1use futures::{task::Poll, Future, TryFutureExt as _};
22use log::{debug, info};
23use parking_lot::Mutex;
24use sc_client_api::{Backend, CallExecutor};
25use sc_network::{
26 config::{MultiaddrWithPeerId, NetworkConfiguration, TransportConfig},
27 multiaddr, NetworkBlock, NetworkPeers, NetworkStateInfo,
28};
29use sc_network_sync::SyncingService;
30use sc_service::{
31 client::Client,
32 config::{
33 BasePath, DatabaseSource, ExecutorConfiguration, KeystoreConfig, RpcBatchRequestConfig,
34 RpcConfiguration,
35 },
36 BlocksPruning, ChainSpecExtension, Configuration, Error, GenericChainSpec, Role,
37 SpawnTaskHandle, TaskManager,
38};
39use sc_transaction_pool_api::TransactionPool;
40use sp_blockchain::HeaderBackend;
41use sp_runtime::traits::Block as BlockT;
42use std::{iter, net::Ipv4Addr, pin::Pin, sync::Arc, task::Context, time::Duration};
43use tempfile::TempDir;
44use tokio::{runtime::Runtime, time};
45
46#[cfg(test)]
47mod client;
48
49const MAX_WAIT_TIME: Duration = Duration::from_secs(60 * 3);
51
52struct TestNet<E, F, U> {
53 runtime: Runtime,
54 authority_nodes: Vec<(usize, F, U, MultiaddrWithPeerId)>,
55 full_nodes: Vec<(usize, F, U, MultiaddrWithPeerId)>,
56 chain_spec: GenericChainSpec<E>,
57 base_port: u16,
58 nodes: usize,
59}
60
61impl<E, F, U> Drop for TestNet<E, F, U> {
62 fn drop(&mut self) {
63 self.full_nodes.drain(..);
66 self.authority_nodes.drain(..);
67 }
68}
69
70pub trait TestNetNode: Clone + Future<Output = Result<(), Error>> + Send + 'static {
71 type Block: BlockT;
72 type Backend: Backend<Self::Block>;
73 type Executor: CallExecutor<Self::Block> + Send + Sync;
74 type RuntimeApi: Send + Sync;
75 type TransactionPool: TransactionPool<Block = Self::Block>;
76
77 fn client(&self) -> Arc<Client<Self::Backend, Self::Executor, Self::Block, Self::RuntimeApi>>;
78 fn transaction_pool(&self) -> Arc<Self::TransactionPool>;
79 fn network(&self) -> Arc<dyn sc_network::service::traits::NetworkService>;
80 fn sync(&self) -> &Arc<SyncingService<Self::Block>>;
81 fn spawn_handle(&self) -> SpawnTaskHandle;
82}
83
84pub struct TestNetComponents<TBl: BlockT, TBackend, TExec, TRtApi, TExPool> {
85 task_manager: Arc<Mutex<TaskManager>>,
86 client: Arc<Client<TBackend, TExec, TBl, TRtApi>>,
87 transaction_pool: Arc<TExPool>,
88 network: Arc<dyn sc_network::service::traits::NetworkService>,
89 sync: Arc<SyncingService<TBl>>,
90}
91
92impl<TBl: BlockT, TBackend, TExec, TRtApi, TExPool>
93 TestNetComponents<TBl, TBackend, TExec, TRtApi, TExPool>
94{
95 pub fn new(
96 task_manager: TaskManager,
97 client: Arc<Client<TBackend, TExec, TBl, TRtApi>>,
98 network: Arc<dyn sc_network::service::traits::NetworkService>,
99 sync: Arc<SyncingService<TBl>>,
100 transaction_pool: Arc<TExPool>,
101 ) -> Self {
102 Self {
103 client,
104 sync,
105 transaction_pool,
106 network,
107 task_manager: Arc::new(Mutex::new(task_manager)),
108 }
109 }
110}
111
112impl<TBl: BlockT, TBackend, TExec, TRtApi, TExPool> Clone
113 for TestNetComponents<TBl, TBackend, TExec, TRtApi, TExPool>
114{
115 fn clone(&self) -> Self {
116 Self {
117 task_manager: self.task_manager.clone(),
118 client: self.client.clone(),
119 transaction_pool: self.transaction_pool.clone(),
120 network: self.network.clone(),
121 sync: self.sync.clone(),
122 }
123 }
124}
125
126impl<TBl: BlockT, TBackend, TExec, TRtApi, TExPool> Future
127 for TestNetComponents<TBl, TBackend, TExec, TRtApi, TExPool>
128{
129 type Output = Result<(), Error>;
130
131 fn poll(self: Pin<&mut Self>, cx: &mut Context) -> Poll<Self::Output> {
132 Pin::new(&mut self.task_manager.lock().future()).poll(cx)
133 }
134}
135
136impl<TBl, TBackend, TExec, TRtApi, TExPool> TestNetNode
137 for TestNetComponents<TBl, TBackend, TExec, TRtApi, TExPool>
138where
139 TBl: BlockT,
140 TBackend: sc_client_api::Backend<TBl> + Send + Sync + 'static,
141 TExec: CallExecutor<TBl> + Send + Sync + 'static,
142 TRtApi: Send + Sync + 'static,
143 TExPool: TransactionPool<Block = TBl> + Send + Sync + 'static,
144{
145 type Block = TBl;
146 type Backend = TBackend;
147 type Executor = TExec;
148 type RuntimeApi = TRtApi;
149 type TransactionPool = TExPool;
150
151 fn client(&self) -> Arc<Client<Self::Backend, Self::Executor, Self::Block, Self::RuntimeApi>> {
152 self.client.clone()
153 }
154 fn transaction_pool(&self) -> Arc<Self::TransactionPool> {
155 self.transaction_pool.clone()
156 }
157 fn network(&self) -> Arc<dyn sc_network::service::traits::NetworkService> {
158 self.network.clone()
159 }
160 fn sync(&self) -> &Arc<SyncingService<Self::Block>> {
161 &self.sync
162 }
163 fn spawn_handle(&self) -> SpawnTaskHandle {
164 self.task_manager.lock().spawn_handle()
165 }
166}
167
168impl<E, F, U> TestNet<E, F, U>
169where
170 F: Clone + Send + 'static,
171 U: Clone + Send + 'static,
172{
173 pub fn run_until_all_full<FP>(&mut self, full_predicate: FP)
174 where
175 FP: Send + Fn(usize, &F) -> bool + 'static,
176 {
177 let full_nodes = self.full_nodes.clone();
178 let future = async move {
179 let mut interval = time::interval(Duration::from_millis(100));
180 loop {
181 interval.tick().await;
182
183 if full_nodes.iter().all(|(id, service, _, _)| full_predicate(*id, service)) {
184 break
185 }
186 }
187 };
188
189 if self
190 .runtime
191 .block_on(async move { time::timeout(MAX_WAIT_TIME, future).await })
192 .is_err()
193 {
194 panic!("Waited for too long");
195 }
196 }
197}
198
199fn node_config<E: ChainSpecExtension + Clone + 'static + Send + Sync>(
200 index: usize,
201 spec: &GenericChainSpec<E>,
202 role: Role,
203 tokio_handle: tokio::runtime::Handle,
204 key_seed: Option<String>,
205 base_port: u16,
206 root: &TempDir,
207) -> Configuration {
208 let root = root.path().join(format!("node-{}", index));
209
210 let mut network_config = NetworkConfiguration::new(
211 format!("Node {}", index),
212 "network/test/0.1",
213 Default::default(),
214 None,
215 );
216
217 network_config.allow_non_globals_in_dht = true;
218
219 network_config.listen_addresses.push(
220 iter::once(multiaddr::Protocol::Ip4(Ipv4Addr::new(127, 0, 0, 1)))
221 .chain(iter::once(multiaddr::Protocol::Tcp(base_port + index as u16)))
222 .collect(),
223 );
224
225 network_config.transport =
226 TransportConfig::Normal { enable_mdns: false, allow_private_ip: true };
227
228 Configuration {
229 impl_name: String::from("network-test-impl"),
230 impl_version: String::from("0.1"),
231 role,
232 tokio_handle,
233 transaction_pool: Default::default(),
234 network: network_config,
235 keystore: KeystoreConfig::Path { path: root.join("key"), password: None },
236 database: DatabaseSource::RocksDb { path: root.join("db"), cache_size: 128 },
237 trie_cache_maximum_size: Some(16 * 1024 * 1024),
238 warm_up_trie_cache: None,
239 state_pruning: Default::default(),
240 blocks_pruning: BlocksPruning::KeepFinalized,
241 chain_spec: Box::new((*spec).clone()),
242 executor: ExecutorConfiguration::default(),
243 wasm_runtime_overrides: Default::default(),
244 rpc: RpcConfiguration {
245 addr: Default::default(),
246 max_connections: Default::default(),
247 cors: None,
248 methods: Default::default(),
249 max_request_size: Default::default(),
250 max_response_size: Default::default(),
251 id_provider: Default::default(),
252 max_subs_per_conn: Default::default(),
253 port: 9944,
254 message_buffer_capacity: Default::default(),
255 batch_config: RpcBatchRequestConfig::Unlimited,
256 rate_limit: None,
257 rate_limit_whitelisted_ips: Default::default(),
258 rate_limit_trust_proxy_headers: Default::default(),
259 },
260 prometheus_config: None,
261 telemetry_endpoints: None,
262 offchain_worker: Default::default(),
263 force_authoring: false,
264 disable_grandpa: false,
265 dev_key_seed: key_seed,
266 tracing_targets: None,
267 tracing_receiver: Default::default(),
268 announce_block: true,
269 base_path: BasePath::new(root.clone()),
270 data_path: root,
271 }
272}
273
274impl<E, F, U> TestNet<E, F, U>
275where
276 F: TestNetNode,
277 E: ChainSpecExtension + Clone + 'static + Send + Sync,
278{
279 fn new(
280 temp: &TempDir,
281 spec: GenericChainSpec<E>,
282 full: impl Iterator<Item = impl FnOnce(Configuration) -> Result<(F, U), Error>>,
283 authorities: impl Iterator<Item = (String, impl FnOnce(Configuration) -> Result<(F, U), Error>)>,
284 base_port: u16,
285 ) -> TestNet<E, F, U> {
286 sp_tracing::try_init_simple();
287 fdlimit::raise_fd_limit().unwrap();
288 let runtime = Runtime::new().expect("Error creating tokio runtime");
289 let mut net = TestNet {
290 runtime,
291 authority_nodes: Default::default(),
292 full_nodes: Default::default(),
293 chain_spec: spec,
294 base_port,
295 nodes: 0,
296 };
297 net.insert_nodes(temp, full, authorities);
298 net
299 }
300
301 fn insert_nodes(
302 &mut self,
303 temp: &TempDir,
304 full: impl Iterator<Item = impl FnOnce(Configuration) -> Result<(F, U), Error>>,
305 authorities: impl Iterator<Item = (String, impl FnOnce(Configuration) -> Result<(F, U), Error>)>,
306 ) {
307 self.runtime.block_on(async {
308 let handle = self.runtime.handle().clone();
309
310 for (key, authority) in authorities {
311 let node_config = node_config(
312 self.nodes,
313 &self.chain_spec,
314 Role::Authority,
315 handle.clone(),
316 Some(key),
317 self.base_port,
318 temp,
319 );
320 let addr = node_config.network.listen_addresses.first().unwrap().clone();
321 let (service, user_data) =
322 authority(node_config).expect("Error creating test node service");
323
324 handle.spawn(service.clone().map_err(|_| ()));
325 let addr = MultiaddrWithPeerId {
326 multiaddr: addr,
327 peer_id: service.network().local_peer_id(),
328 };
329 self.authority_nodes.push((self.nodes, service, user_data, addr));
330 self.nodes += 1;
331 }
332
333 for full in full {
334 let node_config = node_config(
335 self.nodes,
336 &self.chain_spec,
337 Role::Full,
338 handle.clone(),
339 None,
340 self.base_port,
341 temp,
342 );
343 let addr = node_config.network.listen_addresses.first().unwrap().clone();
344 let (service, user_data) =
345 full(node_config).expect("Error creating test node service");
346
347 handle.spawn(service.clone().map_err(|_| ()));
348 let addr = MultiaddrWithPeerId {
349 multiaddr: addr,
350 peer_id: service.network().local_peer_id(),
351 };
352 self.full_nodes.push((self.nodes, service, user_data, addr));
353 self.nodes += 1;
354 }
355 });
356 }
357}
358
359fn tempdir_with_prefix(prefix: &str) -> TempDir {
360 tempfile::Builder::new()
361 .prefix(prefix)
362 .tempdir()
363 .expect("Error creating test dir")
364}
365
366pub fn connectivity<E, Fb, F>(spec: GenericChainSpec<E>, full_builder: Fb)
367where
368 E: ChainSpecExtension + Clone + 'static + Send + Sync,
369 Fb: Fn(Configuration) -> Result<F, Error>,
370 F: TestNetNode,
371{
372 const NUM_FULL_NODES: usize = 5;
373
374 let expected_full_connections = NUM_FULL_NODES - 1;
375
376 {
377 let temp = tempdir_with_prefix("substrate-connectivity-test");
378 {
379 let mut network = TestNet::new(
380 &temp,
381 spec.clone(),
382 (0..NUM_FULL_NODES).map(|_| |cfg| full_builder(cfg).map(|s| (s, ()))),
383 (0..0).map(|_| (String::new(), { |cfg| full_builder(cfg).map(|s| (s, ())) })),
386 30400,
387 );
388 info!("Checking star topology");
389 let first_address = network.full_nodes[0].3.clone();
390 for (_, service, _, _) in network.full_nodes.iter().skip(1) {
391 service
392 .network()
393 .add_reserved_peer(first_address.clone())
394 .expect("Error adding reserved peer");
395 }
396
397 network.run_until_all_full(move |_index, service| {
398 let connected = service.network().sync_num_connected();
399 debug!("Got {}/{} full connections...", connected, expected_full_connections);
400 connected == expected_full_connections
401 });
402 };
403
404 temp.close().expect("Error removing temp dir");
405 }
406 {
407 let temp = tempdir_with_prefix("substrate-connectivity-test");
408 {
409 let mut network = TestNet::new(
410 &temp,
411 spec,
412 (0..NUM_FULL_NODES).map(|_| |cfg| full_builder(cfg).map(|s| (s, ()))),
413 (0..0).map(|_| (String::new(), { |cfg| full_builder(cfg).map(|s| (s, ())) })),
416 30400,
417 );
418 info!("Checking linked topology");
419 let mut address = network.full_nodes[0].3.clone();
420 for i in 0..NUM_FULL_NODES {
421 if i != 0 {
422 if let Some((_, service, _, node_id)) = network.full_nodes.get(i) {
423 service
424 .network()
425 .add_reserved_peer(address)
426 .expect("Error adding reserved peer");
427 address = node_id.clone();
428 }
429 }
430 }
431
432 network.run_until_all_full(move |_index, service| {
433 let connected = service.network().sync_num_connected();
434 debug!("Got {}/{} full connections...", connected, expected_full_connections);
435 connected == expected_full_connections
436 });
437 }
438 temp.close().expect("Error removing temp dir");
439 }
440}
441
442pub fn sync<E, Fb, F, B, ExF, U>(
443 spec: GenericChainSpec<E>,
444 full_builder: Fb,
445 mut make_block_and_import: B,
446 mut extrinsic_factory: ExF,
447) where
448 Fb: Fn(Configuration) -> Result<(F, U), Error>,
449 F: TestNetNode,
450 B: FnMut(&F, &mut U),
451 ExF: FnMut(&F, &U) -> <F::Block as BlockT>::Extrinsic,
452 U: Clone + Send + 'static,
453 E: ChainSpecExtension + Clone + 'static + Send + Sync,
454{
455 const NUM_FULL_NODES: usize = 10;
456 const NUM_BLOCKS: usize = 512;
457 let temp = tempdir_with_prefix("substrate-sync-test");
458 let mut network = TestNet::new(
459 &temp,
460 spec,
461 (0..NUM_FULL_NODES).map(|_| |cfg| full_builder(cfg)),
462 (0..0).map(|_| (String::new(), { |cfg| full_builder(cfg) })),
465 30500,
466 );
467 info!("Checking block sync");
468 let first_address = {
469 let &mut (_, ref first_service, ref mut first_user_data, _) = &mut network.full_nodes[0];
470 for i in 0..NUM_BLOCKS {
471 if i % 128 == 0 {
472 info!("Generating #{}", i + 1);
473 }
474
475 make_block_and_import(first_service, first_user_data);
476 }
477 let info = network.full_nodes[0].1.client().info();
478 network.full_nodes[0]
479 .1
480 .sync()
481 .new_best_block_imported(info.best_hash, info.best_number);
482 network.full_nodes[0].3.clone()
483 };
484
485 info!("Running sync");
486 for (_, service, _, _) in network.full_nodes.iter().skip(1) {
487 service
488 .network()
489 .add_reserved_peer(first_address.clone())
490 .expect("Error adding reserved peer");
491 }
492
493 network.run_until_all_full(|_index, service| {
494 service.client().info().best_number == (NUM_BLOCKS as u32).into()
495 });
496
497 info!("Checking extrinsic propagation");
498 let first_service = network.full_nodes[0].1.clone();
499 let first_user_data = &network.full_nodes[0].2;
500 let best_block = first_service.client().info().best_hash;
501 let extrinsic = extrinsic_factory(&first_service, first_user_data);
502 let source = sc_transaction_pool_api::TransactionSource::External;
503
504 futures::executor::block_on(
505 first_service.transaction_pool().submit_one(best_block, source, extrinsic),
506 )
507 .expect("failed to submit extrinsic");
508
509 network.run_until_all_full(|_index, service| service.transaction_pool().ready().count() == 1);
510}
511
512pub fn consensus<E, Fb, F>(
513 spec: GenericChainSpec<E>,
514 full_builder: Fb,
515 authorities: impl IntoIterator<Item = String>,
516) where
517 Fb: Fn(Configuration) -> Result<F, Error>,
518 F: TestNetNode,
519 E: ChainSpecExtension + Clone + 'static + Send + Sync,
520{
521 const NUM_FULL_NODES: usize = 10;
522 const NUM_BLOCKS: usize = 10; let temp = tempdir_with_prefix("substrate-consensus-test");
524 let mut network = TestNet::new(
525 &temp,
526 spec,
527 (0..NUM_FULL_NODES / 2).map(|_| |cfg| full_builder(cfg).map(|s| (s, ()))),
528 authorities
529 .into_iter()
530 .map(|key| (key, { |cfg| full_builder(cfg).map(|s| (s, ())) })),
531 30600,
532 );
533
534 info!("Checking consensus");
535 let first_address = network.authority_nodes[0].3.clone();
536 for (_, service, _, _) in network.full_nodes.iter() {
537 service
538 .network()
539 .add_reserved_peer(first_address.clone())
540 .expect("Error adding reserved peer");
541 }
542 for (_, service, _, _) in network.authority_nodes.iter().skip(1) {
543 service
544 .network()
545 .add_reserved_peer(first_address.clone())
546 .expect("Error adding reserved peer");
547 }
548 network.run_until_all_full(|_index, service| {
549 service.client().info().finalized_number >= (NUM_BLOCKS as u32 / 2).into()
550 });
551
552 info!("Adding more peers");
553 network.insert_nodes(
554 &temp,
555 (0..NUM_FULL_NODES / 2).map(|_| |cfg| full_builder(cfg).map(|s| (s, ()))),
556 (0..0).map(|_| (String::new(), { |cfg| full_builder(cfg).map(|s| (s, ())) })),
559 );
560 for (_, service, _, _) in network.full_nodes.iter() {
561 service
562 .network()
563 .add_reserved_peer(first_address.clone())
564 .expect("Error adding reserved peer");
565 }
566
567 network.run_until_all_full(|_index, service| {
568 service.client().info().finalized_number >= (NUM_BLOCKS as u32).into()
569 });
570}