referrerpolicy=no-referrer-when-downgrade

cumulus_test_service/
lib.rs

1// Copyright (C) Parity Technologies (UK) Ltd.
2// This file is part of Cumulus.
3
4// Cumulus is free software: you can redistribute it and/or modify
5// it under the terms of the GNU General Public License as published by
6// the Free Software Foundation, either version 3 of the License, or
7// (at your option) any later version.
8
9// Cumulus is distributed in the hope that it will be useful,
10// but WITHOUT ANY WARRANTY; without even the implied warranty of
11// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
12// GNU General Public License for more details.
13
14// You should have received a copy of the GNU General Public License
15// along with Cumulus.  If not, see <http://www.gnu.org/licenses/>.
16
17//! Crate used for testing with Cumulus.
18
19#![warn(missing_docs)]
20
21/// Utilities used for benchmarking
22pub mod bench_utils;
23
24pub mod chain_spec;
25
26use cumulus_client_collator::service::CollatorService;
27use cumulus_client_consensus_aura::{
28	collators::{
29		lookahead::{self as aura, Params as AuraParams},
30		slot_based::{
31			self as slot_based, Params as SlotBasedParams, SlotBasedBlockImport,
32			SlotBasedBlockImportHandle,
33		},
34	},
35	ImportQueueParams,
36};
37use prometheus::Registry;
38use runtime::AccountId;
39use sc_executor::{HeapAllocStrategy, WasmExecutor, DEFAULT_HEAP_ALLOC_STRATEGY};
40use sp_consensus_aura::sr25519::AuthorityPair;
41use std::{
42	collections::HashSet,
43	future::Future,
44	net::{Ipv4Addr, SocketAddr, SocketAddrV4},
45	time::Duration,
46};
47use url::Url;
48
49use crate::runtime::Weight;
50use cumulus_client_cli::{CollatorOptions, RelayChainMode};
51use cumulus_client_consensus_common::{
52	ParachainBlockImport as TParachainBlockImport, ParachainCandidate, ParachainConsensus,
53};
54use cumulus_client_pov_recovery::{RecoveryDelayRange, RecoveryHandle};
55#[allow(deprecated)]
56use cumulus_client_service::old_consensus;
57use cumulus_client_service::{
58	build_network, prepare_node_config, start_relay_chain_tasks, BuildNetworkParams,
59	CollatorSybilResistance, DARecoveryProfile, StartRelayChainTasksParams,
60};
61use cumulus_primitives_core::{relay_chain::ValidationCode, GetParachainInfo, ParaId};
62use cumulus_relay_chain_inprocess_interface::RelayChainInProcessInterface;
63use cumulus_relay_chain_interface::{RelayChainError, RelayChainInterface, RelayChainResult};
64use cumulus_relay_chain_minimal_node::build_minimal_relay_chain_node_with_rpc;
65
66use cumulus_test_runtime::{Hash, Header, NodeBlock as Block, RuntimeApi};
67
68use frame_system_rpc_runtime_api::AccountNonceApi;
69use polkadot_node_subsystem::{errors::RecoveryError, messages::AvailabilityRecoveryMessage};
70use polkadot_overseer::Handle as OverseerHandle;
71use polkadot_primitives::{CandidateHash, CollatorPair, Hash as PHash, PersistedValidationData};
72use polkadot_service::ProvideRuntimeApi;
73use sc_consensus::ImportQueue;
74use sc_network::{
75	config::{FullNetworkConfiguration, TransportConfig},
76	multiaddr,
77	service::traits::NetworkService,
78	NetworkBackend, NetworkBlock, NetworkStateInfo,
79};
80use sc_service::{
81	config::{
82		BlocksPruning, DatabaseSource, ExecutorConfiguration, KeystoreConfig, MultiaddrWithPeerId,
83		NetworkConfiguration, OffchainWorkerConfig, PruningMode, RpcBatchRequestConfig,
84		RpcConfiguration, RpcEndpoint, WasmExecutionMethod,
85	},
86	BasePath, ChainSpec as ChainSpecService, Configuration, Error as ServiceError,
87	PartialComponents, Role, RpcHandlers, TFullBackend, TFullClient, TaskManager,
88};
89use sp_arithmetic::traits::SaturatedConversion;
90use sp_blockchain::HeaderBackend;
91use sp_core::Pair;
92use sp_keyring::Sr25519Keyring;
93use sp_runtime::{codec::Encode, generic, MultiAddress};
94use sp_state_machine::BasicExternalities;
95use std::sync::Arc;
96use substrate_test_client::{
97	BlockchainEventsExt, RpcHandlersExt, RpcTransactionError, RpcTransactionOutput,
98};
99
100pub use chain_spec::*;
101pub use cumulus_test_runtime as runtime;
102pub use sp_keyring::Sr25519Keyring as Keyring;
103
104const LOG_TARGET: &str = "cumulus-test-service";
105
106/// A consensus that will never produce any block.
107#[derive(Clone)]
108struct NullConsensus;
109
110#[async_trait::async_trait]
111impl ParachainConsensus<Block> for NullConsensus {
112	async fn produce_candidate(
113		&mut self,
114		_: &Header,
115		_: PHash,
116		_: &PersistedValidationData,
117	) -> Option<ParachainCandidate<Block>> {
118		None
119	}
120}
121
122/// The signature of the announce block fn.
123pub type AnnounceBlockFn = Arc<dyn Fn(Hash, Option<Vec<u8>>) + Send + Sync>;
124
125type HostFunctions =
126	(sp_io::SubstrateHostFunctions, cumulus_client_service::storage_proof_size::HostFunctions);
127/// The client type being used by the test service.
128pub type Client = TFullClient<runtime::NodeBlock, runtime::RuntimeApi, WasmExecutor<HostFunctions>>;
129
130/// The backend type being used by the test service.
131pub type Backend = TFullBackend<Block>;
132
133/// The block-import type being used by the test service.
134pub type ParachainBlockImport =
135	TParachainBlockImport<Block, SlotBasedBlockImport<Block, Arc<Client>, Client>, Backend>;
136
137/// Transaction pool type used by the test service
138pub type TransactionPool = Arc<sc_transaction_pool::TransactionPoolHandle<Block, Client>>;
139
140/// Recovery handle that fails regularly to simulate unavailable povs.
141pub struct FailingRecoveryHandle {
142	overseer_handle: OverseerHandle,
143	counter: u32,
144	failed_hashes: HashSet<CandidateHash>,
145}
146
147impl FailingRecoveryHandle {
148	/// Create a new FailingRecoveryHandle
149	pub fn new(overseer_handle: OverseerHandle) -> Self {
150		Self { overseer_handle, counter: 0, failed_hashes: Default::default() }
151	}
152}
153
154#[async_trait::async_trait]
155impl RecoveryHandle for FailingRecoveryHandle {
156	async fn send_recovery_msg(
157		&mut self,
158		message: AvailabilityRecoveryMessage,
159		origin: &'static str,
160	) {
161		let AvailabilityRecoveryMessage::RecoverAvailableData(ref receipt, _, _, _, _) = message;
162		let candidate_hash = receipt.hash();
163
164		// For every 3rd block we immediately signal unavailability to trigger
165		// a retry. The same candidate is never failed multiple times to ensure progress.
166		if self.counter % 3 == 0 && self.failed_hashes.insert(candidate_hash) {
167			tracing::info!(target: LOG_TARGET, ?candidate_hash, "Failing pov recovery.");
168
169			let AvailabilityRecoveryMessage::RecoverAvailableData(_, _, _, _, back_sender) =
170				message;
171			back_sender
172				.send(Err(RecoveryError::Unavailable))
173				.expect("Return channel should work here.");
174		} else {
175			self.overseer_handle.send_msg(message, origin).await;
176		}
177		self.counter += 1;
178	}
179}
180
181/// Assembly of PartialComponents (enough to run chain ops subcommands)
182pub type Service = PartialComponents<
183	Client,
184	Backend,
185	(),
186	sc_consensus::import_queue::BasicQueue<Block>,
187	sc_transaction_pool::TransactionPoolHandle<Block, Client>,
188	(ParachainBlockImport, SlotBasedBlockImportHandle<Block>),
189>;
190
191/// Starts a `ServiceBuilder` for a full service.
192///
193/// Use this macro if you don't actually need the full service, but just the builder in order to
194/// be able to perform chain operations.
195pub fn new_partial(
196	config: &mut Configuration,
197	enable_import_proof_record: bool,
198) -> Result<Service, sc_service::Error> {
199	let heap_pages = config
200		.executor
201		.default_heap_pages
202		.map_or(DEFAULT_HEAP_ALLOC_STRATEGY, |h| HeapAllocStrategy::Static { extra_pages: h as _ });
203
204	let executor = WasmExecutor::builder()
205		.with_execution_method(config.executor.wasm_method)
206		.with_onchain_heap_alloc_strategy(heap_pages)
207		.with_offchain_heap_alloc_strategy(heap_pages)
208		.with_max_runtime_instances(config.executor.max_runtime_instances)
209		.with_runtime_cache_size(config.executor.runtime_cache_size)
210		.build();
211
212	let (client, backend, keystore_container, task_manager) =
213		sc_service::new_full_parts_record_import::<Block, RuntimeApi, _>(
214			config,
215			None,
216			executor,
217			enable_import_proof_record,
218		)?;
219	let client = Arc::new(client);
220
221	let (block_import, slot_based_handle) =
222		SlotBasedBlockImport::new(client.clone(), client.clone());
223	let block_import = ParachainBlockImport::new(block_import, backend.clone());
224
225	let transaction_pool = Arc::from(
226		sc_transaction_pool::Builder::new(
227			task_manager.spawn_essential_handle(),
228			client.clone(),
229			config.role.is_authority().into(),
230		)
231		.with_options(config.transaction_pool.clone())
232		.with_prometheus(config.prometheus_registry())
233		.build(),
234	);
235
236	let slot_duration = sc_consensus_aura::slot_duration(&*client)?;
237	let import_queue = cumulus_client_consensus_aura::import_queue::<AuthorityPair, _, _, _, _, _>(
238		ImportQueueParams {
239			block_import: block_import.clone(),
240			client: client.clone(),
241			create_inherent_data_providers: move |_, ()| async move {
242				let timestamp = sp_timestamp::InherentDataProvider::from_system_time();
243
244				let slot =
245					sp_consensus_aura::inherents::InherentDataProvider::from_timestamp_and_slot_duration(
246						*timestamp,
247						slot_duration,
248					);
249
250				Ok((slot, timestamp))
251			},
252			spawner: &task_manager.spawn_essential_handle(),
253			registry: None,
254			telemetry: None,
255		},
256	)?;
257
258	let params = PartialComponents {
259		backend,
260		client,
261		import_queue,
262		keystore_container,
263		task_manager,
264		transaction_pool,
265		select_chain: (),
266		other: (block_import, slot_based_handle),
267	};
268
269	Ok(params)
270}
271
272async fn build_relay_chain_interface(
273	relay_chain_config: Configuration,
274	parachain_prometheus_registry: Option<&Registry>,
275	collator_key: Option<CollatorPair>,
276	collator_options: CollatorOptions,
277	task_manager: &mut TaskManager,
278) -> RelayChainResult<Arc<dyn RelayChainInterface + 'static>> {
279	let relay_chain_node = match collator_options.relay_chain_mode {
280		cumulus_client_cli::RelayChainMode::Embedded => polkadot_test_service::new_full(
281			relay_chain_config,
282			if let Some(ref key) = collator_key {
283				polkadot_service::IsParachainNode::Collator(key.clone())
284			} else {
285				polkadot_service::IsParachainNode::Collator(CollatorPair::generate().0)
286			},
287			None,
288			polkadot_service::CollatorOverseerGen,
289			Some("Relaychain"),
290		)
291		.map_err(|e| RelayChainError::Application(Box::new(e) as Box<_>))?,
292		cumulus_client_cli::RelayChainMode::ExternalRpc(rpc_target_urls) =>
293			return build_minimal_relay_chain_node_with_rpc(
294				relay_chain_config,
295				parachain_prometheus_registry,
296				task_manager,
297				rpc_target_urls,
298			)
299			.await
300			.map(|r| r.0),
301	};
302
303	task_manager.add_child(relay_chain_node.task_manager);
304	tracing::info!("Using inprocess node.");
305	Ok(Arc::new(RelayChainInProcessInterface::new(
306		relay_chain_node.client.clone(),
307		relay_chain_node.backend.clone(),
308		relay_chain_node.sync_service.clone(),
309		relay_chain_node.overseer_handle.ok_or(RelayChainError::GenericError(
310			"Overseer should be running in full node.".to_string(),
311		))?,
312	)))
313}
314
315/// Start a node with the given parachain `Configuration` and relay chain `Configuration`.
316///
317/// This is the actual implementation that is abstract over the executor and the runtime api.
318#[sc_tracing::logging::prefix_logs_with("Parachain")]
319pub async fn start_node_impl<RB, Net: NetworkBackend<Block, Hash>>(
320	parachain_config: Configuration,
321	collator_key: Option<CollatorPair>,
322	relay_chain_config: Configuration,
323	wrap_announce_block: Option<Box<dyn FnOnce(AnnounceBlockFn) -> AnnounceBlockFn>>,
324	fail_pov_recovery: bool,
325	rpc_ext_builder: RB,
326	consensus: Consensus,
327	collator_options: CollatorOptions,
328	proof_recording_during_import: bool,
329	use_slot_based_collator: bool,
330) -> sc_service::error::Result<(
331	TaskManager,
332	Arc<Client>,
333	Arc<dyn NetworkService>,
334	RpcHandlers,
335	TransactionPool,
336	Arc<Backend>,
337)>
338where
339	RB: Fn(Arc<Client>) -> Result<jsonrpsee::RpcModule<()>, sc_service::Error> + Send + 'static,
340{
341	let mut parachain_config = prepare_node_config(parachain_config);
342
343	let params = new_partial(&mut parachain_config, proof_recording_during_import)?;
344
345	let transaction_pool = params.transaction_pool.clone();
346	let mut task_manager = params.task_manager;
347
348	let client = params.client.clone();
349	let backend = params.backend.clone();
350
351	let block_import = params.other.0;
352	let slot_based_handle = params.other.1;
353	let relay_chain_interface = build_relay_chain_interface(
354		relay_chain_config,
355		parachain_config.prometheus_registry(),
356		collator_key.clone(),
357		collator_options.clone(),
358		&mut task_manager,
359	)
360	.await
361	.map_err(|e| sc_service::Error::Application(Box::new(e) as Box<_>))?;
362
363	let import_queue_service = params.import_queue.service();
364	let prometheus_registry = parachain_config.prometheus_registry().cloned();
365	let net_config = FullNetworkConfiguration::<Block, Hash, Net>::new(
366		&parachain_config.network,
367		prometheus_registry.clone(),
368	);
369
370	let best_hash = client.chain_info().best_hash;
371	let para_id = client
372		.runtime_api()
373		.parachain_id(best_hash)
374		.map_err(|e| sc_service::Error::Application(Box::new(e) as Box<_>))?;
375	tracing::info!("Parachain id: {:?}", para_id);
376
377	let (network, system_rpc_tx, tx_handler_controller, sync_service) =
378		build_network(BuildNetworkParams {
379			parachain_config: &parachain_config,
380			net_config,
381			client: client.clone(),
382			transaction_pool: transaction_pool.clone(),
383			para_id,
384			spawn_handle: task_manager.spawn_handle(),
385			relay_chain_interface: relay_chain_interface.clone(),
386			import_queue: params.import_queue,
387			metrics: Net::register_notification_metrics(
388				parachain_config.prometheus_config.as_ref().map(|config| &config.registry),
389			),
390			sybil_resistance_level: CollatorSybilResistance::Resistant, /* Either Aura that is
391			                                                             * resistant or null that
392			                                                             * is not producing any
393			                                                             * blocks at all. */
394		})
395		.await?;
396
397	let keystore = params.keystore_container.keystore();
398	let rpc_builder = {
399		let client = client.clone();
400		Box::new(move |_| rpc_ext_builder(client.clone()))
401	};
402
403	let rpc_handlers = sc_service::spawn_tasks(sc_service::SpawnTasksParams {
404		rpc_builder,
405		client: client.clone(),
406		transaction_pool: transaction_pool.clone(),
407		task_manager: &mut task_manager,
408		config: parachain_config,
409		keystore: keystore.clone(),
410		backend: backend.clone(),
411		network: network.clone(),
412		sync_service: sync_service.clone(),
413		system_rpc_tx,
414		tx_handler_controller,
415		telemetry: None,
416	})?;
417
418	let announce_block = {
419		let sync_service = sync_service.clone();
420		Arc::new(move |hash, data| sync_service.announce_block(hash, data))
421	};
422
423	let announce_block = wrap_announce_block
424		.map(|w| (w)(announce_block.clone()))
425		.unwrap_or_else(|| announce_block);
426
427	let overseer_handle = relay_chain_interface
428		.overseer_handle()
429		.map_err(|e| sc_service::Error::Application(Box::new(e)))?;
430
431	let recovery_handle: Box<dyn RecoveryHandle> = if fail_pov_recovery {
432		Box::new(FailingRecoveryHandle::new(overseer_handle.clone()))
433	} else {
434		Box::new(overseer_handle.clone())
435	};
436	let relay_chain_slot_duration = Duration::from_secs(6);
437
438	start_relay_chain_tasks(StartRelayChainTasksParams {
439		client: client.clone(),
440		announce_block: announce_block.clone(),
441		para_id,
442		relay_chain_interface: relay_chain_interface.clone(),
443		task_manager: &mut task_manager,
444		// Increase speed of recovery for testing purposes.
445		da_recovery_profile: DARecoveryProfile::Other(RecoveryDelayRange {
446			min: Duration::from_secs(1),
447			max: Duration::from_secs(5),
448		}),
449		import_queue: import_queue_service,
450		relay_chain_slot_duration,
451		recovery_handle,
452		sync_service: sync_service.clone(),
453		prometheus_registry: None,
454	})?;
455
456	if let Some(collator_key) = collator_key {
457		if let Consensus::Null = consensus {
458			#[allow(deprecated)]
459			old_consensus::start_collator(old_consensus::StartCollatorParams {
460				block_status: client.clone(),
461				announce_block,
462				runtime_api: client.clone(),
463				spawner: task_manager.spawn_handle(),
464				para_id,
465				parachain_consensus: Box::new(NullConsensus) as Box<_>,
466				key: collator_key,
467				overseer_handle,
468			})
469			.await;
470		} else {
471			let proposer = sc_basic_authorship::ProposerFactory::with_proof_recording(
472				task_manager.spawn_handle(),
473				client.clone(),
474				transaction_pool.clone(),
475				prometheus_registry.as_ref(),
476				None,
477			);
478
479			let collator_service = CollatorService::new(
480				client.clone(),
481				Arc::new(task_manager.spawn_handle()),
482				announce_block,
483				client.clone(),
484			);
485
486			let client_for_aura = client.clone();
487
488			if use_slot_based_collator {
489				tracing::info!(target: LOG_TARGET, "Starting block authoring with slot based authoring.");
490				let params = SlotBasedParams {
491					create_inherent_data_providers: move |_, ()| async move { Ok(()) },
492					block_import,
493					para_client: client.clone(),
494					para_backend: backend.clone(),
495					relay_client: relay_chain_interface,
496					code_hash_provider: move |block_hash| {
497						client_for_aura
498							.code_at(block_hash)
499							.ok()
500							.map(|c| ValidationCode::from(c).hash())
501					},
502					keystore,
503					collator_key,
504					relay_chain_slot_duration,
505					para_id,
506					proposer,
507					collator_service,
508					authoring_duration: Duration::from_millis(2000),
509					reinitialize: false,
510					slot_offset: Duration::from_secs(1),
511					block_import_handle: slot_based_handle,
512					spawner: task_manager.spawn_handle(),
513					export_pov: None,
514					max_pov_percentage: None,
515				};
516
517				slot_based::run::<Block, AuthorityPair, _, _, _, _, _, _, _, _, _>(params);
518			} else {
519				tracing::info!(target: LOG_TARGET, "Starting block authoring with lookahead collator.");
520				let params = AuraParams {
521					create_inherent_data_providers: move |_, ()| async move { Ok(()) },
522					block_import,
523					para_client: client.clone(),
524					para_backend: backend.clone(),
525					relay_client: relay_chain_interface,
526					code_hash_provider: move |block_hash| {
527						client_for_aura
528							.code_at(block_hash)
529							.ok()
530							.map(|c| ValidationCode::from(c).hash())
531					},
532					keystore,
533					collator_key,
534					para_id,
535					overseer_handle,
536					relay_chain_slot_duration,
537					proposer,
538					collator_service,
539					authoring_duration: Duration::from_millis(2000),
540					reinitialize: false,
541					max_pov_percentage: None,
542				};
543
544				let fut = aura::run::<Block, AuthorityPair, _, _, _, _, _, _, _, _>(params);
545				task_manager.spawn_essential_handle().spawn("aura", None, fut);
546			}
547		}
548	}
549
550	Ok((task_manager, client, network, rpc_handlers, transaction_pool, backend))
551}
552
553/// A Cumulus test node instance used for testing.
554pub struct TestNode {
555	/// TaskManager's instance.
556	pub task_manager: TaskManager,
557	/// Client's instance.
558	pub client: Arc<Client>,
559	/// Node's network.
560	pub network: Arc<dyn NetworkService>,
561	/// The `MultiaddrWithPeerId` to this node. This is useful if you want to pass it as "boot
562	/// node" to other nodes.
563	pub addr: MultiaddrWithPeerId,
564	/// RPCHandlers to make RPC queries.
565	pub rpc_handlers: RpcHandlers,
566	/// Node's transaction pool
567	pub transaction_pool: TransactionPool,
568	/// Node's backend
569	pub backend: Arc<Backend>,
570}
571
572#[allow(missing_docs)]
573pub enum Consensus {
574	/// Use Aura consensus.
575	Aura,
576	/// Use the null consensus that will never produce any block.
577	Null,
578}
579
580/// A builder to create a [`TestNode`].
581pub struct TestNodeBuilder {
582	para_id: ParaId,
583	tokio_handle: tokio::runtime::Handle,
584	key: Sr25519Keyring,
585	collator_key: Option<CollatorPair>,
586	parachain_nodes: Vec<MultiaddrWithPeerId>,
587	parachain_nodes_exclusive: bool,
588	relay_chain_nodes: Vec<MultiaddrWithPeerId>,
589	wrap_announce_block: Option<Box<dyn FnOnce(AnnounceBlockFn) -> AnnounceBlockFn>>,
590	storage_update_func_parachain: Option<Box<dyn Fn()>>,
591	storage_update_func_relay_chain: Option<Box<dyn Fn()>>,
592	consensus: Consensus,
593	relay_chain_mode: RelayChainMode,
594	endowed_accounts: Vec<AccountId>,
595	record_proof_during_import: bool,
596}
597
598impl TestNodeBuilder {
599	/// Create a new instance of `Self`.
600	///
601	/// `para_id` - The parachain id this node is running for.
602	/// `tokio_handle` - The tokio handler to use.
603	/// `key` - The key that will be used to generate the name and that will be passed as
604	/// `dev_seed`.
605	pub fn new(para_id: ParaId, tokio_handle: tokio::runtime::Handle, key: Sr25519Keyring) -> Self {
606		TestNodeBuilder {
607			key,
608			para_id,
609			tokio_handle,
610			collator_key: None,
611			parachain_nodes: Vec::new(),
612			parachain_nodes_exclusive: false,
613			relay_chain_nodes: Vec::new(),
614			wrap_announce_block: None,
615			storage_update_func_parachain: None,
616			storage_update_func_relay_chain: None,
617			consensus: Consensus::Aura,
618			endowed_accounts: Default::default(),
619			relay_chain_mode: RelayChainMode::Embedded,
620			record_proof_during_import: true,
621		}
622	}
623
624	/// Enable collator for this node.
625	pub fn enable_collator(mut self) -> Self {
626		let collator_key = CollatorPair::generate().0;
627		self.collator_key = Some(collator_key);
628		self
629	}
630
631	/// Instruct the node to exclusively connect to registered parachain nodes.
632	///
633	/// Parachain nodes can be registered using [`Self::connect_to_parachain_node`] and
634	/// [`Self::connect_to_parachain_nodes`].
635	pub fn exclusively_connect_to_registered_parachain_nodes(mut self) -> Self {
636		self.parachain_nodes_exclusive = true;
637		self
638	}
639
640	/// Make the node connect to the given parachain node.
641	///
642	/// By default the node will not be connected to any node or will be able to discover any other
643	/// node.
644	pub fn connect_to_parachain_node(mut self, node: &TestNode) -> Self {
645		self.parachain_nodes.push(node.addr.clone());
646		self
647	}
648
649	/// Make the node connect to the given parachain nodes.
650	///
651	/// By default the node will not be connected to any node or will be able to discover any other
652	/// node.
653	pub fn connect_to_parachain_nodes<'a>(
654		mut self,
655		nodes: impl IntoIterator<Item = &'a TestNode>,
656	) -> Self {
657		self.parachain_nodes.extend(nodes.into_iter().map(|n| n.addr.clone()));
658		self
659	}
660
661	/// Make the node connect to the given relay chain node.
662	///
663	/// By default the node will not be connected to any node or will be able to discover any other
664	/// node.
665	pub fn connect_to_relay_chain_node(
666		mut self,
667		node: &polkadot_test_service::PolkadotTestNode,
668	) -> Self {
669		self.relay_chain_nodes.push(node.addr.clone());
670		self
671	}
672
673	/// Make the node connect to the given relay chain nodes.
674	///
675	/// By default the node will not be connected to any node or will be able to discover any other
676	/// node.
677	pub fn connect_to_relay_chain_nodes<'a>(
678		mut self,
679		nodes: impl IntoIterator<Item = &'a polkadot_test_service::PolkadotTestNode>,
680	) -> Self {
681		self.relay_chain_nodes.extend(nodes.into_iter().map(|n| n.addr.clone()));
682		self
683	}
684
685	/// Wrap the announce block function of this node.
686	pub fn wrap_announce_block(
687		mut self,
688		wrap: impl FnOnce(AnnounceBlockFn) -> AnnounceBlockFn + 'static,
689	) -> Self {
690		self.wrap_announce_block = Some(Box::new(wrap));
691		self
692	}
693
694	/// Allows accessing the parachain storage before the test node is built.
695	pub fn update_storage_parachain(mut self, updater: impl Fn() + 'static) -> Self {
696		self.storage_update_func_parachain = Some(Box::new(updater));
697		self
698	}
699
700	/// Allows accessing the relay chain storage before the test node is built.
701	pub fn update_storage_relay_chain(mut self, updater: impl Fn() + 'static) -> Self {
702		self.storage_update_func_relay_chain = Some(Box::new(updater));
703		self
704	}
705
706	/// Use the null consensus that will never author any block.
707	pub fn use_null_consensus(mut self) -> Self {
708		self.consensus = Consensus::Null;
709		self
710	}
711
712	/// Connect to full node via RPC.
713	pub fn use_external_relay_chain_node_at_url(mut self, network_address: Url) -> Self {
714		self.relay_chain_mode = RelayChainMode::ExternalRpc(vec![network_address]);
715		self
716	}
717
718	/// Connect to full node via RPC.
719	pub fn use_external_relay_chain_node_at_port(mut self, port: u16) -> Self {
720		let mut localhost_url =
721			Url::parse("ws://localhost").expect("Should be able to parse localhost Url");
722		localhost_url.set_port(Some(port)).expect("Should be able to set port");
723		self.relay_chain_mode = RelayChainMode::ExternalRpc(vec![localhost_url]);
724		self
725	}
726
727	/// Accounts which will have an initial balance.
728	pub fn endowed_accounts(mut self, accounts: Vec<AccountId>) -> TestNodeBuilder {
729		self.endowed_accounts = accounts;
730		self
731	}
732
733	/// Record proofs during import.
734	pub fn import_proof_recording(mut self, should_record_proof: bool) -> TestNodeBuilder {
735		self.record_proof_during_import = should_record_proof;
736		self
737	}
738
739	/// Build the [`TestNode`].
740	pub async fn build(self) -> TestNode {
741		let parachain_config = node_config(
742			self.storage_update_func_parachain.unwrap_or_else(|| Box::new(|| ())),
743			self.tokio_handle.clone(),
744			self.key,
745			self.parachain_nodes,
746			self.parachain_nodes_exclusive,
747			self.para_id,
748			self.collator_key.is_some(),
749			self.endowed_accounts,
750		)
751		.expect("could not generate Configuration");
752
753		let mut relay_chain_config = polkadot_test_service::node_config(
754			self.storage_update_func_relay_chain.unwrap_or_else(|| Box::new(|| ())),
755			self.tokio_handle,
756			self.key,
757			self.relay_chain_nodes,
758			false,
759		);
760
761		let collator_options = CollatorOptions {
762			relay_chain_mode: self.relay_chain_mode,
763			embedded_dht_bootnode: true,
764			dht_bootnode_discovery: true,
765		};
766
767		relay_chain_config.network.node_name =
768			format!("{} (relay chain)", relay_chain_config.network.node_name);
769
770		let (task_manager, client, network, rpc_handlers, transaction_pool, backend) =
771			match relay_chain_config.network.network_backend {
772				sc_network::config::NetworkBackendType::Libp2p =>
773					start_node_impl::<_, sc_network::NetworkWorker<_, _>>(
774						parachain_config,
775						self.collator_key,
776						relay_chain_config,
777						self.wrap_announce_block,
778						false,
779						|_| Ok(jsonrpsee::RpcModule::new(())),
780						self.consensus,
781						collator_options,
782						self.record_proof_during_import,
783						false,
784					)
785					.await
786					.expect("could not create Cumulus test service"),
787				sc_network::config::NetworkBackendType::Litep2p =>
788					start_node_impl::<_, sc_network::Litep2pNetworkBackend>(
789						parachain_config,
790						self.collator_key,
791						relay_chain_config,
792						self.wrap_announce_block,
793						false,
794						|_| Ok(jsonrpsee::RpcModule::new(())),
795						self.consensus,
796						collator_options,
797						self.record_proof_during_import,
798						false,
799					)
800					.await
801					.expect("could not create Cumulus test service"),
802			};
803		let peer_id = network.local_peer_id();
804		let multiaddr = polkadot_test_service::get_listen_address(network.clone()).await;
805		let addr = MultiaddrWithPeerId { multiaddr, peer_id };
806
807		TestNode { task_manager, client, network, addr, rpc_handlers, transaction_pool, backend }
808	}
809}
810
811/// Create a Cumulus `Configuration`.
812///
813/// By default a TCP socket will be used, therefore you need to provide nodes if you want the
814/// node to be connected to other nodes.
815///
816/// If `nodes_exclusive` is `true`, the node will only connect to the given `nodes` and not to any
817/// other node.
818///
819/// The `storage_update_func` can be used to make adjustments to the runtime genesis.
820pub fn node_config(
821	storage_update_func: impl Fn(),
822	tokio_handle: tokio::runtime::Handle,
823	key: Sr25519Keyring,
824	nodes: Vec<MultiaddrWithPeerId>,
825	nodes_exclusive: bool,
826	para_id: ParaId,
827	is_collator: bool,
828	endowed_accounts: Vec<AccountId>,
829) -> Result<Configuration, ServiceError> {
830	let base_path = BasePath::new_temp_dir()?;
831	let root = base_path.path().join(format!("cumulus_test_service_{}", key));
832	let role = if is_collator { Role::Authority } else { Role::Full };
833	let key_seed = key.to_seed();
834	let mut spec = Box::new(chain_spec::get_chain_spec_with_extra_endowed(
835		Some(para_id),
836		endowed_accounts,
837		cumulus_test_runtime::WASM_BINARY.expect("WASM binary was not built, please build it!"),
838	));
839
840	let mut storage = spec.as_storage_builder().build_storage().expect("could not build storage");
841
842	BasicExternalities::execute_with_storage(&mut storage, storage_update_func);
843	spec.set_storage(storage);
844
845	let mut network_config = NetworkConfiguration::new(
846		format!("{} (parachain)", key_seed),
847		"network/test/0.1",
848		Default::default(),
849		None,
850	);
851
852	if nodes_exclusive {
853		network_config.default_peers_set.reserved_nodes = nodes;
854		network_config.default_peers_set.non_reserved_mode =
855			sc_network::config::NonReservedPeerMode::Deny;
856	} else {
857		network_config.boot_nodes = nodes;
858	}
859
860	network_config.allow_non_globals_in_dht = true;
861
862	let addr: multiaddr::Multiaddr = "/ip4/127.0.0.1/tcp/0".parse().expect("valid address; qed");
863	network_config.listen_addresses.push(addr.clone());
864	network_config.transport =
865		TransportConfig::Normal { enable_mdns: false, allow_private_ip: true };
866
867	Ok(Configuration {
868		impl_name: "cumulus-test-node".to_string(),
869		impl_version: "0.1".to_string(),
870		role,
871		tokio_handle,
872		transaction_pool: Default::default(),
873		network: network_config,
874		keystore: KeystoreConfig::InMemory,
875		database: DatabaseSource::RocksDb { path: root.join("db"), cache_size: 128 },
876		trie_cache_maximum_size: Some(64 * 1024 * 1024),
877		warm_up_trie_cache: None,
878		state_pruning: Some(PruningMode::ArchiveAll),
879		blocks_pruning: BlocksPruning::KeepAll,
880		chain_spec: spec,
881		executor: ExecutorConfiguration {
882			wasm_method: WasmExecutionMethod::Compiled {
883				instantiation_strategy:
884					sc_executor_wasmtime::InstantiationStrategy::PoolingCopyOnWrite,
885			},
886			..ExecutorConfiguration::default()
887		},
888		rpc: RpcConfiguration {
889			addr: None,
890			max_connections: Default::default(),
891			cors: None,
892			methods: Default::default(),
893			max_request_size: Default::default(),
894			max_response_size: Default::default(),
895			id_provider: None,
896			max_subs_per_conn: Default::default(),
897			port: 9945,
898			message_buffer_capacity: Default::default(),
899			batch_config: RpcBatchRequestConfig::Unlimited,
900			rate_limit: None,
901			rate_limit_whitelisted_ips: Default::default(),
902			rate_limit_trust_proxy_headers: Default::default(),
903		},
904		prometheus_config: None,
905		telemetry_endpoints: None,
906		offchain_worker: OffchainWorkerConfig { enabled: true, indexing_enabled: false },
907		force_authoring: false,
908		disable_grandpa: false,
909		dev_key_seed: Some(key_seed),
910		tracing_targets: None,
911		tracing_receiver: Default::default(),
912		announce_block: true,
913		data_path: root,
914		base_path,
915		wasm_runtime_overrides: None,
916	})
917}
918
919impl TestNode {
920	/// Wait for `count` blocks to be imported in the node and then exit. This function will not
921	/// return if no blocks are ever created, thus you should restrict the maximum amount of time of
922	/// the test execution.
923	pub fn wait_for_blocks(&self, count: usize) -> impl Future<Output = ()> {
924		self.client.wait_for_blocks(count)
925	}
926
927	/// Send an extrinsic to this node.
928	pub async fn send_extrinsic(
929		&self,
930		function: impl Into<runtime::RuntimeCall>,
931		caller: Sr25519Keyring,
932	) -> Result<RpcTransactionOutput, RpcTransactionError> {
933		let extrinsic = construct_extrinsic(&self.client, function, caller.pair(), Some(0));
934
935		self.rpc_handlers.send_transaction(extrinsic.into()).await
936	}
937
938	/// Register a parachain at this relay chain.
939	pub async fn schedule_upgrade(&self, validation: Vec<u8>) -> Result<(), RpcTransactionError> {
940		let call = frame_system::Call::set_code { code: validation };
941
942		self.send_extrinsic(
943			runtime::SudoCall::sudo_unchecked_weight {
944				call: Box::new(call.into()),
945				weight: Weight::from_parts(1_000, 0),
946			},
947			Sr25519Keyring::Alice,
948		)
949		.await
950		.map(drop)
951	}
952}
953
954/// Fetch account nonce for key pair
955pub fn fetch_nonce(client: &Client, account: sp_core::sr25519::Public) -> u32 {
956	let best_hash = client.chain_info().best_hash;
957	client
958		.runtime_api()
959		.account_nonce(best_hash, account.into())
960		.expect("Fetching account nonce works; qed")
961}
962
963/// Construct an extrinsic that can be applied to the test runtime.
964pub fn construct_extrinsic(
965	client: &Client,
966	function: impl Into<runtime::RuntimeCall>,
967	caller: sp_core::sr25519::Pair,
968	nonce: Option<u32>,
969) -> runtime::UncheckedExtrinsic {
970	let function = function.into();
971	let current_block_hash = client.info().best_hash;
972	let current_block = client.info().best_number.saturated_into();
973	let genesis_block = client.hash(0).unwrap().unwrap();
974	let nonce = nonce.unwrap_or_else(|| fetch_nonce(client, caller.public()));
975	let period = runtime::BlockHashCount::get()
976		.checked_next_power_of_two()
977		.map(|c| c / 2)
978		.unwrap_or(2) as u64;
979	let tip = 0;
980	let tx_ext: runtime::TxExtension = (
981		frame_system::AuthorizeCall::<runtime::Runtime>::new(),
982		frame_system::CheckNonZeroSender::<runtime::Runtime>::new(),
983		frame_system::CheckSpecVersion::<runtime::Runtime>::new(),
984		frame_system::CheckGenesis::<runtime::Runtime>::new(),
985		frame_system::CheckEra::<runtime::Runtime>::from(generic::Era::mortal(
986			period,
987			current_block,
988		)),
989		frame_system::CheckNonce::<runtime::Runtime>::from(nonce),
990		frame_system::CheckWeight::<runtime::Runtime>::new(),
991		pallet_transaction_payment::ChargeTransactionPayment::<runtime::Runtime>::from(tip),
992	)
993		.into();
994	let raw_payload = runtime::SignedPayload::from_raw(
995		function.clone(),
996		tx_ext.clone(),
997		((), (), runtime::VERSION.spec_version, genesis_block, current_block_hash, (), (), ()),
998	);
999	let signature = raw_payload.using_encoded(|e| caller.sign(e));
1000	runtime::UncheckedExtrinsic::new_signed(
1001		function,
1002		MultiAddress::Id(caller.public().into()),
1003		runtime::Signature::Sr25519(signature),
1004		tx_ext,
1005	)
1006}
1007
1008/// Run a relay-chain validator node.
1009///
1010/// This is essentially a wrapper around
1011/// [`run_validator_node`](polkadot_test_service::run_validator_node).
1012pub fn run_relay_chain_validator_node(
1013	tokio_handle: tokio::runtime::Handle,
1014	key: Sr25519Keyring,
1015	storage_update_func: impl Fn(),
1016	boot_nodes: Vec<MultiaddrWithPeerId>,
1017	port: Option<u16>,
1018) -> polkadot_test_service::PolkadotTestNode {
1019	let mut config = polkadot_test_service::node_config(
1020		storage_update_func,
1021		tokio_handle.clone(),
1022		key,
1023		boot_nodes,
1024		true,
1025	);
1026
1027	if let Some(port) = port {
1028		config.rpc.addr = Some(vec![RpcEndpoint {
1029			batch_config: config.rpc.batch_config,
1030			cors: config.rpc.cors.clone(),
1031			listen_addr: SocketAddr::V4(SocketAddrV4::new(Ipv4Addr::LOCALHOST, port)),
1032			max_connections: config.rpc.max_connections,
1033			max_payload_in_mb: config.rpc.max_request_size,
1034			max_payload_out_mb: config.rpc.max_response_size,
1035			max_subscriptions_per_connection: config.rpc.max_subs_per_conn,
1036			max_buffer_capacity_per_connection: config.rpc.message_buffer_capacity,
1037			rpc_methods: config.rpc.methods,
1038			rate_limit: config.rpc.rate_limit,
1039			rate_limit_trust_proxy_headers: config.rpc.rate_limit_trust_proxy_headers,
1040			rate_limit_whitelisted_ips: config.rpc.rate_limit_whitelisted_ips.clone(),
1041			retry_random_port: true,
1042			is_optional: false,
1043		}]);
1044	}
1045
1046	let mut workers_path = std::env::current_exe().unwrap();
1047	workers_path.pop();
1048	workers_path.pop();
1049
1050	tokio_handle.block_on(async move {
1051		polkadot_test_service::run_validator_node(config, Some(workers_path)).await
1052	})
1053}