1use crate::{
22 schema::v1::{StateRequest, StateResponse},
23 service::network::NetworkServiceHandle,
24 strategy::{
25 disconnected_peers::DisconnectedPeers,
26 state_sync::{ImportResult, StateSync, StateSyncProvider},
27 StrategyKey, SyncingAction,
28 },
29 types::{BadPeer, SyncState, SyncStatus},
30 LOG_TARGET,
31};
32use futures::{channel::oneshot, FutureExt};
33use log::{debug, error, trace};
34use prost::Message;
35use sc_client_api::ProofProvider;
36use sc_consensus::{BlockImportError, BlockImportStatus, IncomingBlock};
37use sc_network::{IfDisconnected, ProtocolName};
38use sc_network_common::sync::message::BlockAnnounce;
39use sc_network_types::PeerId;
40use sp_consensus::BlockOrigin;
41use sp_runtime::{
42 traits::{Block as BlockT, Header, NumberFor},
43 Justifications, SaturatedConversion,
44};
45use std::{any::Any, collections::HashMap, sync::Arc};
46
47mod rep {
48 use sc_network::ReputationChange as Rep;
49
50 pub const BAD_RESPONSE: Rep = Rep::new(-(1 << 12), "Incomplete response");
52
53 pub const BAD_STATE: Rep = Rep::new(-(1 << 29), "Bad state");
55}
56
57enum PeerState {
58 Available,
59 DownloadingState,
60}
61
62impl PeerState {
63 fn is_available(&self) -> bool {
64 matches!(self, PeerState::Available)
65 }
66}
67
68struct Peer<B: BlockT> {
69 best_number: NumberFor<B>,
70 state: PeerState,
71}
72
73pub struct StateStrategy<B: BlockT> {
75 state_sync: Box<dyn StateSyncProvider<B>>,
76 peers: HashMap<PeerId, Peer<B>>,
77 disconnected_peers: DisconnectedPeers,
78 actions: Vec<SyncingAction<B>>,
79 protocol_name: ProtocolName,
80 succeeded: bool,
81}
82
83impl<B: BlockT> StateStrategy<B> {
84 pub const STRATEGY_KEY: StrategyKey = StrategyKey::new("State");
86
87 pub fn new<Client>(
89 client: Arc<Client>,
90 target_header: B::Header,
91 target_body: Option<Vec<B::Extrinsic>>,
92 target_justifications: Option<Justifications>,
93 skip_proof: bool,
94 initial_peers: impl Iterator<Item = (PeerId, NumberFor<B>)>,
95 protocol_name: ProtocolName,
96 ) -> Self
97 where
98 Client: ProofProvider<B> + Send + Sync + 'static,
99 {
100 let peers = initial_peers
101 .map(|(peer_id, best_number)| {
102 (peer_id, Peer { best_number, state: PeerState::Available })
103 })
104 .collect();
105 Self {
106 state_sync: Box::new(StateSync::new(
107 client,
108 target_header,
109 target_body,
110 target_justifications,
111 skip_proof,
112 )),
113 peers,
114 disconnected_peers: DisconnectedPeers::new(),
115 actions: Vec::new(),
116 protocol_name,
117 succeeded: false,
118 }
119 }
120
121 pub fn new_with_provider(
126 state_sync_provider: Box<dyn StateSyncProvider<B>>,
127 initial_peers: impl Iterator<Item = (PeerId, NumberFor<B>)>,
128 protocol_name: ProtocolName,
129 ) -> Self {
130 Self {
131 state_sync: state_sync_provider,
132 peers: initial_peers
133 .map(|(peer_id, best_number)| {
134 (peer_id, Peer { best_number, state: PeerState::Available })
135 })
136 .collect(),
137 disconnected_peers: DisconnectedPeers::new(),
138 actions: Vec::new(),
139 protocol_name,
140 succeeded: false,
141 }
142 }
143
144 pub fn add_peer(&mut self, peer_id: PeerId, _best_hash: B::Hash, best_number: NumberFor<B>) {
146 self.peers.insert(peer_id, Peer { best_number, state: PeerState::Available });
147 }
148
149 pub fn remove_peer(&mut self, peer_id: &PeerId) {
151 if let Some(state) = self.peers.remove(peer_id) {
152 if !state.state.is_available() {
153 if let Some(bad_peer) =
154 self.disconnected_peers.on_disconnect_during_request(*peer_id)
155 {
156 self.actions.push(SyncingAction::DropPeer(bad_peer));
157 }
158 }
159 }
160 }
161
162 #[must_use]
166 pub fn on_validated_block_announce(
167 &mut self,
168 is_best: bool,
169 peer_id: PeerId,
170 announce: &BlockAnnounce<B::Header>,
171 ) -> Option<(B::Hash, NumberFor<B>)> {
172 is_best.then(|| {
173 let best_number = *announce.header.number();
174 let best_hash = announce.header.hash();
175 if let Some(ref mut peer) = self.peers.get_mut(&peer_id) {
176 peer.best_number = best_number;
177 }
178 (best_hash, best_number)
180 })
181 }
182
183 pub fn on_state_response(&mut self, peer_id: &PeerId, response: Vec<u8>) {
185 if let Err(bad_peer) = self.on_state_response_inner(peer_id, &response) {
186 self.actions.push(SyncingAction::DropPeer(bad_peer));
187 }
188 }
189
190 fn on_state_response_inner(
191 &mut self,
192 peer_id: &PeerId,
193 response: &[u8],
194 ) -> Result<(), BadPeer> {
195 if let Some(peer) = self.peers.get_mut(&peer_id) {
196 peer.state = PeerState::Available;
197 }
198
199 let response = match StateResponse::decode(response) {
200 Ok(response) => response,
201 Err(error) => {
202 debug!(
203 target: LOG_TARGET,
204 "Failed to decode state response from peer {peer_id:?}: {error:?}.",
205 );
206
207 return Err(BadPeer(*peer_id, rep::BAD_RESPONSE));
208 },
209 };
210
211 debug!(
212 target: LOG_TARGET,
213 "Importing state data from {} with {} keys, {} proof nodes.",
214 peer_id,
215 response.entries.len(),
216 response.proof.len(),
217 );
218
219 match self.state_sync.import(response) {
220 ImportResult::Import(hash, header, state, body, justifications) => {
221 let origin = BlockOrigin::NetworkInitialSync;
222 let block = IncomingBlock {
223 hash,
224 header: Some(header),
225 body,
226 indexed_body: None,
227 justifications,
228 origin: None,
229 allow_missing_state: true,
230 import_existing: true,
231 skip_execution: true,
232 state: Some(state),
233 };
234 debug!(target: LOG_TARGET, "State download is complete. Import is queued");
235 self.actions.push(SyncingAction::ImportBlocks { origin, blocks: vec![block] });
236 Ok(())
237 },
238 ImportResult::Continue => Ok(()),
239 ImportResult::BadResponse => {
240 debug!(target: LOG_TARGET, "Bad state data received from {peer_id}");
241 Err(BadPeer(*peer_id, rep::BAD_STATE))
242 },
243 }
244 }
245
246 pub fn on_blocks_processed(
250 &mut self,
251 imported: usize,
252 count: usize,
253 results: Vec<(Result<BlockImportStatus<NumberFor<B>>, BlockImportError>, B::Hash)>,
254 ) {
255 trace!(target: LOG_TARGET, "State sync: imported {imported} of {count}.");
256
257 let results = results
258 .into_iter()
259 .filter_map(|(result, hash)| {
260 if hash == self.state_sync.target_hash() {
261 Some(result)
262 } else {
263 debug!(
264 target: LOG_TARGET,
265 "Unexpected block processed: {hash} with result {result:?}.",
266 );
267 None
268 }
269 })
270 .collect::<Vec<_>>();
271
272 if !results.is_empty() {
273 results.iter().filter_map(|result| result.as_ref().err()).for_each(|e| {
275 error!(
276 target: LOG_TARGET,
277 "Failed to import target block with state: {e:?}."
278 );
279 });
280 self.succeeded |= results.into_iter().any(|result| result.is_ok());
281 self.actions.push(SyncingAction::Finished);
282 }
283 }
284
285 fn state_request(&mut self) -> Option<(PeerId, StateRequest)> {
287 if self.state_sync.is_complete() {
288 return None
289 }
290
291 if self
292 .peers
293 .values()
294 .any(|peer| matches!(peer.state, PeerState::DownloadingState))
295 {
296 return None
298 }
299
300 let peer_id =
301 self.schedule_next_peer(PeerState::DownloadingState, self.state_sync.target_number())?;
302 let request = self.state_sync.next_request();
303 trace!(
304 target: LOG_TARGET,
305 "New state request to {peer_id}: {request:?}.",
306 );
307 Some((peer_id, request))
308 }
309
310 fn schedule_next_peer(
311 &mut self,
312 new_state: PeerState,
313 min_best_number: NumberFor<B>,
314 ) -> Option<PeerId> {
315 let mut targets: Vec<_> = self.peers.values().map(|p| p.best_number).collect();
316 if targets.is_empty() {
317 return None
318 }
319 targets.sort();
320 let median = targets[targets.len() / 2];
321 let threshold = std::cmp::max(median, min_best_number);
322 for (peer_id, peer) in self.peers.iter_mut() {
325 if peer.state.is_available() &&
326 peer.best_number >= threshold &&
327 self.disconnected_peers.is_peer_available(peer_id)
328 {
329 peer.state = new_state;
330 return Some(*peer_id)
331 }
332 }
333 None
334 }
335
336 pub fn status(&self) -> SyncStatus<B> {
338 SyncStatus {
339 state: if self.state_sync.is_complete() {
340 SyncState::Idle
341 } else {
342 SyncState::Downloading { target: self.state_sync.target_number() }
343 },
344 best_seen_block: Some(self.state_sync.target_number()),
345 num_peers: self.peers.len().saturated_into(),
346 queued_blocks: 0,
347 state_sync: Some(self.state_sync.progress()),
348 warp_sync: None,
349 }
350 }
351
352 #[must_use]
354 pub fn actions(
355 &mut self,
356 network_service: &NetworkServiceHandle,
357 ) -> impl Iterator<Item = SyncingAction<B>> {
358 let state_request = self.state_request().into_iter().map(|(peer_id, request)| {
359 let (tx, rx) = oneshot::channel();
360
361 network_service.start_request(
362 peer_id,
363 self.protocol_name.clone(),
364 request.encode_to_vec(),
365 tx,
366 IfDisconnected::ImmediateError,
367 );
368
369 SyncingAction::StartRequest {
370 peer_id,
371 key: Self::STRATEGY_KEY,
372 request: async move {
373 Ok(rx.await?.and_then(|(response, protocol_name)| {
374 Ok((Box::new(response) as Box<dyn Any + Send>, protocol_name))
375 }))
376 }
377 .boxed(),
378 remove_obsolete: false,
379 }
380 });
381 self.actions.extend(state_request);
382
383 std::mem::take(&mut self.actions).into_iter()
384 }
385
386 #[must_use]
388 pub fn is_succeeded(&self) -> bool {
389 self.succeeded
390 }
391}
392
393#[cfg(test)]
394mod test {
395 use super::*;
396 use crate::{
397 schema::v1::{StateRequest, StateResponse},
398 service::network::NetworkServiceProvider,
399 strategy::state_sync::{ImportResult, StateSyncProgress, StateSyncProvider},
400 };
401 use codec::Decode;
402 use sc_block_builder::BlockBuilderBuilder;
403 use sc_client_api::KeyValueStates;
404 use sc_consensus::{ImportedAux, ImportedState};
405 use sp_core::H256;
406 use sp_runtime::traits::Zero;
407 use substrate_test_runtime_client::{
408 runtime::{Block, Hash},
409 BlockBuilderExt, DefaultTestClientBuilderExt, TestClientBuilder, TestClientBuilderExt,
410 };
411
412 mockall::mock! {
413 pub StateSync<B: BlockT> {}
414
415 impl<B: BlockT> StateSyncProvider<B> for StateSync<B> {
416 fn import(&mut self, response: StateResponse) -> ImportResult<B>;
417 fn next_request(&self) -> StateRequest;
418 fn is_complete(&self) -> bool;
419 fn target_number(&self) -> NumberFor<B>;
420 fn target_hash(&self) -> B::Hash;
421 fn progress(&self) -> StateSyncProgress;
422 }
423 }
424
425 #[test]
426 fn no_peer_is_scheduled_if_no_peers_connected() {
427 let client = Arc::new(TestClientBuilder::new().set_no_genesis().build());
428 let target_block = BlockBuilderBuilder::new(&*client)
429 .on_parent_block(client.chain_info().best_hash)
430 .with_parent_block_number(client.chain_info().best_number)
431 .build()
432 .unwrap()
433 .build()
434 .unwrap()
435 .block;
436 let target_header = target_block.header().clone();
437
438 let mut state_strategy = StateStrategy::new(
439 client,
440 target_header,
441 None,
442 None,
443 false,
444 std::iter::empty(),
445 ProtocolName::Static(""),
446 );
447
448 assert!(state_strategy
449 .schedule_next_peer(PeerState::DownloadingState, Zero::zero())
450 .is_none());
451 }
452
453 #[test]
454 fn at_least_median_synced_peer_is_scheduled() {
455 let client = Arc::new(TestClientBuilder::new().set_no_genesis().build());
456 let target_block = BlockBuilderBuilder::new(&*client)
457 .on_parent_block(client.chain_info().best_hash)
458 .with_parent_block_number(client.chain_info().best_number)
459 .build()
460 .unwrap()
461 .build()
462 .unwrap()
463 .block;
464
465 for _ in 0..100 {
466 let peers = (1..=10)
467 .map(|best_number| (PeerId::random(), best_number))
468 .collect::<HashMap<_, _>>();
469 let initial_peers = peers.iter().map(|(p, n)| (*p, *n));
470
471 let mut state_strategy = StateStrategy::new(
472 client.clone(),
473 target_block.header().clone(),
474 None,
475 None,
476 false,
477 initial_peers,
478 ProtocolName::Static(""),
479 );
480
481 let peer_id =
482 state_strategy.schedule_next_peer(PeerState::DownloadingState, Zero::zero());
483 assert!(*peers.get(&peer_id.unwrap()).unwrap() >= 6);
484 }
485 }
486
487 #[test]
488 fn min_best_number_peer_is_scheduled() {
489 let client = Arc::new(TestClientBuilder::new().set_no_genesis().build());
490 let target_block = BlockBuilderBuilder::new(&*client)
491 .on_parent_block(client.chain_info().best_hash)
492 .with_parent_block_number(client.chain_info().best_number)
493 .build()
494 .unwrap()
495 .build()
496 .unwrap()
497 .block;
498
499 for _ in 0..10 {
500 let peers = (1..=10)
501 .map(|best_number| (PeerId::random(), best_number))
502 .collect::<HashMap<_, _>>();
503 let initial_peers = peers.iter().map(|(p, n)| (*p, *n));
504
505 let mut state_strategy = StateStrategy::new(
506 client.clone(),
507 target_block.header().clone(),
508 None,
509 None,
510 false,
511 initial_peers,
512 ProtocolName::Static(""),
513 );
514
515 let peer_id = state_strategy.schedule_next_peer(PeerState::DownloadingState, 10);
516 assert!(*peers.get(&peer_id.unwrap()).unwrap() == 10);
517 }
518 }
519
520 #[test]
521 fn backedoff_number_peer_is_not_scheduled() {
522 let client = Arc::new(TestClientBuilder::new().set_no_genesis().build());
523 let target_block = BlockBuilderBuilder::new(&*client)
524 .on_parent_block(client.chain_info().best_hash)
525 .with_parent_block_number(client.chain_info().best_number)
526 .build()
527 .unwrap()
528 .build()
529 .unwrap()
530 .block;
531
532 let peers = (1..=10)
533 .map(|best_number| (PeerId::random(), best_number))
534 .collect::<Vec<(_, _)>>();
535 let ninth_peer = peers[8].0;
536 let tenth_peer = peers[9].0;
537 let initial_peers = peers.iter().map(|(p, n)| (*p, *n));
538
539 let mut state_strategy = StateStrategy::new(
540 client.clone(),
541 target_block.header().clone(),
542 None,
543 None,
544 false,
545 initial_peers,
546 ProtocolName::Static(""),
547 );
548
549 state_strategy.remove_peer(&tenth_peer);
551 assert!(state_strategy.disconnected_peers.is_peer_available(&tenth_peer));
552
553 state_strategy.add_peer(tenth_peer, H256::random(), 10);
555 let peer_id: Option<PeerId> =
556 state_strategy.schedule_next_peer(PeerState::DownloadingState, 10);
557 assert_eq!(tenth_peer, peer_id.unwrap());
558 state_strategy.remove_peer(&tenth_peer);
559
560 assert!(!state_strategy.disconnected_peers.is_peer_available(&tenth_peer));
562
563 state_strategy.add_peer(tenth_peer, H256::random(), 10);
565 let peer_id: Option<PeerId> =
566 state_strategy.schedule_next_peer(PeerState::DownloadingState, 10);
567 assert!(peer_id.is_none());
568
569 let peer_id: Option<PeerId> =
571 state_strategy.schedule_next_peer(PeerState::DownloadingState, 9);
572 assert_eq!(ninth_peer, peer_id.unwrap());
573 }
574
575 #[test]
576 fn state_request_contains_correct_hash() {
577 let client = Arc::new(TestClientBuilder::new().set_no_genesis().build());
578 let target_block = BlockBuilderBuilder::new(&*client)
579 .on_parent_block(client.chain_info().best_hash)
580 .with_parent_block_number(client.chain_info().best_number)
581 .build()
582 .unwrap()
583 .build()
584 .unwrap()
585 .block;
586
587 let initial_peers = (1..=10).map(|best_number| (PeerId::random(), best_number));
588
589 let mut state_strategy = StateStrategy::new(
590 client.clone(),
591 target_block.header().clone(),
592 None,
593 None,
594 false,
595 initial_peers,
596 ProtocolName::Static(""),
597 );
598
599 let (_peer_id, request) = state_strategy.state_request().unwrap();
600 let hash = Hash::decode(&mut &*request.block).unwrap();
601
602 assert_eq!(hash, target_block.header().hash());
603 }
604
605 #[test]
606 fn no_parallel_state_requests() {
607 let client = Arc::new(TestClientBuilder::new().set_no_genesis().build());
608 let target_block = BlockBuilderBuilder::new(&*client)
609 .on_parent_block(client.chain_info().best_hash)
610 .with_parent_block_number(client.chain_info().best_number)
611 .build()
612 .unwrap()
613 .build()
614 .unwrap()
615 .block;
616
617 let initial_peers = (1..=10).map(|best_number| (PeerId::random(), best_number));
618
619 let mut state_strategy = StateStrategy::new(
620 client.clone(),
621 target_block.header().clone(),
622 None,
623 None,
624 false,
625 initial_peers,
626 ProtocolName::Static(""),
627 );
628
629 assert!(state_strategy.state_request().is_some());
631
632 assert!(state_strategy.state_request().is_none());
634 }
635
636 #[test]
637 fn received_state_response_makes_peer_available_again() {
638 let mut state_sync_provider = MockStateSync::<Block>::new();
639 state_sync_provider.expect_import().return_once(|_| ImportResult::Continue);
640 let peer_id = PeerId::random();
641 let initial_peers = std::iter::once((peer_id, 10));
642 let mut state_strategy = StateStrategy::new_with_provider(
643 Box::new(state_sync_provider),
644 initial_peers,
645 ProtocolName::Static(""),
646 );
647 state_strategy.peers.get_mut(&peer_id).unwrap().state = PeerState::DownloadingState;
649
650 let dummy_response = StateResponse::default().encode_to_vec();
651 state_strategy.on_state_response(&peer_id, dummy_response);
652
653 assert!(state_strategy.peers.get(&peer_id).unwrap().state.is_available());
654 }
655
656 #[test]
657 fn bad_state_response_drops_peer() {
658 let mut state_sync_provider = MockStateSync::<Block>::new();
659 state_sync_provider.expect_import().return_once(|_| ImportResult::BadResponse);
661 let peer_id = PeerId::random();
662 let initial_peers = std::iter::once((peer_id, 10));
663 let mut state_strategy = StateStrategy::new_with_provider(
664 Box::new(state_sync_provider),
665 initial_peers,
666 ProtocolName::Static(""),
667 );
668 state_strategy.peers.get_mut(&peer_id).unwrap().state = PeerState::DownloadingState;
670 let dummy_response = StateResponse::default().encode_to_vec();
671 assert!(matches!(
673 state_strategy.on_state_response_inner(&peer_id, &dummy_response),
674 Err(BadPeer(id, _rep)) if id == peer_id,
675 ));
676 }
677
678 #[test]
679 fn partial_state_response_doesnt_generate_actions() {
680 let mut state_sync_provider = MockStateSync::<Block>::new();
681 state_sync_provider.expect_import().return_once(|_| ImportResult::Continue);
683 let peer_id = PeerId::random();
684 let initial_peers = std::iter::once((peer_id, 10));
685 let mut state_strategy = StateStrategy::new_with_provider(
686 Box::new(state_sync_provider),
687 initial_peers,
688 ProtocolName::Static(""),
689 );
690 state_strategy.peers.get_mut(&peer_id).unwrap().state = PeerState::DownloadingState;
692
693 let dummy_response = StateResponse::default().encode_to_vec();
694 state_strategy.on_state_response(&peer_id, dummy_response);
695
696 assert_eq!(state_strategy.actions.len(), 0)
698 }
699
700 #[test]
701 fn complete_state_response_leads_to_block_import() {
702 let client = Arc::new(TestClientBuilder::new().set_no_genesis().build());
704 let mut block_builder = BlockBuilderBuilder::new(&*client)
705 .on_parent_block(client.chain_info().best_hash)
706 .with_parent_block_number(client.chain_info().best_number)
707 .build()
708 .unwrap();
709 block_builder.push_storage_change(vec![1, 2, 3], Some(vec![4, 5, 6])).unwrap();
710 let block = block_builder.build().unwrap().block;
711 let header = block.header().clone();
712 let hash = header.hash();
713 let body = Some(block.extrinsics().iter().cloned().collect::<Vec<_>>());
714 let state = ImportedState { block: hash, state: KeyValueStates(Vec::new()) };
715 let justifications = Some(Justifications::from((*b"FRNK", Vec::new())));
716
717 let mut state_sync_provider = MockStateSync::<Block>::new();
719 let import = ImportResult::Import(
720 hash,
721 header.clone(),
722 state.clone(),
723 body.clone(),
724 justifications.clone(),
725 );
726 state_sync_provider.expect_import().return_once(move |_| import);
727
728 let expected_origin = BlockOrigin::NetworkInitialSync;
730 let expected_block = IncomingBlock {
731 hash,
732 header: Some(header),
733 body,
734 indexed_body: None,
735 justifications,
736 origin: None,
737 allow_missing_state: true,
738 import_existing: true,
739 skip_execution: true,
740 state: Some(state),
741 };
742 let expected_blocks = vec![expected_block];
743
744 let peer_id = PeerId::random();
746 let initial_peers = std::iter::once((peer_id, 10));
747 let mut state_strategy = StateStrategy::new_with_provider(
748 Box::new(state_sync_provider),
749 initial_peers,
750 ProtocolName::Static(""),
751 );
752 state_strategy.peers.get_mut(&peer_id).unwrap().state = PeerState::DownloadingState;
754
755 let dummy_response = StateResponse::default().encode_to_vec();
757 state_strategy.on_state_response(&peer_id, dummy_response);
758
759 assert_eq!(state_strategy.actions.len(), 1);
760 assert!(matches!(
761 &state_strategy.actions[0],
762 SyncingAction::ImportBlocks { origin, blocks }
763 if *origin == expected_origin && *blocks == expected_blocks,
764 ));
765 }
766
767 #[test]
768 fn importing_unknown_block_doesnt_finish_strategy() {
769 let target_hash = Hash::random();
770 let unknown_hash = Hash::random();
771 let mut state_sync_provider = MockStateSync::<Block>::new();
772 state_sync_provider.expect_target_hash().return_const(target_hash);
773
774 let mut state_strategy = StateStrategy::new_with_provider(
775 Box::new(state_sync_provider),
776 std::iter::empty(),
777 ProtocolName::Static(""),
778 );
779
780 state_strategy.on_blocks_processed(
782 1,
783 1,
784 vec![(
785 Ok(BlockImportStatus::ImportedUnknown(1, ImportedAux::default(), None)),
786 unknown_hash,
787 )],
788 );
789
790 assert_eq!(state_strategy.actions.len(), 0);
792 }
793
794 #[test]
795 fn successfully_importing_target_block_finishes_strategy() {
796 let target_hash = Hash::random();
797 let mut state_sync_provider = MockStateSync::<Block>::new();
798 state_sync_provider.expect_target_hash().return_const(target_hash);
799
800 let mut state_strategy = StateStrategy::new_with_provider(
801 Box::new(state_sync_provider),
802 std::iter::empty(),
803 ProtocolName::Static(""),
804 );
805
806 state_strategy.on_blocks_processed(
808 1,
809 1,
810 vec![(
811 Ok(BlockImportStatus::ImportedUnknown(1, ImportedAux::default(), None)),
812 target_hash,
813 )],
814 );
815
816 assert_eq!(state_strategy.actions.len(), 1);
818 assert!(matches!(&state_strategy.actions[0], SyncingAction::Finished));
819 }
820
821 #[test]
822 fn failure_to_import_target_block_finishes_strategy() {
823 let target_hash = Hash::random();
824 let mut state_sync_provider = MockStateSync::<Block>::new();
825 state_sync_provider.expect_target_hash().return_const(target_hash);
826
827 let mut state_strategy = StateStrategy::new_with_provider(
828 Box::new(state_sync_provider),
829 std::iter::empty(),
830 ProtocolName::Static(""),
831 );
832
833 state_strategy.on_blocks_processed(
835 1,
836 1,
837 vec![(
838 Err(BlockImportError::VerificationFailed(None, String::from("test-error"))),
839 target_hash,
840 )],
841 );
842
843 assert_eq!(state_strategy.actions.len(), 1);
845 assert!(matches!(&state_strategy.actions[0], SyncingAction::Finished));
846 }
847
848 #[test]
849 fn finished_strategy_doesnt_generate_more_actions() {
850 let target_hash = Hash::random();
851 let mut state_sync_provider = MockStateSync::<Block>::new();
852 state_sync_provider.expect_target_hash().return_const(target_hash);
853 state_sync_provider.expect_is_complete().return_const(true);
854
855 let initial_peers = (1..=10).map(|best_number| (PeerId::random(), best_number));
857
858 let mut state_strategy = StateStrategy::new_with_provider(
859 Box::new(state_sync_provider),
860 initial_peers,
861 ProtocolName::Static(""),
862 );
863
864 state_strategy.on_blocks_processed(
865 1,
866 1,
867 vec![(
868 Ok(BlockImportStatus::ImportedUnknown(1, ImportedAux::default(), None)),
869 target_hash,
870 )],
871 );
872
873 let network_provider = NetworkServiceProvider::new();
874 let network_handle = network_provider.handle();
875
876 let actions = state_strategy.actions(&network_handle).collect::<Vec<_>>();
878 assert_eq!(actions.len(), 1);
879 assert!(matches!(&actions[0], SyncingAction::Finished));
880
881 assert_eq!(state_strategy.actions(&network_handle).count(), 0);
883 }
884}