referrerpolicy=no-referrer-when-downgrade

sc_network_test/
lib.rs

1// This file is part of Substrate.
2
3// Copyright (C) Parity Technologies (UK) Ltd.
4// SPDX-License-Identifier: GPL-3.0-or-later WITH Classpath-exception-2.0
5
6// This program is free software: you can redistribute it and/or modify
7// it under the terms of the GNU General Public License as published by
8// the Free Software Foundation, either version 3 of the License, or
9// (at your option) any later version.
10
11// This program is distributed in the hope that it will be useful,
12// but WITHOUT ANY WARRANTY; without even the implied warranty of
13// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
14// GNU General Public License for more details.
15
16// You should have received a copy of the GNU General Public License
17// along with this program. If not, see <https://www.gnu.org/licenses/>.
18#![allow(missing_docs)]
19
20#[cfg(test)]
21mod block_import;
22#[cfg(test)]
23mod conformance;
24#[cfg(test)]
25mod fuzz;
26#[cfg(test)]
27mod service;
28#[cfg(test)]
29mod sync;
30
31use std::{
32	collections::HashMap,
33	pin::Pin,
34	sync::Arc,
35	task::{Context as FutureContext, Poll},
36	time::Duration,
37};
38
39use futures::{future::BoxFuture, pin_mut, prelude::*};
40use libp2p::PeerId;
41use log::trace;
42use parking_lot::Mutex;
43use sc_block_builder::{BlockBuilder, BlockBuilderBuilder};
44use sc_client_api::{
45	backend::{AuxStore, Backend, Finalizer},
46	BlockBackend, BlockImportNotification, BlockchainEvents, FinalityNotification,
47	FinalityNotifications, ImportNotifications,
48};
49use sc_consensus::{
50	BasicQueue, BlockCheckParams, BlockImport, BlockImportParams, BoxJustificationImport,
51	ForkChoiceStrategy, ImportQueue, ImportResult, JustificationImport, JustificationSyncLink,
52	LongestChain, Verifier,
53};
54use sc_network::{
55	config::{
56		FullNetworkConfiguration, MultiaddrWithPeerId, NetworkConfiguration, NonDefaultSetConfig,
57		NonReservedPeerMode, ProtocolId, Role, SyncMode, TransportConfig,
58	},
59	peer_store::PeerStore,
60	request_responses::ProtocolConfig as RequestResponseConfig,
61	types::ProtocolName,
62	NetworkBlock, NetworkService, NetworkStateInfo, NetworkSyncForkRequest, NetworkWorker,
63	NotificationMetrics, NotificationService,
64};
65use sc_network_common::role::Roles;
66use sc_network_light::light_client_requests::handler::LightClientRequestHandler;
67use sc_network_sync::{
68	block_request_handler::BlockRequestHandler,
69	service::{network::NetworkServiceProvider, syncing_service::SyncingService},
70	state_request_handler::StateRequestHandler,
71	strategy::{
72		polkadot::{PolkadotSyncingStrategy, PolkadotSyncingStrategyConfig},
73		warp::{
74			AuthorityList, EncodedProof, SetId, VerificationResult, WarpSyncConfig,
75			WarpSyncProvider,
76		},
77	},
78	warp_request_handler,
79};
80use sc_network_types::{build_multiaddr, multiaddr::Multiaddr};
81use sc_service::client::Client;
82use sp_blockchain::{
83	Backend as BlockchainBackend, HeaderBackend, Info as BlockchainInfo, Result as ClientResult,
84};
85use sp_consensus::{
86	block_validation::{BlockAnnounceValidator, DefaultBlockAnnounceValidator},
87	BlockOrigin, Error as ConsensusError, SyncOracle,
88};
89use sp_core::H256;
90use sp_runtime::{
91	codec::{Decode, Encode},
92	generic::BlockId,
93	traits::{Block as BlockT, Header as HeaderT, NumberFor, Zero},
94	Justification, Justifications,
95};
96use substrate_test_runtime_client::Sr25519Keyring;
97pub use substrate_test_runtime_client::{
98	runtime::{Block, ExtrinsicBuilder, Hash, Header, Transfer},
99	TestClient, TestClientBuilder, TestClientBuilderExt,
100};
101use tokio::time::timeout;
102
103/// A Verifier that accepts all blocks and passes them on with the configured
104/// finality to be imported.
105#[derive(Clone)]
106pub struct PassThroughVerifier {
107	finalized: bool,
108}
109
110impl PassThroughVerifier {
111	/// Create a new instance.
112	///
113	/// Every verified block will use `finalized` for the `BlockImportParams`.
114	pub fn new(finalized: bool) -> Self {
115		Self { finalized }
116	}
117}
118
119/// This `Verifier` accepts all data as valid.
120#[async_trait::async_trait]
121impl<B: BlockT> Verifier<B> for PassThroughVerifier {
122	async fn verify(
123		&self,
124		mut block: BlockImportParams<B>,
125	) -> Result<BlockImportParams<B>, String> {
126		if block.fork_choice.is_none() {
127			block.fork_choice = Some(ForkChoiceStrategy::LongestChain);
128		};
129		block.finalized = self.finalized;
130		Ok(block)
131	}
132}
133
134pub type PeersFullClient = Client<
135	substrate_test_runtime_client::Backend,
136	substrate_test_runtime_client::ExecutorDispatch,
137	Block,
138	substrate_test_runtime_client::runtime::RuntimeApi,
139>;
140
141#[derive(Clone)]
142pub struct PeersClient {
143	client: Arc<PeersFullClient>,
144	backend: Arc<substrate_test_runtime_client::Backend>,
145}
146
147impl PeersClient {
148	pub fn as_client(&self) -> Arc<PeersFullClient> {
149		self.client.clone()
150	}
151
152	pub fn as_backend(&self) -> Arc<substrate_test_runtime_client::Backend> {
153		self.backend.clone()
154	}
155
156	pub fn as_block_import(&self) -> BlockImportAdapter<Self> {
157		BlockImportAdapter::new(self.clone())
158	}
159
160	pub fn get_aux(&self, key: &[u8]) -> ClientResult<Option<Vec<u8>>> {
161		self.client.get_aux(key)
162	}
163
164	pub fn info(&self) -> BlockchainInfo<Block> {
165		self.client.info()
166	}
167
168	pub fn header(
169		&self,
170		hash: <Block as BlockT>::Hash,
171	) -> ClientResult<Option<<Block as BlockT>::Header>> {
172		self.client.header(hash)
173	}
174
175	pub fn has_state_at(&self, block: &BlockId<Block>) -> bool {
176		let (number, hash) = match *block {
177			BlockId::Hash(h) => match self.as_client().number(h) {
178				Ok(Some(n)) => (n, h),
179				_ => return false,
180			},
181			BlockId::Number(n) => match self.as_client().hash(n) {
182				Ok(Some(h)) => (n, h),
183				_ => return false,
184			},
185		};
186		self.backend.have_state_at(hash, number)
187	}
188
189	pub fn justifications(
190		&self,
191		hash: <Block as BlockT>::Hash,
192	) -> ClientResult<Option<Justifications>> {
193		self.client.justifications(hash)
194	}
195
196	pub fn finality_notification_stream(&self) -> FinalityNotifications<Block> {
197		self.client.finality_notification_stream()
198	}
199
200	pub fn import_notification_stream(&self) -> ImportNotifications<Block> {
201		self.client.import_notification_stream()
202	}
203
204	pub fn finalize_block(
205		&self,
206		hash: <Block as BlockT>::Hash,
207		justification: Option<Justification>,
208		notify: bool,
209	) -> ClientResult<()> {
210		self.client.finalize_block(hash, justification, notify)
211	}
212}
213
214#[async_trait::async_trait]
215impl BlockImport<Block> for PeersClient {
216	type Error = ConsensusError;
217
218	async fn check_block(
219		&self,
220		block: BlockCheckParams<Block>,
221	) -> Result<ImportResult, Self::Error> {
222		self.client.check_block(block).await
223	}
224
225	async fn import_block(
226		&self,
227		block: BlockImportParams<Block>,
228	) -> Result<ImportResult, Self::Error> {
229		self.client.import_block(block).await
230	}
231}
232
233pub struct Peer<D, BlockImport> {
234	pub data: D,
235	client: PeersClient,
236	/// We keep a copy of the verifier so that we can invoke it for locally-generated blocks,
237	/// instead of going through the import queue.
238	verifier: VerifierAdapter<Block>,
239	/// We keep a copy of the block_import so that we can invoke it for locally-generated blocks,
240	/// instead of going through the import queue.
241	block_import: BlockImportAdapter<BlockImport>,
242	select_chain: Option<LongestChain<substrate_test_runtime_client::Backend, Block>>,
243	backend: Option<Arc<substrate_test_runtime_client::Backend>>,
244	network: NetworkWorker<Block, <Block as BlockT>::Hash>,
245	sync_service: Arc<SyncingService<Block>>,
246	imported_blocks_stream: Pin<Box<dyn Stream<Item = BlockImportNotification<Block>> + Send>>,
247	finality_notification_stream: Pin<Box<dyn Stream<Item = FinalityNotification<Block>> + Send>>,
248	listen_addr: Multiaddr,
249	notification_services: HashMap<ProtocolName, Box<dyn NotificationService>>,
250}
251
252impl<D, B> Peer<D, B>
253where
254	B: BlockImport<Block, Error = ConsensusError> + Send + Sync,
255{
256	/// Get this peer ID.
257	pub fn id(&self) -> PeerId {
258		self.network.service().local_peer_id().into()
259	}
260
261	/// Returns true if we're major syncing.
262	pub fn is_major_syncing(&self) -> bool {
263		self.sync_service.is_major_syncing()
264	}
265
266	// Returns a clone of the local SelectChain, only available on full nodes
267	pub fn select_chain(
268		&self,
269	) -> Option<LongestChain<substrate_test_runtime_client::Backend, Block>> {
270		self.select_chain.clone()
271	}
272
273	/// Returns the number of peers we're connected to.
274	pub async fn num_peers(&self) -> usize {
275		self.sync_service.num_connected_peers()
276	}
277
278	/// Returns the number of downloaded blocks.
279	pub async fn num_downloaded_blocks(&self) -> usize {
280		self.sync_service.num_downloaded_blocks().await.unwrap()
281	}
282
283	/// Returns true if we have no peer.
284	pub fn is_offline(&self) -> bool {
285		self.sync_service.is_offline()
286	}
287
288	/// Request a justification for the given block.
289	pub fn request_justification(&self, hash: &<Block as BlockT>::Hash, number: NumberFor<Block>) {
290		self.sync_service.request_justification(hash, number);
291	}
292
293	/// Announces an important block on the network.
294	pub fn announce_block(&self, hash: <Block as BlockT>::Hash, data: Option<Vec<u8>>) {
295		self.sync_service.announce_block(hash, data);
296	}
297
298	/// Request explicit fork sync.
299	pub fn set_sync_fork_request(
300		&self,
301		peers: Vec<PeerId>,
302		hash: <Block as BlockT>::Hash,
303		number: NumberFor<Block>,
304	) {
305		self.sync_service.set_sync_fork_request(
306			peers.into_iter().map(From::from).collect(),
307			hash,
308			number,
309		);
310	}
311
312	/// Add blocks to the peer -- edit the block before adding
313	pub fn generate_blocks<F>(
314		&mut self,
315		count: usize,
316		origin: BlockOrigin,
317		edit_block: F,
318	) -> Vec<H256>
319	where
320		F: FnMut(BlockBuilder<Block, PeersFullClient>) -> Block,
321	{
322		let best_hash = self.client.info().best_hash;
323		self.generate_blocks_at(
324			BlockId::Hash(best_hash),
325			count,
326			origin,
327			edit_block,
328			false,
329			true,
330			true,
331			ForkChoiceStrategy::LongestChain,
332		)
333	}
334
335	/// Add blocks to the peer -- edit the block before adding and use custom fork choice rule.
336	pub fn generate_blocks_with_fork_choice<F>(
337		&mut self,
338		count: usize,
339		origin: BlockOrigin,
340		edit_block: F,
341		fork_choice: ForkChoiceStrategy,
342	) -> Vec<H256>
343	where
344		F: FnMut(BlockBuilder<Block, PeersFullClient>) -> Block,
345	{
346		let best_hash = self.client.info().best_hash;
347		self.generate_blocks_at(
348			BlockId::Hash(best_hash),
349			count,
350			origin,
351			edit_block,
352			false,
353			true,
354			true,
355			fork_choice,
356		)
357	}
358
359	/// Add blocks to the peer -- edit the block before adding. The chain will
360	/// start at the given block iD.
361	pub fn generate_blocks_at<F>(
362		&mut self,
363		at: BlockId<Block>,
364		count: usize,
365		origin: BlockOrigin,
366		mut edit_block: F,
367		headers_only: bool,
368		inform_sync_about_new_best_block: bool,
369		announce_block: bool,
370		fork_choice: ForkChoiceStrategy,
371	) -> Vec<H256>
372	where
373		F: FnMut(BlockBuilder<Block, PeersFullClient>) -> Block,
374	{
375		let mut hashes = Vec::with_capacity(count);
376		let full_client = self.client.as_client();
377		let mut at = full_client.block_hash_from_id(&at).unwrap().unwrap();
378		for _ in 0..count {
379			let builder = BlockBuilderBuilder::new(&*full_client)
380				.on_parent_block(at)
381				.fetch_parent_block_number(&*full_client)
382				.unwrap()
383				.build()
384				.unwrap();
385			let block = edit_block(builder);
386			let hash = block.header.hash();
387			trace!(
388				target: "test_network",
389				"Generating {}, (#{}, parent={})",
390				hash,
391				block.header.number,
392				block.header.parent_hash,
393			);
394			let header = block.header.clone();
395			let mut import_block = BlockImportParams::new(origin, header.clone());
396			import_block.body = if headers_only { None } else { Some(block.extrinsics) };
397			import_block.fork_choice = Some(fork_choice);
398			let import_block =
399				futures::executor::block_on(self.verifier.verify(import_block)).unwrap();
400
401			futures::executor::block_on(self.block_import.import_block(import_block))
402				.expect("block_import failed");
403			hashes.push(hash);
404			at = hash;
405		}
406
407		if announce_block {
408			self.sync_service.announce_block(at, None);
409		}
410
411		if inform_sync_about_new_best_block {
412			self.sync_service.new_best_block_imported(
413				at,
414				*full_client.header(at).ok().flatten().unwrap().number(),
415			);
416		}
417		hashes
418	}
419
420	/// Push blocks to the peer (simplified: with or without a TX)
421	pub fn push_blocks(&mut self, count: usize, with_tx: bool) -> Vec<H256> {
422		let best_hash = self.client.info().best_hash;
423		self.push_blocks_at(BlockId::Hash(best_hash), count, with_tx)
424	}
425
426	/// Push blocks to the peer (simplified: with or without a TX)
427	pub fn push_headers(&mut self, count: usize) -> Vec<H256> {
428		let best_hash = self.client.info().best_hash;
429		self.generate_tx_blocks_at(BlockId::Hash(best_hash), count, false, true, true, true)
430	}
431
432	/// Push blocks to the peer (simplified: with or without a TX) starting from
433	/// given hash.
434	pub fn push_blocks_at(&mut self, at: BlockId<Block>, count: usize, with_tx: bool) -> Vec<H256> {
435		self.generate_tx_blocks_at(at, count, with_tx, false, true, true)
436	}
437
438	/// Push blocks to the peer (simplified: with or without a TX) starting from
439	/// given hash without informing the sync protocol about the new best block.
440	pub fn push_blocks_at_without_informing_sync(
441		&mut self,
442		at: BlockId<Block>,
443		count: usize,
444		with_tx: bool,
445		announce_block: bool,
446	) -> Vec<H256> {
447		self.generate_tx_blocks_at(at, count, with_tx, false, false, announce_block)
448	}
449
450	/// Push blocks to the peer (simplified: with or without a TX) starting from
451	/// given hash without announcing the block.
452	pub fn push_blocks_at_without_announcing(
453		&mut self,
454		at: BlockId<Block>,
455		count: usize,
456		with_tx: bool,
457	) -> Vec<H256> {
458		self.generate_tx_blocks_at(at, count, with_tx, false, true, false)
459	}
460
461	/// Push blocks/headers to the peer (simplified: with or without a TX) starting from
462	/// given hash.
463	fn generate_tx_blocks_at(
464		&mut self,
465		at: BlockId<Block>,
466		count: usize,
467		with_tx: bool,
468		headers_only: bool,
469		inform_sync_about_new_best_block: bool,
470		announce_block: bool,
471	) -> Vec<H256> {
472		let mut nonce = 0;
473		if with_tx {
474			self.generate_blocks_at(
475				at,
476				count,
477				BlockOrigin::File,
478				|mut builder| {
479					let transfer = Transfer {
480						from: Sr25519Keyring::Alice.into(),
481						to: Sr25519Keyring::Alice.into(),
482						amount: 1,
483						nonce,
484					};
485					builder.push(transfer.into_unchecked_extrinsic()).unwrap();
486					nonce += 1;
487					builder.build().unwrap().block
488				},
489				headers_only,
490				inform_sync_about_new_best_block,
491				announce_block,
492				ForkChoiceStrategy::LongestChain,
493			)
494		} else {
495			self.generate_blocks_at(
496				at,
497				count,
498				BlockOrigin::File,
499				|builder| builder.build().unwrap().block,
500				headers_only,
501				inform_sync_about_new_best_block,
502				announce_block,
503				ForkChoiceStrategy::LongestChain,
504			)
505		}
506	}
507
508	/// Get a reference to the client.
509	pub fn client(&self) -> &PeersClient {
510		&self.client
511	}
512
513	/// Get a reference to the network service.
514	pub fn network_service(&self) -> &Arc<NetworkService<Block, <Block as BlockT>::Hash>> {
515		self.network.service()
516	}
517
518	/// Get `SyncingService`.
519	pub fn sync_service(&self) -> &Arc<SyncingService<Block>> {
520		&self.sync_service
521	}
522
523	/// Take notification handle for enabled protocol.
524	pub fn take_notification_service(
525		&mut self,
526		protocol: &ProtocolName,
527	) -> Option<Box<dyn NotificationService>> {
528		self.notification_services.remove(protocol)
529	}
530
531	/// Get a reference to the network worker.
532	pub fn network(&self) -> &NetworkWorker<Block, <Block as BlockT>::Hash> {
533		&self.network
534	}
535
536	/// Test helper to compare the blockchain state of multiple (networked)
537	/// clients.
538	pub fn blockchain_canon_equals(&self, other: &Self) -> bool {
539		if let (Some(mine), Some(others)) = (self.backend.clone(), other.backend.clone()) {
540			mine.blockchain().info().best_hash == others.blockchain().info().best_hash
541		} else {
542			false
543		}
544	}
545
546	/// Count the total number of imported blocks.
547	pub fn blocks_count(&self) -> u64 {
548		self.backend
549			.as_ref()
550			.map(|backend| backend.blockchain().info().best_number)
551			.unwrap_or(0)
552	}
553
554	/// Return a collection of block hashes that failed verification
555	pub fn failed_verifications(&self) -> HashMap<<Block as BlockT>::Hash, String> {
556		self.verifier.failed_verifications.lock().clone()
557	}
558
559	pub fn has_block(&self, hash: H256) -> bool {
560		self.backend
561			.as_ref()
562			.map(|backend| backend.blockchain().header(hash).unwrap().is_some())
563			.unwrap_or(false)
564	}
565
566	pub fn has_body(&self, hash: H256) -> bool {
567		self.backend
568			.as_ref()
569			.map(|backend| backend.blockchain().body(hash).unwrap().is_some())
570			.unwrap_or(false)
571	}
572}
573
574pub trait BlockImportAdapterFull:
575	BlockImport<Block, Error = ConsensusError> + Send + Sync + Clone
576{
577}
578
579impl<T> BlockImportAdapterFull for T where
580	T: BlockImport<Block, Error = ConsensusError> + Send + Sync + Clone
581{
582}
583
584/// Implements `BlockImport` for any `Transaction`. Internally the transaction is
585/// "converted", aka the field is set to `None`.
586///
587/// This is required as the `TestNetFactory` trait does not distinguish between
588/// full and light nodes.
589#[derive(Clone)]
590pub struct BlockImportAdapter<I> {
591	inner: I,
592}
593
594impl<I> BlockImportAdapter<I> {
595	/// Create a new instance of `Self::Full`.
596	pub fn new(inner: I) -> Self {
597		Self { inner }
598	}
599}
600
601#[async_trait::async_trait]
602impl<I> BlockImport<Block> for BlockImportAdapter<I>
603where
604	I: BlockImport<Block, Error = ConsensusError> + Send + Sync,
605{
606	type Error = ConsensusError;
607
608	async fn check_block(
609		&self,
610		block: BlockCheckParams<Block>,
611	) -> Result<ImportResult, Self::Error> {
612		self.inner.check_block(block).await
613	}
614
615	async fn import_block(
616		&self,
617		block: BlockImportParams<Block>,
618	) -> Result<ImportResult, Self::Error> {
619		self.inner.import_block(block).await
620	}
621}
622
623/// Implements `Verifier` and keeps track of failed verifications.
624struct VerifierAdapter<B: BlockT> {
625	verifier: Arc<futures::lock::Mutex<Box<dyn Verifier<B>>>>,
626	failed_verifications: Arc<Mutex<HashMap<B::Hash, String>>>,
627}
628
629#[async_trait::async_trait]
630impl<B: BlockT> Verifier<B> for VerifierAdapter<B> {
631	async fn verify(&self, block: BlockImportParams<B>) -> Result<BlockImportParams<B>, String> {
632		let hash = block.header.hash();
633		self.verifier.lock().await.verify(block).await.inspect_err(|e| {
634			self.failed_verifications.lock().insert(hash, e.clone());
635		})
636	}
637}
638
639impl<B: BlockT> Clone for VerifierAdapter<B> {
640	fn clone(&self) -> Self {
641		Self {
642			verifier: self.verifier.clone(),
643			failed_verifications: self.failed_verifications.clone(),
644		}
645	}
646}
647
648impl<B: BlockT> VerifierAdapter<B> {
649	fn new(verifier: impl Verifier<B> + 'static) -> Self {
650		VerifierAdapter {
651			verifier: Arc::new(futures::lock::Mutex::new(Box::new(verifier))),
652			failed_verifications: Default::default(),
653		}
654	}
655}
656
657struct TestWarpSyncProvider<B: BlockT>(Arc<dyn HeaderBackend<B>>);
658
659impl<B: BlockT> WarpSyncProvider<B> for TestWarpSyncProvider<B> {
660	fn generate(
661		&self,
662		_start: B::Hash,
663	) -> Result<EncodedProof, Box<dyn std::error::Error + Send + Sync>> {
664		let info = self.0.info();
665		let best_header = self.0.header(info.best_hash).unwrap().unwrap();
666		Ok(EncodedProof(best_header.encode()))
667	}
668	fn verify(
669		&self,
670		proof: &EncodedProof,
671		_set_id: SetId,
672		_authorities: AuthorityList,
673	) -> Result<VerificationResult<B>, Box<dyn std::error::Error + Send + Sync>> {
674		let EncodedProof(encoded) = proof;
675		let header = B::Header::decode(&mut encoded.as_slice()).unwrap();
676		Ok(VerificationResult::Complete(0, Default::default(), header))
677	}
678	fn current_authorities(&self) -> AuthorityList {
679		Default::default()
680	}
681}
682
683/// Configuration for a full peer.
684#[derive(Default)]
685pub struct FullPeerConfig {
686	/// Pruning window size.
687	///
688	/// NOTE: only finalized blocks are subject for removal!
689	pub blocks_pruning: Option<u32>,
690	/// Block announce validator.
691	pub block_announce_validator: Option<Box<dyn BlockAnnounceValidator<Block> + Send + Sync>>,
692	/// List of notification protocols that the network must support.
693	pub notifications_protocols: Vec<ProtocolName>,
694	/// List of request-response protocols that the network must support.
695	pub request_response_protocols: Vec<RequestResponseConfig>,
696	/// The indices of the peers the peer should be connected to.
697	///
698	/// If `None`, it will be connected to all other peers.
699	pub connect_to_peers: Option<Vec<usize>>,
700	/// Whether the full peer should have the authority role.
701	pub is_authority: bool,
702	/// Syncing mode
703	pub sync_mode: SyncMode,
704	/// Extra genesis storage.
705	pub extra_storage: Option<sp_core::storage::Storage>,
706	/// Enable transaction indexing.
707	pub storage_chain: bool,
708	/// Optional target block header to sync to
709	pub target_header: Option<<Block as BlockT>::Header>,
710	/// Force genesis even in case of warp & light state sync.
711	pub force_genesis: bool,
712}
713
714#[async_trait::async_trait]
715pub trait TestNetFactory: Default + Sized + Send {
716	type Verifier: 'static + Verifier<Block>;
717	type BlockImport: BlockImport<Block, Error = ConsensusError> + Clone + Send + Sync + 'static;
718	type PeerData: Default + Send;
719
720	/// This one needs to be implemented!
721	fn make_verifier(&self, client: PeersClient, peer_data: &Self::PeerData) -> Self::Verifier;
722
723	/// Get reference to peer.
724	fn peer(&mut self, i: usize) -> &mut Peer<Self::PeerData, Self::BlockImport>;
725	fn peers(&self) -> &Vec<Peer<Self::PeerData, Self::BlockImport>>;
726	fn peers_mut(&mut self) -> &mut Vec<Peer<Self::PeerData, Self::BlockImport>>;
727	fn mut_peers<F: FnOnce(&mut Vec<Peer<Self::PeerData, Self::BlockImport>>)>(
728		&mut self,
729		closure: F,
730	);
731
732	/// Get custom block import handle for fresh client, along with peer data.
733	fn make_block_import(
734		&self,
735		client: PeersClient,
736	) -> (
737		BlockImportAdapter<Self::BlockImport>,
738		Option<BoxJustificationImport<Block>>,
739		Self::PeerData,
740	);
741
742	/// Create new test network with this many peers.
743	fn new(n: usize) -> Self {
744		trace!(target: "test_network", "Creating test network");
745		let mut net = Self::default();
746
747		for i in 0..n {
748			trace!(target: "test_network", "Adding peer {}", i);
749			net.add_full_peer();
750		}
751		net
752	}
753
754	fn add_full_peer(&mut self) {
755		self.add_full_peer_with_config(Default::default())
756	}
757
758	/// Add a full peer.
759	fn add_full_peer_with_config(&mut self, config: FullPeerConfig) {
760		let mut test_client_builder = match (config.blocks_pruning, config.storage_chain) {
761			(Some(blocks_pruning), true) => TestClientBuilder::with_tx_storage(blocks_pruning),
762			(None, true) => TestClientBuilder::with_tx_storage(u32::MAX),
763			(Some(blocks_pruning), false) => TestClientBuilder::with_pruning_window(blocks_pruning),
764			(None, false) => TestClientBuilder::with_default_backend(),
765		};
766		if let Some(storage) = config.extra_storage {
767			let genesis_extra_storage = test_client_builder.genesis_init_mut().extra_storage();
768			*genesis_extra_storage = storage;
769		}
770
771		if !config.force_genesis &&
772			matches!(config.sync_mode, SyncMode::LightState { .. } | SyncMode::Warp)
773		{
774			test_client_builder = test_client_builder.set_no_genesis();
775		}
776		let backend = test_client_builder.backend();
777		let (c, longest_chain) = test_client_builder.build_with_longest_chain();
778		let client = Arc::new(c);
779
780		let (block_import, justification_import, data) = self
781			.make_block_import(PeersClient { client: client.clone(), backend: backend.clone() });
782
783		let verifier = self
784			.make_verifier(PeersClient { client: client.clone(), backend: backend.clone() }, &data);
785		let verifier = VerifierAdapter::new(verifier);
786
787		let import_queue = Box::new(BasicQueue::new(
788			verifier.clone(),
789			Box::new(block_import.clone()),
790			justification_import,
791			&sp_core::testing::TaskExecutor::new(),
792			None,
793		));
794
795		let listen_addr = build_multiaddr![Memory(rand::random::<u64>())];
796
797		let mut network_config =
798			NetworkConfiguration::new("test-node", "test-client", Default::default(), None);
799		network_config.sync_mode = config.sync_mode;
800		network_config.transport = TransportConfig::MemoryOnly;
801		network_config.listen_addresses = vec![listen_addr.clone()];
802		network_config.allow_non_globals_in_dht = true;
803
804		let (notif_configs, notif_handles): (Vec<_>, Vec<_>) = config
805			.notifications_protocols
806			.into_iter()
807			.map(|p| {
808				let (config, handle) = NonDefaultSetConfig::new(
809					p.clone(),
810					Vec::new(),
811					1024 * 1024,
812					None,
813					Default::default(),
814				);
815
816				(config, (p, handle))
817			})
818			.unzip();
819
820		if let Some(connect_to) = config.connect_to_peers {
821			let addrs = connect_to
822				.iter()
823				.map(|v| {
824					let peer_id = self.peer(*v).network_service().local_peer_id();
825					let multiaddr = self.peer(*v).listen_addr.clone();
826					MultiaddrWithPeerId { peer_id, multiaddr }
827				})
828				.collect();
829			network_config.default_peers_set.reserved_nodes = addrs;
830			network_config.default_peers_set.non_reserved_mode = NonReservedPeerMode::Deny;
831		}
832		let mut full_net_config = FullNetworkConfiguration::new(&network_config, None);
833
834		let protocol_id = ProtocolId::from("test-protocol-name");
835
836		let fork_id = Some(String::from("test-fork-id"));
837
838		let chain_sync_network_provider = NetworkServiceProvider::new();
839		let chain_sync_network_handle = chain_sync_network_provider.handle();
840		let mut block_relay_params = BlockRequestHandler::new::<NetworkWorker<_, _>>(
841			chain_sync_network_handle.clone(),
842			&protocol_id,
843			None,
844			client.clone(),
845			50,
846		);
847		self.spawn_task(Box::pin(async move {
848			block_relay_params.server.run().await;
849		}));
850
851		let state_request_protocol_config = {
852			let (handler, protocol_config) = StateRequestHandler::new::<NetworkWorker<_, _>>(
853				&protocol_id,
854				None,
855				client.clone(),
856				50,
857			);
858			self.spawn_task(handler.run().boxed());
859			protocol_config
860		};
861
862		let light_client_request_protocol_config =
863			{
864				let (handler, protocol_config) = LightClientRequestHandler::new::<
865					NetworkWorker<_, _>,
866				>(&protocol_id, None, client.clone());
867				self.spawn_task(handler.run().boxed());
868				protocol_config
869			};
870
871		let warp_sync = Arc::new(TestWarpSyncProvider(client.clone()));
872
873		let warp_sync_config = match config.target_header {
874			Some(target_header) => WarpSyncConfig::WithTarget(target_header),
875			_ => WarpSyncConfig::WithProvider(warp_sync.clone()),
876		};
877
878		let warp_protocol_config = {
879			let (handler, protocol_config) =
880				warp_request_handler::RequestHandler::new::<_, NetworkWorker<_, _>>(
881					protocol_id.clone(),
882					client
883						.block_hash(0u32.into())
884						.ok()
885						.flatten()
886						.expect("Genesis block exists; qed"),
887					None,
888					warp_sync.clone(),
889				);
890			self.spawn_task(handler.run().boxed());
891			protocol_config
892		};
893
894		let peer_store = PeerStore::new(
895			network_config
896				.boot_nodes
897				.iter()
898				.map(|bootnode| bootnode.peer_id.into())
899				.collect(),
900			None,
901		);
902		let peer_store_handle = Arc::new(peer_store.handle());
903		self.spawn_task(peer_store.run().boxed());
904
905		let block_announce_validator = config
906			.block_announce_validator
907			.unwrap_or_else(|| Box::new(DefaultBlockAnnounceValidator));
908		let metrics = <NetworkWorker<_, _> as sc_network::NetworkBackend<
909			Block,
910			<Block as BlockT>::Hash,
911		>>::register_notification_metrics(None);
912
913		let syncing_config = PolkadotSyncingStrategyConfig {
914			mode: network_config.sync_mode,
915			max_parallel_downloads: network_config.max_parallel_downloads,
916			max_blocks_per_request: network_config.max_blocks_per_request,
917			metrics_registry: None,
918			state_request_protocol_name: state_request_protocol_config.name.clone(),
919			block_downloader: block_relay_params.downloader,
920			min_peers_to_start_warp_sync: None,
921		};
922		// Initialize syncing strategy.
923		let syncing_strategy = Box::new(
924			PolkadotSyncingStrategy::new(
925				syncing_config,
926				client.clone(),
927				Some(warp_sync_config),
928				Some(warp_protocol_config.name.clone()),
929			)
930			.unwrap(),
931		);
932
933		let (engine, sync_service, block_announce_config) =
934			sc_network_sync::engine::SyncingEngine::new(
935				Roles::from(if config.is_authority { &Role::Authority } else { &Role::Full }),
936				client.clone(),
937				None,
938				metrics,
939				&full_net_config,
940				protocol_id.clone(),
941				fork_id.as_deref(),
942				block_announce_validator,
943				syncing_strategy,
944				chain_sync_network_handle,
945				import_queue.service(),
946				peer_store_handle.clone(),
947			)
948			.unwrap();
949		let sync_service = Arc::new(sync_service.clone());
950
951		for config in config.request_response_protocols {
952			full_net_config.add_request_response_protocol(config);
953		}
954		for config in [
955			block_relay_params.request_response_config,
956			state_request_protocol_config,
957			light_client_request_protocol_config,
958			warp_protocol_config,
959		] {
960			full_net_config.add_request_response_protocol(config);
961		}
962
963		for config in notif_configs {
964			full_net_config.add_notification_protocol(config);
965		}
966
967		let genesis_hash =
968			client.hash(Zero::zero()).ok().flatten().expect("Genesis block exists; qed");
969		let network = NetworkWorker::new(sc_network::config::Params {
970			role: if config.is_authority { Role::Authority } else { Role::Full },
971			executor: Box::new(|f| {
972				tokio::spawn(f);
973			}),
974			network_config: full_net_config,
975			genesis_hash,
976			protocol_id,
977			fork_id,
978			metrics_registry: None,
979			block_announce_config,
980			bitswap_config: None,
981			notification_metrics: NotificationMetrics::new(None),
982		})
983		.unwrap();
984
985		trace!(target: "test_network", "Peer identifier: {}", network.service().local_peer_id());
986
987		let service = network.service().clone();
988		tokio::spawn(async move {
989			chain_sync_network_provider.run(service).await;
990		});
991
992		tokio::spawn({
993			let sync_service = sync_service.clone();
994
995			async move {
996				import_queue.run(sync_service.as_ref()).await;
997			}
998		});
999
1000		tokio::spawn(async move {
1001			engine.run().await;
1002		});
1003
1004		self.mut_peers(move |peers| {
1005			for peer in peers.iter_mut() {
1006				peer.network.add_known_address(
1007					network.service().local_peer_id().into(),
1008					listen_addr.clone().into(),
1009				);
1010			}
1011
1012			let imported_blocks_stream = Box::pin(client.import_notification_stream().fuse());
1013			let finality_notification_stream =
1014				Box::pin(client.finality_notification_stream().fuse());
1015
1016			peers.push(Peer {
1017				data,
1018				client: PeersClient { client: client.clone(), backend: backend.clone() },
1019				select_chain: Some(longest_chain),
1020				backend: Some(backend),
1021				imported_blocks_stream,
1022				finality_notification_stream,
1023				notification_services: HashMap::from_iter(notif_handles.into_iter()),
1024				block_import,
1025				verifier,
1026				network,
1027				sync_service,
1028				listen_addr,
1029			});
1030		});
1031	}
1032
1033	/// Used to spawn background tasks, e.g. the block request protocol handler.
1034	fn spawn_task(&self, f: BoxFuture<'static, ()>) {
1035		tokio::spawn(f);
1036	}
1037
1038	async fn is_in_sync(&mut self) -> bool {
1039		let mut highest = None;
1040		let peers = self.peers_mut();
1041
1042		for peer in peers {
1043			if peer.sync_service.is_major_syncing() ||
1044				peer.sync_service.status().await.unwrap().queued_blocks != 0
1045			{
1046				return false
1047			}
1048			if peer.sync_service.num_sync_requests().await.unwrap() != 0 {
1049				return false
1050			}
1051			match (highest, peer.client.info().best_hash) {
1052				(None, b) => highest = Some(b),
1053				(Some(ref a), ref b) if a == b => {},
1054				(Some(_), _) => return false,
1055			}
1056		}
1057
1058		true
1059	}
1060
1061	async fn is_idle(&mut self) -> bool {
1062		let peers = self.peers_mut();
1063		for peer in peers {
1064			if peer.sync_service.status().await.unwrap().queued_blocks != 0 {
1065				return false
1066			}
1067			if peer.sync_service.num_sync_requests().await.unwrap() != 0 {
1068				return false
1069			}
1070		}
1071
1072		true
1073	}
1074
1075	/// Blocks the current thread until we are sync'ed.
1076	/// Wait until we are sync'ed.
1077	///
1078	/// (If we've not synced within 10 mins then panic rather than hang.)
1079	async fn run_until_sync(&mut self) {
1080		timeout(Duration::from_secs(10 * 60), async {
1081			loop {
1082				futures::future::poll_fn::<(), _>(|cx| {
1083					self.poll(cx);
1084					Poll::Ready(())
1085				})
1086				.await;
1087
1088				if self.is_in_sync().await {
1089					break
1090				}
1091			}
1092		})
1093		.await
1094		.expect("sync didn't happen within 10 mins");
1095	}
1096
1097	/// Run the network until there are no pending packets.
1098	///
1099	/// Calls `poll_until_idle` repeatedly with the runtime passed as parameter.
1100	async fn run_until_idle(&mut self) {
1101		loop {
1102			futures::future::poll_fn::<(), _>(|cx| {
1103				self.poll(cx);
1104				Poll::Ready(())
1105			})
1106			.await;
1107
1108			if self.is_idle().await {
1109				break
1110			}
1111		}
1112	}
1113
1114	/// Run the network until all peers are connected to each other.
1115	async fn run_until_connected(&mut self) {
1116		let num_peers = self.peers().len();
1117		let sync_services =
1118			self.peers().iter().map(|info| info.sync_service.clone()).collect::<Vec<_>>();
1119
1120		'outer: loop {
1121			for sync_service in &sync_services {
1122				if sync_service.num_connected_peers() != num_peers - 1 {
1123					futures::future::poll_fn::<(), _>(|cx| {
1124						self.poll(cx);
1125						Poll::Ready(())
1126					})
1127					.await;
1128					continue 'outer
1129				}
1130			}
1131
1132			break
1133		}
1134	}
1135
1136	/// Polls the testnet. Processes all the pending actions.
1137	fn poll(&mut self, cx: &mut FutureContext) {
1138		self.mut_peers(|peers| {
1139			for (i, peer) in peers.iter_mut().enumerate() {
1140				trace!(target: "sync", "-- Polling {}: {}", i, peer.id());
1141				loop {
1142					// The code below is not quite correct, because we are polling a different
1143					// instance of the future every time. But as long as
1144					// `NetworkWorker::next_action()` contains just streams polling not interleaved
1145					// with other `.await`s, dropping the future and recreating it works the same as
1146					// polling a single instance.
1147					let net_poll_future = peer.network.next_action();
1148					pin_mut!(net_poll_future);
1149					if let Poll::Pending = net_poll_future.poll(cx) {
1150						break
1151					}
1152				}
1153				trace!(target: "sync", "-- Polling complete {}: {}", i, peer.id());
1154
1155				// We poll `imported_blocks_stream`.
1156				while let Poll::Ready(Some(notification)) =
1157					peer.imported_blocks_stream.as_mut().poll_next(cx)
1158				{
1159					peer.sync_service.announce_block(notification.hash, None);
1160				}
1161
1162				// We poll `finality_notification_stream`.
1163				while let Poll::Ready(Some(notification)) =
1164					peer.finality_notification_stream.as_mut().poll_next(cx)
1165				{
1166					peer.sync_service.on_block_finalized(notification.hash, notification.header);
1167				}
1168			}
1169		});
1170	}
1171}
1172
1173#[derive(Default)]
1174pub struct TestNet {
1175	peers: Vec<Peer<(), PeersClient>>,
1176}
1177
1178impl TestNetFactory for TestNet {
1179	type Verifier = PassThroughVerifier;
1180	type PeerData = ();
1181	type BlockImport = PeersClient;
1182
1183	fn make_verifier(&self, _client: PeersClient, _peer_data: &()) -> Self::Verifier {
1184		PassThroughVerifier::new(false)
1185	}
1186
1187	fn make_block_import(
1188		&self,
1189		client: PeersClient,
1190	) -> (
1191		BlockImportAdapter<Self::BlockImport>,
1192		Option<BoxJustificationImport<Block>>,
1193		Self::PeerData,
1194	) {
1195		(client.as_block_import(), None, ())
1196	}
1197
1198	fn peer(&mut self, i: usize) -> &mut Peer<(), Self::BlockImport> {
1199		&mut self.peers[i]
1200	}
1201
1202	fn peers(&self) -> &Vec<Peer<(), Self::BlockImport>> {
1203		&self.peers
1204	}
1205
1206	fn peers_mut(&mut self) -> &mut Vec<Peer<(), Self::BlockImport>> {
1207		&mut self.peers
1208	}
1209
1210	fn mut_peers<F: FnOnce(&mut Vec<Peer<(), Self::BlockImport>>)>(&mut self, closure: F) {
1211		closure(&mut self.peers);
1212	}
1213}
1214
1215pub struct ForceFinalized(PeersClient);
1216
1217#[async_trait::async_trait]
1218impl JustificationImport<Block> for ForceFinalized {
1219	type Error = ConsensusError;
1220
1221	async fn on_start(&mut self) -> Vec<(H256, NumberFor<Block>)> {
1222		Vec::new()
1223	}
1224
1225	async fn import_justification(
1226		&mut self,
1227		hash: H256,
1228		_number: NumberFor<Block>,
1229		justification: Justification,
1230	) -> Result<(), Self::Error> {
1231		self.0
1232			.finalize_block(hash, Some(justification), true)
1233			.map_err(|_| ConsensusError::InvalidJustification)
1234	}
1235}
1236
1237#[derive(Default)]
1238pub struct JustificationTestNet(TestNet);
1239
1240impl TestNetFactory for JustificationTestNet {
1241	type Verifier = PassThroughVerifier;
1242	type PeerData = ();
1243	type BlockImport = PeersClient;
1244
1245	fn make_verifier(&self, client: PeersClient, peer_data: &()) -> Self::Verifier {
1246		self.0.make_verifier(client, peer_data)
1247	}
1248
1249	fn peer(&mut self, i: usize) -> &mut Peer<Self::PeerData, Self::BlockImport> {
1250		self.0.peer(i)
1251	}
1252
1253	fn peers(&self) -> &Vec<Peer<Self::PeerData, Self::BlockImport>> {
1254		self.0.peers()
1255	}
1256
1257	fn peers_mut(&mut self) -> &mut Vec<Peer<Self::PeerData, Self::BlockImport>> {
1258		self.0.peers_mut()
1259	}
1260
1261	fn mut_peers<F: FnOnce(&mut Vec<Peer<Self::PeerData, Self::BlockImport>>)>(
1262		&mut self,
1263		closure: F,
1264	) {
1265		self.0.mut_peers(closure)
1266	}
1267
1268	fn make_block_import(
1269		&self,
1270		client: PeersClient,
1271	) -> (
1272		BlockImportAdapter<Self::BlockImport>,
1273		Option<BoxJustificationImport<Block>>,
1274		Self::PeerData,
1275	) {
1276		(client.as_block_import(), Some(Box::new(ForceFinalized(client))), Default::default())
1277	}
1278}