referrerpolicy=no-referrer-when-downgrade

sc_service_test/
lib.rs

1// This file is part of Substrate.
2
3// Copyright (C) Parity Technologies (UK) Ltd.
4// SPDX-License-Identifier: GPL-3.0-or-later WITH Classpath-exception-2.0
5
6// This program is free software: you can redistribute it and/or modify
7// it under the terms of the GNU General Public License as published by
8// the Free Software Foundation, either version 3 of the License, or
9// (at your option) any later version.
10
11// This program is distributed in the hope that it will be useful,
12// but WITHOUT ANY WARRANTY; without even the implied warranty of
13// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
14// GNU General Public License for more details.
15
16// You should have received a copy of the GNU General Public License
17// along with this program. If not, see <https://www.gnu.org/licenses/>.
18
19//! Service integration test utils.
20
21use futures::{task::Poll, Future, TryFutureExt as _};
22use log::{debug, info};
23use parking_lot::Mutex;
24use sc_client_api::{Backend, CallExecutor};
25use sc_network::{
26	config::{MultiaddrWithPeerId, NetworkConfiguration, TransportConfig},
27	multiaddr, NetworkBlock, NetworkPeers, NetworkStateInfo,
28};
29use sc_network_sync::SyncingService;
30use sc_service::{
31	client::Client,
32	config::{
33		BasePath, DatabaseSource, ExecutorConfiguration, KeystoreConfig, RpcBatchRequestConfig,
34		RpcConfiguration,
35	},
36	BlocksPruning, ChainSpecExtension, Configuration, Error, GenericChainSpec, Role,
37	SpawnTaskHandle, TaskManager,
38};
39use sc_transaction_pool_api::TransactionPool;
40use sp_blockchain::HeaderBackend;
41use sp_runtime::traits::Block as BlockT;
42use std::{iter, net::Ipv4Addr, pin::Pin, sync::Arc, task::Context, time::Duration};
43use tempfile::TempDir;
44use tokio::{runtime::Runtime, time};
45
46#[cfg(test)]
47mod client;
48
49/// Maximum duration of single wait call.
50const MAX_WAIT_TIME: Duration = Duration::from_secs(60 * 3);
51
52struct TestNet<E, F, U> {
53	runtime: Runtime,
54	authority_nodes: Vec<(usize, F, U, MultiaddrWithPeerId)>,
55	full_nodes: Vec<(usize, F, U, MultiaddrWithPeerId)>,
56	chain_spec: GenericChainSpec<E>,
57	base_port: u16,
58	nodes: usize,
59}
60
61impl<E, F, U> Drop for TestNet<E, F, U> {
62	fn drop(&mut self) {
63		// Drop the nodes before dropping the runtime, as the runtime otherwise waits for all
64		// futures to be ended and we run into a dead lock.
65		self.full_nodes.drain(..);
66		self.authority_nodes.drain(..);
67	}
68}
69
70pub trait TestNetNode: Clone + Future<Output = Result<(), Error>> + Send + 'static {
71	type Block: BlockT;
72	type Backend: Backend<Self::Block>;
73	type Executor: CallExecutor<Self::Block> + Send + Sync;
74	type RuntimeApi: Send + Sync;
75	type TransactionPool: TransactionPool<Block = Self::Block>;
76
77	fn client(&self) -> Arc<Client<Self::Backend, Self::Executor, Self::Block, Self::RuntimeApi>>;
78	fn transaction_pool(&self) -> Arc<Self::TransactionPool>;
79	fn network(&self) -> Arc<dyn sc_network::service::traits::NetworkService>;
80	fn sync(&self) -> &Arc<SyncingService<Self::Block>>;
81	fn spawn_handle(&self) -> SpawnTaskHandle;
82}
83
84pub struct TestNetComponents<TBl: BlockT, TBackend, TExec, TRtApi, TExPool> {
85	task_manager: Arc<Mutex<TaskManager>>,
86	client: Arc<Client<TBackend, TExec, TBl, TRtApi>>,
87	transaction_pool: Arc<TExPool>,
88	network: Arc<dyn sc_network::service::traits::NetworkService>,
89	sync: Arc<SyncingService<TBl>>,
90}
91
92impl<TBl: BlockT, TBackend, TExec, TRtApi, TExPool>
93	TestNetComponents<TBl, TBackend, TExec, TRtApi, TExPool>
94{
95	pub fn new(
96		task_manager: TaskManager,
97		client: Arc<Client<TBackend, TExec, TBl, TRtApi>>,
98		network: Arc<dyn sc_network::service::traits::NetworkService>,
99		sync: Arc<SyncingService<TBl>>,
100		transaction_pool: Arc<TExPool>,
101	) -> Self {
102		Self {
103			client,
104			sync,
105			transaction_pool,
106			network,
107			task_manager: Arc::new(Mutex::new(task_manager)),
108		}
109	}
110}
111
112impl<TBl: BlockT, TBackend, TExec, TRtApi, TExPool> Clone
113	for TestNetComponents<TBl, TBackend, TExec, TRtApi, TExPool>
114{
115	fn clone(&self) -> Self {
116		Self {
117			task_manager: self.task_manager.clone(),
118			client: self.client.clone(),
119			transaction_pool: self.transaction_pool.clone(),
120			network: self.network.clone(),
121			sync: self.sync.clone(),
122		}
123	}
124}
125
126impl<TBl: BlockT, TBackend, TExec, TRtApi, TExPool> Future
127	for TestNetComponents<TBl, TBackend, TExec, TRtApi, TExPool>
128{
129	type Output = Result<(), Error>;
130
131	fn poll(self: Pin<&mut Self>, cx: &mut Context) -> Poll<Self::Output> {
132		Pin::new(&mut self.task_manager.lock().future()).poll(cx)
133	}
134}
135
136impl<TBl, TBackend, TExec, TRtApi, TExPool> TestNetNode
137	for TestNetComponents<TBl, TBackend, TExec, TRtApi, TExPool>
138where
139	TBl: BlockT,
140	TBackend: sc_client_api::Backend<TBl> + Send + Sync + 'static,
141	TExec: CallExecutor<TBl> + Send + Sync + 'static,
142	TRtApi: Send + Sync + 'static,
143	TExPool: TransactionPool<Block = TBl> + Send + Sync + 'static,
144{
145	type Block = TBl;
146	type Backend = TBackend;
147	type Executor = TExec;
148	type RuntimeApi = TRtApi;
149	type TransactionPool = TExPool;
150
151	fn client(&self) -> Arc<Client<Self::Backend, Self::Executor, Self::Block, Self::RuntimeApi>> {
152		self.client.clone()
153	}
154	fn transaction_pool(&self) -> Arc<Self::TransactionPool> {
155		self.transaction_pool.clone()
156	}
157	fn network(&self) -> Arc<dyn sc_network::service::traits::NetworkService> {
158		self.network.clone()
159	}
160	fn sync(&self) -> &Arc<SyncingService<Self::Block>> {
161		&self.sync
162	}
163	fn spawn_handle(&self) -> SpawnTaskHandle {
164		self.task_manager.lock().spawn_handle()
165	}
166}
167
168impl<E, F, U> TestNet<E, F, U>
169where
170	F: Clone + Send + 'static,
171	U: Clone + Send + 'static,
172{
173	pub fn run_until_all_full<FP>(&mut self, full_predicate: FP)
174	where
175		FP: Send + Fn(usize, &F) -> bool + 'static,
176	{
177		let full_nodes = self.full_nodes.clone();
178		let future = async move {
179			let mut interval = time::interval(Duration::from_millis(100));
180			loop {
181				interval.tick().await;
182
183				if full_nodes.iter().all(|(id, service, _, _)| full_predicate(*id, service)) {
184					break
185				}
186			}
187		};
188
189		if self
190			.runtime
191			.block_on(async move { time::timeout(MAX_WAIT_TIME, future).await })
192			.is_err()
193		{
194			panic!("Waited for too long");
195		}
196	}
197}
198
199fn node_config<E: ChainSpecExtension + Clone + 'static + Send + Sync>(
200	index: usize,
201	spec: &GenericChainSpec<E>,
202	role: Role,
203	tokio_handle: tokio::runtime::Handle,
204	key_seed: Option<String>,
205	base_port: u16,
206	root: &TempDir,
207) -> Configuration {
208	let root = root.path().join(format!("node-{}", index));
209
210	let mut network_config = NetworkConfiguration::new(
211		format!("Node {}", index),
212		"network/test/0.1",
213		Default::default(),
214		None,
215	);
216
217	network_config.allow_non_globals_in_dht = true;
218
219	network_config.listen_addresses.push(
220		iter::once(multiaddr::Protocol::Ip4(Ipv4Addr::new(127, 0, 0, 1)))
221			.chain(iter::once(multiaddr::Protocol::Tcp(base_port + index as u16)))
222			.collect(),
223	);
224
225	network_config.transport =
226		TransportConfig::Normal { enable_mdns: false, allow_private_ip: true };
227
228	Configuration {
229		impl_name: String::from("network-test-impl"),
230		impl_version: String::from("0.1"),
231		role,
232		tokio_handle,
233		transaction_pool: Default::default(),
234		network: network_config,
235		keystore: KeystoreConfig::Path { path: root.join("key"), password: None },
236		database: DatabaseSource::RocksDb { path: root.join("db"), cache_size: 128 },
237		trie_cache_maximum_size: Some(16 * 1024 * 1024),
238		warm_up_trie_cache: None,
239		state_pruning: Default::default(),
240		blocks_pruning: BlocksPruning::KeepFinalized,
241		chain_spec: Box::new((*spec).clone()),
242		executor: ExecutorConfiguration::default(),
243		wasm_runtime_overrides: Default::default(),
244		rpc: RpcConfiguration {
245			addr: Default::default(),
246			max_connections: Default::default(),
247			cors: None,
248			methods: Default::default(),
249			max_request_size: Default::default(),
250			max_response_size: Default::default(),
251			id_provider: Default::default(),
252			max_subs_per_conn: Default::default(),
253			port: 9944,
254			message_buffer_capacity: Default::default(),
255			batch_config: RpcBatchRequestConfig::Unlimited,
256			rate_limit: None,
257			rate_limit_whitelisted_ips: Default::default(),
258			rate_limit_trust_proxy_headers: Default::default(),
259		},
260		prometheus_config: None,
261		telemetry_endpoints: None,
262		offchain_worker: Default::default(),
263		force_authoring: false,
264		disable_grandpa: false,
265		dev_key_seed: key_seed,
266		tracing_targets: None,
267		tracing_receiver: Default::default(),
268		announce_block: true,
269		base_path: BasePath::new(root.clone()),
270		data_path: root,
271	}
272}
273
274impl<E, F, U> TestNet<E, F, U>
275where
276	F: TestNetNode,
277	E: ChainSpecExtension + Clone + 'static + Send + Sync,
278{
279	fn new(
280		temp: &TempDir,
281		spec: GenericChainSpec<E>,
282		full: impl Iterator<Item = impl FnOnce(Configuration) -> Result<(F, U), Error>>,
283		authorities: impl Iterator<Item = (String, impl FnOnce(Configuration) -> Result<(F, U), Error>)>,
284		base_port: u16,
285	) -> TestNet<E, F, U> {
286		sp_tracing::try_init_simple();
287		fdlimit::raise_fd_limit().unwrap();
288		let runtime = Runtime::new().expect("Error creating tokio runtime");
289		let mut net = TestNet {
290			runtime,
291			authority_nodes: Default::default(),
292			full_nodes: Default::default(),
293			chain_spec: spec,
294			base_port,
295			nodes: 0,
296		};
297		net.insert_nodes(temp, full, authorities);
298		net
299	}
300
301	fn insert_nodes(
302		&mut self,
303		temp: &TempDir,
304		full: impl Iterator<Item = impl FnOnce(Configuration) -> Result<(F, U), Error>>,
305		authorities: impl Iterator<Item = (String, impl FnOnce(Configuration) -> Result<(F, U), Error>)>,
306	) {
307		self.runtime.block_on(async {
308			let handle = self.runtime.handle().clone();
309
310			for (key, authority) in authorities {
311				let node_config = node_config(
312					self.nodes,
313					&self.chain_spec,
314					Role::Authority,
315					handle.clone(),
316					Some(key),
317					self.base_port,
318					temp,
319				);
320				let addr = node_config.network.listen_addresses.first().unwrap().clone();
321				let (service, user_data) =
322					authority(node_config).expect("Error creating test node service");
323
324				handle.spawn(service.clone().map_err(|_| ()));
325				let addr = MultiaddrWithPeerId {
326					multiaddr: addr,
327					peer_id: service.network().local_peer_id(),
328				};
329				self.authority_nodes.push((self.nodes, service, user_data, addr));
330				self.nodes += 1;
331			}
332
333			for full in full {
334				let node_config = node_config(
335					self.nodes,
336					&self.chain_spec,
337					Role::Full,
338					handle.clone(),
339					None,
340					self.base_port,
341					temp,
342				);
343				let addr = node_config.network.listen_addresses.first().unwrap().clone();
344				let (service, user_data) =
345					full(node_config).expect("Error creating test node service");
346
347				handle.spawn(service.clone().map_err(|_| ()));
348				let addr = MultiaddrWithPeerId {
349					multiaddr: addr,
350					peer_id: service.network().local_peer_id(),
351				};
352				self.full_nodes.push((self.nodes, service, user_data, addr));
353				self.nodes += 1;
354			}
355		});
356	}
357}
358
359fn tempdir_with_prefix(prefix: &str) -> TempDir {
360	tempfile::Builder::new()
361		.prefix(prefix)
362		.tempdir()
363		.expect("Error creating test dir")
364}
365
366pub fn connectivity<E, Fb, F>(spec: GenericChainSpec<E>, full_builder: Fb)
367where
368	E: ChainSpecExtension + Clone + 'static + Send + Sync,
369	Fb: Fn(Configuration) -> Result<F, Error>,
370	F: TestNetNode,
371{
372	const NUM_FULL_NODES: usize = 5;
373
374	let expected_full_connections = NUM_FULL_NODES - 1;
375
376	{
377		let temp = tempdir_with_prefix("substrate-connectivity-test");
378		{
379			let mut network = TestNet::new(
380				&temp,
381				spec.clone(),
382				(0..NUM_FULL_NODES).map(|_| |cfg| full_builder(cfg).map(|s| (s, ()))),
383				// Note: this iterator is empty but we can't just use `iter::empty()`, otherwise
384				// the type of the closure cannot be inferred.
385				(0..0).map(|_| (String::new(), { |cfg| full_builder(cfg).map(|s| (s, ())) })),
386				30400,
387			);
388			info!("Checking star topology");
389			let first_address = network.full_nodes[0].3.clone();
390			for (_, service, _, _) in network.full_nodes.iter().skip(1) {
391				service
392					.network()
393					.add_reserved_peer(first_address.clone())
394					.expect("Error adding reserved peer");
395			}
396
397			network.run_until_all_full(move |_index, service| {
398				let connected = service.network().sync_num_connected();
399				debug!("Got {}/{} full connections...", connected, expected_full_connections);
400				connected == expected_full_connections
401			});
402		};
403
404		temp.close().expect("Error removing temp dir");
405	}
406	{
407		let temp = tempdir_with_prefix("substrate-connectivity-test");
408		{
409			let mut network = TestNet::new(
410				&temp,
411				spec,
412				(0..NUM_FULL_NODES).map(|_| |cfg| full_builder(cfg).map(|s| (s, ()))),
413				// Note: this iterator is empty but we can't just use `iter::empty()`, otherwise
414				// the type of the closure cannot be inferred.
415				(0..0).map(|_| (String::new(), { |cfg| full_builder(cfg).map(|s| (s, ())) })),
416				30400,
417			);
418			info!("Checking linked topology");
419			let mut address = network.full_nodes[0].3.clone();
420			for i in 0..NUM_FULL_NODES {
421				if i != 0 {
422					if let Some((_, service, _, node_id)) = network.full_nodes.get(i) {
423						service
424							.network()
425							.add_reserved_peer(address)
426							.expect("Error adding reserved peer");
427						address = node_id.clone();
428					}
429				}
430			}
431
432			network.run_until_all_full(move |_index, service| {
433				let connected = service.network().sync_num_connected();
434				debug!("Got {}/{} full connections...", connected, expected_full_connections);
435				connected == expected_full_connections
436			});
437		}
438		temp.close().expect("Error removing temp dir");
439	}
440}
441
442pub fn sync<E, Fb, F, B, ExF, U>(
443	spec: GenericChainSpec<E>,
444	full_builder: Fb,
445	mut make_block_and_import: B,
446	mut extrinsic_factory: ExF,
447) where
448	Fb: Fn(Configuration) -> Result<(F, U), Error>,
449	F: TestNetNode,
450	B: FnMut(&F, &mut U),
451	ExF: FnMut(&F, &U) -> <F::Block as BlockT>::Extrinsic,
452	U: Clone + Send + 'static,
453	E: ChainSpecExtension + Clone + 'static + Send + Sync,
454{
455	const NUM_FULL_NODES: usize = 10;
456	const NUM_BLOCKS: usize = 512;
457	let temp = tempdir_with_prefix("substrate-sync-test");
458	let mut network = TestNet::new(
459		&temp,
460		spec,
461		(0..NUM_FULL_NODES).map(|_| |cfg| full_builder(cfg)),
462		// Note: this iterator is empty but we can't just use `iter::empty()`, otherwise
463		// the type of the closure cannot be inferred.
464		(0..0).map(|_| (String::new(), { |cfg| full_builder(cfg) })),
465		30500,
466	);
467	info!("Checking block sync");
468	let first_address = {
469		let &mut (_, ref first_service, ref mut first_user_data, _) = &mut network.full_nodes[0];
470		for i in 0..NUM_BLOCKS {
471			if i % 128 == 0 {
472				info!("Generating #{}", i + 1);
473			}
474
475			make_block_and_import(first_service, first_user_data);
476		}
477		let info = network.full_nodes[0].1.client().info();
478		network.full_nodes[0]
479			.1
480			.sync()
481			.new_best_block_imported(info.best_hash, info.best_number);
482		network.full_nodes[0].3.clone()
483	};
484
485	info!("Running sync");
486	for (_, service, _, _) in network.full_nodes.iter().skip(1) {
487		service
488			.network()
489			.add_reserved_peer(first_address.clone())
490			.expect("Error adding reserved peer");
491	}
492
493	network.run_until_all_full(|_index, service| {
494		service.client().info().best_number == (NUM_BLOCKS as u32).into()
495	});
496
497	info!("Checking extrinsic propagation");
498	let first_service = network.full_nodes[0].1.clone();
499	let first_user_data = &network.full_nodes[0].2;
500	let best_block = first_service.client().info().best_hash;
501	let extrinsic = extrinsic_factory(&first_service, first_user_data);
502	let source = sc_transaction_pool_api::TransactionSource::External;
503
504	futures::executor::block_on(
505		first_service.transaction_pool().submit_one(best_block, source, extrinsic),
506	)
507	.expect("failed to submit extrinsic");
508
509	network.run_until_all_full(|_index, service| service.transaction_pool().ready().count() == 1);
510}
511
512pub fn consensus<E, Fb, F>(
513	spec: GenericChainSpec<E>,
514	full_builder: Fb,
515	authorities: impl IntoIterator<Item = String>,
516) where
517	Fb: Fn(Configuration) -> Result<F, Error>,
518	F: TestNetNode,
519	E: ChainSpecExtension + Clone + 'static + Send + Sync,
520{
521	const NUM_FULL_NODES: usize = 10;
522	const NUM_BLOCKS: usize = 10; // 10 * 2 sec block production time = ~20 seconds
523	let temp = tempdir_with_prefix("substrate-consensus-test");
524	let mut network = TestNet::new(
525		&temp,
526		spec,
527		(0..NUM_FULL_NODES / 2).map(|_| |cfg| full_builder(cfg).map(|s| (s, ()))),
528		authorities
529			.into_iter()
530			.map(|key| (key, { |cfg| full_builder(cfg).map(|s| (s, ())) })),
531		30600,
532	);
533
534	info!("Checking consensus");
535	let first_address = network.authority_nodes[0].3.clone();
536	for (_, service, _, _) in network.full_nodes.iter() {
537		service
538			.network()
539			.add_reserved_peer(first_address.clone())
540			.expect("Error adding reserved peer");
541	}
542	for (_, service, _, _) in network.authority_nodes.iter().skip(1) {
543		service
544			.network()
545			.add_reserved_peer(first_address.clone())
546			.expect("Error adding reserved peer");
547	}
548	network.run_until_all_full(|_index, service| {
549		service.client().info().finalized_number >= (NUM_BLOCKS as u32 / 2).into()
550	});
551
552	info!("Adding more peers");
553	network.insert_nodes(
554		&temp,
555		(0..NUM_FULL_NODES / 2).map(|_| |cfg| full_builder(cfg).map(|s| (s, ()))),
556		// Note: this iterator is empty but we can't just use `iter::empty()`, otherwise
557		// the type of the closure cannot be inferred.
558		(0..0).map(|_| (String::new(), { |cfg| full_builder(cfg).map(|s| (s, ())) })),
559	);
560	for (_, service, _, _) in network.full_nodes.iter() {
561		service
562			.network()
563			.add_reserved_peer(first_address.clone())
564			.expect("Error adding reserved peer");
565	}
566
567	network.run_until_all_full(|_index, service| {
568		service.client().info().finalized_number >= (NUM_BLOCKS as u32).into()
569	});
570}