referrerpolicy=no-referrer-when-downgrade

cumulus_test_service/
lib.rs

1// Copyright (C) Parity Technologies (UK) Ltd.
2// This file is part of Cumulus.
3
4// Cumulus is free software: you can redistribute it and/or modify
5// it under the terms of the GNU General Public License as published by
6// the Free Software Foundation, either version 3 of the License, or
7// (at your option) any later version.
8
9// Cumulus is distributed in the hope that it will be useful,
10// but WITHOUT ANY WARRANTY; without even the implied warranty of
11// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
12// GNU General Public License for more details.
13
14// You should have received a copy of the GNU General Public License
15// along with Cumulus.  If not, see <http://www.gnu.org/licenses/>.
16
17//! Crate used for testing with Cumulus.
18
19#![warn(missing_docs)]
20
21/// Utilities used for benchmarking
22pub mod bench_utils;
23
24pub mod chain_spec;
25
26use cumulus_client_collator::service::CollatorService;
27use cumulus_client_consensus_aura::{
28	collators::{
29		lookahead::{self as aura, Params as AuraParams},
30		slot_based::{
31			self as slot_based, Params as SlotBasedParams, SlotBasedBlockImport,
32			SlotBasedBlockImportHandle,
33		},
34	},
35	ImportQueueParams,
36};
37use prometheus::Registry;
38use runtime::AccountId;
39use sc_executor::{HeapAllocStrategy, WasmExecutor, DEFAULT_HEAP_ALLOC_STRATEGY};
40use sp_consensus_aura::sr25519::AuthorityPair;
41use std::{
42	collections::HashSet,
43	future::Future,
44	net::{Ipv4Addr, SocketAddr, SocketAddrV4},
45	time::Duration,
46};
47use url::Url;
48
49use crate::runtime::Weight;
50use cumulus_client_cli::{CollatorOptions, RelayChainMode};
51use cumulus_client_consensus_common::ParachainBlockImport as TParachainBlockImport;
52use cumulus_client_pov_recovery::{RecoveryDelayRange, RecoveryHandle};
53use cumulus_client_service::{
54	build_network, prepare_node_config, start_relay_chain_tasks, BuildNetworkParams,
55	CollatorSybilResistance, DARecoveryProfile, ParachainTracingExecuteBlock,
56	StartRelayChainTasksParams,
57};
58use cumulus_primitives_core::{relay_chain::ValidationCode, GetParachainInfo, ParaId};
59use cumulus_relay_chain_inprocess_interface::RelayChainInProcessInterface;
60use cumulus_relay_chain_interface::{RelayChainError, RelayChainInterface, RelayChainResult};
61use cumulus_relay_chain_minimal_node::build_minimal_relay_chain_node_with_rpc;
62
63use cumulus_test_runtime::{Hash, NodeBlock as Block, RuntimeApi};
64
65use frame_system_rpc_runtime_api::AccountNonceApi;
66use polkadot_node_subsystem::{errors::RecoveryError, messages::AvailabilityRecoveryMessage};
67use polkadot_overseer::Handle as OverseerHandle;
68use polkadot_primitives::{CandidateHash, CollatorPair};
69use polkadot_service::ProvideRuntimeApi;
70use sc_consensus::ImportQueue;
71use sc_network::{
72	config::{FullNetworkConfiguration, TransportConfig},
73	multiaddr,
74	service::traits::NetworkService,
75	NetworkBackend, NetworkBlock, NetworkStateInfo,
76};
77use sc_service::{
78	config::{
79		BlocksPruning, DatabaseSource, ExecutorConfiguration, KeystoreConfig, MultiaddrWithPeerId,
80		NetworkConfiguration, OffchainWorkerConfig, PruningMode, RpcBatchRequestConfig,
81		RpcConfiguration, RpcEndpoint, WasmExecutionMethod,
82	},
83	BasePath, ChainSpec as ChainSpecService, Configuration, Error as ServiceError,
84	PartialComponents, Role, RpcHandlers, TFullBackend, TFullClient, TaskManager,
85};
86use sp_arithmetic::traits::SaturatedConversion;
87use sp_blockchain::HeaderBackend;
88use sp_core::Pair;
89use sp_keyring::Sr25519Keyring;
90use sp_runtime::{codec::Encode, generic, MultiAddress};
91use sp_state_machine::BasicExternalities;
92use std::sync::Arc;
93use substrate_test_client::{
94	BlockchainEventsExt, RpcHandlersExt, RpcTransactionError, RpcTransactionOutput,
95};
96
97pub use chain_spec::*;
98pub use cumulus_test_runtime as runtime;
99pub use sp_keyring::Sr25519Keyring as Keyring;
100
101const LOG_TARGET: &str = "cumulus-test-service";
102
103/// The signature of the announce block fn.
104pub type AnnounceBlockFn = Arc<dyn Fn(Hash, Option<Vec<u8>>) + Send + Sync>;
105
106type HostFunctions =
107	(sp_io::SubstrateHostFunctions, cumulus_client_service::storage_proof_size::HostFunctions);
108/// The client type being used by the test service.
109pub type Client = TFullClient<runtime::NodeBlock, runtime::RuntimeApi, WasmExecutor<HostFunctions>>;
110
111/// The backend type being used by the test service.
112pub type Backend = TFullBackend<Block>;
113
114/// The block-import type being used by the test service.
115pub type ParachainBlockImport =
116	TParachainBlockImport<Block, SlotBasedBlockImport<Block, Arc<Client>, Client>, Backend>;
117
118/// Transaction pool type used by the test service
119pub type TransactionPool = Arc<sc_transaction_pool::TransactionPoolHandle<Block, Client>>;
120
121/// Recovery handle that fails regularly to simulate unavailable povs.
122pub struct FailingRecoveryHandle {
123	overseer_handle: OverseerHandle,
124	counter: u32,
125	failed_hashes: HashSet<CandidateHash>,
126}
127
128impl FailingRecoveryHandle {
129	/// Create a new FailingRecoveryHandle
130	pub fn new(overseer_handle: OverseerHandle) -> Self {
131		Self { overseer_handle, counter: 0, failed_hashes: Default::default() }
132	}
133}
134
135#[async_trait::async_trait]
136impl RecoveryHandle for FailingRecoveryHandle {
137	async fn send_recovery_msg(
138		&mut self,
139		message: AvailabilityRecoveryMessage,
140		origin: &'static str,
141	) {
142		let AvailabilityRecoveryMessage::RecoverAvailableData(ref receipt, _, _, _, _) = message;
143		let candidate_hash = receipt.hash();
144
145		// For every 3rd block we immediately signal unavailability to trigger
146		// a retry. The same candidate is never failed multiple times to ensure progress.
147		if self.counter.is_multiple_of(3) && self.failed_hashes.insert(candidate_hash) {
148			tracing::info!(target: LOG_TARGET, ?candidate_hash, "Failing pov recovery.");
149
150			let AvailabilityRecoveryMessage::RecoverAvailableData(_, _, _, _, back_sender) =
151				message;
152			back_sender
153				.send(Err(RecoveryError::Unavailable))
154				.expect("Return channel should work here.");
155		} else {
156			self.overseer_handle.send_msg(message, origin).await;
157		}
158		self.counter += 1;
159	}
160}
161
162/// Assembly of PartialComponents (enough to run chain ops subcommands)
163pub type Service = PartialComponents<
164	Client,
165	Backend,
166	(),
167	sc_consensus::import_queue::BasicQueue<Block>,
168	sc_transaction_pool::TransactionPoolHandle<Block, Client>,
169	(ParachainBlockImport, SlotBasedBlockImportHandle<Block>),
170>;
171
172/// Starts a `ServiceBuilder` for a full service.
173///
174/// Use this macro if you don't actually need the full service, but just the builder in order to
175/// be able to perform chain operations.
176pub fn new_partial(
177	config: &mut Configuration,
178	enable_import_proof_record: bool,
179) -> Result<Service, sc_service::Error> {
180	let heap_pages = config
181		.executor
182		.default_heap_pages
183		.map_or(DEFAULT_HEAP_ALLOC_STRATEGY, |h| HeapAllocStrategy::Static { extra_pages: h as _ });
184
185	let executor = WasmExecutor::builder()
186		.with_execution_method(config.executor.wasm_method)
187		.with_onchain_heap_alloc_strategy(heap_pages)
188		.with_offchain_heap_alloc_strategy(heap_pages)
189		.with_max_runtime_instances(config.executor.max_runtime_instances)
190		.with_runtime_cache_size(config.executor.runtime_cache_size)
191		.build();
192
193	let (client, backend, keystore_container, task_manager) =
194		sc_service::new_full_parts_record_import::<Block, RuntimeApi, _>(
195			config,
196			None,
197			executor,
198			enable_import_proof_record,
199			Default::default(),
200		)?;
201	let client = Arc::new(client);
202
203	let (block_import, block_import_handle) =
204		SlotBasedBlockImport::new(client.clone(), client.clone());
205	let block_import = ParachainBlockImport::new(block_import, backend.clone());
206
207	let transaction_pool = Arc::from(
208		sc_transaction_pool::Builder::new(
209			task_manager.spawn_essential_handle(),
210			client.clone(),
211			config.role.is_authority().into(),
212		)
213		.with_options(config.transaction_pool.clone())
214		.with_prometheus(config.prometheus_registry())
215		.build(),
216	);
217
218	let slot_duration = sc_consensus_aura::slot_duration(&*client)?;
219	let import_queue = cumulus_client_consensus_aura::import_queue::<AuthorityPair, _, _, _, _, _>(
220		ImportQueueParams {
221			block_import: block_import.clone(),
222			client: client.clone(),
223			create_inherent_data_providers: move |_, ()| async move {
224				let timestamp = sp_timestamp::InherentDataProvider::from_system_time();
225
226				let slot =
227					sp_consensus_aura::inherents::InherentDataProvider::from_timestamp_and_slot_duration(
228						*timestamp,
229						slot_duration,
230					);
231
232				Ok((slot, timestamp))
233			},
234			spawner: &task_manager.spawn_essential_handle(),
235			registry: None,
236			telemetry: None,
237		},
238	)?;
239
240	let params = PartialComponents {
241		backend,
242		client,
243		import_queue,
244		keystore_container,
245		task_manager,
246		transaction_pool,
247		select_chain: (),
248		other: (block_import, block_import_handle),
249	};
250
251	Ok(params)
252}
253
254async fn build_relay_chain_interface(
255	relay_chain_config: Configuration,
256	parachain_prometheus_registry: Option<&Registry>,
257	collator_key: Option<CollatorPair>,
258	collator_options: CollatorOptions,
259	task_manager: &mut TaskManager,
260) -> RelayChainResult<Arc<dyn RelayChainInterface + 'static>> {
261	let relay_chain_node = match collator_options.relay_chain_mode {
262		cumulus_client_cli::RelayChainMode::Embedded => polkadot_test_service::new_full(
263			relay_chain_config,
264			if let Some(ref key) = collator_key {
265				polkadot_service::IsParachainNode::Collator(key.clone())
266			} else {
267				polkadot_service::IsParachainNode::Collator(CollatorPair::generate().0)
268			},
269			None,
270			polkadot_service::CollatorOverseerGen,
271			Some("Relaychain"),
272		)
273		.map_err(|e| RelayChainError::Application(Box::new(e) as Box<_>))?,
274		cumulus_client_cli::RelayChainMode::ExternalRpc(rpc_target_urls) => {
275			return build_minimal_relay_chain_node_with_rpc(
276				relay_chain_config,
277				parachain_prometheus_registry,
278				task_manager,
279				rpc_target_urls,
280			)
281			.await
282			.map(|r| r.0)
283		},
284	};
285
286	task_manager.add_child(relay_chain_node.task_manager);
287	tracing::info!("Using inprocess node.");
288	Ok(Arc::new(RelayChainInProcessInterface::new(
289		relay_chain_node.client.clone(),
290		relay_chain_node.backend.clone(),
291		relay_chain_node.sync_service.clone(),
292		relay_chain_node.overseer_handle.ok_or(RelayChainError::GenericError(
293			"Overseer should be running in full node.".to_string(),
294		))?,
295	)))
296}
297
298/// Start a node with the given parachain `Configuration` and relay chain `Configuration`.
299///
300/// This is the actual implementation that is abstract over the executor and the runtime api.
301#[sc_tracing::logging::prefix_logs_with("Parachain")]
302pub async fn start_node_impl<RB, Net: NetworkBackend<Block, Hash>>(
303	parachain_config: Configuration,
304	collator_key: Option<CollatorPair>,
305	relay_chain_config: Configuration,
306	wrap_announce_block: Option<Box<dyn FnOnce(AnnounceBlockFn) -> AnnounceBlockFn>>,
307	fail_pov_recovery: bool,
308	rpc_ext_builder: RB,
309	collator_options: CollatorOptions,
310	proof_recording_during_import: bool,
311	use_slot_based_collator: bool,
312	collator_reserved_slots: usize,
313) -> sc_service::error::Result<(
314	TaskManager,
315	Arc<Client>,
316	Arc<dyn NetworkService>,
317	RpcHandlers,
318	TransactionPool,
319	Arc<Backend>,
320)>
321where
322	RB: Fn(Arc<Client>) -> Result<jsonrpsee::RpcModule<()>, sc_service::Error> + Send + 'static,
323{
324	let mut parachain_config = prepare_node_config(parachain_config);
325
326	let params = new_partial(&mut parachain_config, proof_recording_during_import)?;
327
328	let transaction_pool = params.transaction_pool.clone();
329	let mut task_manager = params.task_manager;
330
331	let client = params.client.clone();
332	let backend = params.backend.clone();
333
334	let (block_import, block_import_handle) = params.other;
335	let relay_chain_interface = build_relay_chain_interface(
336		relay_chain_config,
337		parachain_config.prometheus_registry(),
338		collator_key.clone(),
339		collator_options.clone(),
340		&mut task_manager,
341	)
342	.await
343	.map_err(|e| sc_service::Error::Application(Box::new(e) as Box<_>))?;
344
345	let import_queue_service = params.import_queue.service();
346	let prometheus_registry = parachain_config.prometheus_registry().cloned();
347	let net_config = FullNetworkConfiguration::<Block, Hash, Net>::new(
348		&parachain_config.network,
349		prometheus_registry.clone(),
350	);
351
352	let best_hash = client.chain_info().best_hash;
353	let para_id = client
354		.runtime_api()
355		.parachain_id(best_hash)
356		.map_err(|e| sc_service::Error::Application(Box::new(e) as Box<_>))?;
357	tracing::info!("Parachain id: {:?}", para_id);
358
359	let (network, system_rpc_tx, tx_handler_controller, sync_service) =
360		build_network(BuildNetworkParams {
361			parachain_config: &parachain_config,
362			net_config,
363			client: client.clone(),
364			transaction_pool: transaction_pool.clone(),
365			para_id,
366			spawn_handle: task_manager.spawn_handle(),
367			spawn_essential_handle: task_manager.spawn_essential_handle(),
368			relay_chain_interface: relay_chain_interface.clone(),
369			import_queue: params.import_queue,
370			metrics: Net::register_notification_metrics(
371				parachain_config.prometheus_config.as_ref().map(|config| &config.registry),
372			),
373			sybil_resistance_level: CollatorSybilResistance::Resistant,
374		})
375		.await?;
376
377	let keystore = params.keystore_container.keystore();
378
379	if collator_key.is_some() && collator_reserved_slots > 0 {
380		cumulus_client_collator_discovery::start_collator_discovery(
381			cumulus_client_collator_discovery::StartCollatorDiscoveryParams {
382				max_reserved: collator_reserved_slots,
383				client: client.clone(),
384				authority_discovery: client.clone(),
385				network: network.clone(),
386				sync_service: sync_service.clone(),
387				network_event_stream: network.event_stream("para-authority-discovery"),
388				keystore: keystore.clone(),
389				genesis_hash: client.chain_info().genesis_hash,
390				fork_id: parachain_config.chain_spec.fork_id().map(ToString::to_string),
391				publish_non_global_ips: parachain_config.network.allow_non_globals_in_dht,
392				public_addresses: parachain_config.network.public_addresses.clone(),
393				persisted_cache_directory: parachain_config.network.net_config_path.clone(),
394				prometheus_registry: prometheus_registry.clone(),
395				spawn_handle: task_manager.spawn_handle(),
396			},
397		)
398		.map_err(|e| sc_service::Error::Application(Box::new(e)))?;
399	}
400
401	let rpc_builder = {
402		let client = client.clone();
403		Box::new(move |_| rpc_ext_builder(client.clone()))
404	};
405
406	let rpc_handlers = sc_service::spawn_tasks(sc_service::SpawnTasksParams {
407		rpc_builder,
408		client: client.clone(),
409		transaction_pool: transaction_pool.clone(),
410		task_manager: &mut task_manager,
411		config: parachain_config,
412		keystore: keystore.clone(),
413		backend: backend.clone(),
414		network: network.clone(),
415		sync_service: sync_service.clone(),
416		system_rpc_tx,
417		tx_handler_controller,
418		telemetry: None,
419		tracing_execute_block: Some(Arc::new(ParachainTracingExecuteBlock::new(client.clone()))),
420	})?;
421
422	let announce_block = {
423		let sync_service = sync_service.clone();
424		Arc::new(move |hash, data| sync_service.announce_block(hash, data))
425	};
426
427	let announce_block = wrap_announce_block
428		.map(|w| (w)(announce_block.clone()))
429		.unwrap_or_else(|| announce_block);
430
431	let overseer_handle = relay_chain_interface
432		.overseer_handle()
433		.map_err(|e| sc_service::Error::Application(Box::new(e)))?;
434
435	let recovery_handle: Box<dyn RecoveryHandle> = if fail_pov_recovery {
436		Box::new(FailingRecoveryHandle::new(overseer_handle.clone()))
437	} else {
438		Box::new(overseer_handle.clone())
439	};
440	let relay_chain_slot_duration = Duration::from_secs(6);
441
442	start_relay_chain_tasks(StartRelayChainTasksParams {
443		client: client.clone(),
444		announce_block: announce_block.clone(),
445		para_id,
446		relay_chain_interface: relay_chain_interface.clone(),
447		task_manager: &mut task_manager,
448		// Increase speed of recovery for testing purposes.
449		da_recovery_profile: DARecoveryProfile::Other(RecoveryDelayRange {
450			min: Duration::from_secs(1),
451			max: Duration::from_secs(5),
452		}),
453		import_queue: import_queue_service,
454		relay_chain_slot_duration,
455		recovery_handle,
456		sync_service: sync_service.clone(),
457		prometheus_registry: None,
458	})?;
459
460	let collator_peer_id = network.local_peer_id();
461	if let Some(collator_key) = collator_key {
462		let proposer = sc_basic_authorship::ProposerFactory::new(
463			task_manager.spawn_handle(),
464			client.clone(),
465			transaction_pool.clone(),
466			prometheus_registry.as_ref(),
467			None,
468		);
469
470		let collator_service = CollatorService::new(
471			client.clone(),
472			Arc::new(task_manager.spawn_handle()),
473			announce_block,
474			client.clone(),
475		);
476
477		let client_for_aura = client.clone();
478
479		if use_slot_based_collator {
480			tracing::info!(target: LOG_TARGET, "Starting block authoring with slot based authoring.");
481			let params = SlotBasedParams {
482				create_inherent_data_providers: move |_, ()| async move { Ok(()) },
483				block_import,
484				para_client: client.clone(),
485				para_backend: backend.clone(),
486				relay_client: relay_chain_interface,
487				code_hash_provider: move |block_hash| {
488					client_for_aura.code_at(block_hash).ok().map(|c| ValidationCode::from(c).hash())
489				},
490				keystore,
491				collator_key,
492				relay_chain_slot_duration,
493				para_id,
494				proposer,
495				collator_service,
496				reinitialize: false,
497				slot_offset: Duration::from_secs(1),
498				block_import_handle,
499				spawner: task_manager.spawn_essential_handle(),
500				export_pov: None,
501				max_pov_percentage: None,
502				collator_peer_id,
503			};
504
505			slot_based::run::<Block, AuthorityPair, _, _, _, _, _, _, _, _, _>(params);
506		} else {
507			tracing::info!(target: LOG_TARGET, "Starting block authoring with lookahead collator.");
508			let params = AuraParams {
509				create_inherent_data_providers: move |_, ()| async move { Ok(()) },
510				block_import,
511				para_client: client.clone(),
512				para_backend: backend.clone(),
513				relay_client: relay_chain_interface,
514				code_hash_provider: move |block_hash| {
515					client_for_aura.code_at(block_hash).ok().map(|c| ValidationCode::from(c).hash())
516				},
517				keystore,
518				collator_key,
519				collator_peer_id,
520				para_id,
521				overseer_handle,
522				relay_chain_slot_duration,
523				proposer,
524				collator_service,
525				authoring_duration: Duration::from_millis(2000),
526				reinitialize: false,
527				max_pov_percentage: None,
528			};
529
530			let fut = aura::run::<Block, AuthorityPair, _, _, _, _, _, _, _, _>(params);
531			task_manager.spawn_essential_handle().spawn("aura", None, fut);
532		}
533	}
534
535	Ok((task_manager, client, network, rpc_handlers, transaction_pool, backend))
536}
537
538/// A Cumulus test node instance used for testing.
539pub struct TestNode {
540	/// TaskManager's instance.
541	pub task_manager: TaskManager,
542	/// Client's instance.
543	pub client: Arc<Client>,
544	/// Node's network.
545	pub network: Arc<dyn NetworkService>,
546	/// The `MultiaddrWithPeerId` to this node. This is useful if you want to pass it as "boot
547	/// node" to other nodes.
548	pub addr: MultiaddrWithPeerId,
549	/// RPCHandlers to make RPC queries.
550	pub rpc_handlers: RpcHandlers,
551	/// Node's transaction pool
552	pub transaction_pool: TransactionPool,
553	/// Node's backend
554	pub backend: Arc<Backend>,
555}
556
557/// A builder to create a [`TestNode`].
558pub struct TestNodeBuilder {
559	para_id: ParaId,
560	tokio_handle: tokio::runtime::Handle,
561	key: Sr25519Keyring,
562	collator_key: Option<CollatorPair>,
563	parachain_nodes: Vec<MultiaddrWithPeerId>,
564	parachain_nodes_exclusive: bool,
565	relay_chain_nodes: Vec<MultiaddrWithPeerId>,
566	wrap_announce_block: Option<Box<dyn FnOnce(AnnounceBlockFn) -> AnnounceBlockFn>>,
567	storage_update_func_parachain: Option<Box<dyn Fn()>>,
568	storage_update_func_relay_chain: Option<Box<dyn Fn()>>,
569	relay_chain_mode: RelayChainMode,
570	endowed_accounts: Vec<AccountId>,
571	record_proof_during_import: bool,
572}
573
574impl TestNodeBuilder {
575	/// Create a new instance of `Self`.
576	///
577	/// `para_id` - The parachain id this node is running for.
578	/// `tokio_handle` - The tokio handler to use.
579	/// `key` - The key that will be used to generate the name and that will be passed as
580	/// `dev_seed`.
581	pub fn new(para_id: ParaId, tokio_handle: tokio::runtime::Handle, key: Sr25519Keyring) -> Self {
582		TestNodeBuilder {
583			key,
584			para_id,
585			tokio_handle,
586			collator_key: None,
587			parachain_nodes: Vec::new(),
588			parachain_nodes_exclusive: false,
589			relay_chain_nodes: Vec::new(),
590			wrap_announce_block: None,
591			storage_update_func_parachain: None,
592			storage_update_func_relay_chain: None,
593			endowed_accounts: Default::default(),
594			relay_chain_mode: RelayChainMode::Embedded,
595			record_proof_during_import: true,
596		}
597	}
598
599	/// Enable collator for this node.
600	pub fn enable_collator(mut self) -> Self {
601		let collator_key = CollatorPair::generate().0;
602		self.collator_key = Some(collator_key);
603		self
604	}
605
606	/// Instruct the node to exclusively connect to registered parachain nodes.
607	///
608	/// Parachain nodes can be registered using [`Self::connect_to_parachain_node`] and
609	/// [`Self::connect_to_parachain_nodes`].
610	pub fn exclusively_connect_to_registered_parachain_nodes(mut self) -> Self {
611		self.parachain_nodes_exclusive = true;
612		self
613	}
614
615	/// Make the node connect to the given parachain node.
616	///
617	/// By default the node will not be connected to any node or will be able to discover any other
618	/// node.
619	pub fn connect_to_parachain_node(mut self, node: &TestNode) -> Self {
620		self.parachain_nodes.push(node.addr.clone());
621		self
622	}
623
624	/// Make the node connect to the given parachain nodes.
625	///
626	/// By default the node will not be connected to any node or will be able to discover any other
627	/// node.
628	pub fn connect_to_parachain_nodes<'a>(
629		mut self,
630		nodes: impl IntoIterator<Item = &'a TestNode>,
631	) -> Self {
632		self.parachain_nodes.extend(nodes.into_iter().map(|n| n.addr.clone()));
633		self
634	}
635
636	/// Make the node connect to the given relay chain node.
637	///
638	/// By default the node will not be connected to any node or will be able to discover any other
639	/// node.
640	pub fn connect_to_relay_chain_node(
641		mut self,
642		node: &polkadot_test_service::PolkadotTestNode,
643	) -> Self {
644		self.relay_chain_nodes.push(node.addr.clone());
645		self
646	}
647
648	/// Make the node connect to the given relay chain nodes.
649	///
650	/// By default the node will not be connected to any node or will be able to discover any other
651	/// node.
652	pub fn connect_to_relay_chain_nodes<'a>(
653		mut self,
654		nodes: impl IntoIterator<Item = &'a polkadot_test_service::PolkadotTestNode>,
655	) -> Self {
656		self.relay_chain_nodes.extend(nodes.into_iter().map(|n| n.addr.clone()));
657		self
658	}
659
660	/// Wrap the announce block function of this node.
661	pub fn wrap_announce_block(
662		mut self,
663		wrap: impl FnOnce(AnnounceBlockFn) -> AnnounceBlockFn + 'static,
664	) -> Self {
665		self.wrap_announce_block = Some(Box::new(wrap));
666		self
667	}
668
669	/// Allows accessing the parachain storage before the test node is built.
670	pub fn update_storage_parachain(mut self, updater: impl Fn() + 'static) -> Self {
671		self.storage_update_func_parachain = Some(Box::new(updater));
672		self
673	}
674
675	/// Allows accessing the relay chain storage before the test node is built.
676	pub fn update_storage_relay_chain(mut self, updater: impl Fn() + 'static) -> Self {
677		self.storage_update_func_relay_chain = Some(Box::new(updater));
678		self
679	}
680
681	/// Connect to full node via RPC.
682	pub fn use_external_relay_chain_node_at_url(mut self, network_address: Url) -> Self {
683		self.relay_chain_mode = RelayChainMode::ExternalRpc(vec![network_address]);
684		self
685	}
686
687	/// Connect to full node via RPC.
688	pub fn use_external_relay_chain_node_at_port(mut self, port: u16) -> Self {
689		let mut localhost_url =
690			Url::parse("ws://localhost").expect("Should be able to parse localhost Url");
691		localhost_url.set_port(Some(port)).expect("Should be able to set port");
692		self.relay_chain_mode = RelayChainMode::ExternalRpc(vec![localhost_url]);
693		self
694	}
695
696	/// Accounts which will have an initial balance.
697	pub fn endowed_accounts(mut self, accounts: Vec<AccountId>) -> TestNodeBuilder {
698		self.endowed_accounts = accounts;
699		self
700	}
701
702	/// Record proofs during import.
703	pub fn import_proof_recording(mut self, should_record_proof: bool) -> TestNodeBuilder {
704		self.record_proof_during_import = should_record_proof;
705		self
706	}
707
708	/// Build the [`TestNode`].
709	pub async fn build(self) -> TestNode {
710		let parachain_config = node_config(
711			self.storage_update_func_parachain.unwrap_or_else(|| Box::new(|| ())),
712			self.tokio_handle.clone(),
713			self.key,
714			self.parachain_nodes,
715			self.parachain_nodes_exclusive,
716			self.para_id,
717			self.collator_key.is_some(),
718			self.endowed_accounts,
719		)
720		.expect("could not generate Configuration");
721
722		let mut relay_chain_config = polkadot_test_service::node_config(
723			self.storage_update_func_relay_chain.unwrap_or_else(|| Box::new(|| ())),
724			self.tokio_handle,
725			self.key,
726			self.relay_chain_nodes,
727			false,
728		);
729
730		let collator_options = CollatorOptions {
731			relay_chain_mode: self.relay_chain_mode,
732			embedded_dht_bootnode: true,
733			dht_bootnode_discovery: true,
734		};
735
736		relay_chain_config.network.node_name =
737			format!("{} (relay chain)", relay_chain_config.network.node_name);
738
739		let (task_manager, client, network, rpc_handlers, transaction_pool, backend) =
740			match relay_chain_config.network.network_backend {
741				sc_network::config::NetworkBackendType::Libp2p => {
742					start_node_impl::<_, sc_network::NetworkWorker<_, _>>(
743						parachain_config,
744						self.collator_key,
745						relay_chain_config,
746						self.wrap_announce_block,
747						false,
748						|_| Ok(jsonrpsee::RpcModule::new(())),
749						collator_options,
750						self.record_proof_during_import,
751						false,
752						0,
753					)
754					.await
755					.expect("could not create Cumulus test service")
756				},
757				sc_network::config::NetworkBackendType::Litep2p => {
758					start_node_impl::<_, sc_network::Litep2pNetworkBackend>(
759						parachain_config,
760						self.collator_key,
761						relay_chain_config,
762						self.wrap_announce_block,
763						false,
764						|_| Ok(jsonrpsee::RpcModule::new(())),
765						collator_options,
766						self.record_proof_during_import,
767						false,
768						0,
769					)
770					.await
771					.expect("could not create Cumulus test service")
772				},
773			};
774		let peer_id = network.local_peer_id();
775		let multiaddr = polkadot_test_service::get_listen_address(network.clone()).await;
776		let addr = MultiaddrWithPeerId { multiaddr, peer_id };
777
778		TestNode { task_manager, client, network, addr, rpc_handlers, transaction_pool, backend }
779	}
780}
781
782/// Create a Cumulus `Configuration`.
783///
784/// By default a TCP socket will be used, therefore you need to provide nodes if you want the
785/// node to be connected to other nodes.
786///
787/// If `nodes_exclusive` is `true`, the node will only connect to the given `nodes` and not to any
788/// other node.
789///
790/// The `storage_update_func` can be used to make adjustments to the runtime genesis.
791pub fn node_config(
792	storage_update_func: impl Fn(),
793	tokio_handle: tokio::runtime::Handle,
794	key: Sr25519Keyring,
795	nodes: Vec<MultiaddrWithPeerId>,
796	nodes_exclusive: bool,
797	para_id: ParaId,
798	is_collator: bool,
799	endowed_accounts: Vec<AccountId>,
800) -> Result<Configuration, ServiceError> {
801	let base_path = BasePath::new_temp_dir()?;
802	let root = base_path.path().join(format!("cumulus_test_service_{}", key));
803	let role = if is_collator { Role::Authority } else { Role::Full };
804	let key_seed = key.to_seed();
805	let mut spec = Box::new(chain_spec::get_chain_spec_with_extra_endowed(
806		Some(para_id),
807		endowed_accounts,
808		cumulus_test_runtime::WASM_BINARY.expect("WASM binary was not built, please build it!"),
809	));
810
811	let mut storage = spec.as_storage_builder().build_storage().expect("could not build storage");
812
813	BasicExternalities::execute_with_storage(&mut storage, storage_update_func);
814	spec.set_storage(storage);
815
816	let mut network_config = NetworkConfiguration::new(
817		format!("{} (parachain)", key_seed),
818		"network/test/0.1",
819		Default::default(),
820		None,
821	);
822
823	if nodes_exclusive {
824		network_config.default_peers_set.reserved_nodes = nodes;
825		network_config.default_peers_set.non_reserved_mode =
826			sc_network::config::NonReservedPeerMode::Deny;
827	} else {
828		network_config.boot_nodes = nodes;
829	}
830
831	network_config.allow_non_globals_in_dht = true;
832
833	let addr: multiaddr::Multiaddr = "/ip4/127.0.0.1/tcp/0".parse().expect("valid address; qed");
834	network_config.listen_addresses.push(addr.clone());
835	network_config.transport =
836		TransportConfig::Normal { enable_mdns: false, allow_private_ip: true };
837
838	Ok(Configuration {
839		impl_name: "cumulus-test-node".to_string(),
840		impl_version: "0.1".to_string(),
841		role,
842		tokio_handle,
843		transaction_pool: Default::default(),
844		network: network_config,
845		keystore: KeystoreConfig::InMemory,
846		database: DatabaseSource::RocksDb { path: root.join("db"), cache_size: 128 },
847		trie_cache_maximum_size: Some(64 * 1024 * 1024),
848		warm_up_trie_cache: None,
849		state_pruning: Some(PruningMode::ArchiveAll),
850		blocks_pruning: BlocksPruning::KeepAll,
851		chain_spec: spec,
852		executor: ExecutorConfiguration {
853			wasm_method: WasmExecutionMethod::Compiled {
854				instantiation_strategy:
855					sc_executor_wasmtime::InstantiationStrategy::PoolingCopyOnWrite,
856			},
857			..ExecutorConfiguration::default()
858		},
859		rpc: RpcConfiguration {
860			addr: None,
861			max_connections: Default::default(),
862			cors: None,
863			methods: Default::default(),
864			max_request_size: Default::default(),
865			max_response_size: Default::default(),
866			id_provider: None,
867			max_subs_per_conn: Default::default(),
868			port: 9945,
869			message_buffer_capacity: Default::default(),
870			batch_config: RpcBatchRequestConfig::Unlimited,
871			rate_limit: None,
872			rate_limit_whitelisted_ips: Default::default(),
873			rate_limit_trust_proxy_headers: Default::default(),
874			request_logger_limit: 1024,
875		},
876		prometheus_config: None,
877		telemetry_endpoints: None,
878		offchain_worker: OffchainWorkerConfig { enabled: true, indexing_enabled: false },
879		force_authoring: false,
880		disable_grandpa: false,
881		dev_key_seed: Some(key_seed),
882		tracing_targets: None,
883		tracing_receiver: Default::default(),
884		announce_block: true,
885		data_path: root,
886		base_path,
887		wasm_runtime_overrides: None,
888	})
889}
890
891impl TestNode {
892	/// Wait for `count` blocks to be imported in the node and then exit. This function will not
893	/// return if no blocks are ever created, thus you should restrict the maximum amount of time of
894	/// the test execution.
895	pub fn wait_for_blocks(&self, count: usize) -> impl Future<Output = ()> {
896		self.client.wait_for_blocks(count)
897	}
898
899	/// Send an extrinsic to this node.
900	pub async fn send_extrinsic(
901		&self,
902		function: impl Into<runtime::RuntimeCall>,
903		caller: Sr25519Keyring,
904	) -> Result<RpcTransactionOutput, RpcTransactionError> {
905		let extrinsic = construct_extrinsic(&self.client, function, caller.pair(), Some(0));
906
907		self.rpc_handlers.send_transaction(extrinsic.into()).await
908	}
909
910	/// Register a parachain at this relay chain.
911	pub async fn schedule_upgrade(&self, validation: Vec<u8>) -> Result<(), RpcTransactionError> {
912		let call = frame_system::Call::set_code { code: validation };
913
914		self.send_extrinsic(
915			runtime::SudoCall::sudo_unchecked_weight {
916				call: Box::new(call.into()),
917				weight: Weight::from_parts(1_000, 0),
918			},
919			Sr25519Keyring::Alice,
920		)
921		.await
922		.map(drop)
923	}
924}
925
926/// Fetch account nonce for key pair
927pub fn fetch_nonce(client: &Client, account: sp_core::sr25519::Public) -> u32 {
928	let best_hash = client.chain_info().best_hash;
929	client
930		.runtime_api()
931		.account_nonce(best_hash, account.into())
932		.expect("Fetching account nonce works; qed")
933}
934
935/// Construct an extrinsic that can be applied to the test runtime.
936pub fn construct_extrinsic(
937	client: &Client,
938	function: impl Into<runtime::RuntimeCall>,
939	caller: sp_core::sr25519::Pair,
940	nonce: Option<u32>,
941) -> runtime::UncheckedExtrinsic {
942	let function = function.into();
943	let current_block_hash = client.info().best_hash;
944	let current_block = client.info().best_number.saturated_into();
945	let genesis_block = client.hash(0).unwrap().unwrap();
946	let nonce = nonce.unwrap_or_else(|| fetch_nonce(client, caller.public()));
947	let period = runtime::BlockHashCount::get()
948		.checked_next_power_of_two()
949		.map(|c| c / 2)
950		.unwrap_or(2) as u64;
951	let tip = 0;
952	let tx_ext: runtime::TxExtension = cumulus_pallet_weight_reclaim::StorageWeightReclaim::from((
953		frame_system::AuthorizeCall::<runtime::Runtime>::new(),
954		frame_system::CheckNonZeroSender::<runtime::Runtime>::new(),
955		frame_system::CheckSpecVersion::<runtime::Runtime>::new(),
956		frame_system::CheckGenesis::<runtime::Runtime>::new(),
957		frame_system::CheckEra::<runtime::Runtime>::from(generic::Era::mortal(
958			period,
959			current_block,
960		)),
961		frame_system::CheckNonce::<runtime::Runtime>::from(nonce),
962		frame_system::CheckWeight::<runtime::Runtime>::new(),
963		pallet_transaction_payment::ChargeTransactionPayment::<runtime::Runtime>::from(tip),
964		runtime::TestTransactionExtension::<runtime::Runtime>::default(),
965	))
966	.into();
967	let raw_payload = runtime::SignedPayload::from_raw(
968		function.clone(),
969		tx_ext.clone(),
970		((), (), runtime::VERSION.spec_version, genesis_block, current_block_hash, (), (), (), ()),
971	);
972	let signature = raw_payload.using_encoded(|e| caller.sign(e));
973	runtime::UncheckedExtrinsic::new_signed(
974		function,
975		MultiAddress::Id(caller.public().into()),
976		runtime::Signature::Sr25519(signature),
977		tx_ext,
978	)
979}
980
981/// Run a relay-chain validator node.
982///
983/// This is essentially a wrapper around
984/// [`run_validator_node`](polkadot_test_service::run_validator_node).
985pub fn run_relay_chain_validator_node(
986	tokio_handle: tokio::runtime::Handle,
987	key: Sr25519Keyring,
988	storage_update_func: impl Fn(),
989	boot_nodes: Vec<MultiaddrWithPeerId>,
990	port: Option<u16>,
991) -> polkadot_test_service::PolkadotTestNode {
992	let mut config = polkadot_test_service::node_config(
993		storage_update_func,
994		tokio_handle.clone(),
995		key,
996		boot_nodes,
997		true,
998	);
999
1000	if let Some(port) = port {
1001		config.rpc.addr = Some(vec![RpcEndpoint {
1002			batch_config: config.rpc.batch_config,
1003			cors: config.rpc.cors.clone(),
1004			listen_addr: SocketAddr::V4(SocketAddrV4::new(Ipv4Addr::LOCALHOST, port)),
1005			max_connections: config.rpc.max_connections,
1006			max_payload_in_mb: config.rpc.max_request_size,
1007			max_payload_out_mb: config.rpc.max_response_size,
1008			max_subscriptions_per_connection: config.rpc.max_subs_per_conn,
1009			max_buffer_capacity_per_connection: config.rpc.message_buffer_capacity,
1010			rpc_methods: config.rpc.methods,
1011			rate_limit: config.rpc.rate_limit,
1012			rate_limit_trust_proxy_headers: config.rpc.rate_limit_trust_proxy_headers,
1013			rate_limit_whitelisted_ips: config.rpc.rate_limit_whitelisted_ips.clone(),
1014			retry_random_port: true,
1015			is_optional: false,
1016		}]);
1017	}
1018
1019	let mut workers_path = std::env::current_exe().unwrap();
1020	workers_path.pop();
1021	workers_path.pop();
1022
1023	tokio_handle.block_on(async move {
1024		polkadot_test_service::run_validator_node(config, Some(workers_path)).await
1025	})
1026}