1use sc_consensus::IncomingBlock;
22use sp_consensus::BlockOrigin;
23
24use crate::{
25 block_relay_protocol::{BlockDownloader, BlockResponseError},
26 service::network::NetworkServiceHandle,
27 strategy::{
28 chain_sync::validate_blocks, disconnected_peers::DisconnectedPeers, StrategyKey,
29 SyncingAction,
30 },
31 types::{BadPeer, SyncState, SyncStatus},
32 LOG_TARGET,
33};
34use codec::{Decode, Encode};
35use futures::{channel::oneshot, FutureExt};
36use log::{debug, error, trace, warn};
37use sc_network::{IfDisconnected, ProtocolName};
38use sc_network_common::sync::message::{
39 BlockAnnounce, BlockAttributes, BlockData, BlockRequest, Direction, FromBlock,
40};
41use sc_network_types::PeerId;
42use sp_blockchain::HeaderBackend;
43use sp_runtime::{
44 traits::{Block as BlockT, Header, NumberFor, Zero},
45 Justifications, SaturatedConversion,
46};
47use std::{any::Any, collections::HashMap, fmt, sync::Arc};
48
49const MIN_PEERS_TO_START_WARP_SYNC: usize = 3;
51
52pub struct EncodedProof(pub Vec<u8>);
54
55#[derive(Encode, Decode, Debug, Clone)]
57pub struct WarpProofRequest<B: BlockT> {
58 pub begin: B::Hash,
60}
61
62pub trait Verifier<Block: BlockT>: Send + Sync {
64 fn verify(
66 &mut self,
67 proof: &EncodedProof,
68 ) -> Result<VerificationResult<Block>, Box<dyn std::error::Error + Send + Sync>>;
69 fn next_proof_context(&self) -> Block::Hash;
71 fn status(&self) -> Option<String>;
73}
74
75pub enum VerificationResult<Block: BlockT> {
77 Partial(Vec<(Block::Header, Justifications)>),
79 Complete(Block::Header, Vec<(Block::Header, Justifications)>),
81}
82
83pub trait WarpSyncProvider<Block: BlockT>: Send + Sync {
85 fn generate(
88 &self,
89 start: Block::Hash,
90 ) -> Result<EncodedProof, Box<dyn std::error::Error + Send + Sync>>;
91 fn create_verifier(&self) -> Box<dyn Verifier<Block>>;
93}
94
95mod rep {
96 use sc_network::ReputationChange as Rep;
97
98 pub const UNEXPECTED_RESPONSE: Rep = Rep::new(-(1 << 29), "Unexpected response");
100
101 pub const BAD_WARP_PROOF: Rep = Rep::new(-(1 << 29), "Bad warp proof");
103
104 pub const NO_BLOCK: Rep = Rep::new(-(1 << 29), "No requested block data");
106
107 pub const NOT_REQUESTED: Rep = Rep::new(-(1 << 29), "Not requested block data");
109
110 pub const VERIFICATION_FAIL: Rep = Rep::new(-(1 << 29), "Block verification failed");
112
113 pub const BAD_MESSAGE: Rep = Rep::new(-(1 << 12), "Bad message");
115}
116
117#[derive(Clone, Eq, PartialEq, Debug)]
119pub enum WarpSyncPhase<Block: BlockT> {
120 AwaitingPeers { required_peers: usize },
122 DownloadingWarpProofs,
124 DownloadingTargetBlock,
126 DownloadingState,
128 ImportingState,
130 DownloadingBlocks(NumberFor<Block>),
132 Complete,
134}
135
136impl<Block: BlockT> fmt::Display for WarpSyncPhase<Block> {
137 fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result {
138 match self {
139 Self::AwaitingPeers { required_peers } =>
140 write!(f, "Waiting for {required_peers} peers to be connected"),
141 Self::DownloadingWarpProofs => write!(f, "Downloading finality proofs"),
142 Self::DownloadingTargetBlock => write!(f, "Downloading target block"),
143 Self::DownloadingState => write!(f, "Downloading state"),
144 Self::ImportingState => write!(f, "Importing state"),
145 Self::DownloadingBlocks(n) => write!(f, "Downloading block history (#{})", n),
146 Self::Complete => write!(f, "Warp sync is complete"),
147 }
148 }
149}
150
151#[derive(Clone, Eq, PartialEq, Debug)]
153pub struct WarpSyncProgress<Block: BlockT> {
154 pub phase: WarpSyncPhase<Block>,
156 pub total_bytes: u64,
158 pub status: Option<String>,
160}
161
162pub enum WarpSyncConfig<Block: BlockT> {
164 WithProvider(Arc<dyn WarpSyncProvider<Block>>),
166 WithTarget(<Block as BlockT>::Header),
170}
171
172enum Phase<B: BlockT> {
174 WaitingForPeers { warp_sync_provider: Arc<dyn WarpSyncProvider<B>> },
176 WarpProof { verifier: Box<dyn Verifier<B>> },
178 TargetBlock(B::Header),
180 Complete,
182}
183
184enum PeerState {
185 Available,
186 DownloadingProofs,
187 DownloadingTargetBlock,
188}
189
190impl PeerState {
191 fn is_available(&self) -> bool {
192 matches!(self, PeerState::Available)
193 }
194}
195
196struct Peer<B: BlockT> {
197 best_number: NumberFor<B>,
198 state: PeerState,
199}
200
201pub struct WarpSyncResult<B: BlockT> {
202 pub target_header: B::Header,
203 pub target_body: Option<Vec<B::Extrinsic>>,
204 pub target_justifications: Option<Justifications>,
205}
206
207pub struct WarpSync<B: BlockT> {
209 phase: Phase<B>,
210 total_proof_bytes: u64,
211 total_state_bytes: u64,
212 peers: HashMap<PeerId, Peer<B>>,
213 disconnected_peers: DisconnectedPeers,
214 protocol_name: Option<ProtocolName>,
215 block_downloader: Arc<dyn BlockDownloader<B>>,
216 actions: Vec<SyncingAction<B>>,
217 result: Option<WarpSyncResult<B>>,
218 min_peers_to_start_warp_sync: usize,
220}
221
222impl<B> WarpSync<B>
223where
224 B: BlockT,
225{
226 pub const STRATEGY_KEY: StrategyKey = StrategyKey::new("Warp");
228
229 pub fn new<Client>(
233 client: Arc<Client>,
234 warp_sync_config: WarpSyncConfig<B>,
235 protocol_name: Option<ProtocolName>,
236 block_downloader: Arc<dyn BlockDownloader<B>>,
237 min_peers_to_start_warp_sync: Option<usize>,
238 ) -> Self
239 where
240 Client: HeaderBackend<B> + 'static,
241 {
242 let min_peers_to_start_warp_sync =
243 min_peers_to_start_warp_sync.unwrap_or(MIN_PEERS_TO_START_WARP_SYNC);
244 if client.info().finalized_state.is_some() {
245 error!(
246 target: LOG_TARGET,
247 "Can't use warp sync mode with a partially synced database. Reverting to full sync mode."
248 );
249 return Self {
250 phase: Phase::Complete,
251 total_proof_bytes: 0,
252 total_state_bytes: 0,
253 peers: HashMap::new(),
254 disconnected_peers: DisconnectedPeers::new(),
255 protocol_name,
256 block_downloader,
257 actions: vec![SyncingAction::Finished],
258 result: None,
259 min_peers_to_start_warp_sync,
260 }
261 }
262
263 let phase = match warp_sync_config {
264 WarpSyncConfig::WithProvider(warp_sync_provider) =>
265 Phase::WaitingForPeers { warp_sync_provider },
266 WarpSyncConfig::WithTarget(target_header) => Phase::TargetBlock(target_header),
267 };
268
269 Self {
270 phase,
271 total_proof_bytes: 0,
272 total_state_bytes: 0,
273 peers: HashMap::new(),
274 disconnected_peers: DisconnectedPeers::new(),
275 protocol_name,
276 block_downloader,
277 actions: Vec::new(),
278 result: None,
279 min_peers_to_start_warp_sync,
280 }
281 }
282
283 pub fn add_peer(&mut self, peer_id: PeerId, _best_hash: B::Hash, best_number: NumberFor<B>) {
285 self.peers.insert(peer_id, Peer { best_number, state: PeerState::Available });
286
287 self.try_to_start_warp_sync();
288 }
289
290 pub fn remove_peer(&mut self, peer_id: &PeerId) {
292 if let Some(state) = self.peers.remove(peer_id) {
293 if !state.state.is_available() {
294 if let Some(bad_peer) =
295 self.disconnected_peers.on_disconnect_during_request(*peer_id)
296 {
297 self.actions.push(SyncingAction::DropPeer(bad_peer));
298 }
299 }
300 }
301 }
302
303 #[must_use]
307 pub fn on_validated_block_announce(
308 &mut self,
309 is_best: bool,
310 peer_id: PeerId,
311 announce: &BlockAnnounce<B::Header>,
312 ) -> Option<(B::Hash, NumberFor<B>)> {
313 is_best.then(|| {
314 let best_number = *announce.header.number();
315 let best_hash = announce.header.hash();
316 if let Some(ref mut peer) = self.peers.get_mut(&peer_id) {
317 peer.best_number = best_number;
318 }
319 (best_hash, best_number)
321 })
322 }
323
324 fn try_to_start_warp_sync(&mut self) {
326 let Phase::WaitingForPeers { warp_sync_provider } = &self.phase else { return };
327
328 if self.peers.len() < self.min_peers_to_start_warp_sync {
329 return
330 }
331
332 let verifier = warp_sync_provider.create_verifier();
333 self.phase = Phase::WarpProof { verifier };
334 trace!(target: LOG_TARGET, "Started warp sync with {} peers.", self.peers.len());
335 }
336
337 pub fn on_generic_response(
338 &mut self,
339 peer_id: &PeerId,
340 protocol_name: ProtocolName,
341 response: Box<dyn Any + Send>,
342 ) {
343 if &protocol_name == self.block_downloader.protocol_name() {
344 let Ok(response) = response
345 .downcast::<(BlockRequest<B>, Result<Vec<BlockData<B>>, BlockResponseError>)>()
346 else {
347 warn!(target: LOG_TARGET, "Failed to downcast block response");
348 debug_assert!(false);
349 return;
350 };
351
352 let (request, response) = *response;
353 let blocks = match response {
354 Ok(blocks) => blocks,
355 Err(BlockResponseError::DecodeFailed(e)) => {
356 debug!(
357 target: LOG_TARGET,
358 "Failed to decode block response from peer {:?}: {:?}.",
359 peer_id,
360 e
361 );
362 self.actions.push(SyncingAction::DropPeer(BadPeer(*peer_id, rep::BAD_MESSAGE)));
363 return;
364 },
365 Err(BlockResponseError::ExtractionFailed(e)) => {
366 debug!(
367 target: LOG_TARGET,
368 "Failed to extract blocks from peer response {:?}: {:?}.",
369 peer_id,
370 e
371 );
372 self.actions.push(SyncingAction::DropPeer(BadPeer(*peer_id, rep::BAD_MESSAGE)));
373 return;
374 },
375 };
376
377 self.on_block_response(*peer_id, request, blocks);
378 } else {
379 let Ok(response) = response.downcast::<Vec<u8>>() else {
380 warn!(target: LOG_TARGET, "Failed to downcast warp sync response");
381 debug_assert!(false);
382 return;
383 };
384
385 self.on_warp_proof_response(peer_id, EncodedProof(*response));
386 }
387 }
388
389 pub fn on_warp_proof_response(&mut self, peer_id: &PeerId, response: EncodedProof) {
391 if let Some(peer) = self.peers.get_mut(peer_id) {
392 peer.state = PeerState::Available;
393 }
394
395 let Phase::WarpProof { verifier } = &mut self.phase else {
396 debug!(target: LOG_TARGET, "Unexpected warp proof response");
397 self.actions
398 .push(SyncingAction::DropPeer(BadPeer(*peer_id, rep::UNEXPECTED_RESPONSE)));
399 return
400 };
401
402 let proof_to_incoming_block =
403 |(header, justifications): (B::Header, Justifications)| -> IncomingBlock<B> {
404 IncomingBlock {
405 hash: header.hash(),
406 header: Some(header),
407 body: None,
408 indexed_body: None,
409 justifications: Some(justifications),
410 origin: Some(*peer_id),
411 allow_missing_state: true,
414 skip_execution: true,
415 import_existing: false,
417 state: None,
418 }
419 };
420
421 match verifier.verify(&response) {
422 Err(e) => {
423 debug!(target: LOG_TARGET, "Bad warp proof response: {}", e);
424 self.actions
425 .push(SyncingAction::DropPeer(BadPeer(*peer_id, rep::BAD_WARP_PROOF)))
426 },
427 Ok(VerificationResult::Partial(proofs)) => {
428 debug!(target: LOG_TARGET, "Verified partial proof");
429 self.total_proof_bytes += response.0.len() as u64;
430 self.actions.push(SyncingAction::ImportBlocks {
431 origin: BlockOrigin::NetworkInitialSync,
432 blocks: proofs.into_iter().map(proof_to_incoming_block).collect(),
433 });
434 },
435 Ok(VerificationResult::Complete(header, proofs)) => {
436 debug!(
437 target: LOG_TARGET,
438 "Verified complete proof. Continuing with target block download: {} ({}).",
439 header.hash(),
440 header.number(),
441 );
442 self.total_proof_bytes += response.0.len() as u64;
443 self.phase = Phase::TargetBlock(header);
444 self.actions.push(SyncingAction::ImportBlocks {
445 origin: BlockOrigin::NetworkInitialSync,
446 blocks: proofs.into_iter().map(proof_to_incoming_block).collect(),
447 });
448 },
449 }
450 }
451
452 pub fn on_block_response(
454 &mut self,
455 peer_id: PeerId,
456 request: BlockRequest<B>,
457 blocks: Vec<BlockData<B>>,
458 ) {
459 if let Err(bad_peer) = self.on_block_response_inner(peer_id, request, blocks) {
460 self.actions.push(SyncingAction::DropPeer(bad_peer));
461 }
462 }
463
464 fn on_block_response_inner(
465 &mut self,
466 peer_id: PeerId,
467 request: BlockRequest<B>,
468 mut blocks: Vec<BlockData<B>>,
469 ) -> Result<(), BadPeer> {
470 if let Some(peer) = self.peers.get_mut(&peer_id) {
471 peer.state = PeerState::Available;
472 }
473
474 let Phase::TargetBlock(header) = &mut self.phase else {
475 debug!(target: LOG_TARGET, "Unexpected target block response from {peer_id}");
476 return Err(BadPeer(peer_id, rep::UNEXPECTED_RESPONSE))
477 };
478
479 if blocks.is_empty() {
480 debug!(
481 target: LOG_TARGET,
482 "Downloading target block failed: empty block response from {peer_id}",
483 );
484 return Err(BadPeer(peer_id, rep::NO_BLOCK))
485 }
486
487 if blocks.len() > 1 {
488 debug!(
489 target: LOG_TARGET,
490 "Too many blocks ({}) in warp target block response from {peer_id}",
491 blocks.len(),
492 );
493 return Err(BadPeer(peer_id, rep::NOT_REQUESTED))
494 }
495
496 validate_blocks::<B>(&blocks, &peer_id, Some(request))?;
497
498 let block = blocks.pop().expect("`blocks` len checked above; qed");
499
500 let Some(block_header) = &block.header else {
501 debug!(
502 target: LOG_TARGET,
503 "Downloading target block failed: missing header in response from {peer_id}.",
504 );
505 return Err(BadPeer(peer_id, rep::VERIFICATION_FAIL))
506 };
507
508 if block_header != header {
509 debug!(
510 target: LOG_TARGET,
511 "Downloading target block failed: different header in response from {peer_id}.",
512 );
513 return Err(BadPeer(peer_id, rep::VERIFICATION_FAIL))
514 }
515
516 if block.body.is_none() {
517 debug!(
518 target: LOG_TARGET,
519 "Downloading target block failed: missing body in response from {peer_id}.",
520 );
521 return Err(BadPeer(peer_id, rep::VERIFICATION_FAIL))
522 }
523
524 self.result = Some(WarpSyncResult {
525 target_header: header.clone(),
526 target_body: block.body,
527 target_justifications: block.justifications,
528 });
529 self.phase = Phase::Complete;
530 self.actions.push(SyncingAction::Finished);
531 Ok(())
532 }
533
534 fn schedule_next_peer(
536 &mut self,
537 new_state: PeerState,
538 min_best_number: Option<NumberFor<B>>,
539 ) -> Option<PeerId> {
540 let mut targets: Vec<_> = self.peers.values().map(|p| p.best_number).collect();
541 if targets.is_empty() {
542 return None
543 }
544 targets.sort();
545 let median = targets[targets.len() / 2];
546 let threshold = std::cmp::max(median, min_best_number.unwrap_or(Zero::zero()));
547 for (peer_id, peer) in self.peers.iter_mut() {
550 if peer.state.is_available() &&
551 peer.best_number >= threshold &&
552 self.disconnected_peers.is_peer_available(peer_id)
553 {
554 peer.state = new_state;
555 return Some(*peer_id)
556 }
557 }
558 None
559 }
560
561 fn warp_proof_request(&mut self) -> Option<(PeerId, ProtocolName, WarpProofRequest<B>)> {
563 let Phase::WarpProof { verifier } = &self.phase else { return None };
564
565 let begin = verifier.next_proof_context();
567
568 if self
569 .peers
570 .values()
571 .any(|peer| matches!(peer.state, PeerState::DownloadingProofs))
572 {
573 return None
575 }
576
577 let peer_id = self.schedule_next_peer(PeerState::DownloadingProofs, None)?;
578 trace!(target: LOG_TARGET, "New WarpProofRequest to {peer_id}, begin hash: {begin}.");
579
580 let request = WarpProofRequest { begin };
581
582 let Some(protocol_name) = self.protocol_name.clone() else {
583 warn!(
584 target: LOG_TARGET,
585 "Trying to send warp sync request when no protocol is configured {request:?}",
586 );
587 return None;
588 };
589
590 Some((peer_id, protocol_name, request))
591 }
592
593 fn target_block_request(&mut self) -> Option<(PeerId, BlockRequest<B>)> {
595 let Phase::TargetBlock(target_header) = &self.phase else { return None };
596
597 if self
598 .peers
599 .values()
600 .any(|peer| matches!(peer.state, PeerState::DownloadingTargetBlock))
601 {
602 return None
604 }
605
606 let target_hash = target_header.hash();
608 let target_number = *target_header.number();
609
610 let peer_id =
611 self.schedule_next_peer(PeerState::DownloadingTargetBlock, Some(target_number))?;
612
613 trace!(
614 target: LOG_TARGET,
615 "New target block request to {peer_id}, target: {} ({}).",
616 target_hash,
617 target_number,
618 );
619
620 Some((
621 peer_id,
622 BlockRequest::<B> {
623 id: 0,
624 fields: BlockAttributes::HEADER |
625 BlockAttributes::BODY |
626 BlockAttributes::JUSTIFICATION,
627 from: FromBlock::Hash(target_hash),
628 direction: Direction::Ascending,
629 max: Some(1),
630 },
631 ))
632 }
633
634 pub fn progress(&self) -> WarpSyncProgress<B> {
636 match &self.phase {
637 Phase::WaitingForPeers { .. } => WarpSyncProgress {
638 phase: WarpSyncPhase::AwaitingPeers {
639 required_peers: self.min_peers_to_start_warp_sync,
640 },
641 total_bytes: self.total_proof_bytes,
642 status: None,
643 },
644 Phase::WarpProof { verifier } => WarpSyncProgress {
645 phase: WarpSyncPhase::DownloadingWarpProofs,
646 total_bytes: self.total_proof_bytes,
647 status: verifier.status(),
648 },
649 Phase::TargetBlock(_) => WarpSyncProgress {
650 phase: WarpSyncPhase::DownloadingTargetBlock,
651 total_bytes: self.total_proof_bytes,
652 status: None,
653 },
654 Phase::Complete => WarpSyncProgress {
655 phase: WarpSyncPhase::Complete,
656 total_bytes: self.total_proof_bytes + self.total_state_bytes,
657 status: None,
658 },
659 }
660 }
661
662 pub fn num_peers(&self) -> usize {
664 self.peers.len()
665 }
666
667 pub fn status(&self) -> SyncStatus<B> {
669 SyncStatus {
670 state: match &self.phase {
671 Phase::WaitingForPeers { .. } => SyncState::Downloading { target: Zero::zero() },
672 Phase::WarpProof { .. } => SyncState::Downloading { target: Zero::zero() },
673 Phase::TargetBlock(header) => SyncState::Downloading { target: *header.number() },
674 Phase::Complete => SyncState::Idle,
675 },
676 best_seen_block: match &self.phase {
677 Phase::WaitingForPeers { .. } => None,
678 Phase::WarpProof { .. } => None,
679 Phase::TargetBlock(header) => Some(*header.number()),
680 Phase::Complete => None,
681 },
682 num_peers: self.peers.len().saturated_into(),
683 queued_blocks: 0,
684 state_sync: None,
685 warp_sync: Some(self.progress()),
686 }
687 }
688
689 #[must_use]
691 pub fn actions(
692 &mut self,
693 network_service: &NetworkServiceHandle,
694 ) -> impl Iterator<Item = SyncingAction<B>> {
695 let warp_proof_request =
696 self.warp_proof_request().into_iter().map(|(peer_id, protocol_name, request)| {
697 trace!(
698 target: LOG_TARGET,
699 "Created `WarpProofRequest` to {}, request: {:?}.",
700 peer_id,
701 request,
702 );
703
704 let (tx, rx) = oneshot::channel();
705
706 network_service.start_request(
707 peer_id,
708 protocol_name,
709 request.encode(),
710 tx,
711 IfDisconnected::ImmediateError,
712 );
713
714 SyncingAction::StartRequest {
715 peer_id,
716 key: Self::STRATEGY_KEY,
717 request: async move {
718 Ok(rx.await?.and_then(|(response, protocol_name)| {
719 Ok((Box::new(response) as Box<dyn Any + Send>, protocol_name))
720 }))
721 }
722 .boxed(),
723 remove_obsolete: false,
724 }
725 });
726 self.actions.extend(warp_proof_request);
727
728 let target_block_request =
729 self.target_block_request().into_iter().map(|(peer_id, request)| {
730 let downloader = self.block_downloader.clone();
731
732 SyncingAction::StartRequest {
733 peer_id,
734 key: Self::STRATEGY_KEY,
735 request: async move {
736 Ok(downloader.download_blocks(peer_id, request.clone()).await?.and_then(
737 |(response, protocol_name)| {
738 let decoded_response =
739 downloader.block_response_into_blocks(&request, response);
740 let result =
741 Box::new((request, decoded_response)) as Box<dyn Any + Send>;
742 Ok((result, protocol_name))
743 },
744 ))
745 }
746 .boxed(),
747 remove_obsolete: true,
750 }
751 });
752 self.actions.extend(target_block_request);
753
754 std::mem::take(&mut self.actions).into_iter()
755 }
756
757 #[must_use]
759 pub fn take_result(&mut self) -> Option<WarpSyncResult<B>> {
760 self.result.take()
761 }
762}
763
764#[cfg(test)]
765mod test {
766 use super::*;
767 use crate::{mock::MockBlockDownloader, service::network::NetworkServiceProvider};
768 use sc_block_builder::BlockBuilderBuilder;
769 use sp_blockchain::{BlockStatus, Error as BlockchainError, HeaderBackend, Info};
770 use sp_core::H256;
771 use sp_runtime::{
772 traits::{Block as BlockT, Header as HeaderT, NumberFor},
773 ConsensusEngineId,
774 };
775 use std::{io::ErrorKind, sync::Arc};
776 use substrate_test_runtime_client::{
777 runtime::{Block, Hash},
778 BlockBuilderExt, DefaultTestClientBuilderExt, TestClientBuilder, TestClientBuilderExt,
779 };
780
781 pub const TEST_ENGINE_ID: ConsensusEngineId = *b"TEST";
782
783 mockall::mock! {
784 pub Client<B: BlockT> {}
785
786 impl<B: BlockT> HeaderBackend<B> for Client<B> {
787 fn header(&self, hash: B::Hash) -> Result<Option<B::Header>, BlockchainError>;
788 fn info(&self) -> Info<B>;
789 fn status(&self, hash: B::Hash) -> Result<BlockStatus, BlockchainError>;
790 fn number(
791 &self,
792 hash: B::Hash,
793 ) -> Result<Option<<<B as BlockT>::Header as HeaderT>::Number>, BlockchainError>;
794 fn hash(&self, number: NumberFor<B>) -> Result<Option<B::Hash>, BlockchainError>;
795 }
796 }
797
798 mockall::mock! {
799 pub WarpSyncProvider<B: BlockT> {}
800
801 impl<B: BlockT> super::WarpSyncProvider<B> for WarpSyncProvider<B> {
802 fn generate(
803 &self,
804 start: B::Hash,
805 ) -> Result<EncodedProof, Box<dyn std::error::Error + Send + Sync>>;
806 fn create_verifier(&self) -> Box<dyn super::Verifier<B>>;
807 }
808 }
809
810 mockall::mock! {
811 pub Verifier<B: BlockT> {}
812
813 impl<B: BlockT> super::Verifier<B> for Verifier<B> {
814 fn verify(
815 &mut self,
816 proof: &EncodedProof,
817 ) -> Result<VerificationResult<B>, Box<dyn std::error::Error + Send + Sync>>;
818 fn next_proof_context(&self) -> B::Hash;
819 fn status(&self) -> Option<String>;
820 }
821 }
822
823 fn mock_client_with_state() -> MockClient<Block> {
824 let mut client = MockClient::<Block>::new();
825 let genesis_hash = Hash::random();
826 client.expect_info().return_once(move || Info {
827 best_hash: genesis_hash,
828 best_number: 0,
829 genesis_hash,
830 finalized_hash: genesis_hash,
831 finalized_number: 0,
832 finalized_state: Some((genesis_hash, 0)),
834 number_leaves: 0,
835 block_gap: None,
836 });
837
838 client
839 }
840
841 fn mock_client_without_state() -> MockClient<Block> {
842 let mut client = MockClient::<Block>::new();
843 let genesis_hash = Hash::random();
844 client.expect_info().returning(move || Info {
845 best_hash: genesis_hash,
846 best_number: 0,
847 genesis_hash,
848 finalized_hash: genesis_hash,
849 finalized_number: 0,
850 finalized_state: None,
851 number_leaves: 0,
852 block_gap: None,
853 });
854
855 client
856 }
857
858 #[test]
859 fn warp_sync_with_provider_for_db_with_finalized_state_is_noop() {
860 let client = mock_client_with_state();
861 let provider = MockWarpSyncProvider::<Block>::new();
862 let config = WarpSyncConfig::WithProvider(Arc::new(provider));
863 let mut warp_sync = WarpSync::new(
864 Arc::new(client),
865 config,
866 None,
867 Arc::new(MockBlockDownloader::new()),
868 None,
869 );
870
871 let network_provider = NetworkServiceProvider::new();
872 let network_handle = network_provider.handle();
873
874 let actions = warp_sync.actions(&network_handle).collect::<Vec<_>>();
876 assert_eq!(actions.len(), 1);
877 assert!(matches!(actions[0], SyncingAction::Finished));
878
879 assert!(warp_sync.take_result().is_none());
881 }
882
883 #[test]
884 fn warp_sync_to_target_for_db_with_finalized_state_is_noop() {
885 let client = mock_client_with_state();
886 let config = WarpSyncConfig::WithTarget(<Block as BlockT>::Header::new(
887 1,
888 Default::default(),
889 Default::default(),
890 Default::default(),
891 Default::default(),
892 ));
893 let mut warp_sync = WarpSync::new(
894 Arc::new(client),
895 config,
896 None,
897 Arc::new(MockBlockDownloader::new()),
898 None,
899 );
900
901 let network_provider = NetworkServiceProvider::new();
902 let network_handle = network_provider.handle();
903
904 let actions = warp_sync.actions(&network_handle).collect::<Vec<_>>();
906 assert_eq!(actions.len(), 1);
907 assert!(matches!(actions[0], SyncingAction::Finished));
908
909 assert!(warp_sync.take_result().is_none());
911 }
912
913 #[test]
914 fn warp_sync_with_provider_for_empty_db_doesnt_finish_instantly() {
915 let client = mock_client_without_state();
916 let provider = MockWarpSyncProvider::<Block>::new();
917 let config = WarpSyncConfig::WithProvider(Arc::new(provider));
918 let mut warp_sync = WarpSync::new(
919 Arc::new(client),
920 config,
921 None,
922 Arc::new(MockBlockDownloader::new()),
923 None,
924 );
925
926 let network_provider = NetworkServiceProvider::new();
927 let network_handle = network_provider.handle();
928
929 assert_eq!(warp_sync.actions(&network_handle).count(), 0)
931 }
932
933 #[test]
934 fn warp_sync_to_target_for_empty_db_doesnt_finish_instantly() {
935 let client = mock_client_without_state();
936 let config = WarpSyncConfig::WithTarget(<Block as BlockT>::Header::new(
937 1,
938 Default::default(),
939 Default::default(),
940 Default::default(),
941 Default::default(),
942 ));
943 let mut warp_sync = WarpSync::new(
944 Arc::new(client),
945 config,
946 None,
947 Arc::new(MockBlockDownloader::new()),
948 None,
949 );
950
951 let network_provider = NetworkServiceProvider::new();
952 let network_handle = network_provider.handle();
953
954 assert_eq!(warp_sync.actions(&network_handle).count(), 0)
956 }
957
958 #[test]
959 fn warp_sync_is_started_only_when_there_is_enough_peers() {
960 let client = mock_client_without_state();
961 let mut provider = MockWarpSyncProvider::<Block>::new();
962 let mut verifier = MockVerifier::<Block>::new();
963 verifier.expect_next_proof_context().returning(|| Hash::random());
964 verifier
965 .expect_verify()
966 .returning(|_| unreachable!("verify should not be called in this test"));
967 provider.expect_create_verifier().return_once(move || Box::new(verifier));
968 let config = WarpSyncConfig::WithProvider(Arc::new(provider));
969 let mut warp_sync = WarpSync::new(
970 Arc::new(client),
971 config,
972 None,
973 Arc::new(MockBlockDownloader::new()),
974 None,
975 );
976
977 for _ in 0..(MIN_PEERS_TO_START_WARP_SYNC - 1) {
979 warp_sync.add_peer(PeerId::random(), Hash::random(), 10);
980 assert!(matches!(warp_sync.phase, Phase::WaitingForPeers { .. }))
981 }
982
983 warp_sync.add_peer(PeerId::random(), Hash::random(), 10);
985 assert!(matches!(warp_sync.phase, Phase::WarpProof { .. }))
986 }
987
988 #[test]
989 fn no_peer_is_scheduled_if_no_peers_connected() {
990 let client = mock_client_without_state();
991 let provider = MockWarpSyncProvider::<Block>::new();
992 let config = WarpSyncConfig::WithProvider(Arc::new(provider));
993 let mut warp_sync = WarpSync::new(
994 Arc::new(client),
995 config,
996 None,
997 Arc::new(MockBlockDownloader::new()),
998 None,
999 );
1000
1001 assert!(warp_sync.schedule_next_peer(PeerState::DownloadingProofs, None).is_none());
1002 }
1003
1004 #[test]
1005 fn enough_peers_are_used_in_tests() {
1006 assert!(
1008 10 >= MIN_PEERS_TO_START_WARP_SYNC,
1009 "Tests must be updated to use that many initial peers.",
1010 );
1011 }
1012
1013 #[test]
1014 fn at_least_median_synced_peer_is_scheduled() {
1015 for _ in 0..100 {
1016 let client = mock_client_without_state();
1017 let mut provider = MockWarpSyncProvider::<Block>::new();
1018 let mut verifier = MockVerifier::<Block>::new();
1019 verifier.expect_next_proof_context().returning(|| Hash::random());
1020 verifier
1021 .expect_verify()
1022 .returning(|_| unreachable!("verify should not be called in this test"));
1023 provider.expect_create_verifier().return_once(move || Box::new(verifier));
1024 let config = WarpSyncConfig::WithProvider(Arc::new(provider));
1025 let mut warp_sync = WarpSync::new(
1026 Arc::new(client),
1027 config,
1028 None,
1029 Arc::new(MockBlockDownloader::new()),
1030 None,
1031 );
1032
1033 for best_number in 1..11 {
1034 warp_sync.add_peer(PeerId::random(), Hash::random(), best_number);
1035 }
1036
1037 let peer_id = warp_sync.schedule_next_peer(PeerState::DownloadingProofs, None);
1038 assert!(warp_sync.peers.get(&peer_id.unwrap()).unwrap().best_number >= 6);
1039 }
1040 }
1041
1042 #[test]
1043 fn min_best_number_peer_is_scheduled() {
1044 for _ in 0..10 {
1045 let client = mock_client_without_state();
1046 let mut provider = MockWarpSyncProvider::<Block>::new();
1047 let mut verifier = MockVerifier::<Block>::new();
1048 verifier.expect_next_proof_context().returning(|| Hash::random());
1049 verifier
1050 .expect_verify()
1051 .returning(|_| unreachable!("verify should not be called in this test"));
1052 provider.expect_create_verifier().return_once(move || Box::new(verifier));
1053 let config = WarpSyncConfig::WithProvider(Arc::new(provider));
1054 let mut warp_sync = WarpSync::new(
1055 Arc::new(client),
1056 config,
1057 None,
1058 Arc::new(MockBlockDownloader::new()),
1059 None,
1060 );
1061
1062 for best_number in 1..11 {
1063 warp_sync.add_peer(PeerId::random(), Hash::random(), best_number);
1064 }
1065
1066 let peer_id = warp_sync.schedule_next_peer(PeerState::DownloadingProofs, Some(10));
1067 assert!(warp_sync.peers.get(&peer_id.unwrap()).unwrap().best_number == 10);
1068 }
1069 }
1070
1071 #[test]
1072 fn backedoff_number_peer_is_not_scheduled() {
1073 let client = mock_client_without_state();
1074 let mut provider = MockWarpSyncProvider::<Block>::new();
1075 let mut verifier = MockVerifier::<Block>::new();
1076 verifier.expect_next_proof_context().returning(|| Hash::random());
1077 verifier
1078 .expect_verify()
1079 .returning(|_| unreachable!("verify should not be called in this test"));
1080 provider.expect_create_verifier().return_once(move || Box::new(verifier));
1081 let config = WarpSyncConfig::WithProvider(Arc::new(provider));
1082 let mut warp_sync = WarpSync::new(
1083 Arc::new(client),
1084 config,
1085 None,
1086 Arc::new(MockBlockDownloader::new()),
1087 None,
1088 );
1089
1090 for best_number in 1..11 {
1091 warp_sync.add_peer(PeerId::random(), Hash::random(), best_number);
1092 }
1093
1094 let ninth_peer =
1095 *warp_sync.peers.iter().find(|(_, state)| state.best_number == 9).unwrap().0;
1096 let tenth_peer =
1097 *warp_sync.peers.iter().find(|(_, state)| state.best_number == 10).unwrap().0;
1098
1099 warp_sync.remove_peer(&tenth_peer);
1101 assert!(warp_sync.disconnected_peers.is_peer_available(&tenth_peer));
1102
1103 warp_sync.add_peer(tenth_peer, H256::random(), 10);
1104 let peer_id = warp_sync.schedule_next_peer(PeerState::DownloadingProofs, Some(10));
1105 assert_eq!(tenth_peer, peer_id.unwrap());
1106 warp_sync.remove_peer(&tenth_peer);
1107
1108 assert!(!warp_sync.disconnected_peers.is_peer_available(&tenth_peer));
1110
1111 warp_sync.add_peer(tenth_peer, H256::random(), 10);
1113 let peer_id: Option<PeerId> =
1114 warp_sync.schedule_next_peer(PeerState::DownloadingProofs, Some(10));
1115 assert!(peer_id.is_none());
1116
1117 let peer_id: Option<PeerId> =
1119 warp_sync.schedule_next_peer(PeerState::DownloadingProofs, Some(9));
1120 assert_eq!(ninth_peer, peer_id.unwrap());
1121 }
1122
1123 #[test]
1124 fn no_warp_proof_request_in_another_phase() {
1125 let client = mock_client_without_state();
1126 let mut provider = MockWarpSyncProvider::<Block>::new();
1127 let mut verifier = MockVerifier::<Block>::new();
1128 verifier.expect_next_proof_context().returning(|| Hash::random());
1129 verifier
1130 .expect_verify()
1131 .returning(|_| unreachable!("verify should not be called in this test"));
1132 provider.expect_create_verifier().return_once(move || Box::new(verifier));
1133 let config = WarpSyncConfig::WithProvider(Arc::new(provider));
1134 let mut warp_sync = WarpSync::new(
1135 Arc::new(client),
1136 config,
1137 Some(ProtocolName::Static("")),
1138 Arc::new(MockBlockDownloader::new()),
1139 None,
1140 );
1141
1142 for best_number in 1..11 {
1144 warp_sync.add_peer(PeerId::random(), Hash::random(), best_number);
1145 }
1146
1147 warp_sync.phase = Phase::TargetBlock(<Block as BlockT>::Header::new(
1149 1,
1150 Default::default(),
1151 Default::default(),
1152 Default::default(),
1153 Default::default(),
1154 ));
1155
1156 assert!(warp_sync.warp_proof_request().is_none());
1158 }
1159
1160 #[test]
1161 fn warp_proof_request_starts_at_last_hash() {
1162 let client = mock_client_without_state();
1163 let mut provider = MockWarpSyncProvider::<Block>::new();
1164 let mut verifier = MockVerifier::<Block>::new();
1165 let known_last_hash = Hash::random();
1166 verifier.expect_next_proof_context().returning(move || known_last_hash);
1167 verifier
1168 .expect_verify()
1169 .returning(|_| unreachable!("verify should not be called in this test"));
1170 provider.expect_create_verifier().return_once(move || Box::new(verifier));
1171 let config = WarpSyncConfig::WithProvider(Arc::new(provider));
1172 let mut warp_sync = WarpSync::new(
1173 Arc::new(client),
1174 config,
1175 Some(ProtocolName::Static("")),
1176 Arc::new(MockBlockDownloader::new()),
1177 None,
1178 );
1179
1180 for best_number in 1..11 {
1182 warp_sync.add_peer(PeerId::random(), Hash::random(), best_number);
1183 }
1184 assert!(matches!(warp_sync.phase, Phase::WarpProof { .. }));
1185
1186 let (_peer_id, _protocol_name, request) = warp_sync.warp_proof_request().unwrap();
1187 assert_eq!(request.begin, known_last_hash);
1188 }
1189
1190 #[test]
1191 fn no_parallel_warp_proof_requests() {
1192 let client = mock_client_without_state();
1193 let mut provider = MockWarpSyncProvider::<Block>::new();
1194 let mut verifier = MockVerifier::<Block>::new();
1195 verifier.expect_next_proof_context().returning(|| Hash::random());
1196 verifier
1197 .expect_verify()
1198 .returning(|_| unreachable!("verify should not be called in this test"));
1199 provider.expect_create_verifier().return_once(move || Box::new(verifier));
1200 let config = WarpSyncConfig::WithProvider(Arc::new(provider));
1201 let mut warp_sync = WarpSync::new(
1202 Arc::new(client),
1203 config,
1204 Some(ProtocolName::Static("")),
1205 Arc::new(MockBlockDownloader::new()),
1206 None,
1207 );
1208
1209 for best_number in 1..11 {
1211 warp_sync.add_peer(PeerId::random(), Hash::random(), best_number);
1212 }
1213 assert!(matches!(warp_sync.phase, Phase::WarpProof { .. }));
1214
1215 assert!(warp_sync.warp_proof_request().is_some());
1217 assert!(warp_sync.warp_proof_request().is_none());
1219 }
1220
1221 #[test]
1222 fn bad_warp_proof_response_drops_peer() {
1223 let client = mock_client_without_state();
1224 let mut provider = MockWarpSyncProvider::<Block>::new();
1225 let mut verifier = MockVerifier::<Block>::new();
1226 verifier.expect_next_proof_context().returning(|| Hash::random());
1227 verifier.expect_verify().return_once(|_proof| {
1229 Err(Box::new(std::io::Error::new(ErrorKind::Other, "test-verification-failure")))
1230 });
1231 provider.expect_create_verifier().return_once(move || Box::new(verifier));
1232 let config = WarpSyncConfig::WithProvider(Arc::new(provider));
1233 let mut warp_sync = WarpSync::new(
1234 Arc::new(client),
1235 config,
1236 Some(ProtocolName::Static("")),
1237 Arc::new(MockBlockDownloader::new()),
1238 None,
1239 );
1240
1241 for best_number in 1..11 {
1243 warp_sync.add_peer(PeerId::random(), Hash::random(), best_number);
1244 }
1245 assert!(matches!(warp_sync.phase, Phase::WarpProof { .. }));
1246
1247 let network_provider = NetworkServiceProvider::new();
1248 let network_handle = network_provider.handle();
1249
1250 let actions = warp_sync.actions(&network_handle).collect::<Vec<_>>();
1252 assert_eq!(actions.len(), 1);
1253 let SyncingAction::StartRequest { peer_id: request_peer_id, .. } = actions[0] else {
1254 panic!("Invalid action");
1255 };
1256
1257 warp_sync.on_warp_proof_response(&request_peer_id, EncodedProof(Vec::new()));
1258
1259 let actions = std::mem::take(&mut warp_sync.actions);
1261 assert_eq!(actions.len(), 1);
1262 assert!(matches!(
1263 actions[0],
1264 SyncingAction::DropPeer(BadPeer(peer_id, _rep)) if peer_id == request_peer_id
1265 ));
1266 assert!(matches!(warp_sync.phase, Phase::WarpProof { .. }));
1267 }
1268
1269 #[test]
1270 fn partial_warp_proof_doesnt_advance_phase() {
1271 let client = Arc::new(TestClientBuilder::new().set_no_genesis().build());
1272 let mut provider = MockWarpSyncProvider::<Block>::new();
1273 let target_block = BlockBuilderBuilder::new(&*client)
1274 .on_parent_block(client.chain_info().best_hash)
1275 .with_parent_block_number(client.chain_info().best_number)
1276 .build()
1277 .unwrap()
1278 .build()
1279 .unwrap()
1280 .block;
1281 let target_header = target_block.header().clone();
1282 let justifications = Justifications::new(vec![(TEST_ENGINE_ID, vec![1, 2, 3, 4, 5])]);
1283 let mut verifier = MockVerifier::<Block>::new();
1285 let context = client.info().genesis_hash;
1286 verifier.expect_next_proof_context().returning(move || context);
1287 let header_for_verify = target_header.clone();
1288 let just_for_verify = justifications.clone();
1289 verifier.expect_verify().return_once(move |_proof| {
1290 Ok(VerificationResult::Partial(vec![(
1291 header_for_verify.clone(),
1292 just_for_verify.clone(),
1293 )]))
1294 });
1295 provider.expect_create_verifier().return_once(move || Box::new(verifier));
1296 let config = WarpSyncConfig::WithProvider(Arc::new(provider));
1297 let mut warp_sync = WarpSync::new(
1298 client,
1299 config,
1300 Some(ProtocolName::Static("")),
1301 Arc::new(MockBlockDownloader::new()),
1302 None,
1303 );
1304
1305 for best_number in 1..11 {
1307 warp_sync.add_peer(PeerId::random(), Hash::random(), best_number);
1308 }
1309 assert!(matches!(warp_sync.phase, Phase::WarpProof { .. }));
1310
1311 let network_provider = NetworkServiceProvider::new();
1312 let network_handle = network_provider.handle();
1313
1314 let actions = warp_sync.actions(&network_handle).collect::<Vec<_>>();
1316 assert_eq!(actions.len(), 1);
1317 let SyncingAction::StartRequest { peer_id: request_peer_id, .. } = actions[0] else {
1318 panic!("Invalid action");
1319 };
1320
1321 warp_sync.on_warp_proof_response(&request_peer_id, EncodedProof(Vec::new()));
1322
1323 assert_eq!(warp_sync.actions.len(), 1);
1324 let SyncingAction::ImportBlocks { origin, mut blocks } = warp_sync.actions.pop().unwrap()
1325 else {
1326 panic!("Expected `ImportBlocks` action.");
1327 };
1328 assert_eq!(origin, BlockOrigin::NetworkInitialSync);
1329 assert_eq!(blocks.len(), 1);
1330 let import_block = blocks.pop().unwrap();
1331 assert_eq!(
1332 import_block,
1333 IncomingBlock {
1334 hash: target_header.hash(),
1335 header: Some(target_header),
1336 body: None,
1337 indexed_body: None,
1338 justifications: Some(justifications),
1339 origin: Some(request_peer_id),
1340 allow_missing_state: true,
1341 skip_execution: true,
1342 import_existing: false,
1343 state: None,
1344 }
1345 );
1346 assert!(matches!(warp_sync.phase, Phase::WarpProof { .. }));
1347 }
1348
1349 #[test]
1350 fn complete_warp_proof_advances_phase() {
1351 let client = Arc::new(TestClientBuilder::new().set_no_genesis().build());
1352 let mut provider = MockWarpSyncProvider::<Block>::new();
1353 let target_block = BlockBuilderBuilder::new(&*client)
1354 .on_parent_block(client.chain_info().best_hash)
1355 .with_parent_block_number(client.chain_info().best_number)
1356 .build()
1357 .unwrap()
1358 .build()
1359 .unwrap()
1360 .block;
1361 let target_header = target_block.header().clone();
1362 let justifications = Justifications::new(vec![(TEST_ENGINE_ID, vec![1, 2, 3, 4, 5])]);
1363 let mut verifier = MockVerifier::<Block>::new();
1365 let context = client.info().genesis_hash;
1366 verifier.expect_next_proof_context().returning(move || context);
1367 let header_for_verify = target_header.clone();
1368 let just_for_verify = justifications.clone();
1369 verifier.expect_verify().return_once(move |_proof| {
1370 Ok(VerificationResult::Complete(
1371 header_for_verify.clone(),
1372 vec![(header_for_verify.clone(), just_for_verify.clone())],
1373 ))
1374 });
1375 provider.expect_create_verifier().return_once(move || Box::new(verifier));
1376 let config = WarpSyncConfig::WithProvider(Arc::new(provider));
1377 let mut warp_sync = WarpSync::new(
1378 client,
1379 config,
1380 Some(ProtocolName::Static("")),
1381 Arc::new(MockBlockDownloader::new()),
1382 None,
1383 );
1384
1385 for best_number in 1..11 {
1387 warp_sync.add_peer(PeerId::random(), Hash::random(), best_number);
1388 }
1389 assert!(matches!(warp_sync.phase, Phase::WarpProof { .. }));
1390
1391 let network_provider = NetworkServiceProvider::new();
1392 let network_handle = network_provider.handle();
1393
1394 let actions = warp_sync.actions(&network_handle).collect::<Vec<_>>();
1396 assert_eq!(actions.len(), 1);
1397 let SyncingAction::StartRequest { peer_id: request_peer_id, .. } = actions[0] else {
1398 panic!("Invalid action.");
1399 };
1400
1401 warp_sync.on_warp_proof_response(&request_peer_id, EncodedProof(Vec::new()));
1402
1403 assert_eq!(warp_sync.actions.len(), 1);
1404 let SyncingAction::ImportBlocks { origin, mut blocks } = warp_sync.actions.pop().unwrap()
1405 else {
1406 panic!("Expected `ImportBlocks` action.");
1407 };
1408 assert_eq!(origin, BlockOrigin::NetworkInitialSync);
1409 assert_eq!(blocks.len(), 1);
1410 let import_block = blocks.pop().unwrap();
1411 assert_eq!(
1412 import_block,
1413 IncomingBlock {
1414 hash: target_header.hash(),
1415 header: Some(target_header),
1416 body: None,
1417 indexed_body: None,
1418 justifications: Some(justifications),
1419 origin: Some(request_peer_id),
1420 allow_missing_state: true,
1421 skip_execution: true,
1422 import_existing: false,
1423 state: None,
1424 }
1425 );
1426 assert!(
1427 matches!(warp_sync.phase, Phase::TargetBlock(header) if header == *target_block.header())
1428 );
1429 }
1430
1431 #[test]
1432 fn no_target_block_requests_in_another_phase() {
1433 let client = mock_client_without_state();
1434 let mut provider = MockWarpSyncProvider::<Block>::new();
1435 let mut verifier = MockVerifier::<Block>::new();
1436 verifier.expect_next_proof_context().returning(|| Hash::random());
1437 verifier
1438 .expect_verify()
1439 .returning(|_| unreachable!("verify should not be called in this test"));
1440 provider.expect_create_verifier().return_once(move || Box::new(verifier));
1441 let config = WarpSyncConfig::WithProvider(Arc::new(provider));
1442 let mut warp_sync = WarpSync::new(
1443 Arc::new(client),
1444 config,
1445 None,
1446 Arc::new(MockBlockDownloader::new()),
1447 None,
1448 );
1449
1450 for best_number in 1..11 {
1452 warp_sync.add_peer(PeerId::random(), Hash::random(), best_number);
1453 }
1454 assert!(matches!(warp_sync.phase, Phase::WarpProof { .. }));
1456
1457 assert!(warp_sync.target_block_request().is_none());
1459 }
1460
1461 #[test]
1462 fn target_block_request_is_correct() {
1463 let client = Arc::new(TestClientBuilder::new().set_no_genesis().build());
1464 let mut provider = MockWarpSyncProvider::<Block>::new();
1465 let mut verifier = MockVerifier::<Block>::new();
1466 let header_for_ctx = client.info().genesis_hash;
1467 verifier.expect_next_proof_context().returning(move || header_for_ctx);
1468 let target_block = BlockBuilderBuilder::new(&*client)
1469 .on_parent_block(client.chain_info().best_hash)
1470 .with_parent_block_number(client.chain_info().best_number)
1471 .build()
1472 .unwrap()
1473 .build()
1474 .unwrap()
1475 .block;
1476 let target_header = target_block.header().clone();
1477 let header_for_verify = target_header.clone();
1479 verifier.expect_verify().return_once(move |_proof| {
1480 Ok(VerificationResult::Complete(header_for_verify, Default::default()))
1481 });
1482 provider.expect_create_verifier().return_once(move || Box::new(verifier));
1483 let config = WarpSyncConfig::WithProvider(Arc::new(provider));
1484 let mut warp_sync =
1485 WarpSync::new(client, config, None, Arc::new(MockBlockDownloader::new()), None);
1486
1487 for best_number in 1..11 {
1489 warp_sync.add_peer(PeerId::random(), Hash::random(), best_number);
1490 }
1491
1492 warp_sync.phase = Phase::TargetBlock(target_block.header().clone());
1494
1495 let (_peer_id, request) = warp_sync.target_block_request().unwrap();
1496 assert_eq!(request.from, FromBlock::Hash(target_block.header().hash()));
1497 assert_eq!(
1498 request.fields,
1499 BlockAttributes::HEADER | BlockAttributes::BODY | BlockAttributes::JUSTIFICATION
1500 );
1501 assert_eq!(request.max, Some(1));
1502 }
1503
1504 #[test]
1505 fn externally_set_target_block_is_requested() {
1506 let client = Arc::new(TestClientBuilder::new().set_no_genesis().build());
1507 let target_block = BlockBuilderBuilder::new(&*client)
1508 .on_parent_block(client.chain_info().best_hash)
1509 .with_parent_block_number(client.chain_info().best_number)
1510 .build()
1511 .unwrap()
1512 .build()
1513 .unwrap()
1514 .block;
1515 let target_header = target_block.header().clone();
1516 let config = WarpSyncConfig::WithTarget(target_header);
1517 let mut warp_sync =
1518 WarpSync::new(client, config, None, Arc::new(MockBlockDownloader::new()), None);
1519
1520 for best_number in 1..11 {
1522 warp_sync.add_peer(PeerId::random(), Hash::random(), best_number);
1523 }
1524
1525 assert!(matches!(warp_sync.phase, Phase::TargetBlock(_)));
1526
1527 let (_peer_id, request) = warp_sync.target_block_request().unwrap();
1528 assert_eq!(request.from, FromBlock::Hash(target_block.header().hash()));
1529 assert_eq!(
1530 request.fields,
1531 BlockAttributes::HEADER | BlockAttributes::BODY | BlockAttributes::JUSTIFICATION
1532 );
1533 assert_eq!(request.max, Some(1));
1534 }
1535
1536 #[test]
1537 fn no_parallel_target_block_requests() {
1538 let client = Arc::new(TestClientBuilder::new().set_no_genesis().build());
1539 let mut provider = MockWarpSyncProvider::<Block>::new();
1540 let mut verifier = MockVerifier::<Block>::new();
1541 let header_for_ctx = client.info().genesis_hash;
1542 verifier.expect_next_proof_context().returning(move || header_for_ctx);
1543 let target_block = BlockBuilderBuilder::new(&*client)
1544 .on_parent_block(client.chain_info().best_hash)
1545 .with_parent_block_number(client.chain_info().best_number)
1546 .build()
1547 .unwrap()
1548 .build()
1549 .unwrap()
1550 .block;
1551 let target_header = target_block.header().clone();
1552 let header_for_verify = target_header.clone();
1554 verifier.expect_verify().return_once(move |_proof| {
1555 Ok(VerificationResult::Complete(header_for_verify, Default::default()))
1556 });
1557 provider.expect_create_verifier().return_once(move || Box::new(verifier));
1558 let config = WarpSyncConfig::WithProvider(Arc::new(provider));
1559 let mut warp_sync =
1560 WarpSync::new(client, config, None, Arc::new(MockBlockDownloader::new()), None);
1561
1562 for best_number in 1..11 {
1564 warp_sync.add_peer(PeerId::random(), Hash::random(), best_number);
1565 }
1566
1567 warp_sync.phase = Phase::TargetBlock(target_block.header().clone());
1569
1570 assert!(warp_sync.target_block_request().is_some());
1572 assert!(warp_sync.target_block_request().is_none());
1574 }
1575
1576 #[test]
1577 fn target_block_response_with_no_blocks_drops_peer() {
1578 let client = Arc::new(TestClientBuilder::new().set_no_genesis().build());
1579 let mut provider = MockWarpSyncProvider::<Block>::new();
1580 let mut verifier = MockVerifier::<Block>::new();
1581 let header_for_ctx = client.info().genesis_hash;
1582 verifier.expect_next_proof_context().returning(move || header_for_ctx);
1583 let target_block = BlockBuilderBuilder::new(&*client)
1584 .on_parent_block(client.chain_info().best_hash)
1585 .with_parent_block_number(client.chain_info().best_number)
1586 .build()
1587 .unwrap()
1588 .build()
1589 .unwrap()
1590 .block;
1591 let target_header = target_block.header().clone();
1592 let header_for_verify = target_header.clone();
1594 verifier.expect_verify().return_once(move |_proof| {
1595 Ok(VerificationResult::Complete(header_for_verify, Default::default()))
1596 });
1597 provider.expect_create_verifier().return_once(move || Box::new(verifier));
1598 let config = WarpSyncConfig::WithProvider(Arc::new(provider));
1599 let mut warp_sync =
1600 WarpSync::new(client, config, None, Arc::new(MockBlockDownloader::new()), None);
1601
1602 for best_number in 1..11 {
1604 warp_sync.add_peer(PeerId::random(), Hash::random(), best_number);
1605 }
1606
1607 warp_sync.phase = Phase::TargetBlock(target_block.header().clone());
1609
1610 let (peer_id, request) = warp_sync.target_block_request().unwrap();
1611
1612 let response = Vec::new();
1614 assert!(matches!(
1616 warp_sync.on_block_response_inner(peer_id, request, response),
1617 Err(BadPeer(id, _rep)) if id == peer_id,
1618 ));
1619 }
1620
1621 #[test]
1622 fn target_block_response_with_extra_blocks_drops_peer() {
1623 let client = Arc::new(TestClientBuilder::new().set_no_genesis().build());
1624 let mut provider = MockWarpSyncProvider::<Block>::new();
1625 let mut verifier = MockVerifier::<Block>::new();
1626 let header_for_ctx = client.info().genesis_hash;
1627 verifier.expect_next_proof_context().returning(move || header_for_ctx);
1628 let target_block = BlockBuilderBuilder::new(&*client)
1629 .on_parent_block(client.chain_info().best_hash)
1630 .with_parent_block_number(client.chain_info().best_number)
1631 .build()
1632 .unwrap()
1633 .build()
1634 .unwrap()
1635 .block;
1636
1637 let mut extra_block_builder = BlockBuilderBuilder::new(&*client)
1638 .on_parent_block(client.chain_info().best_hash)
1639 .with_parent_block_number(client.chain_info().best_number)
1640 .build()
1641 .unwrap();
1642 extra_block_builder
1643 .push_storage_change(vec![1, 2, 3], Some(vec![4, 5, 6]))
1644 .unwrap();
1645 let extra_block = extra_block_builder.build().unwrap().block;
1646
1647 let target_header = target_block.header().clone();
1648 let header_for_verify = target_header.clone();
1650 verifier.expect_verify().return_once(move |_proof| {
1651 Ok(VerificationResult::Complete(header_for_verify, Default::default()))
1652 });
1653 provider.expect_create_verifier().return_once(move || Box::new(verifier));
1654 let config = WarpSyncConfig::WithProvider(Arc::new(provider));
1655 let mut warp_sync =
1656 WarpSync::new(client, config, None, Arc::new(MockBlockDownloader::new()), None);
1657
1658 for best_number in 1..11 {
1660 warp_sync.add_peer(PeerId::random(), Hash::random(), best_number);
1661 }
1662
1663 warp_sync.phase = Phase::TargetBlock(target_block.header().clone());
1665
1666 let (peer_id, request) = warp_sync.target_block_request().unwrap();
1667
1668 let response = vec![
1670 BlockData::<Block> {
1671 hash: target_block.header().hash(),
1672 header: Some(target_block.header().clone()),
1673 body: Some(target_block.extrinsics().iter().cloned().collect::<Vec<_>>()),
1674 indexed_body: None,
1675 receipt: None,
1676 message_queue: None,
1677 justification: None,
1678 justifications: None,
1679 },
1680 BlockData::<Block> {
1681 hash: extra_block.header().hash(),
1682 header: Some(extra_block.header().clone()),
1683 body: Some(extra_block.extrinsics().iter().cloned().collect::<Vec<_>>()),
1684 indexed_body: None,
1685 receipt: None,
1686 message_queue: None,
1687 justification: None,
1688 justifications: None,
1689 },
1690 ];
1691 assert!(matches!(
1693 warp_sync.on_block_response_inner(peer_id, request, response),
1694 Err(BadPeer(id, _rep)) if id == peer_id,
1695 ));
1696 }
1697
1698 #[test]
1699 fn target_block_response_with_wrong_block_drops_peer() {
1700 sp_tracing::try_init_simple();
1701
1702 let client = Arc::new(TestClientBuilder::new().set_no_genesis().build());
1703 let mut provider = MockWarpSyncProvider::<Block>::new();
1704 let mut verifier = MockVerifier::<Block>::new();
1705 let header_for_ctx = client.info().genesis_hash;
1706 verifier.expect_next_proof_context().returning(move || header_for_ctx);
1707 let target_block = BlockBuilderBuilder::new(&*client)
1708 .on_parent_block(client.chain_info().best_hash)
1709 .with_parent_block_number(client.chain_info().best_number)
1710 .build()
1711 .unwrap()
1712 .build()
1713 .unwrap()
1714 .block;
1715
1716 let mut wrong_block_builder = BlockBuilderBuilder::new(&*client)
1717 .on_parent_block(client.chain_info().best_hash)
1718 .with_parent_block_number(client.chain_info().best_number)
1719 .build()
1720 .unwrap();
1721 wrong_block_builder
1722 .push_storage_change(vec![1, 2, 3], Some(vec![4, 5, 6]))
1723 .unwrap();
1724 let wrong_block = wrong_block_builder.build().unwrap().block;
1725
1726 let target_header = target_block.header().clone();
1727 let header_for_verify = target_header.clone();
1729 verifier.expect_verify().return_once(move |_proof| {
1730 Ok(VerificationResult::Complete(header_for_verify, Default::default()))
1731 });
1732 provider.expect_create_verifier().return_once(move || Box::new(verifier));
1733 let config = WarpSyncConfig::WithProvider(Arc::new(provider));
1734 let mut warp_sync =
1735 WarpSync::new(client, config, None, Arc::new(MockBlockDownloader::new()), None);
1736
1737 for best_number in 1..11 {
1739 warp_sync.add_peer(PeerId::random(), Hash::random(), best_number);
1740 }
1741
1742 warp_sync.phase = Phase::TargetBlock(target_block.header().clone());
1744
1745 let (peer_id, request) = warp_sync.target_block_request().unwrap();
1746
1747 let response = vec![BlockData::<Block> {
1749 hash: wrong_block.header().hash(),
1750 header: Some(wrong_block.header().clone()),
1751 body: Some(wrong_block.extrinsics().iter().cloned().collect::<Vec<_>>()),
1752 indexed_body: None,
1753 receipt: None,
1754 message_queue: None,
1755 justification: None,
1756 justifications: None,
1757 }];
1758 assert!(matches!(
1760 warp_sync.on_block_response_inner(peer_id, request, response),
1761 Err(BadPeer(id, _rep)) if id == peer_id,
1762 ));
1763 }
1764
1765 #[test]
1766 fn correct_target_block_response_sets_strategy_result() {
1767 let client = Arc::new(TestClientBuilder::new().set_no_genesis().build());
1768 let mut provider = MockWarpSyncProvider::<Block>::new();
1769 let mut verifier = MockVerifier::<Block>::new();
1770 let header_for_ctx = client.info().genesis_hash;
1771 verifier.expect_next_proof_context().returning(move || header_for_ctx);
1772 let mut target_block_builder = BlockBuilderBuilder::new(&*client)
1773 .on_parent_block(client.chain_info().best_hash)
1774 .with_parent_block_number(client.chain_info().best_number)
1775 .build()
1776 .unwrap();
1777 target_block_builder
1778 .push_storage_change(vec![1, 2, 3], Some(vec![4, 5, 6]))
1779 .unwrap();
1780 let target_block = target_block_builder.build().unwrap().block;
1781 let target_header = target_block.header().clone();
1782 let header_for_verify = target_header.clone();
1784 verifier.expect_verify().return_once(move |_proof| {
1785 Ok(VerificationResult::Complete(header_for_verify, Default::default()))
1786 });
1787 provider.expect_create_verifier().return_once(move || Box::new(verifier));
1788 let config = WarpSyncConfig::WithProvider(Arc::new(provider));
1789 let mut warp_sync =
1790 WarpSync::new(client, config, None, Arc::new(MockBlockDownloader::new()), None);
1791
1792 for best_number in 1..11 {
1794 warp_sync.add_peer(PeerId::random(), Hash::random(), best_number);
1795 }
1796
1797 warp_sync.phase = Phase::TargetBlock(target_block.header().clone());
1799
1800 let (peer_id, request) = warp_sync.target_block_request().unwrap();
1801
1802 let body = Some(target_block.extrinsics().iter().cloned().collect::<Vec<_>>());
1804 let justifications = Some(Justifications::from((*b"FRNK", Vec::new())));
1805 let response = vec![BlockData::<Block> {
1806 hash: target_block.header().hash(),
1807 header: Some(target_block.header().clone()),
1808 body: body.clone(),
1809 indexed_body: None,
1810 receipt: None,
1811 message_queue: None,
1812 justification: None,
1813 justifications: justifications.clone(),
1814 }];
1815
1816 assert!(warp_sync.on_block_response_inner(peer_id, request, response).is_ok());
1817
1818 let network_provider = NetworkServiceProvider::new();
1819 let network_handle = network_provider.handle();
1820
1821 let actions = warp_sync.actions(&network_handle).collect::<Vec<_>>();
1823 assert_eq!(actions.len(), 1);
1824 assert!(matches!(actions[0], SyncingAction::Finished));
1825
1826 let result = warp_sync.take_result().unwrap();
1828 assert_eq!(result.target_header, *target_block.header());
1829 assert_eq!(result.target_body, body);
1830 assert_eq!(result.target_justifications, justifications);
1831 }
1832}