1#![allow(missing_docs)]
19
20#[cfg(test)]
21mod block_import;
22#[cfg(test)]
23mod conformance;
24#[cfg(test)]
25mod fuzz;
26#[cfg(test)]
27mod service;
28#[cfg(test)]
29mod sync;
30
31use std::{
32 collections::HashMap,
33 pin::Pin,
34 sync::Arc,
35 task::{Context as FutureContext, Poll},
36 time::Duration,
37};
38
39use futures::{future::BoxFuture, pin_mut, prelude::*};
40use libp2p::PeerId;
41use log::trace;
42use parking_lot::Mutex;
43use sc_block_builder::{BlockBuilder, BlockBuilderBuilder};
44use sc_client_api::{
45 backend::{AuxStore, Backend, Finalizer},
46 BlockBackend, BlockImportNotification, BlockchainEvents, FinalityNotification,
47 FinalityNotifications, ImportNotifications,
48};
49use sc_consensus::{
50 BasicQueue, BlockCheckParams, BlockImport, BlockImportParams, BoxJustificationImport,
51 ForkChoiceStrategy, ImportQueue, ImportResult, JustificationImport, JustificationSyncLink,
52 LongestChain, Verifier,
53};
54use sc_network::{
55 config::{
56 FullNetworkConfiguration, MultiaddrWithPeerId, NetworkConfiguration, NonDefaultSetConfig,
57 NonReservedPeerMode, ProtocolId, Role, SyncMode, TransportConfig,
58 },
59 peer_store::PeerStore,
60 request_responses::ProtocolConfig as RequestResponseConfig,
61 types::ProtocolName,
62 NetworkBlock, NetworkService, NetworkStateInfo, NetworkSyncForkRequest, NetworkWorker,
63 NotificationMetrics, NotificationService,
64};
65use sc_network_common::role::Roles;
66use sc_network_light::light_client_requests::handler::LightClientRequestHandler;
67use sc_network_sync::{
68 block_request_handler::BlockRequestHandler,
69 service::{network::NetworkServiceProvider, syncing_service::SyncingService},
70 state_request_handler::StateRequestHandler,
71 strategy::{
72 polkadot::{PolkadotSyncingStrategy, PolkadotSyncingStrategyConfig},
73 warp::{
74 AuthorityList, EncodedProof, SetId, VerificationResult, WarpSyncConfig,
75 WarpSyncProvider,
76 },
77 },
78 warp_request_handler,
79};
80use sc_network_types::{build_multiaddr, multiaddr::Multiaddr};
81use sc_service::client::Client;
82use sp_blockchain::{
83 Backend as BlockchainBackend, HeaderBackend, Info as BlockchainInfo, Result as ClientResult,
84};
85use sp_consensus::{
86 block_validation::{BlockAnnounceValidator, DefaultBlockAnnounceValidator},
87 BlockOrigin, Error as ConsensusError, SyncOracle,
88};
89use sp_core::H256;
90use sp_runtime::{
91 codec::{Decode, Encode},
92 generic::BlockId,
93 traits::{Block as BlockT, Header as HeaderT, NumberFor, Zero},
94 Justification, Justifications,
95};
96use substrate_test_runtime_client::Sr25519Keyring;
97pub use substrate_test_runtime_client::{
98 runtime::{Block, ExtrinsicBuilder, Hash, Header, Transfer},
99 TestClient, TestClientBuilder, TestClientBuilderExt,
100};
101use tokio::time::timeout;
102
103#[derive(Clone)]
106pub struct PassThroughVerifier {
107 finalized: bool,
108}
109
110impl PassThroughVerifier {
111 pub fn new(finalized: bool) -> Self {
115 Self { finalized }
116 }
117}
118
119#[async_trait::async_trait]
121impl<B: BlockT> Verifier<B> for PassThroughVerifier {
122 async fn verify(
123 &self,
124 mut block: BlockImportParams<B>,
125 ) -> Result<BlockImportParams<B>, String> {
126 if block.fork_choice.is_none() {
127 block.fork_choice = Some(ForkChoiceStrategy::LongestChain);
128 };
129 block.finalized = self.finalized;
130 Ok(block)
131 }
132}
133
134pub type PeersFullClient = Client<
135 substrate_test_runtime_client::Backend,
136 substrate_test_runtime_client::ExecutorDispatch,
137 Block,
138 substrate_test_runtime_client::runtime::RuntimeApi,
139>;
140
141#[derive(Clone)]
142pub struct PeersClient {
143 client: Arc<PeersFullClient>,
144 backend: Arc<substrate_test_runtime_client::Backend>,
145}
146
147impl PeersClient {
148 pub fn as_client(&self) -> Arc<PeersFullClient> {
149 self.client.clone()
150 }
151
152 pub fn as_backend(&self) -> Arc<substrate_test_runtime_client::Backend> {
153 self.backend.clone()
154 }
155
156 pub fn as_block_import(&self) -> BlockImportAdapter<Self> {
157 BlockImportAdapter::new(self.clone())
158 }
159
160 pub fn get_aux(&self, key: &[u8]) -> ClientResult<Option<Vec<u8>>> {
161 self.client.get_aux(key)
162 }
163
164 pub fn info(&self) -> BlockchainInfo<Block> {
165 self.client.info()
166 }
167
168 pub fn header(
169 &self,
170 hash: <Block as BlockT>::Hash,
171 ) -> ClientResult<Option<<Block as BlockT>::Header>> {
172 self.client.header(hash)
173 }
174
175 pub fn has_state_at(&self, block: &BlockId<Block>) -> bool {
176 let (number, hash) = match *block {
177 BlockId::Hash(h) => match self.as_client().number(h) {
178 Ok(Some(n)) => (n, h),
179 _ => return false,
180 },
181 BlockId::Number(n) => match self.as_client().hash(n) {
182 Ok(Some(h)) => (n, h),
183 _ => return false,
184 },
185 };
186 self.backend.have_state_at(hash, number)
187 }
188
189 pub fn justifications(
190 &self,
191 hash: <Block as BlockT>::Hash,
192 ) -> ClientResult<Option<Justifications>> {
193 self.client.justifications(hash)
194 }
195
196 pub fn finality_notification_stream(&self) -> FinalityNotifications<Block> {
197 self.client.finality_notification_stream()
198 }
199
200 pub fn import_notification_stream(&self) -> ImportNotifications<Block> {
201 self.client.import_notification_stream()
202 }
203
204 pub fn finalize_block(
205 &self,
206 hash: <Block as BlockT>::Hash,
207 justification: Option<Justification>,
208 notify: bool,
209 ) -> ClientResult<()> {
210 self.client.finalize_block(hash, justification, notify)
211 }
212}
213
214#[async_trait::async_trait]
215impl BlockImport<Block> for PeersClient {
216 type Error = ConsensusError;
217
218 async fn check_block(
219 &self,
220 block: BlockCheckParams<Block>,
221 ) -> Result<ImportResult, Self::Error> {
222 self.client.check_block(block).await
223 }
224
225 async fn import_block(
226 &self,
227 block: BlockImportParams<Block>,
228 ) -> Result<ImportResult, Self::Error> {
229 self.client.import_block(block).await
230 }
231}
232
233pub struct Peer<D, BlockImport> {
234 pub data: D,
235 client: PeersClient,
236 verifier: VerifierAdapter<Block>,
239 block_import: BlockImportAdapter<BlockImport>,
242 select_chain: Option<LongestChain<substrate_test_runtime_client::Backend, Block>>,
243 backend: Option<Arc<substrate_test_runtime_client::Backend>>,
244 network: NetworkWorker<Block, <Block as BlockT>::Hash>,
245 sync_service: Arc<SyncingService<Block>>,
246 imported_blocks_stream: Pin<Box<dyn Stream<Item = BlockImportNotification<Block>> + Send>>,
247 finality_notification_stream: Pin<Box<dyn Stream<Item = FinalityNotification<Block>> + Send>>,
248 listen_addr: Multiaddr,
249 notification_services: HashMap<ProtocolName, Box<dyn NotificationService>>,
250}
251
252impl<D, B> Peer<D, B>
253where
254 B: BlockImport<Block, Error = ConsensusError> + Send + Sync,
255{
256 pub fn id(&self) -> PeerId {
258 self.network.service().local_peer_id().into()
259 }
260
261 pub fn is_major_syncing(&self) -> bool {
263 self.sync_service.is_major_syncing()
264 }
265
266 pub fn select_chain(
268 &self,
269 ) -> Option<LongestChain<substrate_test_runtime_client::Backend, Block>> {
270 self.select_chain.clone()
271 }
272
273 pub async fn num_peers(&self) -> usize {
275 self.sync_service.num_connected_peers()
276 }
277
278 pub async fn num_downloaded_blocks(&self) -> usize {
280 self.sync_service.num_downloaded_blocks().await.unwrap()
281 }
282
283 pub fn is_offline(&self) -> bool {
285 self.sync_service.is_offline()
286 }
287
288 pub fn request_justification(&self, hash: &<Block as BlockT>::Hash, number: NumberFor<Block>) {
290 self.sync_service.request_justification(hash, number);
291 }
292
293 pub fn announce_block(&self, hash: <Block as BlockT>::Hash, data: Option<Vec<u8>>) {
295 self.sync_service.announce_block(hash, data);
296 }
297
298 pub fn set_sync_fork_request(
300 &self,
301 peers: Vec<PeerId>,
302 hash: <Block as BlockT>::Hash,
303 number: NumberFor<Block>,
304 ) {
305 self.sync_service.set_sync_fork_request(
306 peers.into_iter().map(From::from).collect(),
307 hash,
308 number,
309 );
310 }
311
312 pub fn generate_blocks<F>(
314 &mut self,
315 count: usize,
316 origin: BlockOrigin,
317 edit_block: F,
318 ) -> Vec<H256>
319 where
320 F: FnMut(BlockBuilder<Block, PeersFullClient>) -> Block,
321 {
322 let best_hash = self.client.info().best_hash;
323 self.generate_blocks_at(
324 BlockId::Hash(best_hash),
325 count,
326 origin,
327 edit_block,
328 false,
329 true,
330 true,
331 ForkChoiceStrategy::LongestChain,
332 )
333 }
334
335 pub fn generate_blocks_with_fork_choice<F>(
337 &mut self,
338 count: usize,
339 origin: BlockOrigin,
340 edit_block: F,
341 fork_choice: ForkChoiceStrategy,
342 ) -> Vec<H256>
343 where
344 F: FnMut(BlockBuilder<Block, PeersFullClient>) -> Block,
345 {
346 let best_hash = self.client.info().best_hash;
347 self.generate_blocks_at(
348 BlockId::Hash(best_hash),
349 count,
350 origin,
351 edit_block,
352 false,
353 true,
354 true,
355 fork_choice,
356 )
357 }
358
359 pub fn generate_blocks_at<F>(
362 &mut self,
363 at: BlockId<Block>,
364 count: usize,
365 origin: BlockOrigin,
366 mut edit_block: F,
367 headers_only: bool,
368 inform_sync_about_new_best_block: bool,
369 announce_block: bool,
370 fork_choice: ForkChoiceStrategy,
371 ) -> Vec<H256>
372 where
373 F: FnMut(BlockBuilder<Block, PeersFullClient>) -> Block,
374 {
375 let mut hashes = Vec::with_capacity(count);
376 let full_client = self.client.as_client();
377 let mut at = full_client.block_hash_from_id(&at).unwrap().unwrap();
378 for _ in 0..count {
379 let builder = BlockBuilderBuilder::new(&*full_client)
380 .on_parent_block(at)
381 .fetch_parent_block_number(&*full_client)
382 .unwrap()
383 .build()
384 .unwrap();
385 let block = edit_block(builder);
386 let hash = block.header.hash();
387 trace!(
388 target: "test_network",
389 "Generating {}, (#{}, parent={})",
390 hash,
391 block.header.number,
392 block.header.parent_hash,
393 );
394 let header = block.header.clone();
395 let mut import_block = BlockImportParams::new(origin, header.clone());
396 import_block.body = if headers_only { None } else { Some(block.extrinsics) };
397 import_block.fork_choice = Some(fork_choice);
398 let import_block =
399 futures::executor::block_on(self.verifier.verify(import_block)).unwrap();
400
401 futures::executor::block_on(self.block_import.import_block(import_block))
402 .expect("block_import failed");
403 hashes.push(hash);
404 at = hash;
405 }
406
407 if announce_block {
408 self.sync_service.announce_block(at, None);
409 }
410
411 if inform_sync_about_new_best_block {
412 self.sync_service.new_best_block_imported(
413 at,
414 *full_client.header(at).ok().flatten().unwrap().number(),
415 );
416 }
417 hashes
418 }
419
420 pub fn push_blocks(&mut self, count: usize, with_tx: bool) -> Vec<H256> {
422 let best_hash = self.client.info().best_hash;
423 self.push_blocks_at(BlockId::Hash(best_hash), count, with_tx)
424 }
425
426 pub fn push_headers(&mut self, count: usize) -> Vec<H256> {
428 let best_hash = self.client.info().best_hash;
429 self.generate_tx_blocks_at(BlockId::Hash(best_hash), count, false, true, true, true)
430 }
431
432 pub fn push_blocks_at(&mut self, at: BlockId<Block>, count: usize, with_tx: bool) -> Vec<H256> {
435 self.generate_tx_blocks_at(at, count, with_tx, false, true, true)
436 }
437
438 pub fn push_blocks_at_without_informing_sync(
441 &mut self,
442 at: BlockId<Block>,
443 count: usize,
444 with_tx: bool,
445 announce_block: bool,
446 ) -> Vec<H256> {
447 self.generate_tx_blocks_at(at, count, with_tx, false, false, announce_block)
448 }
449
450 pub fn push_blocks_at_without_announcing(
453 &mut self,
454 at: BlockId<Block>,
455 count: usize,
456 with_tx: bool,
457 ) -> Vec<H256> {
458 self.generate_tx_blocks_at(at, count, with_tx, false, true, false)
459 }
460
461 fn generate_tx_blocks_at(
464 &mut self,
465 at: BlockId<Block>,
466 count: usize,
467 with_tx: bool,
468 headers_only: bool,
469 inform_sync_about_new_best_block: bool,
470 announce_block: bool,
471 ) -> Vec<H256> {
472 let mut nonce = 0;
473 if with_tx {
474 self.generate_blocks_at(
475 at,
476 count,
477 BlockOrigin::File,
478 |mut builder| {
479 let transfer = Transfer {
480 from: Sr25519Keyring::Alice.into(),
481 to: Sr25519Keyring::Alice.into(),
482 amount: 1,
483 nonce,
484 };
485 builder.push(transfer.into_unchecked_extrinsic()).unwrap();
486 nonce += 1;
487 builder.build().unwrap().block
488 },
489 headers_only,
490 inform_sync_about_new_best_block,
491 announce_block,
492 ForkChoiceStrategy::LongestChain,
493 )
494 } else {
495 self.generate_blocks_at(
496 at,
497 count,
498 BlockOrigin::File,
499 |builder| builder.build().unwrap().block,
500 headers_only,
501 inform_sync_about_new_best_block,
502 announce_block,
503 ForkChoiceStrategy::LongestChain,
504 )
505 }
506 }
507
508 pub fn client(&self) -> &PeersClient {
510 &self.client
511 }
512
513 pub fn network_service(&self) -> &Arc<NetworkService<Block, <Block as BlockT>::Hash>> {
515 self.network.service()
516 }
517
518 pub fn sync_service(&self) -> &Arc<SyncingService<Block>> {
520 &self.sync_service
521 }
522
523 pub fn take_notification_service(
525 &mut self,
526 protocol: &ProtocolName,
527 ) -> Option<Box<dyn NotificationService>> {
528 self.notification_services.remove(protocol)
529 }
530
531 pub fn network(&self) -> &NetworkWorker<Block, <Block as BlockT>::Hash> {
533 &self.network
534 }
535
536 pub fn blockchain_canon_equals(&self, other: &Self) -> bool {
539 if let (Some(mine), Some(others)) = (self.backend.clone(), other.backend.clone()) {
540 mine.blockchain().info().best_hash == others.blockchain().info().best_hash
541 } else {
542 false
543 }
544 }
545
546 pub fn blocks_count(&self) -> u64 {
548 self.backend
549 .as_ref()
550 .map(|backend| backend.blockchain().info().best_number)
551 .unwrap_or(0)
552 }
553
554 pub fn failed_verifications(&self) -> HashMap<<Block as BlockT>::Hash, String> {
556 self.verifier.failed_verifications.lock().clone()
557 }
558
559 pub fn has_block(&self, hash: H256) -> bool {
560 self.backend
561 .as_ref()
562 .map(|backend| backend.blockchain().header(hash).unwrap().is_some())
563 .unwrap_or(false)
564 }
565
566 pub fn has_body(&self, hash: H256) -> bool {
567 self.backend
568 .as_ref()
569 .map(|backend| backend.blockchain().body(hash).unwrap().is_some())
570 .unwrap_or(false)
571 }
572}
573
574pub trait BlockImportAdapterFull:
575 BlockImport<Block, Error = ConsensusError> + Send + Sync + Clone
576{
577}
578
579impl<T> BlockImportAdapterFull for T where
580 T: BlockImport<Block, Error = ConsensusError> + Send + Sync + Clone
581{
582}
583
584#[derive(Clone)]
590pub struct BlockImportAdapter<I> {
591 inner: I,
592}
593
594impl<I> BlockImportAdapter<I> {
595 pub fn new(inner: I) -> Self {
597 Self { inner }
598 }
599}
600
601#[async_trait::async_trait]
602impl<I> BlockImport<Block> for BlockImportAdapter<I>
603where
604 I: BlockImport<Block, Error = ConsensusError> + Send + Sync,
605{
606 type Error = ConsensusError;
607
608 async fn check_block(
609 &self,
610 block: BlockCheckParams<Block>,
611 ) -> Result<ImportResult, Self::Error> {
612 self.inner.check_block(block).await
613 }
614
615 async fn import_block(
616 &self,
617 block: BlockImportParams<Block>,
618 ) -> Result<ImportResult, Self::Error> {
619 self.inner.import_block(block).await
620 }
621}
622
623struct VerifierAdapter<B: BlockT> {
625 verifier: Arc<futures::lock::Mutex<Box<dyn Verifier<B>>>>,
626 failed_verifications: Arc<Mutex<HashMap<B::Hash, String>>>,
627}
628
629#[async_trait::async_trait]
630impl<B: BlockT> Verifier<B> for VerifierAdapter<B> {
631 async fn verify(&self, block: BlockImportParams<B>) -> Result<BlockImportParams<B>, String> {
632 let hash = block.header.hash();
633 self.verifier.lock().await.verify(block).await.inspect_err(|e| {
634 self.failed_verifications.lock().insert(hash, e.clone());
635 })
636 }
637}
638
639impl<B: BlockT> Clone for VerifierAdapter<B> {
640 fn clone(&self) -> Self {
641 Self {
642 verifier: self.verifier.clone(),
643 failed_verifications: self.failed_verifications.clone(),
644 }
645 }
646}
647
648impl<B: BlockT> VerifierAdapter<B> {
649 fn new(verifier: impl Verifier<B> + 'static) -> Self {
650 VerifierAdapter {
651 verifier: Arc::new(futures::lock::Mutex::new(Box::new(verifier))),
652 failed_verifications: Default::default(),
653 }
654 }
655}
656
657struct TestWarpSyncProvider<B: BlockT>(Arc<dyn HeaderBackend<B>>);
658
659impl<B: BlockT> WarpSyncProvider<B> for TestWarpSyncProvider<B> {
660 fn generate(
661 &self,
662 _start: B::Hash,
663 ) -> Result<EncodedProof, Box<dyn std::error::Error + Send + Sync>> {
664 let info = self.0.info();
665 let best_header = self.0.header(info.best_hash).unwrap().unwrap();
666 Ok(EncodedProof(best_header.encode()))
667 }
668 fn verify(
669 &self,
670 proof: &EncodedProof,
671 _set_id: SetId,
672 _authorities: AuthorityList,
673 ) -> Result<VerificationResult<B>, Box<dyn std::error::Error + Send + Sync>> {
674 let EncodedProof(encoded) = proof;
675 let header = B::Header::decode(&mut encoded.as_slice()).unwrap();
676 Ok(VerificationResult::Complete(0, Default::default(), header))
677 }
678 fn current_authorities(&self) -> AuthorityList {
679 Default::default()
680 }
681}
682
683#[derive(Default)]
685pub struct FullPeerConfig {
686 pub blocks_pruning: Option<u32>,
690 pub block_announce_validator: Option<Box<dyn BlockAnnounceValidator<Block> + Send + Sync>>,
692 pub notifications_protocols: Vec<ProtocolName>,
694 pub request_response_protocols: Vec<RequestResponseConfig>,
696 pub connect_to_peers: Option<Vec<usize>>,
700 pub is_authority: bool,
702 pub sync_mode: SyncMode,
704 pub extra_storage: Option<sp_core::storage::Storage>,
706 pub storage_chain: bool,
708 pub target_header: Option<<Block as BlockT>::Header>,
710 pub force_genesis: bool,
712}
713
714#[async_trait::async_trait]
715pub trait TestNetFactory: Default + Sized + Send {
716 type Verifier: 'static + Verifier<Block>;
717 type BlockImport: BlockImport<Block, Error = ConsensusError> + Clone + Send + Sync + 'static;
718 type PeerData: Default + Send;
719
720 fn make_verifier(&self, client: PeersClient, peer_data: &Self::PeerData) -> Self::Verifier;
722
723 fn peer(&mut self, i: usize) -> &mut Peer<Self::PeerData, Self::BlockImport>;
725 fn peers(&self) -> &Vec<Peer<Self::PeerData, Self::BlockImport>>;
726 fn peers_mut(&mut self) -> &mut Vec<Peer<Self::PeerData, Self::BlockImport>>;
727 fn mut_peers<F: FnOnce(&mut Vec<Peer<Self::PeerData, Self::BlockImport>>)>(
728 &mut self,
729 closure: F,
730 );
731
732 fn make_block_import(
734 &self,
735 client: PeersClient,
736 ) -> (
737 BlockImportAdapter<Self::BlockImport>,
738 Option<BoxJustificationImport<Block>>,
739 Self::PeerData,
740 );
741
742 fn new(n: usize) -> Self {
744 trace!(target: "test_network", "Creating test network");
745 let mut net = Self::default();
746
747 for i in 0..n {
748 trace!(target: "test_network", "Adding peer {}", i);
749 net.add_full_peer();
750 }
751 net
752 }
753
754 fn add_full_peer(&mut self) {
755 self.add_full_peer_with_config(Default::default())
756 }
757
758 fn add_full_peer_with_config(&mut self, config: FullPeerConfig) {
760 let mut test_client_builder = match (config.blocks_pruning, config.storage_chain) {
761 (Some(blocks_pruning), true) => TestClientBuilder::with_tx_storage(blocks_pruning),
762 (None, true) => TestClientBuilder::with_tx_storage(u32::MAX),
763 (Some(blocks_pruning), false) => TestClientBuilder::with_pruning_window(blocks_pruning),
764 (None, false) => TestClientBuilder::with_default_backend(),
765 };
766 if let Some(storage) = config.extra_storage {
767 let genesis_extra_storage = test_client_builder.genesis_init_mut().extra_storage();
768 *genesis_extra_storage = storage;
769 }
770
771 if !config.force_genesis &&
772 matches!(config.sync_mode, SyncMode::LightState { .. } | SyncMode::Warp)
773 {
774 test_client_builder = test_client_builder.set_no_genesis();
775 }
776 let backend = test_client_builder.backend();
777 let (c, longest_chain) = test_client_builder.build_with_longest_chain();
778 let client = Arc::new(c);
779
780 let (block_import, justification_import, data) = self
781 .make_block_import(PeersClient { client: client.clone(), backend: backend.clone() });
782
783 let verifier = self
784 .make_verifier(PeersClient { client: client.clone(), backend: backend.clone() }, &data);
785 let verifier = VerifierAdapter::new(verifier);
786
787 let import_queue = Box::new(BasicQueue::new(
788 verifier.clone(),
789 Box::new(block_import.clone()),
790 justification_import,
791 &sp_core::testing::TaskExecutor::new(),
792 None,
793 ));
794
795 let listen_addr = build_multiaddr![Memory(rand::random::<u64>())];
796
797 let mut network_config =
798 NetworkConfiguration::new("test-node", "test-client", Default::default(), None);
799 network_config.sync_mode = config.sync_mode;
800 network_config.transport = TransportConfig::MemoryOnly;
801 network_config.listen_addresses = vec![listen_addr.clone()];
802 network_config.allow_non_globals_in_dht = true;
803
804 let (notif_configs, notif_handles): (Vec<_>, Vec<_>) = config
805 .notifications_protocols
806 .into_iter()
807 .map(|p| {
808 let (config, handle) = NonDefaultSetConfig::new(
809 p.clone(),
810 Vec::new(),
811 1024 * 1024,
812 None,
813 Default::default(),
814 );
815
816 (config, (p, handle))
817 })
818 .unzip();
819
820 if let Some(connect_to) = config.connect_to_peers {
821 let addrs = connect_to
822 .iter()
823 .map(|v| {
824 let peer_id = self.peer(*v).network_service().local_peer_id();
825 let multiaddr = self.peer(*v).listen_addr.clone();
826 MultiaddrWithPeerId { peer_id, multiaddr }
827 })
828 .collect();
829 network_config.default_peers_set.reserved_nodes = addrs;
830 network_config.default_peers_set.non_reserved_mode = NonReservedPeerMode::Deny;
831 }
832 let mut full_net_config = FullNetworkConfiguration::new(&network_config, None);
833
834 let protocol_id = ProtocolId::from("test-protocol-name");
835
836 let fork_id = Some(String::from("test-fork-id"));
837
838 let chain_sync_network_provider = NetworkServiceProvider::new();
839 let chain_sync_network_handle = chain_sync_network_provider.handle();
840 let mut block_relay_params = BlockRequestHandler::new::<NetworkWorker<_, _>>(
841 chain_sync_network_handle.clone(),
842 &protocol_id,
843 None,
844 client.clone(),
845 50,
846 );
847 self.spawn_task(Box::pin(async move {
848 block_relay_params.server.run().await;
849 }));
850
851 let state_request_protocol_config = {
852 let (handler, protocol_config) = StateRequestHandler::new::<NetworkWorker<_, _>>(
853 &protocol_id,
854 None,
855 client.clone(),
856 50,
857 );
858 self.spawn_task(handler.run().boxed());
859 protocol_config
860 };
861
862 let light_client_request_protocol_config =
863 {
864 let (handler, protocol_config) = LightClientRequestHandler::new::<
865 NetworkWorker<_, _>,
866 >(&protocol_id, None, client.clone());
867 self.spawn_task(handler.run().boxed());
868 protocol_config
869 };
870
871 let warp_sync = Arc::new(TestWarpSyncProvider(client.clone()));
872
873 let warp_sync_config = match config.target_header {
874 Some(target_header) => WarpSyncConfig::WithTarget(target_header),
875 _ => WarpSyncConfig::WithProvider(warp_sync.clone()),
876 };
877
878 let warp_protocol_config = {
879 let (handler, protocol_config) =
880 warp_request_handler::RequestHandler::new::<_, NetworkWorker<_, _>>(
881 protocol_id.clone(),
882 client
883 .block_hash(0u32.into())
884 .ok()
885 .flatten()
886 .expect("Genesis block exists; qed"),
887 None,
888 warp_sync.clone(),
889 );
890 self.spawn_task(handler.run().boxed());
891 protocol_config
892 };
893
894 let peer_store = PeerStore::new(
895 network_config
896 .boot_nodes
897 .iter()
898 .map(|bootnode| bootnode.peer_id.into())
899 .collect(),
900 None,
901 );
902 let peer_store_handle = Arc::new(peer_store.handle());
903 self.spawn_task(peer_store.run().boxed());
904
905 let block_announce_validator = config
906 .block_announce_validator
907 .unwrap_or_else(|| Box::new(DefaultBlockAnnounceValidator));
908 let metrics = <NetworkWorker<_, _> as sc_network::NetworkBackend<
909 Block,
910 <Block as BlockT>::Hash,
911 >>::register_notification_metrics(None);
912
913 let syncing_config = PolkadotSyncingStrategyConfig {
914 mode: network_config.sync_mode,
915 max_parallel_downloads: network_config.max_parallel_downloads,
916 max_blocks_per_request: network_config.max_blocks_per_request,
917 metrics_registry: None,
918 state_request_protocol_name: state_request_protocol_config.name.clone(),
919 block_downloader: block_relay_params.downloader,
920 min_peers_to_start_warp_sync: None,
921 };
922 let syncing_strategy = Box::new(
924 PolkadotSyncingStrategy::new(
925 syncing_config,
926 client.clone(),
927 Some(warp_sync_config),
928 Some(warp_protocol_config.name.clone()),
929 )
930 .unwrap(),
931 );
932
933 let (engine, sync_service, block_announce_config) =
934 sc_network_sync::engine::SyncingEngine::new(
935 Roles::from(if config.is_authority { &Role::Authority } else { &Role::Full }),
936 client.clone(),
937 None,
938 metrics,
939 &full_net_config,
940 protocol_id.clone(),
941 fork_id.as_deref(),
942 block_announce_validator,
943 syncing_strategy,
944 chain_sync_network_handle,
945 import_queue.service(),
946 peer_store_handle.clone(),
947 )
948 .unwrap();
949 let sync_service = Arc::new(sync_service.clone());
950
951 for config in config.request_response_protocols {
952 full_net_config.add_request_response_protocol(config);
953 }
954 for config in [
955 block_relay_params.request_response_config,
956 state_request_protocol_config,
957 light_client_request_protocol_config,
958 warp_protocol_config,
959 ] {
960 full_net_config.add_request_response_protocol(config);
961 }
962
963 for config in notif_configs {
964 full_net_config.add_notification_protocol(config);
965 }
966
967 let genesis_hash =
968 client.hash(Zero::zero()).ok().flatten().expect("Genesis block exists; qed");
969 let network = NetworkWorker::new(sc_network::config::Params {
970 role: if config.is_authority { Role::Authority } else { Role::Full },
971 executor: Box::new(|f| {
972 tokio::spawn(f);
973 }),
974 network_config: full_net_config,
975 genesis_hash,
976 protocol_id,
977 fork_id,
978 metrics_registry: None,
979 block_announce_config,
980 bitswap_config: None,
981 notification_metrics: NotificationMetrics::new(None),
982 })
983 .unwrap();
984
985 trace!(target: "test_network", "Peer identifier: {}", network.service().local_peer_id());
986
987 let service = network.service().clone();
988 tokio::spawn(async move {
989 chain_sync_network_provider.run(service).await;
990 });
991
992 tokio::spawn({
993 let sync_service = sync_service.clone();
994
995 async move {
996 import_queue.run(sync_service.as_ref()).await;
997 }
998 });
999
1000 tokio::spawn(async move {
1001 engine.run().await;
1002 });
1003
1004 self.mut_peers(move |peers| {
1005 for peer in peers.iter_mut() {
1006 peer.network.add_known_address(
1007 network.service().local_peer_id().into(),
1008 listen_addr.clone().into(),
1009 );
1010 }
1011
1012 let imported_blocks_stream = Box::pin(client.import_notification_stream().fuse());
1013 let finality_notification_stream =
1014 Box::pin(client.finality_notification_stream().fuse());
1015
1016 peers.push(Peer {
1017 data,
1018 client: PeersClient { client: client.clone(), backend: backend.clone() },
1019 select_chain: Some(longest_chain),
1020 backend: Some(backend),
1021 imported_blocks_stream,
1022 finality_notification_stream,
1023 notification_services: HashMap::from_iter(notif_handles.into_iter()),
1024 block_import,
1025 verifier,
1026 network,
1027 sync_service,
1028 listen_addr,
1029 });
1030 });
1031 }
1032
1033 fn spawn_task(&self, f: BoxFuture<'static, ()>) {
1035 tokio::spawn(f);
1036 }
1037
1038 async fn is_in_sync(&mut self) -> bool {
1039 let mut highest = None;
1040 let peers = self.peers_mut();
1041
1042 for peer in peers {
1043 if peer.sync_service.is_major_syncing() ||
1044 peer.sync_service.status().await.unwrap().queued_blocks != 0
1045 {
1046 return false
1047 }
1048 if peer.sync_service.num_sync_requests().await.unwrap() != 0 {
1049 return false
1050 }
1051 match (highest, peer.client.info().best_hash) {
1052 (None, b) => highest = Some(b),
1053 (Some(ref a), ref b) if a == b => {},
1054 (Some(_), _) => return false,
1055 }
1056 }
1057
1058 true
1059 }
1060
1061 async fn is_idle(&mut self) -> bool {
1062 let peers = self.peers_mut();
1063 for peer in peers {
1064 if peer.sync_service.status().await.unwrap().queued_blocks != 0 {
1065 return false
1066 }
1067 if peer.sync_service.num_sync_requests().await.unwrap() != 0 {
1068 return false
1069 }
1070 }
1071
1072 true
1073 }
1074
1075 async fn run_until_sync(&mut self) {
1080 timeout(Duration::from_secs(10 * 60), async {
1081 loop {
1082 futures::future::poll_fn::<(), _>(|cx| {
1083 self.poll(cx);
1084 Poll::Ready(())
1085 })
1086 .await;
1087
1088 if self.is_in_sync().await {
1089 break
1090 }
1091 }
1092 })
1093 .await
1094 .expect("sync didn't happen within 10 mins");
1095 }
1096
1097 async fn run_until_idle(&mut self) {
1101 loop {
1102 futures::future::poll_fn::<(), _>(|cx| {
1103 self.poll(cx);
1104 Poll::Ready(())
1105 })
1106 .await;
1107
1108 if self.is_idle().await {
1109 break
1110 }
1111 }
1112 }
1113
1114 async fn run_until_connected(&mut self) {
1116 let num_peers = self.peers().len();
1117 let sync_services =
1118 self.peers().iter().map(|info| info.sync_service.clone()).collect::<Vec<_>>();
1119
1120 'outer: loop {
1121 for sync_service in &sync_services {
1122 if sync_service.num_connected_peers() != num_peers - 1 {
1123 futures::future::poll_fn::<(), _>(|cx| {
1124 self.poll(cx);
1125 Poll::Ready(())
1126 })
1127 .await;
1128 continue 'outer
1129 }
1130 }
1131
1132 break
1133 }
1134 }
1135
1136 fn poll(&mut self, cx: &mut FutureContext) {
1138 self.mut_peers(|peers| {
1139 for (i, peer) in peers.iter_mut().enumerate() {
1140 trace!(target: "sync", "-- Polling {}: {}", i, peer.id());
1141 loop {
1142 let net_poll_future = peer.network.next_action();
1148 pin_mut!(net_poll_future);
1149 if let Poll::Pending = net_poll_future.poll(cx) {
1150 break
1151 }
1152 }
1153 trace!(target: "sync", "-- Polling complete {}: {}", i, peer.id());
1154
1155 while let Poll::Ready(Some(notification)) =
1157 peer.imported_blocks_stream.as_mut().poll_next(cx)
1158 {
1159 peer.sync_service.announce_block(notification.hash, None);
1160 }
1161
1162 while let Poll::Ready(Some(notification)) =
1164 peer.finality_notification_stream.as_mut().poll_next(cx)
1165 {
1166 peer.sync_service.on_block_finalized(notification.hash, notification.header);
1167 }
1168 }
1169 });
1170 }
1171}
1172
1173#[derive(Default)]
1174pub struct TestNet {
1175 peers: Vec<Peer<(), PeersClient>>,
1176}
1177
1178impl TestNetFactory for TestNet {
1179 type Verifier = PassThroughVerifier;
1180 type PeerData = ();
1181 type BlockImport = PeersClient;
1182
1183 fn make_verifier(&self, _client: PeersClient, _peer_data: &()) -> Self::Verifier {
1184 PassThroughVerifier::new(false)
1185 }
1186
1187 fn make_block_import(
1188 &self,
1189 client: PeersClient,
1190 ) -> (
1191 BlockImportAdapter<Self::BlockImport>,
1192 Option<BoxJustificationImport<Block>>,
1193 Self::PeerData,
1194 ) {
1195 (client.as_block_import(), None, ())
1196 }
1197
1198 fn peer(&mut self, i: usize) -> &mut Peer<(), Self::BlockImport> {
1199 &mut self.peers[i]
1200 }
1201
1202 fn peers(&self) -> &Vec<Peer<(), Self::BlockImport>> {
1203 &self.peers
1204 }
1205
1206 fn peers_mut(&mut self) -> &mut Vec<Peer<(), Self::BlockImport>> {
1207 &mut self.peers
1208 }
1209
1210 fn mut_peers<F: FnOnce(&mut Vec<Peer<(), Self::BlockImport>>)>(&mut self, closure: F) {
1211 closure(&mut self.peers);
1212 }
1213}
1214
1215pub struct ForceFinalized(PeersClient);
1216
1217#[async_trait::async_trait]
1218impl JustificationImport<Block> for ForceFinalized {
1219 type Error = ConsensusError;
1220
1221 async fn on_start(&mut self) -> Vec<(H256, NumberFor<Block>)> {
1222 Vec::new()
1223 }
1224
1225 async fn import_justification(
1226 &mut self,
1227 hash: H256,
1228 _number: NumberFor<Block>,
1229 justification: Justification,
1230 ) -> Result<(), Self::Error> {
1231 self.0
1232 .finalize_block(hash, Some(justification), true)
1233 .map_err(|_| ConsensusError::InvalidJustification)
1234 }
1235}
1236
1237#[derive(Default)]
1238pub struct JustificationTestNet(TestNet);
1239
1240impl TestNetFactory for JustificationTestNet {
1241 type Verifier = PassThroughVerifier;
1242 type PeerData = ();
1243 type BlockImport = PeersClient;
1244
1245 fn make_verifier(&self, client: PeersClient, peer_data: &()) -> Self::Verifier {
1246 self.0.make_verifier(client, peer_data)
1247 }
1248
1249 fn peer(&mut self, i: usize) -> &mut Peer<Self::PeerData, Self::BlockImport> {
1250 self.0.peer(i)
1251 }
1252
1253 fn peers(&self) -> &Vec<Peer<Self::PeerData, Self::BlockImport>> {
1254 self.0.peers()
1255 }
1256
1257 fn peers_mut(&mut self) -> &mut Vec<Peer<Self::PeerData, Self::BlockImport>> {
1258 self.0.peers_mut()
1259 }
1260
1261 fn mut_peers<F: FnOnce(&mut Vec<Peer<Self::PeerData, Self::BlockImport>>)>(
1262 &mut self,
1263 closure: F,
1264 ) {
1265 self.0.mut_peers(closure)
1266 }
1267
1268 fn make_block_import(
1269 &self,
1270 client: PeersClient,
1271 ) -> (
1272 BlockImportAdapter<Self::BlockImport>,
1273 Option<BoxJustificationImport<Block>>,
1274 Self::PeerData,
1275 ) {
1276 (client.as_block_import(), Some(Box::new(ForceFinalized(client))), Default::default())
1277 }
1278}