referrerpolicy=no-referrer-when-downgrade

cumulus_test_service/
lib.rs

1// Copyright (C) Parity Technologies (UK) Ltd.
2// This file is part of Cumulus.
3
4// Cumulus is free software: you can redistribute it and/or modify
5// it under the terms of the GNU General Public License as published by
6// the Free Software Foundation, either version 3 of the License, or
7// (at your option) any later version.
8
9// Cumulus is distributed in the hope that it will be useful,
10// but WITHOUT ANY WARRANTY; without even the implied warranty of
11// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
12// GNU General Public License for more details.
13
14// You should have received a copy of the GNU General Public License
15// along with Cumulus.  If not, see <http://www.gnu.org/licenses/>.
16
17//! Crate used for testing with Cumulus.
18
19#![warn(missing_docs)]
20
21/// Utilities used for benchmarking
22pub mod bench_utils;
23
24pub mod chain_spec;
25
26use cumulus_client_collator::service::CollatorService;
27use cumulus_client_consensus_aura::{
28	collators::{
29		lookahead::{self as aura, Params as AuraParams},
30		slot_based::{
31			self as slot_based, Params as SlotBasedParams, SlotBasedBlockImport,
32			SlotBasedBlockImportHandle,
33		},
34	},
35	ImportQueueParams,
36};
37use prometheus::Registry;
38use runtime::AccountId;
39use sc_executor::{HeapAllocStrategy, WasmExecutor, DEFAULT_HEAP_ALLOC_STRATEGY};
40use sp_consensus_aura::sr25519::AuthorityPair;
41use std::{
42	collections::HashSet,
43	future::Future,
44	net::{Ipv4Addr, SocketAddr, SocketAddrV4},
45	time::Duration,
46};
47use url::Url;
48
49use crate::runtime::Weight;
50use cumulus_client_cli::{CollatorOptions, RelayChainMode};
51use cumulus_client_consensus_common::ParachainBlockImport as TParachainBlockImport;
52use cumulus_client_pov_recovery::{RecoveryDelayRange, RecoveryHandle};
53use cumulus_client_service::{
54	build_network, prepare_node_config, start_relay_chain_tasks, BuildNetworkParams,
55	CollatorSybilResistance, DARecoveryProfile, StartRelayChainTasksParams,
56};
57use cumulus_primitives_core::{relay_chain::ValidationCode, GetParachainInfo, ParaId};
58use cumulus_relay_chain_inprocess_interface::RelayChainInProcessInterface;
59use cumulus_relay_chain_interface::{RelayChainError, RelayChainInterface, RelayChainResult};
60use cumulus_relay_chain_minimal_node::build_minimal_relay_chain_node_with_rpc;
61
62use cumulus_test_runtime::{Hash, NodeBlock as Block, RuntimeApi};
63
64use frame_system_rpc_runtime_api::AccountNonceApi;
65use polkadot_node_subsystem::{errors::RecoveryError, messages::AvailabilityRecoveryMessage};
66use polkadot_overseer::Handle as OverseerHandle;
67use polkadot_primitives::{CandidateHash, CollatorPair};
68use polkadot_service::ProvideRuntimeApi;
69use sc_consensus::ImportQueue;
70use sc_network::{
71	config::{FullNetworkConfiguration, TransportConfig},
72	multiaddr,
73	service::traits::NetworkService,
74	NetworkBackend, NetworkBlock, NetworkStateInfo,
75};
76use sc_service::{
77	config::{
78		BlocksPruning, DatabaseSource, ExecutorConfiguration, KeystoreConfig, MultiaddrWithPeerId,
79		NetworkConfiguration, OffchainWorkerConfig, PruningMode, RpcBatchRequestConfig,
80		RpcConfiguration, RpcEndpoint, WasmExecutionMethod,
81	},
82	BasePath, ChainSpec as ChainSpecService, Configuration, Error as ServiceError,
83	PartialComponents, Role, RpcHandlers, TFullBackend, TFullClient, TaskManager,
84};
85use sp_arithmetic::traits::SaturatedConversion;
86use sp_blockchain::HeaderBackend;
87use sp_core::Pair;
88use sp_keyring::Sr25519Keyring;
89use sp_runtime::{codec::Encode, generic, MultiAddress};
90use sp_state_machine::BasicExternalities;
91use std::sync::Arc;
92use substrate_test_client::{
93	BlockchainEventsExt, RpcHandlersExt, RpcTransactionError, RpcTransactionOutput,
94};
95
96pub use chain_spec::*;
97pub use cumulus_test_runtime as runtime;
98pub use sp_keyring::Sr25519Keyring as Keyring;
99
100const LOG_TARGET: &str = "cumulus-test-service";
101
102/// The signature of the announce block fn.
103pub type AnnounceBlockFn = Arc<dyn Fn(Hash, Option<Vec<u8>>) + Send + Sync>;
104
105type HostFunctions =
106	(sp_io::SubstrateHostFunctions, cumulus_client_service::storage_proof_size::HostFunctions);
107/// The client type being used by the test service.
108pub type Client = TFullClient<runtime::NodeBlock, runtime::RuntimeApi, WasmExecutor<HostFunctions>>;
109
110/// The backend type being used by the test service.
111pub type Backend = TFullBackend<Block>;
112
113/// The block-import type being used by the test service.
114pub type ParachainBlockImport =
115	TParachainBlockImport<Block, SlotBasedBlockImport<Block, Arc<Client>, Client>, Backend>;
116
117/// Transaction pool type used by the test service
118pub type TransactionPool = Arc<sc_transaction_pool::TransactionPoolHandle<Block, Client>>;
119
120/// Recovery handle that fails regularly to simulate unavailable povs.
121pub struct FailingRecoveryHandle {
122	overseer_handle: OverseerHandle,
123	counter: u32,
124	failed_hashes: HashSet<CandidateHash>,
125}
126
127impl FailingRecoveryHandle {
128	/// Create a new FailingRecoveryHandle
129	pub fn new(overseer_handle: OverseerHandle) -> Self {
130		Self { overseer_handle, counter: 0, failed_hashes: Default::default() }
131	}
132}
133
134#[async_trait::async_trait]
135impl RecoveryHandle for FailingRecoveryHandle {
136	async fn send_recovery_msg(
137		&mut self,
138		message: AvailabilityRecoveryMessage,
139		origin: &'static str,
140	) {
141		let AvailabilityRecoveryMessage::RecoverAvailableData(ref receipt, _, _, _, _) = message;
142		let candidate_hash = receipt.hash();
143
144		// For every 3rd block we immediately signal unavailability to trigger
145		// a retry. The same candidate is never failed multiple times to ensure progress.
146		if self.counter % 3 == 0 && self.failed_hashes.insert(candidate_hash) {
147			tracing::info!(target: LOG_TARGET, ?candidate_hash, "Failing pov recovery.");
148
149			let AvailabilityRecoveryMessage::RecoverAvailableData(_, _, _, _, back_sender) =
150				message;
151			back_sender
152				.send(Err(RecoveryError::Unavailable))
153				.expect("Return channel should work here.");
154		} else {
155			self.overseer_handle.send_msg(message, origin).await;
156		}
157		self.counter += 1;
158	}
159}
160
161/// Assembly of PartialComponents (enough to run chain ops subcommands)
162pub type Service = PartialComponents<
163	Client,
164	Backend,
165	(),
166	sc_consensus::import_queue::BasicQueue<Block>,
167	sc_transaction_pool::TransactionPoolHandle<Block, Client>,
168	(ParachainBlockImport, SlotBasedBlockImportHandle<Block>),
169>;
170
171/// Starts a `ServiceBuilder` for a full service.
172///
173/// Use this macro if you don't actually need the full service, but just the builder in order to
174/// be able to perform chain operations.
175pub fn new_partial(
176	config: &mut Configuration,
177	enable_import_proof_record: bool,
178) -> Result<Service, sc_service::Error> {
179	let heap_pages = config
180		.executor
181		.default_heap_pages
182		.map_or(DEFAULT_HEAP_ALLOC_STRATEGY, |h| HeapAllocStrategy::Static { extra_pages: h as _ });
183
184	let executor = WasmExecutor::builder()
185		.with_execution_method(config.executor.wasm_method)
186		.with_onchain_heap_alloc_strategy(heap_pages)
187		.with_offchain_heap_alloc_strategy(heap_pages)
188		.with_max_runtime_instances(config.executor.max_runtime_instances)
189		.with_runtime_cache_size(config.executor.runtime_cache_size)
190		.build();
191
192	let (client, backend, keystore_container, task_manager) =
193		sc_service::new_full_parts_record_import::<Block, RuntimeApi, _>(
194			config,
195			None,
196			executor,
197			enable_import_proof_record,
198		)?;
199	let client = Arc::new(client);
200
201	let (block_import, slot_based_handle) =
202		SlotBasedBlockImport::new(client.clone(), client.clone());
203	let block_import = ParachainBlockImport::new(block_import, backend.clone());
204
205	let transaction_pool = Arc::from(
206		sc_transaction_pool::Builder::new(
207			task_manager.spawn_essential_handle(),
208			client.clone(),
209			config.role.is_authority().into(),
210		)
211		.with_options(config.transaction_pool.clone())
212		.with_prometheus(config.prometheus_registry())
213		.build(),
214	);
215
216	let slot_duration = sc_consensus_aura::slot_duration(&*client)?;
217	let import_queue = cumulus_client_consensus_aura::import_queue::<AuthorityPair, _, _, _, _, _>(
218		ImportQueueParams {
219			block_import: block_import.clone(),
220			client: client.clone(),
221			create_inherent_data_providers: move |_, ()| async move {
222				let timestamp = sp_timestamp::InherentDataProvider::from_system_time();
223
224				let slot =
225					sp_consensus_aura::inherents::InherentDataProvider::from_timestamp_and_slot_duration(
226						*timestamp,
227						slot_duration,
228					);
229
230				Ok((slot, timestamp))
231			},
232			spawner: &task_manager.spawn_essential_handle(),
233			registry: None,
234			telemetry: None,
235		},
236	)?;
237
238	let params = PartialComponents {
239		backend,
240		client,
241		import_queue,
242		keystore_container,
243		task_manager,
244		transaction_pool,
245		select_chain: (),
246		other: (block_import, slot_based_handle),
247	};
248
249	Ok(params)
250}
251
252async fn build_relay_chain_interface(
253	relay_chain_config: Configuration,
254	parachain_prometheus_registry: Option<&Registry>,
255	collator_key: Option<CollatorPair>,
256	collator_options: CollatorOptions,
257	task_manager: &mut TaskManager,
258) -> RelayChainResult<Arc<dyn RelayChainInterface + 'static>> {
259	let relay_chain_node = match collator_options.relay_chain_mode {
260		cumulus_client_cli::RelayChainMode::Embedded => polkadot_test_service::new_full(
261			relay_chain_config,
262			if let Some(ref key) = collator_key {
263				polkadot_service::IsParachainNode::Collator(key.clone())
264			} else {
265				polkadot_service::IsParachainNode::Collator(CollatorPair::generate().0)
266			},
267			None,
268			polkadot_service::CollatorOverseerGen,
269			Some("Relaychain"),
270		)
271		.map_err(|e| RelayChainError::Application(Box::new(e) as Box<_>))?,
272		cumulus_client_cli::RelayChainMode::ExternalRpc(rpc_target_urls) =>
273			return build_minimal_relay_chain_node_with_rpc(
274				relay_chain_config,
275				parachain_prometheus_registry,
276				task_manager,
277				rpc_target_urls,
278			)
279			.await
280			.map(|r| r.0),
281	};
282
283	task_manager.add_child(relay_chain_node.task_manager);
284	tracing::info!("Using inprocess node.");
285	Ok(Arc::new(RelayChainInProcessInterface::new(
286		relay_chain_node.client.clone(),
287		relay_chain_node.backend.clone(),
288		relay_chain_node.sync_service.clone(),
289		relay_chain_node.overseer_handle.ok_or(RelayChainError::GenericError(
290			"Overseer should be running in full node.".to_string(),
291		))?,
292	)))
293}
294
295/// Start a node with the given parachain `Configuration` and relay chain `Configuration`.
296///
297/// This is the actual implementation that is abstract over the executor and the runtime api.
298#[sc_tracing::logging::prefix_logs_with("Parachain")]
299pub async fn start_node_impl<RB, Net: NetworkBackend<Block, Hash>>(
300	parachain_config: Configuration,
301	collator_key: Option<CollatorPair>,
302	relay_chain_config: Configuration,
303	wrap_announce_block: Option<Box<dyn FnOnce(AnnounceBlockFn) -> AnnounceBlockFn>>,
304	fail_pov_recovery: bool,
305	rpc_ext_builder: RB,
306	collator_options: CollatorOptions,
307	proof_recording_during_import: bool,
308	use_slot_based_collator: bool,
309) -> sc_service::error::Result<(
310	TaskManager,
311	Arc<Client>,
312	Arc<dyn NetworkService>,
313	RpcHandlers,
314	TransactionPool,
315	Arc<Backend>,
316)>
317where
318	RB: Fn(Arc<Client>) -> Result<jsonrpsee::RpcModule<()>, sc_service::Error> + Send + 'static,
319{
320	let mut parachain_config = prepare_node_config(parachain_config);
321
322	let params = new_partial(&mut parachain_config, proof_recording_during_import)?;
323
324	let transaction_pool = params.transaction_pool.clone();
325	let mut task_manager = params.task_manager;
326
327	let client = params.client.clone();
328	let backend = params.backend.clone();
329
330	let block_import = params.other.0;
331	let slot_based_handle = params.other.1;
332	let relay_chain_interface = build_relay_chain_interface(
333		relay_chain_config,
334		parachain_config.prometheus_registry(),
335		collator_key.clone(),
336		collator_options.clone(),
337		&mut task_manager,
338	)
339	.await
340	.map_err(|e| sc_service::Error::Application(Box::new(e) as Box<_>))?;
341
342	let import_queue_service = params.import_queue.service();
343	let prometheus_registry = parachain_config.prometheus_registry().cloned();
344	let net_config = FullNetworkConfiguration::<Block, Hash, Net>::new(
345		&parachain_config.network,
346		prometheus_registry.clone(),
347	);
348
349	let best_hash = client.chain_info().best_hash;
350	let para_id = client
351		.runtime_api()
352		.parachain_id(best_hash)
353		.map_err(|e| sc_service::Error::Application(Box::new(e) as Box<_>))?;
354	tracing::info!("Parachain id: {:?}", para_id);
355
356	let (network, system_rpc_tx, tx_handler_controller, sync_service) =
357		build_network(BuildNetworkParams {
358			parachain_config: &parachain_config,
359			net_config,
360			client: client.clone(),
361			transaction_pool: transaction_pool.clone(),
362			para_id,
363			spawn_handle: task_manager.spawn_handle(),
364			relay_chain_interface: relay_chain_interface.clone(),
365			import_queue: params.import_queue,
366			metrics: Net::register_notification_metrics(
367				parachain_config.prometheus_config.as_ref().map(|config| &config.registry),
368			),
369			sybil_resistance_level: CollatorSybilResistance::Resistant,
370		})
371		.await?;
372
373	let keystore = params.keystore_container.keystore();
374	let rpc_builder = {
375		let client = client.clone();
376		Box::new(move |_| rpc_ext_builder(client.clone()))
377	};
378
379	let rpc_handlers = sc_service::spawn_tasks(sc_service::SpawnTasksParams {
380		rpc_builder,
381		client: client.clone(),
382		transaction_pool: transaction_pool.clone(),
383		task_manager: &mut task_manager,
384		config: parachain_config,
385		keystore: keystore.clone(),
386		backend: backend.clone(),
387		network: network.clone(),
388		sync_service: sync_service.clone(),
389		system_rpc_tx,
390		tx_handler_controller,
391		telemetry: None,
392	})?;
393
394	let announce_block = {
395		let sync_service = sync_service.clone();
396		Arc::new(move |hash, data| sync_service.announce_block(hash, data))
397	};
398
399	let announce_block = wrap_announce_block
400		.map(|w| (w)(announce_block.clone()))
401		.unwrap_or_else(|| announce_block);
402
403	let overseer_handle = relay_chain_interface
404		.overseer_handle()
405		.map_err(|e| sc_service::Error::Application(Box::new(e)))?;
406
407	let recovery_handle: Box<dyn RecoveryHandle> = if fail_pov_recovery {
408		Box::new(FailingRecoveryHandle::new(overseer_handle.clone()))
409	} else {
410		Box::new(overseer_handle.clone())
411	};
412	let relay_chain_slot_duration = Duration::from_secs(6);
413
414	start_relay_chain_tasks(StartRelayChainTasksParams {
415		client: client.clone(),
416		announce_block: announce_block.clone(),
417		para_id,
418		relay_chain_interface: relay_chain_interface.clone(),
419		task_manager: &mut task_manager,
420		// Increase speed of recovery for testing purposes.
421		da_recovery_profile: DARecoveryProfile::Other(RecoveryDelayRange {
422			min: Duration::from_secs(1),
423			max: Duration::from_secs(5),
424		}),
425		import_queue: import_queue_service,
426		relay_chain_slot_duration,
427		recovery_handle,
428		sync_service: sync_service.clone(),
429		prometheus_registry: None,
430	})?;
431
432	if let Some(collator_key) = collator_key {
433		let proposer = sc_basic_authorship::ProposerFactory::with_proof_recording(
434			task_manager.spawn_handle(),
435			client.clone(),
436			transaction_pool.clone(),
437			prometheus_registry.as_ref(),
438			None,
439		);
440
441		let collator_service = CollatorService::new(
442			client.clone(),
443			Arc::new(task_manager.spawn_handle()),
444			announce_block,
445			client.clone(),
446		);
447
448		let client_for_aura = client.clone();
449
450		if use_slot_based_collator {
451			tracing::info!(target: LOG_TARGET, "Starting block authoring with slot based authoring.");
452			let params = SlotBasedParams {
453				create_inherent_data_providers: move |_, ()| async move { Ok(()) },
454				block_import,
455				para_client: client.clone(),
456				para_backend: backend.clone(),
457				relay_client: relay_chain_interface,
458				code_hash_provider: move |block_hash| {
459					client_for_aura.code_at(block_hash).ok().map(|c| ValidationCode::from(c).hash())
460				},
461				keystore,
462				collator_key,
463				relay_chain_slot_duration,
464				para_id,
465				proposer,
466				collator_service,
467				authoring_duration: Duration::from_millis(2000),
468				reinitialize: false,
469				slot_offset: Duration::from_secs(1),
470				block_import_handle: slot_based_handle,
471				spawner: task_manager.spawn_handle(),
472				export_pov: None,
473				max_pov_percentage: None,
474			};
475
476			slot_based::run::<Block, AuthorityPair, _, _, _, _, _, _, _, _, _>(params);
477		} else {
478			tracing::info!(target: LOG_TARGET, "Starting block authoring with lookahead collator.");
479			let params = AuraParams {
480				create_inherent_data_providers: move |_, ()| async move { Ok(()) },
481				block_import,
482				para_client: client.clone(),
483				para_backend: backend.clone(),
484				relay_client: relay_chain_interface,
485				code_hash_provider: move |block_hash| {
486					client_for_aura.code_at(block_hash).ok().map(|c| ValidationCode::from(c).hash())
487				},
488				keystore,
489				collator_key,
490				para_id,
491				overseer_handle,
492				relay_chain_slot_duration,
493				proposer,
494				collator_service,
495				authoring_duration: Duration::from_millis(2000),
496				reinitialize: false,
497				max_pov_percentage: None,
498			};
499
500			let fut = aura::run::<Block, AuthorityPair, _, _, _, _, _, _, _, _>(params);
501			task_manager.spawn_essential_handle().spawn("aura", None, fut);
502		}
503	}
504
505	Ok((task_manager, client, network, rpc_handlers, transaction_pool, backend))
506}
507
508/// A Cumulus test node instance used for testing.
509pub struct TestNode {
510	/// TaskManager's instance.
511	pub task_manager: TaskManager,
512	/// Client's instance.
513	pub client: Arc<Client>,
514	/// Node's network.
515	pub network: Arc<dyn NetworkService>,
516	/// The `MultiaddrWithPeerId` to this node. This is useful if you want to pass it as "boot
517	/// node" to other nodes.
518	pub addr: MultiaddrWithPeerId,
519	/// RPCHandlers to make RPC queries.
520	pub rpc_handlers: RpcHandlers,
521	/// Node's transaction pool
522	pub transaction_pool: TransactionPool,
523	/// Node's backend
524	pub backend: Arc<Backend>,
525}
526
527/// A builder to create a [`TestNode`].
528pub struct TestNodeBuilder {
529	para_id: ParaId,
530	tokio_handle: tokio::runtime::Handle,
531	key: Sr25519Keyring,
532	collator_key: Option<CollatorPair>,
533	parachain_nodes: Vec<MultiaddrWithPeerId>,
534	parachain_nodes_exclusive: bool,
535	relay_chain_nodes: Vec<MultiaddrWithPeerId>,
536	wrap_announce_block: Option<Box<dyn FnOnce(AnnounceBlockFn) -> AnnounceBlockFn>>,
537	storage_update_func_parachain: Option<Box<dyn Fn()>>,
538	storage_update_func_relay_chain: Option<Box<dyn Fn()>>,
539	relay_chain_mode: RelayChainMode,
540	endowed_accounts: Vec<AccountId>,
541	record_proof_during_import: bool,
542}
543
544impl TestNodeBuilder {
545	/// Create a new instance of `Self`.
546	///
547	/// `para_id` - The parachain id this node is running for.
548	/// `tokio_handle` - The tokio handler to use.
549	/// `key` - The key that will be used to generate the name and that will be passed as
550	/// `dev_seed`.
551	pub fn new(para_id: ParaId, tokio_handle: tokio::runtime::Handle, key: Sr25519Keyring) -> Self {
552		TestNodeBuilder {
553			key,
554			para_id,
555			tokio_handle,
556			collator_key: None,
557			parachain_nodes: Vec::new(),
558			parachain_nodes_exclusive: false,
559			relay_chain_nodes: Vec::new(),
560			wrap_announce_block: None,
561			storage_update_func_parachain: None,
562			storage_update_func_relay_chain: None,
563			endowed_accounts: Default::default(),
564			relay_chain_mode: RelayChainMode::Embedded,
565			record_proof_during_import: true,
566		}
567	}
568
569	/// Enable collator for this node.
570	pub fn enable_collator(mut self) -> Self {
571		let collator_key = CollatorPair::generate().0;
572		self.collator_key = Some(collator_key);
573		self
574	}
575
576	/// Instruct the node to exclusively connect to registered parachain nodes.
577	///
578	/// Parachain nodes can be registered using [`Self::connect_to_parachain_node`] and
579	/// [`Self::connect_to_parachain_nodes`].
580	pub fn exclusively_connect_to_registered_parachain_nodes(mut self) -> Self {
581		self.parachain_nodes_exclusive = true;
582		self
583	}
584
585	/// Make the node connect to the given parachain node.
586	///
587	/// By default the node will not be connected to any node or will be able to discover any other
588	/// node.
589	pub fn connect_to_parachain_node(mut self, node: &TestNode) -> Self {
590		self.parachain_nodes.push(node.addr.clone());
591		self
592	}
593
594	/// Make the node connect to the given parachain nodes.
595	///
596	/// By default the node will not be connected to any node or will be able to discover any other
597	/// node.
598	pub fn connect_to_parachain_nodes<'a>(
599		mut self,
600		nodes: impl IntoIterator<Item = &'a TestNode>,
601	) -> Self {
602		self.parachain_nodes.extend(nodes.into_iter().map(|n| n.addr.clone()));
603		self
604	}
605
606	/// Make the node connect to the given relay chain node.
607	///
608	/// By default the node will not be connected to any node or will be able to discover any other
609	/// node.
610	pub fn connect_to_relay_chain_node(
611		mut self,
612		node: &polkadot_test_service::PolkadotTestNode,
613	) -> Self {
614		self.relay_chain_nodes.push(node.addr.clone());
615		self
616	}
617
618	/// Make the node connect to the given relay chain nodes.
619	///
620	/// By default the node will not be connected to any node or will be able to discover any other
621	/// node.
622	pub fn connect_to_relay_chain_nodes<'a>(
623		mut self,
624		nodes: impl IntoIterator<Item = &'a polkadot_test_service::PolkadotTestNode>,
625	) -> Self {
626		self.relay_chain_nodes.extend(nodes.into_iter().map(|n| n.addr.clone()));
627		self
628	}
629
630	/// Wrap the announce block function of this node.
631	pub fn wrap_announce_block(
632		mut self,
633		wrap: impl FnOnce(AnnounceBlockFn) -> AnnounceBlockFn + 'static,
634	) -> Self {
635		self.wrap_announce_block = Some(Box::new(wrap));
636		self
637	}
638
639	/// Allows accessing the parachain storage before the test node is built.
640	pub fn update_storage_parachain(mut self, updater: impl Fn() + 'static) -> Self {
641		self.storage_update_func_parachain = Some(Box::new(updater));
642		self
643	}
644
645	/// Allows accessing the relay chain storage before the test node is built.
646	pub fn update_storage_relay_chain(mut self, updater: impl Fn() + 'static) -> Self {
647		self.storage_update_func_relay_chain = Some(Box::new(updater));
648		self
649	}
650
651	/// Connect to full node via RPC.
652	pub fn use_external_relay_chain_node_at_url(mut self, network_address: Url) -> Self {
653		self.relay_chain_mode = RelayChainMode::ExternalRpc(vec![network_address]);
654		self
655	}
656
657	/// Connect to full node via RPC.
658	pub fn use_external_relay_chain_node_at_port(mut self, port: u16) -> Self {
659		let mut localhost_url =
660			Url::parse("ws://localhost").expect("Should be able to parse localhost Url");
661		localhost_url.set_port(Some(port)).expect("Should be able to set port");
662		self.relay_chain_mode = RelayChainMode::ExternalRpc(vec![localhost_url]);
663		self
664	}
665
666	/// Accounts which will have an initial balance.
667	pub fn endowed_accounts(mut self, accounts: Vec<AccountId>) -> TestNodeBuilder {
668		self.endowed_accounts = accounts;
669		self
670	}
671
672	/// Record proofs during import.
673	pub fn import_proof_recording(mut self, should_record_proof: bool) -> TestNodeBuilder {
674		self.record_proof_during_import = should_record_proof;
675		self
676	}
677
678	/// Build the [`TestNode`].
679	pub async fn build(self) -> TestNode {
680		let parachain_config = node_config(
681			self.storage_update_func_parachain.unwrap_or_else(|| Box::new(|| ())),
682			self.tokio_handle.clone(),
683			self.key,
684			self.parachain_nodes,
685			self.parachain_nodes_exclusive,
686			self.para_id,
687			self.collator_key.is_some(),
688			self.endowed_accounts,
689		)
690		.expect("could not generate Configuration");
691
692		let mut relay_chain_config = polkadot_test_service::node_config(
693			self.storage_update_func_relay_chain.unwrap_or_else(|| Box::new(|| ())),
694			self.tokio_handle,
695			self.key,
696			self.relay_chain_nodes,
697			false,
698		);
699
700		let collator_options = CollatorOptions {
701			relay_chain_mode: self.relay_chain_mode,
702			embedded_dht_bootnode: true,
703			dht_bootnode_discovery: true,
704		};
705
706		relay_chain_config.network.node_name =
707			format!("{} (relay chain)", relay_chain_config.network.node_name);
708
709		let (task_manager, client, network, rpc_handlers, transaction_pool, backend) =
710			match relay_chain_config.network.network_backend {
711				sc_network::config::NetworkBackendType::Libp2p =>
712					start_node_impl::<_, sc_network::NetworkWorker<_, _>>(
713						parachain_config,
714						self.collator_key,
715						relay_chain_config,
716						self.wrap_announce_block,
717						false,
718						|_| Ok(jsonrpsee::RpcModule::new(())),
719						collator_options,
720						self.record_proof_during_import,
721						false,
722					)
723					.await
724					.expect("could not create Cumulus test service"),
725				sc_network::config::NetworkBackendType::Litep2p =>
726					start_node_impl::<_, sc_network::Litep2pNetworkBackend>(
727						parachain_config,
728						self.collator_key,
729						relay_chain_config,
730						self.wrap_announce_block,
731						false,
732						|_| Ok(jsonrpsee::RpcModule::new(())),
733						collator_options,
734						self.record_proof_during_import,
735						false,
736					)
737					.await
738					.expect("could not create Cumulus test service"),
739			};
740		let peer_id = network.local_peer_id();
741		let multiaddr = polkadot_test_service::get_listen_address(network.clone()).await;
742		let addr = MultiaddrWithPeerId { multiaddr, peer_id };
743
744		TestNode { task_manager, client, network, addr, rpc_handlers, transaction_pool, backend }
745	}
746}
747
748/// Create a Cumulus `Configuration`.
749///
750/// By default a TCP socket will be used, therefore you need to provide nodes if you want the
751/// node to be connected to other nodes.
752///
753/// If `nodes_exclusive` is `true`, the node will only connect to the given `nodes` and not to any
754/// other node.
755///
756/// The `storage_update_func` can be used to make adjustments to the runtime genesis.
757pub fn node_config(
758	storage_update_func: impl Fn(),
759	tokio_handle: tokio::runtime::Handle,
760	key: Sr25519Keyring,
761	nodes: Vec<MultiaddrWithPeerId>,
762	nodes_exclusive: bool,
763	para_id: ParaId,
764	is_collator: bool,
765	endowed_accounts: Vec<AccountId>,
766) -> Result<Configuration, ServiceError> {
767	let base_path = BasePath::new_temp_dir()?;
768	let root = base_path.path().join(format!("cumulus_test_service_{}", key));
769	let role = if is_collator { Role::Authority } else { Role::Full };
770	let key_seed = key.to_seed();
771	let mut spec = Box::new(chain_spec::get_chain_spec_with_extra_endowed(
772		Some(para_id),
773		endowed_accounts,
774		cumulus_test_runtime::WASM_BINARY.expect("WASM binary was not built, please build it!"),
775	));
776
777	let mut storage = spec.as_storage_builder().build_storage().expect("could not build storage");
778
779	BasicExternalities::execute_with_storage(&mut storage, storage_update_func);
780	spec.set_storage(storage);
781
782	let mut network_config = NetworkConfiguration::new(
783		format!("{} (parachain)", key_seed),
784		"network/test/0.1",
785		Default::default(),
786		None,
787	);
788
789	if nodes_exclusive {
790		network_config.default_peers_set.reserved_nodes = nodes;
791		network_config.default_peers_set.non_reserved_mode =
792			sc_network::config::NonReservedPeerMode::Deny;
793	} else {
794		network_config.boot_nodes = nodes;
795	}
796
797	network_config.allow_non_globals_in_dht = true;
798
799	let addr: multiaddr::Multiaddr = "/ip4/127.0.0.1/tcp/0".parse().expect("valid address; qed");
800	network_config.listen_addresses.push(addr.clone());
801	network_config.transport =
802		TransportConfig::Normal { enable_mdns: false, allow_private_ip: true };
803
804	Ok(Configuration {
805		impl_name: "cumulus-test-node".to_string(),
806		impl_version: "0.1".to_string(),
807		role,
808		tokio_handle,
809		transaction_pool: Default::default(),
810		network: network_config,
811		keystore: KeystoreConfig::InMemory,
812		database: DatabaseSource::RocksDb { path: root.join("db"), cache_size: 128 },
813		trie_cache_maximum_size: Some(64 * 1024 * 1024),
814		warm_up_trie_cache: None,
815		state_pruning: Some(PruningMode::ArchiveAll),
816		blocks_pruning: BlocksPruning::KeepAll,
817		chain_spec: spec,
818		executor: ExecutorConfiguration {
819			wasm_method: WasmExecutionMethod::Compiled {
820				instantiation_strategy:
821					sc_executor_wasmtime::InstantiationStrategy::PoolingCopyOnWrite,
822			},
823			..ExecutorConfiguration::default()
824		},
825		rpc: RpcConfiguration {
826			addr: None,
827			max_connections: Default::default(),
828			cors: None,
829			methods: Default::default(),
830			max_request_size: Default::default(),
831			max_response_size: Default::default(),
832			id_provider: None,
833			max_subs_per_conn: Default::default(),
834			port: 9945,
835			message_buffer_capacity: Default::default(),
836			batch_config: RpcBatchRequestConfig::Unlimited,
837			rate_limit: None,
838			rate_limit_whitelisted_ips: Default::default(),
839			rate_limit_trust_proxy_headers: Default::default(),
840		},
841		prometheus_config: None,
842		telemetry_endpoints: None,
843		offchain_worker: OffchainWorkerConfig { enabled: true, indexing_enabled: false },
844		force_authoring: false,
845		disable_grandpa: false,
846		dev_key_seed: Some(key_seed),
847		tracing_targets: None,
848		tracing_receiver: Default::default(),
849		announce_block: true,
850		data_path: root,
851		base_path,
852		wasm_runtime_overrides: None,
853	})
854}
855
856impl TestNode {
857	/// Wait for `count` blocks to be imported in the node and then exit. This function will not
858	/// return if no blocks are ever created, thus you should restrict the maximum amount of time of
859	/// the test execution.
860	pub fn wait_for_blocks(&self, count: usize) -> impl Future<Output = ()> {
861		self.client.wait_for_blocks(count)
862	}
863
864	/// Send an extrinsic to this node.
865	pub async fn send_extrinsic(
866		&self,
867		function: impl Into<runtime::RuntimeCall>,
868		caller: Sr25519Keyring,
869	) -> Result<RpcTransactionOutput, RpcTransactionError> {
870		let extrinsic = construct_extrinsic(&self.client, function, caller.pair(), Some(0));
871
872		self.rpc_handlers.send_transaction(extrinsic.into()).await
873	}
874
875	/// Register a parachain at this relay chain.
876	pub async fn schedule_upgrade(&self, validation: Vec<u8>) -> Result<(), RpcTransactionError> {
877		let call = frame_system::Call::set_code { code: validation };
878
879		self.send_extrinsic(
880			runtime::SudoCall::sudo_unchecked_weight {
881				call: Box::new(call.into()),
882				weight: Weight::from_parts(1_000, 0),
883			},
884			Sr25519Keyring::Alice,
885		)
886		.await
887		.map(drop)
888	}
889}
890
891/// Fetch account nonce for key pair
892pub fn fetch_nonce(client: &Client, account: sp_core::sr25519::Public) -> u32 {
893	let best_hash = client.chain_info().best_hash;
894	client
895		.runtime_api()
896		.account_nonce(best_hash, account.into())
897		.expect("Fetching account nonce works; qed")
898}
899
900/// Construct an extrinsic that can be applied to the test runtime.
901pub fn construct_extrinsic(
902	client: &Client,
903	function: impl Into<runtime::RuntimeCall>,
904	caller: sp_core::sr25519::Pair,
905	nonce: Option<u32>,
906) -> runtime::UncheckedExtrinsic {
907	let function = function.into();
908	let current_block_hash = client.info().best_hash;
909	let current_block = client.info().best_number.saturated_into();
910	let genesis_block = client.hash(0).unwrap().unwrap();
911	let nonce = nonce.unwrap_or_else(|| fetch_nonce(client, caller.public()));
912	let period = runtime::BlockHashCount::get()
913		.checked_next_power_of_two()
914		.map(|c| c / 2)
915		.unwrap_or(2) as u64;
916	let tip = 0;
917	let tx_ext: runtime::TxExtension = (
918		frame_system::AuthorizeCall::<runtime::Runtime>::new(),
919		frame_system::CheckNonZeroSender::<runtime::Runtime>::new(),
920		frame_system::CheckSpecVersion::<runtime::Runtime>::new(),
921		frame_system::CheckGenesis::<runtime::Runtime>::new(),
922		frame_system::CheckEra::<runtime::Runtime>::from(generic::Era::mortal(
923			period,
924			current_block,
925		)),
926		frame_system::CheckNonce::<runtime::Runtime>::from(nonce),
927		frame_system::CheckWeight::<runtime::Runtime>::new(),
928		pallet_transaction_payment::ChargeTransactionPayment::<runtime::Runtime>::from(tip),
929	)
930		.into();
931	let raw_payload = runtime::SignedPayload::from_raw(
932		function.clone(),
933		tx_ext.clone(),
934		((), (), runtime::VERSION.spec_version, genesis_block, current_block_hash, (), (), ()),
935	);
936	let signature = raw_payload.using_encoded(|e| caller.sign(e));
937	runtime::UncheckedExtrinsic::new_signed(
938		function,
939		MultiAddress::Id(caller.public().into()),
940		runtime::Signature::Sr25519(signature),
941		tx_ext,
942	)
943}
944
945/// Run a relay-chain validator node.
946///
947/// This is essentially a wrapper around
948/// [`run_validator_node`](polkadot_test_service::run_validator_node).
949pub fn run_relay_chain_validator_node(
950	tokio_handle: tokio::runtime::Handle,
951	key: Sr25519Keyring,
952	storage_update_func: impl Fn(),
953	boot_nodes: Vec<MultiaddrWithPeerId>,
954	port: Option<u16>,
955) -> polkadot_test_service::PolkadotTestNode {
956	let mut config = polkadot_test_service::node_config(
957		storage_update_func,
958		tokio_handle.clone(),
959		key,
960		boot_nodes,
961		true,
962	);
963
964	if let Some(port) = port {
965		config.rpc.addr = Some(vec![RpcEndpoint {
966			batch_config: config.rpc.batch_config,
967			cors: config.rpc.cors.clone(),
968			listen_addr: SocketAddr::V4(SocketAddrV4::new(Ipv4Addr::LOCALHOST, port)),
969			max_connections: config.rpc.max_connections,
970			max_payload_in_mb: config.rpc.max_request_size,
971			max_payload_out_mb: config.rpc.max_response_size,
972			max_subscriptions_per_connection: config.rpc.max_subs_per_conn,
973			max_buffer_capacity_per_connection: config.rpc.message_buffer_capacity,
974			rpc_methods: config.rpc.methods,
975			rate_limit: config.rpc.rate_limit,
976			rate_limit_trust_proxy_headers: config.rpc.rate_limit_trust_proxy_headers,
977			rate_limit_whitelisted_ips: config.rpc.rate_limit_whitelisted_ips.clone(),
978			retry_random_port: true,
979			is_optional: false,
980		}]);
981	}
982
983	let mut workers_path = std::env::current_exe().unwrap();
984	workers_path.pop();
985	workers_path.pop();
986
987	tokio_handle.block_on(async move {
988		polkadot_test_service::run_validator_node(config, Some(workers_path)).await
989	})
990}