1use sc_consensus::IncomingBlock;
22use sp_consensus::BlockOrigin;
23pub use sp_consensus_grandpa::{AuthorityList, SetId};
24
25use crate::{
26 block_relay_protocol::{BlockDownloader, BlockResponseError},
27 service::network::NetworkServiceHandle,
28 strategy::{
29 chain_sync::validate_blocks, disconnected_peers::DisconnectedPeers, StrategyKey,
30 SyncingAction,
31 },
32 types::{BadPeer, SyncState, SyncStatus},
33 LOG_TARGET,
34};
35use codec::{Decode, Encode};
36use futures::{channel::oneshot, FutureExt};
37use log::{debug, error, trace, warn};
38use sc_network::{IfDisconnected, ProtocolName};
39use sc_network_common::sync::message::{
40 BlockAnnounce, BlockAttributes, BlockData, BlockRequest, Direction, FromBlock,
41};
42use sc_network_types::PeerId;
43use sp_blockchain::HeaderBackend;
44use sp_runtime::{
45 traits::{Block as BlockT, Header, NumberFor, Zero},
46 Justifications, SaturatedConversion,
47};
48use std::{any::Any, collections::HashMap, fmt, sync::Arc};
49
50const MIN_PEERS_TO_START_WARP_SYNC: usize = 3;
52
53pub struct EncodedProof(pub Vec<u8>);
55
56#[derive(Encode, Decode, Debug, Clone)]
58pub struct WarpProofRequest<B: BlockT> {
59 pub begin: B::Hash,
61}
62
63pub enum VerificationResult<Block: BlockT> {
65 Partial(SetId, AuthorityList, Block::Hash, Vec<(Block::Header, Justifications)>),
67 Complete(SetId, AuthorityList, Block::Header, Vec<(Block::Header, Justifications)>),
69}
70
71pub trait WarpSyncProvider<Block: BlockT>: Send + Sync {
73 fn generate(
76 &self,
77 start: Block::Hash,
78 ) -> Result<EncodedProof, Box<dyn std::error::Error + Send + Sync>>;
79 fn verify(
81 &self,
82 proof: &EncodedProof,
83 set_id: SetId,
84 authorities: AuthorityList,
85 ) -> Result<VerificationResult<Block>, Box<dyn std::error::Error + Send + Sync>>;
86 fn current_authorities(&self) -> AuthorityList;
89}
90
91mod rep {
92 use sc_network::ReputationChange as Rep;
93
94 pub const UNEXPECTED_RESPONSE: Rep = Rep::new(-(1 << 29), "Unexpected response");
96
97 pub const BAD_WARP_PROOF: Rep = Rep::new(-(1 << 29), "Bad warp proof");
99
100 pub const NO_BLOCK: Rep = Rep::new(-(1 << 29), "No requested block data");
102
103 pub const NOT_REQUESTED: Rep = Rep::new(-(1 << 29), "Not requested block data");
105
106 pub const VERIFICATION_FAIL: Rep = Rep::new(-(1 << 29), "Block verification failed");
108
109 pub const BAD_MESSAGE: Rep = Rep::new(-(1 << 12), "Bad message");
111}
112
113#[derive(Clone, Eq, PartialEq, Debug)]
115pub enum WarpSyncPhase<Block: BlockT> {
116 AwaitingPeers { required_peers: usize },
118 DownloadingWarpProofs,
120 DownloadingTargetBlock,
122 DownloadingState,
124 ImportingState,
126 DownloadingBlocks(NumberFor<Block>),
128 Complete,
130}
131
132impl<Block: BlockT> fmt::Display for WarpSyncPhase<Block> {
133 fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result {
134 match self {
135 Self::AwaitingPeers { required_peers } =>
136 write!(f, "Waiting for {required_peers} peers to be connected"),
137 Self::DownloadingWarpProofs => write!(f, "Downloading finality proofs"),
138 Self::DownloadingTargetBlock => write!(f, "Downloading target block"),
139 Self::DownloadingState => write!(f, "Downloading state"),
140 Self::ImportingState => write!(f, "Importing state"),
141 Self::DownloadingBlocks(n) => write!(f, "Downloading block history (#{})", n),
142 Self::Complete => write!(f, "Warp sync is complete"),
143 }
144 }
145}
146
147#[derive(Clone, Eq, PartialEq, Debug)]
149pub struct WarpSyncProgress<Block: BlockT> {
150 pub phase: WarpSyncPhase<Block>,
152 pub total_bytes: u64,
154}
155
156pub enum WarpSyncConfig<Block: BlockT> {
158 WithProvider(Arc<dyn WarpSyncProvider<Block>>),
160 WithTarget(<Block as BlockT>::Header),
164}
165
166enum Phase<B: BlockT> {
168 WaitingForPeers { warp_sync_provider: Arc<dyn WarpSyncProvider<B>> },
170 WarpProof {
172 set_id: SetId,
173 authorities: AuthorityList,
174 last_hash: B::Hash,
175 warp_sync_provider: Arc<dyn WarpSyncProvider<B>>,
176 },
177 TargetBlock(B::Header),
179 Complete,
181}
182
183enum PeerState {
184 Available,
185 DownloadingProofs,
186 DownloadingTargetBlock,
187}
188
189impl PeerState {
190 fn is_available(&self) -> bool {
191 matches!(self, PeerState::Available)
192 }
193}
194
195struct Peer<B: BlockT> {
196 best_number: NumberFor<B>,
197 state: PeerState,
198}
199
200pub struct WarpSyncResult<B: BlockT> {
201 pub target_header: B::Header,
202 pub target_body: Option<Vec<B::Extrinsic>>,
203 pub target_justifications: Option<Justifications>,
204}
205
206pub struct WarpSync<B: BlockT, Client> {
208 phase: Phase<B>,
209 client: Arc<Client>,
210 total_proof_bytes: u64,
211 total_state_bytes: u64,
212 peers: HashMap<PeerId, Peer<B>>,
213 disconnected_peers: DisconnectedPeers,
214 protocol_name: Option<ProtocolName>,
215 block_downloader: Arc<dyn BlockDownloader<B>>,
216 actions: Vec<SyncingAction<B>>,
217 result: Option<WarpSyncResult<B>>,
218 min_peers_to_start_warp_sync: usize,
220}
221
222impl<B, Client> WarpSync<B, Client>
223where
224 B: BlockT,
225 Client: HeaderBackend<B> + 'static,
226{
227 pub const STRATEGY_KEY: StrategyKey = StrategyKey::new("Warp");
229
230 pub fn new(
234 client: Arc<Client>,
235 warp_sync_config: WarpSyncConfig<B>,
236 protocol_name: Option<ProtocolName>,
237 block_downloader: Arc<dyn BlockDownloader<B>>,
238 min_peers_to_start_warp_sync: Option<usize>,
239 ) -> Self {
240 let min_peers_to_start_warp_sync =
241 min_peers_to_start_warp_sync.unwrap_or(MIN_PEERS_TO_START_WARP_SYNC);
242 if client.info().finalized_state.is_some() {
243 error!(
244 target: LOG_TARGET,
245 "Can't use warp sync mode with a partially synced database. Reverting to full sync mode."
246 );
247 return Self {
248 client,
249 phase: Phase::Complete,
250 total_proof_bytes: 0,
251 total_state_bytes: 0,
252 peers: HashMap::new(),
253 disconnected_peers: DisconnectedPeers::new(),
254 protocol_name,
255 block_downloader,
256 actions: vec![SyncingAction::Finished],
257 result: None,
258 min_peers_to_start_warp_sync,
259 }
260 }
261
262 let phase = match warp_sync_config {
263 WarpSyncConfig::WithProvider(warp_sync_provider) =>
264 Phase::WaitingForPeers { warp_sync_provider },
265 WarpSyncConfig::WithTarget(target_header) => Phase::TargetBlock(target_header),
266 };
267
268 Self {
269 client,
270 phase,
271 total_proof_bytes: 0,
272 total_state_bytes: 0,
273 peers: HashMap::new(),
274 disconnected_peers: DisconnectedPeers::new(),
275 protocol_name,
276 block_downloader,
277 actions: Vec::new(),
278 result: None,
279 min_peers_to_start_warp_sync,
280 }
281 }
282
283 pub fn add_peer(&mut self, peer_id: PeerId, _best_hash: B::Hash, best_number: NumberFor<B>) {
285 self.peers.insert(peer_id, Peer { best_number, state: PeerState::Available });
286
287 self.try_to_start_warp_sync();
288 }
289
290 pub fn remove_peer(&mut self, peer_id: &PeerId) {
292 if let Some(state) = self.peers.remove(peer_id) {
293 if !state.state.is_available() {
294 if let Some(bad_peer) =
295 self.disconnected_peers.on_disconnect_during_request(*peer_id)
296 {
297 self.actions.push(SyncingAction::DropPeer(bad_peer));
298 }
299 }
300 }
301 }
302
303 #[must_use]
307 pub fn on_validated_block_announce(
308 &mut self,
309 is_best: bool,
310 peer_id: PeerId,
311 announce: &BlockAnnounce<B::Header>,
312 ) -> Option<(B::Hash, NumberFor<B>)> {
313 is_best.then(|| {
314 let best_number = *announce.header.number();
315 let best_hash = announce.header.hash();
316 if let Some(ref mut peer) = self.peers.get_mut(&peer_id) {
317 peer.best_number = best_number;
318 }
319 (best_hash, best_number)
321 })
322 }
323
324 fn try_to_start_warp_sync(&mut self) {
326 let Phase::WaitingForPeers { warp_sync_provider } = &self.phase else { return };
327
328 if self.peers.len() < self.min_peers_to_start_warp_sync {
329 return
330 }
331
332 self.phase = Phase::WarpProof {
333 set_id: 0,
334 authorities: warp_sync_provider.current_authorities(),
335 last_hash: self.client.info().genesis_hash,
336 warp_sync_provider: Arc::clone(warp_sync_provider),
337 };
338 trace!(target: LOG_TARGET, "Started warp sync with {} peers.", self.peers.len());
339 }
340
341 pub fn on_generic_response(
342 &mut self,
343 peer_id: &PeerId,
344 protocol_name: ProtocolName,
345 response: Box<dyn Any + Send>,
346 ) {
347 if &protocol_name == self.block_downloader.protocol_name() {
348 let Ok(response) = response
349 .downcast::<(BlockRequest<B>, Result<Vec<BlockData<B>>, BlockResponseError>)>()
350 else {
351 warn!(target: LOG_TARGET, "Failed to downcast block response");
352 debug_assert!(false);
353 return;
354 };
355
356 let (request, response) = *response;
357 let blocks = match response {
358 Ok(blocks) => blocks,
359 Err(BlockResponseError::DecodeFailed(e)) => {
360 debug!(
361 target: LOG_TARGET,
362 "Failed to decode block response from peer {:?}: {:?}.",
363 peer_id,
364 e
365 );
366 self.actions.push(SyncingAction::DropPeer(BadPeer(*peer_id, rep::BAD_MESSAGE)));
367 return;
368 },
369 Err(BlockResponseError::ExtractionFailed(e)) => {
370 debug!(
371 target: LOG_TARGET,
372 "Failed to extract blocks from peer response {:?}: {:?}.",
373 peer_id,
374 e
375 );
376 self.actions.push(SyncingAction::DropPeer(BadPeer(*peer_id, rep::BAD_MESSAGE)));
377 return;
378 },
379 };
380
381 self.on_block_response(*peer_id, request, blocks);
382 } else {
383 let Ok(response) = response.downcast::<Vec<u8>>() else {
384 warn!(target: LOG_TARGET, "Failed to downcast warp sync response");
385 debug_assert!(false);
386 return;
387 };
388
389 self.on_warp_proof_response(peer_id, EncodedProof(*response));
390 }
391 }
392
393 pub fn on_warp_proof_response(&mut self, peer_id: &PeerId, response: EncodedProof) {
395 if let Some(peer) = self.peers.get_mut(peer_id) {
396 peer.state = PeerState::Available;
397 }
398
399 let Phase::WarpProof { set_id, authorities, last_hash, warp_sync_provider } =
400 &mut self.phase
401 else {
402 debug!(target: LOG_TARGET, "Unexpected warp proof response");
403 self.actions
404 .push(SyncingAction::DropPeer(BadPeer(*peer_id, rep::UNEXPECTED_RESPONSE)));
405 return
406 };
407
408 let proof_to_incoming_block =
409 |(header, justifications): (B::Header, Justifications)| -> IncomingBlock<B> {
410 IncomingBlock {
411 hash: header.hash(),
412 header: Some(header),
413 body: None,
414 indexed_body: None,
415 justifications: Some(justifications),
416 origin: Some(*peer_id),
417 allow_missing_state: true,
420 skip_execution: true,
421 import_existing: false,
423 state: None,
424 }
425 };
426
427 match warp_sync_provider.verify(&response, *set_id, authorities.clone()) {
428 Err(e) => {
429 debug!(target: LOG_TARGET, "Bad warp proof response: {}", e);
430 self.actions
431 .push(SyncingAction::DropPeer(BadPeer(*peer_id, rep::BAD_WARP_PROOF)))
432 },
433 Ok(VerificationResult::Partial(new_set_id, new_authorities, new_last_hash, proofs)) => {
434 log::debug!(target: LOG_TARGET, "Verified partial proof, set_id={:?}", new_set_id);
435 *set_id = new_set_id;
436 *authorities = new_authorities;
437 *last_hash = new_last_hash;
438 self.total_proof_bytes += response.0.len() as u64;
439 self.actions.push(SyncingAction::ImportBlocks {
440 origin: BlockOrigin::NetworkInitialSync,
441 blocks: proofs.into_iter().map(proof_to_incoming_block).collect(),
442 });
443 },
444 Ok(VerificationResult::Complete(new_set_id, _, header, proofs)) => {
445 log::debug!(
446 target: LOG_TARGET,
447 "Verified complete proof, set_id={:?}. Continuing with target block download: {} ({}).",
448 new_set_id,
449 header.hash(),
450 header.number(),
451 );
452 self.total_proof_bytes += response.0.len() as u64;
453 self.phase = Phase::TargetBlock(header);
454 self.actions.push(SyncingAction::ImportBlocks {
455 origin: BlockOrigin::NetworkInitialSync,
456 blocks: proofs.into_iter().map(proof_to_incoming_block).collect(),
457 });
458 },
459 }
460 }
461
462 pub fn on_block_response(
464 &mut self,
465 peer_id: PeerId,
466 request: BlockRequest<B>,
467 blocks: Vec<BlockData<B>>,
468 ) {
469 if let Err(bad_peer) = self.on_block_response_inner(peer_id, request, blocks) {
470 self.actions.push(SyncingAction::DropPeer(bad_peer));
471 }
472 }
473
474 fn on_block_response_inner(
475 &mut self,
476 peer_id: PeerId,
477 request: BlockRequest<B>,
478 mut blocks: Vec<BlockData<B>>,
479 ) -> Result<(), BadPeer> {
480 if let Some(peer) = self.peers.get_mut(&peer_id) {
481 peer.state = PeerState::Available;
482 }
483
484 let Phase::TargetBlock(header) = &mut self.phase else {
485 debug!(target: LOG_TARGET, "Unexpected target block response from {peer_id}");
486 return Err(BadPeer(peer_id, rep::UNEXPECTED_RESPONSE))
487 };
488
489 if blocks.is_empty() {
490 debug!(
491 target: LOG_TARGET,
492 "Downloading target block failed: empty block response from {peer_id}",
493 );
494 return Err(BadPeer(peer_id, rep::NO_BLOCK))
495 }
496
497 if blocks.len() > 1 {
498 debug!(
499 target: LOG_TARGET,
500 "Too many blocks ({}) in warp target block response from {peer_id}",
501 blocks.len(),
502 );
503 return Err(BadPeer(peer_id, rep::NOT_REQUESTED))
504 }
505
506 validate_blocks::<B>(&blocks, &peer_id, Some(request))?;
507
508 let block = blocks.pop().expect("`blocks` len checked above; qed");
509
510 let Some(block_header) = &block.header else {
511 debug!(
512 target: LOG_TARGET,
513 "Downloading target block failed: missing header in response from {peer_id}.",
514 );
515 return Err(BadPeer(peer_id, rep::VERIFICATION_FAIL))
516 };
517
518 if block_header != header {
519 debug!(
520 target: LOG_TARGET,
521 "Downloading target block failed: different header in response from {peer_id}.",
522 );
523 return Err(BadPeer(peer_id, rep::VERIFICATION_FAIL))
524 }
525
526 if block.body.is_none() {
527 debug!(
528 target: LOG_TARGET,
529 "Downloading target block failed: missing body in response from {peer_id}.",
530 );
531 return Err(BadPeer(peer_id, rep::VERIFICATION_FAIL))
532 }
533
534 self.result = Some(WarpSyncResult {
535 target_header: header.clone(),
536 target_body: block.body,
537 target_justifications: block.justifications,
538 });
539 self.phase = Phase::Complete;
540 self.actions.push(SyncingAction::Finished);
541 Ok(())
542 }
543
544 fn schedule_next_peer(
546 &mut self,
547 new_state: PeerState,
548 min_best_number: Option<NumberFor<B>>,
549 ) -> Option<PeerId> {
550 let mut targets: Vec<_> = self.peers.values().map(|p| p.best_number).collect();
551 if targets.is_empty() {
552 return None
553 }
554 targets.sort();
555 let median = targets[targets.len() / 2];
556 let threshold = std::cmp::max(median, min_best_number.unwrap_or(Zero::zero()));
557 for (peer_id, peer) in self.peers.iter_mut() {
560 if peer.state.is_available() &&
561 peer.best_number >= threshold &&
562 self.disconnected_peers.is_peer_available(peer_id)
563 {
564 peer.state = new_state;
565 return Some(*peer_id)
566 }
567 }
568 None
569 }
570
571 fn warp_proof_request(&mut self) -> Option<(PeerId, ProtocolName, WarpProofRequest<B>)> {
573 let Phase::WarpProof { last_hash, .. } = &self.phase else { return None };
574
575 let begin = *last_hash;
577
578 if self
579 .peers
580 .values()
581 .any(|peer| matches!(peer.state, PeerState::DownloadingProofs))
582 {
583 return None
585 }
586
587 let peer_id = self.schedule_next_peer(PeerState::DownloadingProofs, None)?;
588 trace!(target: LOG_TARGET, "New WarpProofRequest to {peer_id}, begin hash: {begin}.");
589
590 let request = WarpProofRequest { begin };
591
592 let Some(protocol_name) = self.protocol_name.clone() else {
593 warn!(
594 target: LOG_TARGET,
595 "Trying to send warp sync request when no protocol is configured {request:?}",
596 );
597 return None;
598 };
599
600 Some((peer_id, protocol_name, request))
601 }
602
603 fn target_block_request(&mut self) -> Option<(PeerId, BlockRequest<B>)> {
605 let Phase::TargetBlock(target_header) = &self.phase else { return None };
606
607 if self
608 .peers
609 .values()
610 .any(|peer| matches!(peer.state, PeerState::DownloadingTargetBlock))
611 {
612 return None
614 }
615
616 let target_hash = target_header.hash();
618 let target_number = *target_header.number();
619
620 let peer_id =
621 self.schedule_next_peer(PeerState::DownloadingTargetBlock, Some(target_number))?;
622
623 trace!(
624 target: LOG_TARGET,
625 "New target block request to {peer_id}, target: {} ({}).",
626 target_hash,
627 target_number,
628 );
629
630 Some((
631 peer_id,
632 BlockRequest::<B> {
633 id: 0,
634 fields: BlockAttributes::HEADER |
635 BlockAttributes::BODY |
636 BlockAttributes::JUSTIFICATION,
637 from: FromBlock::Hash(target_hash),
638 direction: Direction::Ascending,
639 max: Some(1),
640 },
641 ))
642 }
643
644 pub fn progress(&self) -> WarpSyncProgress<B> {
646 match &self.phase {
647 Phase::WaitingForPeers { .. } => WarpSyncProgress {
648 phase: WarpSyncPhase::AwaitingPeers {
649 required_peers: self.min_peers_to_start_warp_sync,
650 },
651 total_bytes: self.total_proof_bytes,
652 },
653 Phase::WarpProof { .. } => WarpSyncProgress {
654 phase: WarpSyncPhase::DownloadingWarpProofs,
655 total_bytes: self.total_proof_bytes,
656 },
657 Phase::TargetBlock(_) => WarpSyncProgress {
658 phase: WarpSyncPhase::DownloadingTargetBlock,
659 total_bytes: self.total_proof_bytes,
660 },
661 Phase::Complete => WarpSyncProgress {
662 phase: WarpSyncPhase::Complete,
663 total_bytes: self.total_proof_bytes + self.total_state_bytes,
664 },
665 }
666 }
667
668 pub fn num_peers(&self) -> usize {
670 self.peers.len()
671 }
672
673 pub fn status(&self) -> SyncStatus<B> {
675 SyncStatus {
676 state: match &self.phase {
677 Phase::WaitingForPeers { .. } => SyncState::Downloading { target: Zero::zero() },
678 Phase::WarpProof { .. } => SyncState::Downloading { target: Zero::zero() },
679 Phase::TargetBlock(header) => SyncState::Downloading { target: *header.number() },
680 Phase::Complete => SyncState::Idle,
681 },
682 best_seen_block: match &self.phase {
683 Phase::WaitingForPeers { .. } => None,
684 Phase::WarpProof { .. } => None,
685 Phase::TargetBlock(header) => Some(*header.number()),
686 Phase::Complete => None,
687 },
688 num_peers: self.peers.len().saturated_into(),
689 queued_blocks: 0,
690 state_sync: None,
691 warp_sync: Some(self.progress()),
692 }
693 }
694
695 #[must_use]
697 pub fn actions(
698 &mut self,
699 network_service: &NetworkServiceHandle,
700 ) -> impl Iterator<Item = SyncingAction<B>> {
701 let warp_proof_request =
702 self.warp_proof_request().into_iter().map(|(peer_id, protocol_name, request)| {
703 trace!(
704 target: LOG_TARGET,
705 "Created `WarpProofRequest` to {}, request: {:?}.",
706 peer_id,
707 request,
708 );
709
710 let (tx, rx) = oneshot::channel();
711
712 network_service.start_request(
713 peer_id,
714 protocol_name,
715 request.encode(),
716 tx,
717 IfDisconnected::ImmediateError,
718 );
719
720 SyncingAction::StartRequest {
721 peer_id,
722 key: Self::STRATEGY_KEY,
723 request: async move {
724 Ok(rx.await?.and_then(|(response, protocol_name)| {
725 Ok((Box::new(response) as Box<dyn Any + Send>, protocol_name))
726 }))
727 }
728 .boxed(),
729 remove_obsolete: false,
730 }
731 });
732 self.actions.extend(warp_proof_request);
733
734 let target_block_request =
735 self.target_block_request().into_iter().map(|(peer_id, request)| {
736 let downloader = self.block_downloader.clone();
737
738 SyncingAction::StartRequest {
739 peer_id,
740 key: Self::STRATEGY_KEY,
741 request: async move {
742 Ok(downloader.download_blocks(peer_id, request.clone()).await?.and_then(
743 |(response, protocol_name)| {
744 let decoded_response =
745 downloader.block_response_into_blocks(&request, response);
746 let result =
747 Box::new((request, decoded_response)) as Box<dyn Any + Send>;
748 Ok((result, protocol_name))
749 },
750 ))
751 }
752 .boxed(),
753 remove_obsolete: true,
756 }
757 });
758 self.actions.extend(target_block_request);
759
760 std::mem::take(&mut self.actions).into_iter()
761 }
762
763 #[must_use]
765 pub fn take_result(&mut self) -> Option<WarpSyncResult<B>> {
766 self.result.take()
767 }
768}
769
770#[cfg(test)]
771mod test {
772 use super::*;
773 use crate::{mock::MockBlockDownloader, service::network::NetworkServiceProvider};
774 use sc_block_builder::BlockBuilderBuilder;
775 use sp_blockchain::{BlockStatus, Error as BlockchainError, HeaderBackend, Info};
776 use sp_consensus_grandpa::{AuthorityList, SetId, GRANDPA_ENGINE_ID};
777 use sp_core::H256;
778 use sp_runtime::traits::{Block as BlockT, Header as HeaderT, NumberFor};
779 use std::{io::ErrorKind, sync::Arc};
780 use substrate_test_runtime_client::{
781 runtime::{Block, Hash},
782 BlockBuilderExt, DefaultTestClientBuilderExt, TestClientBuilder, TestClientBuilderExt,
783 };
784
785 mockall::mock! {
786 pub Client<B: BlockT> {}
787
788 impl<B: BlockT> HeaderBackend<B> for Client<B> {
789 fn header(&self, hash: B::Hash) -> Result<Option<B::Header>, BlockchainError>;
790 fn info(&self) -> Info<B>;
791 fn status(&self, hash: B::Hash) -> Result<BlockStatus, BlockchainError>;
792 fn number(
793 &self,
794 hash: B::Hash,
795 ) -> Result<Option<<<B as BlockT>::Header as HeaderT>::Number>, BlockchainError>;
796 fn hash(&self, number: NumberFor<B>) -> Result<Option<B::Hash>, BlockchainError>;
797 }
798 }
799
800 mockall::mock! {
801 pub WarpSyncProvider<B: BlockT> {}
802
803 impl<B: BlockT> super::WarpSyncProvider<B> for WarpSyncProvider<B> {
804 fn generate(
805 &self,
806 start: B::Hash,
807 ) -> Result<EncodedProof, Box<dyn std::error::Error + Send + Sync>>;
808 fn verify(
809 &self,
810 proof: &EncodedProof,
811 set_id: SetId,
812 authorities: AuthorityList,
813 ) -> Result<VerificationResult<B>, Box<dyn std::error::Error + Send + Sync>>;
814 fn current_authorities(&self) -> AuthorityList;
815 }
816 }
817
818 fn mock_client_with_state() -> MockClient<Block> {
819 let mut client = MockClient::<Block>::new();
820 let genesis_hash = Hash::random();
821 client.expect_info().return_once(move || Info {
822 best_hash: genesis_hash,
823 best_number: 0,
824 genesis_hash,
825 finalized_hash: genesis_hash,
826 finalized_number: 0,
827 finalized_state: Some((genesis_hash, 0)),
829 number_leaves: 0,
830 block_gap: None,
831 });
832
833 client
834 }
835
836 fn mock_client_without_state() -> MockClient<Block> {
837 let mut client = MockClient::<Block>::new();
838 let genesis_hash = Hash::random();
839 client.expect_info().returning(move || Info {
840 best_hash: genesis_hash,
841 best_number: 0,
842 genesis_hash,
843 finalized_hash: genesis_hash,
844 finalized_number: 0,
845 finalized_state: None,
846 number_leaves: 0,
847 block_gap: None,
848 });
849
850 client
851 }
852
853 #[test]
854 fn warp_sync_with_provider_for_db_with_finalized_state_is_noop() {
855 let client = mock_client_with_state();
856 let provider = MockWarpSyncProvider::<Block>::new();
857 let config = WarpSyncConfig::WithProvider(Arc::new(provider));
858 let mut warp_sync = WarpSync::new(
859 Arc::new(client),
860 config,
861 None,
862 Arc::new(MockBlockDownloader::new()),
863 None,
864 );
865
866 let network_provider = NetworkServiceProvider::new();
867 let network_handle = network_provider.handle();
868
869 let actions = warp_sync.actions(&network_handle).collect::<Vec<_>>();
871 assert_eq!(actions.len(), 1);
872 assert!(matches!(actions[0], SyncingAction::Finished));
873
874 assert!(warp_sync.take_result().is_none());
876 }
877
878 #[test]
879 fn warp_sync_to_target_for_db_with_finalized_state_is_noop() {
880 let client = mock_client_with_state();
881 let config = WarpSyncConfig::WithTarget(<Block as BlockT>::Header::new(
882 1,
883 Default::default(),
884 Default::default(),
885 Default::default(),
886 Default::default(),
887 ));
888 let mut warp_sync = WarpSync::new(
889 Arc::new(client),
890 config,
891 None,
892 Arc::new(MockBlockDownloader::new()),
893 None,
894 );
895
896 let network_provider = NetworkServiceProvider::new();
897 let network_handle = network_provider.handle();
898
899 let actions = warp_sync.actions(&network_handle).collect::<Vec<_>>();
901 assert_eq!(actions.len(), 1);
902 assert!(matches!(actions[0], SyncingAction::Finished));
903
904 assert!(warp_sync.take_result().is_none());
906 }
907
908 #[test]
909 fn warp_sync_with_provider_for_empty_db_doesnt_finish_instantly() {
910 let client = mock_client_without_state();
911 let provider = MockWarpSyncProvider::<Block>::new();
912 let config = WarpSyncConfig::WithProvider(Arc::new(provider));
913 let mut warp_sync = WarpSync::new(
914 Arc::new(client),
915 config,
916 None,
917 Arc::new(MockBlockDownloader::new()),
918 None,
919 );
920
921 let network_provider = NetworkServiceProvider::new();
922 let network_handle = network_provider.handle();
923
924 assert_eq!(warp_sync.actions(&network_handle).count(), 0)
926 }
927
928 #[test]
929 fn warp_sync_to_target_for_empty_db_doesnt_finish_instantly() {
930 let client = mock_client_without_state();
931 let config = WarpSyncConfig::WithTarget(<Block as BlockT>::Header::new(
932 1,
933 Default::default(),
934 Default::default(),
935 Default::default(),
936 Default::default(),
937 ));
938 let mut warp_sync = WarpSync::new(
939 Arc::new(client),
940 config,
941 None,
942 Arc::new(MockBlockDownloader::new()),
943 None,
944 );
945
946 let network_provider = NetworkServiceProvider::new();
947 let network_handle = network_provider.handle();
948
949 assert_eq!(warp_sync.actions(&network_handle).count(), 0)
951 }
952
953 #[test]
954 fn warp_sync_is_started_only_when_there_is_enough_peers() {
955 let client = mock_client_without_state();
956 let mut provider = MockWarpSyncProvider::<Block>::new();
957 provider
958 .expect_current_authorities()
959 .once()
960 .return_const(AuthorityList::default());
961 let config = WarpSyncConfig::WithProvider(Arc::new(provider));
962 let mut warp_sync = WarpSync::new(
963 Arc::new(client),
964 config,
965 None,
966 Arc::new(MockBlockDownloader::new()),
967 None,
968 );
969
970 for _ in 0..(MIN_PEERS_TO_START_WARP_SYNC - 1) {
972 warp_sync.add_peer(PeerId::random(), Hash::random(), 10);
973 assert!(matches!(warp_sync.phase, Phase::WaitingForPeers { .. }))
974 }
975
976 warp_sync.add_peer(PeerId::random(), Hash::random(), 10);
978 assert!(matches!(warp_sync.phase, Phase::WarpProof { .. }))
979 }
980
981 #[test]
982 fn no_peer_is_scheduled_if_no_peers_connected() {
983 let client = mock_client_without_state();
984 let provider = MockWarpSyncProvider::<Block>::new();
985 let config = WarpSyncConfig::WithProvider(Arc::new(provider));
986 let mut warp_sync = WarpSync::new(
987 Arc::new(client),
988 config,
989 None,
990 Arc::new(MockBlockDownloader::new()),
991 None,
992 );
993
994 assert!(warp_sync.schedule_next_peer(PeerState::DownloadingProofs, None).is_none());
995 }
996
997 #[test]
998 fn enough_peers_are_used_in_tests() {
999 assert!(
1001 10 >= MIN_PEERS_TO_START_WARP_SYNC,
1002 "Tests must be updated to use that many initial peers.",
1003 );
1004 }
1005
1006 #[test]
1007 fn at_least_median_synced_peer_is_scheduled() {
1008 for _ in 0..100 {
1009 let client = mock_client_without_state();
1010 let mut provider = MockWarpSyncProvider::<Block>::new();
1011 provider
1012 .expect_current_authorities()
1013 .once()
1014 .return_const(AuthorityList::default());
1015 let config = WarpSyncConfig::WithProvider(Arc::new(provider));
1016 let mut warp_sync = WarpSync::new(
1017 Arc::new(client),
1018 config,
1019 None,
1020 Arc::new(MockBlockDownloader::new()),
1021 None,
1022 );
1023
1024 for best_number in 1..11 {
1025 warp_sync.add_peer(PeerId::random(), Hash::random(), best_number);
1026 }
1027
1028 let peer_id = warp_sync.schedule_next_peer(PeerState::DownloadingProofs, None);
1029 assert!(warp_sync.peers.get(&peer_id.unwrap()).unwrap().best_number >= 6);
1030 }
1031 }
1032
1033 #[test]
1034 fn min_best_number_peer_is_scheduled() {
1035 for _ in 0..10 {
1036 let client = mock_client_without_state();
1037 let mut provider = MockWarpSyncProvider::<Block>::new();
1038 provider
1039 .expect_current_authorities()
1040 .once()
1041 .return_const(AuthorityList::default());
1042 let config = WarpSyncConfig::WithProvider(Arc::new(provider));
1043 let mut warp_sync = WarpSync::new(
1044 Arc::new(client),
1045 config,
1046 None,
1047 Arc::new(MockBlockDownloader::new()),
1048 None,
1049 );
1050
1051 for best_number in 1..11 {
1052 warp_sync.add_peer(PeerId::random(), Hash::random(), best_number);
1053 }
1054
1055 let peer_id = warp_sync.schedule_next_peer(PeerState::DownloadingProofs, Some(10));
1056 assert!(warp_sync.peers.get(&peer_id.unwrap()).unwrap().best_number == 10);
1057 }
1058 }
1059
1060 #[test]
1061 fn backedoff_number_peer_is_not_scheduled() {
1062 let client = mock_client_without_state();
1063 let mut provider = MockWarpSyncProvider::<Block>::new();
1064 provider
1065 .expect_current_authorities()
1066 .once()
1067 .return_const(AuthorityList::default());
1068 let config = WarpSyncConfig::WithProvider(Arc::new(provider));
1069 let mut warp_sync = WarpSync::new(
1070 Arc::new(client),
1071 config,
1072 None,
1073 Arc::new(MockBlockDownloader::new()),
1074 None,
1075 );
1076
1077 for best_number in 1..11 {
1078 warp_sync.add_peer(PeerId::random(), Hash::random(), best_number);
1079 }
1080
1081 let ninth_peer =
1082 *warp_sync.peers.iter().find(|(_, state)| state.best_number == 9).unwrap().0;
1083 let tenth_peer =
1084 *warp_sync.peers.iter().find(|(_, state)| state.best_number == 10).unwrap().0;
1085
1086 warp_sync.remove_peer(&tenth_peer);
1088 assert!(warp_sync.disconnected_peers.is_peer_available(&tenth_peer));
1089
1090 warp_sync.add_peer(tenth_peer, H256::random(), 10);
1091 let peer_id = warp_sync.schedule_next_peer(PeerState::DownloadingProofs, Some(10));
1092 assert_eq!(tenth_peer, peer_id.unwrap());
1093 warp_sync.remove_peer(&tenth_peer);
1094
1095 assert!(!warp_sync.disconnected_peers.is_peer_available(&tenth_peer));
1097
1098 warp_sync.add_peer(tenth_peer, H256::random(), 10);
1100 let peer_id: Option<PeerId> =
1101 warp_sync.schedule_next_peer(PeerState::DownloadingProofs, Some(10));
1102 assert!(peer_id.is_none());
1103
1104 let peer_id: Option<PeerId> =
1106 warp_sync.schedule_next_peer(PeerState::DownloadingProofs, Some(9));
1107 assert_eq!(ninth_peer, peer_id.unwrap());
1108 }
1109
1110 #[test]
1111 fn no_warp_proof_request_in_another_phase() {
1112 let client = mock_client_without_state();
1113 let mut provider = MockWarpSyncProvider::<Block>::new();
1114 provider
1115 .expect_current_authorities()
1116 .once()
1117 .return_const(AuthorityList::default());
1118 let config = WarpSyncConfig::WithProvider(Arc::new(provider));
1119 let mut warp_sync = WarpSync::new(
1120 Arc::new(client),
1121 config,
1122 Some(ProtocolName::Static("")),
1123 Arc::new(MockBlockDownloader::new()),
1124 None,
1125 );
1126
1127 for best_number in 1..11 {
1129 warp_sync.add_peer(PeerId::random(), Hash::random(), best_number);
1130 }
1131
1132 warp_sync.phase = Phase::TargetBlock(<Block as BlockT>::Header::new(
1134 1,
1135 Default::default(),
1136 Default::default(),
1137 Default::default(),
1138 Default::default(),
1139 ));
1140
1141 assert!(warp_sync.warp_proof_request().is_none());
1143 }
1144
1145 #[test]
1146 fn warp_proof_request_starts_at_last_hash() {
1147 let client = mock_client_without_state();
1148 let mut provider = MockWarpSyncProvider::<Block>::new();
1149 provider
1150 .expect_current_authorities()
1151 .once()
1152 .return_const(AuthorityList::default());
1153 let config = WarpSyncConfig::WithProvider(Arc::new(provider));
1154 let mut warp_sync = WarpSync::new(
1155 Arc::new(client),
1156 config,
1157 Some(ProtocolName::Static("")),
1158 Arc::new(MockBlockDownloader::new()),
1159 None,
1160 );
1161
1162 for best_number in 1..11 {
1164 warp_sync.add_peer(PeerId::random(), Hash::random(), best_number);
1165 }
1166 assert!(matches!(warp_sync.phase, Phase::WarpProof { .. }));
1167
1168 let known_last_hash = Hash::random();
1169
1170 match &mut warp_sync.phase {
1172 Phase::WarpProof { last_hash, .. } => {
1173 *last_hash = known_last_hash;
1174 },
1175 _ => panic!("Invalid phase."),
1176 }
1177
1178 let (_peer_id, _protocol_name, request) = warp_sync.warp_proof_request().unwrap();
1179 assert_eq!(request.begin, known_last_hash);
1180 }
1181
1182 #[test]
1183 fn no_parallel_warp_proof_requests() {
1184 let client = mock_client_without_state();
1185 let mut provider = MockWarpSyncProvider::<Block>::new();
1186 provider
1187 .expect_current_authorities()
1188 .once()
1189 .return_const(AuthorityList::default());
1190 let config = WarpSyncConfig::WithProvider(Arc::new(provider));
1191 let mut warp_sync = WarpSync::new(
1192 Arc::new(client),
1193 config,
1194 Some(ProtocolName::Static("")),
1195 Arc::new(MockBlockDownloader::new()),
1196 None,
1197 );
1198
1199 for best_number in 1..11 {
1201 warp_sync.add_peer(PeerId::random(), Hash::random(), best_number);
1202 }
1203 assert!(matches!(warp_sync.phase, Phase::WarpProof { .. }));
1204
1205 assert!(warp_sync.warp_proof_request().is_some());
1207 assert!(warp_sync.warp_proof_request().is_none());
1209 }
1210
1211 #[test]
1212 fn bad_warp_proof_response_drops_peer() {
1213 let client = mock_client_without_state();
1214 let mut provider = MockWarpSyncProvider::<Block>::new();
1215 provider
1216 .expect_current_authorities()
1217 .once()
1218 .return_const(AuthorityList::default());
1219 provider.expect_verify().return_once(|_proof, _set_id, _authorities| {
1221 Err(Box::new(std::io::Error::new(ErrorKind::Other, "test-verification-failure")))
1222 });
1223 let config = WarpSyncConfig::WithProvider(Arc::new(provider));
1224 let mut warp_sync = WarpSync::new(
1225 Arc::new(client),
1226 config,
1227 Some(ProtocolName::Static("")),
1228 Arc::new(MockBlockDownloader::new()),
1229 None,
1230 );
1231
1232 for best_number in 1..11 {
1234 warp_sync.add_peer(PeerId::random(), Hash::random(), best_number);
1235 }
1236 assert!(matches!(warp_sync.phase, Phase::WarpProof { .. }));
1237
1238 let network_provider = NetworkServiceProvider::new();
1239 let network_handle = network_provider.handle();
1240
1241 let actions = warp_sync.actions(&network_handle).collect::<Vec<_>>();
1243 assert_eq!(actions.len(), 1);
1244 let SyncingAction::StartRequest { peer_id: request_peer_id, .. } = actions[0] else {
1245 panic!("Invalid action");
1246 };
1247
1248 warp_sync.on_warp_proof_response(&request_peer_id, EncodedProof(Vec::new()));
1249
1250 let actions = std::mem::take(&mut warp_sync.actions);
1252 assert_eq!(actions.len(), 1);
1253 assert!(matches!(
1254 actions[0],
1255 SyncingAction::DropPeer(BadPeer(peer_id, _rep)) if peer_id == request_peer_id
1256 ));
1257 assert!(matches!(warp_sync.phase, Phase::WarpProof { .. }));
1258 }
1259
1260 #[test]
1261 fn partial_warp_proof_doesnt_advance_phase() {
1262 let client = Arc::new(TestClientBuilder::new().set_no_genesis().build());
1263 let mut provider = MockWarpSyncProvider::<Block>::new();
1264 provider
1265 .expect_current_authorities()
1266 .once()
1267 .return_const(AuthorityList::default());
1268 let target_block = BlockBuilderBuilder::new(&*client)
1269 .on_parent_block(client.chain_info().best_hash)
1270 .with_parent_block_number(client.chain_info().best_number)
1271 .build()
1272 .unwrap()
1273 .build()
1274 .unwrap()
1275 .block;
1276 let target_header = target_block.header().clone();
1277 let justifications = Justifications::new(vec![(GRANDPA_ENGINE_ID, vec![1, 2, 3, 4, 5])]);
1278 {
1280 let target_header = target_header.clone();
1281 let justifications = justifications.clone();
1282 provider.expect_verify().return_once(move |_proof, set_id, authorities| {
1283 Ok(VerificationResult::Partial(
1284 set_id,
1285 authorities,
1286 target_header.hash(),
1287 vec![(target_header, justifications)],
1288 ))
1289 });
1290 }
1291 let config = WarpSyncConfig::WithProvider(Arc::new(provider));
1292 let mut warp_sync = WarpSync::new(
1293 client,
1294 config,
1295 Some(ProtocolName::Static("")),
1296 Arc::new(MockBlockDownloader::new()),
1297 None,
1298 );
1299
1300 for best_number in 1..11 {
1302 warp_sync.add_peer(PeerId::random(), Hash::random(), best_number);
1303 }
1304 assert!(matches!(warp_sync.phase, Phase::WarpProof { .. }));
1305
1306 let network_provider = NetworkServiceProvider::new();
1307 let network_handle = network_provider.handle();
1308
1309 let actions = warp_sync.actions(&network_handle).collect::<Vec<_>>();
1311 assert_eq!(actions.len(), 1);
1312 let SyncingAction::StartRequest { peer_id: request_peer_id, .. } = actions[0] else {
1313 panic!("Invalid action");
1314 };
1315
1316 warp_sync.on_warp_proof_response(&request_peer_id, EncodedProof(Vec::new()));
1317
1318 assert_eq!(warp_sync.actions.len(), 1);
1319 let SyncingAction::ImportBlocks { origin, mut blocks } = warp_sync.actions.pop().unwrap()
1320 else {
1321 panic!("Expected `ImportBlocks` action.");
1322 };
1323 assert_eq!(origin, BlockOrigin::NetworkInitialSync);
1324 assert_eq!(blocks.len(), 1);
1325 let import_block = blocks.pop().unwrap();
1326 assert_eq!(
1327 import_block,
1328 IncomingBlock {
1329 hash: target_header.hash(),
1330 header: Some(target_header),
1331 body: None,
1332 indexed_body: None,
1333 justifications: Some(justifications),
1334 origin: Some(request_peer_id),
1335 allow_missing_state: true,
1336 skip_execution: true,
1337 import_existing: false,
1338 state: None,
1339 }
1340 );
1341 assert!(matches!(warp_sync.phase, Phase::WarpProof { .. }));
1342 }
1343
1344 #[test]
1345 fn complete_warp_proof_advances_phase() {
1346 let client = Arc::new(TestClientBuilder::new().set_no_genesis().build());
1347 let mut provider = MockWarpSyncProvider::<Block>::new();
1348 provider
1349 .expect_current_authorities()
1350 .once()
1351 .return_const(AuthorityList::default());
1352 let target_block = BlockBuilderBuilder::new(&*client)
1353 .on_parent_block(client.chain_info().best_hash)
1354 .with_parent_block_number(client.chain_info().best_number)
1355 .build()
1356 .unwrap()
1357 .build()
1358 .unwrap()
1359 .block;
1360 let target_header = target_block.header().clone();
1361 let justifications = Justifications::new(vec![(GRANDPA_ENGINE_ID, vec![1, 2, 3, 4, 5])]);
1362 {
1364 let target_header = target_header.clone();
1365 let justifications = justifications.clone();
1366 provider.expect_verify().return_once(move |_proof, set_id, authorities| {
1367 Ok(VerificationResult::Complete(
1368 set_id,
1369 authorities,
1370 target_header.clone(),
1371 vec![(target_header, justifications)],
1372 ))
1373 });
1374 }
1375 let config = WarpSyncConfig::WithProvider(Arc::new(provider));
1376 let mut warp_sync = WarpSync::new(
1377 client,
1378 config,
1379 Some(ProtocolName::Static("")),
1380 Arc::new(MockBlockDownloader::new()),
1381 None,
1382 );
1383
1384 for best_number in 1..11 {
1386 warp_sync.add_peer(PeerId::random(), Hash::random(), best_number);
1387 }
1388 assert!(matches!(warp_sync.phase, Phase::WarpProof { .. }));
1389
1390 let network_provider = NetworkServiceProvider::new();
1391 let network_handle = network_provider.handle();
1392
1393 let actions = warp_sync.actions(&network_handle).collect::<Vec<_>>();
1395 assert_eq!(actions.len(), 1);
1396 let SyncingAction::StartRequest { peer_id: request_peer_id, .. } = actions[0] else {
1397 panic!("Invalid action.");
1398 };
1399
1400 warp_sync.on_warp_proof_response(&request_peer_id, EncodedProof(Vec::new()));
1401
1402 assert_eq!(warp_sync.actions.len(), 1);
1403 let SyncingAction::ImportBlocks { origin, mut blocks } = warp_sync.actions.pop().unwrap()
1404 else {
1405 panic!("Expected `ImportBlocks` action.");
1406 };
1407 assert_eq!(origin, BlockOrigin::NetworkInitialSync);
1408 assert_eq!(blocks.len(), 1);
1409 let import_block = blocks.pop().unwrap();
1410 assert_eq!(
1411 import_block,
1412 IncomingBlock {
1413 hash: target_header.hash(),
1414 header: Some(target_header),
1415 body: None,
1416 indexed_body: None,
1417 justifications: Some(justifications),
1418 origin: Some(request_peer_id),
1419 allow_missing_state: true,
1420 skip_execution: true,
1421 import_existing: false,
1422 state: None,
1423 }
1424 );
1425 assert!(
1426 matches!(warp_sync.phase, Phase::TargetBlock(header) if header == *target_block.header())
1427 );
1428 }
1429
1430 #[test]
1431 fn no_target_block_requests_in_another_phase() {
1432 let client = mock_client_without_state();
1433 let mut provider = MockWarpSyncProvider::<Block>::new();
1434 provider
1435 .expect_current_authorities()
1436 .once()
1437 .return_const(AuthorityList::default());
1438 let config = WarpSyncConfig::WithProvider(Arc::new(provider));
1439 let mut warp_sync = WarpSync::new(
1440 Arc::new(client),
1441 config,
1442 None,
1443 Arc::new(MockBlockDownloader::new()),
1444 None,
1445 );
1446
1447 for best_number in 1..11 {
1449 warp_sync.add_peer(PeerId::random(), Hash::random(), best_number);
1450 }
1451 assert!(matches!(warp_sync.phase, Phase::WarpProof { .. }));
1453
1454 assert!(warp_sync.target_block_request().is_none());
1456 }
1457
1458 #[test]
1459 fn target_block_request_is_correct() {
1460 let client = Arc::new(TestClientBuilder::new().set_no_genesis().build());
1461 let mut provider = MockWarpSyncProvider::<Block>::new();
1462 provider
1463 .expect_current_authorities()
1464 .once()
1465 .return_const(AuthorityList::default());
1466 let target_block = BlockBuilderBuilder::new(&*client)
1467 .on_parent_block(client.chain_info().best_hash)
1468 .with_parent_block_number(client.chain_info().best_number)
1469 .build()
1470 .unwrap()
1471 .build()
1472 .unwrap()
1473 .block;
1474 let target_header = target_block.header().clone();
1475 provider.expect_verify().return_once(move |_proof, set_id, authorities| {
1477 Ok(VerificationResult::Complete(set_id, authorities, target_header, Default::default()))
1478 });
1479 let config = WarpSyncConfig::WithProvider(Arc::new(provider));
1480 let mut warp_sync =
1481 WarpSync::new(client, config, None, Arc::new(MockBlockDownloader::new()), None);
1482
1483 for best_number in 1..11 {
1485 warp_sync.add_peer(PeerId::random(), Hash::random(), best_number);
1486 }
1487
1488 warp_sync.phase = Phase::TargetBlock(target_block.header().clone());
1490
1491 let (_peer_id, request) = warp_sync.target_block_request().unwrap();
1492 assert_eq!(request.from, FromBlock::Hash(target_block.header().hash()));
1493 assert_eq!(
1494 request.fields,
1495 BlockAttributes::HEADER | BlockAttributes::BODY | BlockAttributes::JUSTIFICATION
1496 );
1497 assert_eq!(request.max, Some(1));
1498 }
1499
1500 #[test]
1501 fn externally_set_target_block_is_requested() {
1502 let client = Arc::new(TestClientBuilder::new().set_no_genesis().build());
1503 let target_block = BlockBuilderBuilder::new(&*client)
1504 .on_parent_block(client.chain_info().best_hash)
1505 .with_parent_block_number(client.chain_info().best_number)
1506 .build()
1507 .unwrap()
1508 .build()
1509 .unwrap()
1510 .block;
1511 let target_header = target_block.header().clone();
1512 let config = WarpSyncConfig::WithTarget(target_header);
1513 let mut warp_sync =
1514 WarpSync::new(client, config, None, Arc::new(MockBlockDownloader::new()), None);
1515
1516 for best_number in 1..11 {
1518 warp_sync.add_peer(PeerId::random(), Hash::random(), best_number);
1519 }
1520
1521 assert!(matches!(warp_sync.phase, Phase::TargetBlock(_)));
1522
1523 let (_peer_id, request) = warp_sync.target_block_request().unwrap();
1524 assert_eq!(request.from, FromBlock::Hash(target_block.header().hash()));
1525 assert_eq!(
1526 request.fields,
1527 BlockAttributes::HEADER | BlockAttributes::BODY | BlockAttributes::JUSTIFICATION
1528 );
1529 assert_eq!(request.max, Some(1));
1530 }
1531
1532 #[test]
1533 fn no_parallel_target_block_requests() {
1534 let client = Arc::new(TestClientBuilder::new().set_no_genesis().build());
1535 let mut provider = MockWarpSyncProvider::<Block>::new();
1536 provider
1537 .expect_current_authorities()
1538 .once()
1539 .return_const(AuthorityList::default());
1540 let target_block = BlockBuilderBuilder::new(&*client)
1541 .on_parent_block(client.chain_info().best_hash)
1542 .with_parent_block_number(client.chain_info().best_number)
1543 .build()
1544 .unwrap()
1545 .build()
1546 .unwrap()
1547 .block;
1548 let target_header = target_block.header().clone();
1549 provider.expect_verify().return_once(move |_proof, set_id, authorities| {
1551 Ok(VerificationResult::Complete(set_id, authorities, target_header, Default::default()))
1552 });
1553 let config = WarpSyncConfig::WithProvider(Arc::new(provider));
1554 let mut warp_sync =
1555 WarpSync::new(client, config, None, Arc::new(MockBlockDownloader::new()), None);
1556
1557 for best_number in 1..11 {
1559 warp_sync.add_peer(PeerId::random(), Hash::random(), best_number);
1560 }
1561
1562 warp_sync.phase = Phase::TargetBlock(target_block.header().clone());
1564
1565 assert!(warp_sync.target_block_request().is_some());
1567 assert!(warp_sync.target_block_request().is_none());
1569 }
1570
1571 #[test]
1572 fn target_block_response_with_no_blocks_drops_peer() {
1573 let client = Arc::new(TestClientBuilder::new().set_no_genesis().build());
1574 let mut provider = MockWarpSyncProvider::<Block>::new();
1575 provider
1576 .expect_current_authorities()
1577 .once()
1578 .return_const(AuthorityList::default());
1579 let target_block = BlockBuilderBuilder::new(&*client)
1580 .on_parent_block(client.chain_info().best_hash)
1581 .with_parent_block_number(client.chain_info().best_number)
1582 .build()
1583 .unwrap()
1584 .build()
1585 .unwrap()
1586 .block;
1587 let target_header = target_block.header().clone();
1588 provider.expect_verify().return_once(move |_proof, set_id, authorities| {
1590 Ok(VerificationResult::Complete(set_id, authorities, target_header, Default::default()))
1591 });
1592 let config = WarpSyncConfig::WithProvider(Arc::new(provider));
1593 let mut warp_sync =
1594 WarpSync::new(client, config, None, Arc::new(MockBlockDownloader::new()), None);
1595
1596 for best_number in 1..11 {
1598 warp_sync.add_peer(PeerId::random(), Hash::random(), best_number);
1599 }
1600
1601 warp_sync.phase = Phase::TargetBlock(target_block.header().clone());
1603
1604 let (peer_id, request) = warp_sync.target_block_request().unwrap();
1605
1606 let response = Vec::new();
1608 assert!(matches!(
1610 warp_sync.on_block_response_inner(peer_id, request, response),
1611 Err(BadPeer(id, _rep)) if id == peer_id,
1612 ));
1613 }
1614
1615 #[test]
1616 fn target_block_response_with_extra_blocks_drops_peer() {
1617 let client = Arc::new(TestClientBuilder::new().set_no_genesis().build());
1618 let mut provider = MockWarpSyncProvider::<Block>::new();
1619 provider
1620 .expect_current_authorities()
1621 .once()
1622 .return_const(AuthorityList::default());
1623 let target_block = BlockBuilderBuilder::new(&*client)
1624 .on_parent_block(client.chain_info().best_hash)
1625 .with_parent_block_number(client.chain_info().best_number)
1626 .build()
1627 .unwrap()
1628 .build()
1629 .unwrap()
1630 .block;
1631
1632 let mut extra_block_builder = BlockBuilderBuilder::new(&*client)
1633 .on_parent_block(client.chain_info().best_hash)
1634 .with_parent_block_number(client.chain_info().best_number)
1635 .build()
1636 .unwrap();
1637 extra_block_builder
1638 .push_storage_change(vec![1, 2, 3], Some(vec![4, 5, 6]))
1639 .unwrap();
1640 let extra_block = extra_block_builder.build().unwrap().block;
1641
1642 let target_header = target_block.header().clone();
1643 provider.expect_verify().return_once(move |_proof, set_id, authorities| {
1645 Ok(VerificationResult::Complete(set_id, authorities, target_header, Default::default()))
1646 });
1647 let config = WarpSyncConfig::WithProvider(Arc::new(provider));
1648 let mut warp_sync =
1649 WarpSync::new(client, config, None, Arc::new(MockBlockDownloader::new()), None);
1650
1651 for best_number in 1..11 {
1653 warp_sync.add_peer(PeerId::random(), Hash::random(), best_number);
1654 }
1655
1656 warp_sync.phase = Phase::TargetBlock(target_block.header().clone());
1658
1659 let (peer_id, request) = warp_sync.target_block_request().unwrap();
1660
1661 let response = vec![
1663 BlockData::<Block> {
1664 hash: target_block.header().hash(),
1665 header: Some(target_block.header().clone()),
1666 body: Some(target_block.extrinsics().iter().cloned().collect::<Vec<_>>()),
1667 indexed_body: None,
1668 receipt: None,
1669 message_queue: None,
1670 justification: None,
1671 justifications: None,
1672 },
1673 BlockData::<Block> {
1674 hash: extra_block.header().hash(),
1675 header: Some(extra_block.header().clone()),
1676 body: Some(extra_block.extrinsics().iter().cloned().collect::<Vec<_>>()),
1677 indexed_body: None,
1678 receipt: None,
1679 message_queue: None,
1680 justification: None,
1681 justifications: None,
1682 },
1683 ];
1684 assert!(matches!(
1686 warp_sync.on_block_response_inner(peer_id, request, response),
1687 Err(BadPeer(id, _rep)) if id == peer_id,
1688 ));
1689 }
1690
1691 #[test]
1692 fn target_block_response_with_wrong_block_drops_peer() {
1693 sp_tracing::try_init_simple();
1694
1695 let client = Arc::new(TestClientBuilder::new().set_no_genesis().build());
1696 let mut provider = MockWarpSyncProvider::<Block>::new();
1697 provider
1698 .expect_current_authorities()
1699 .once()
1700 .return_const(AuthorityList::default());
1701 let target_block = BlockBuilderBuilder::new(&*client)
1702 .on_parent_block(client.chain_info().best_hash)
1703 .with_parent_block_number(client.chain_info().best_number)
1704 .build()
1705 .unwrap()
1706 .build()
1707 .unwrap()
1708 .block;
1709
1710 let mut wrong_block_builder = BlockBuilderBuilder::new(&*client)
1711 .on_parent_block(client.chain_info().best_hash)
1712 .with_parent_block_number(client.chain_info().best_number)
1713 .build()
1714 .unwrap();
1715 wrong_block_builder
1716 .push_storage_change(vec![1, 2, 3], Some(vec![4, 5, 6]))
1717 .unwrap();
1718 let wrong_block = wrong_block_builder.build().unwrap().block;
1719
1720 let target_header = target_block.header().clone();
1721 provider.expect_verify().return_once(move |_proof, set_id, authorities| {
1723 Ok(VerificationResult::Complete(set_id, authorities, target_header, Default::default()))
1724 });
1725 let config = WarpSyncConfig::WithProvider(Arc::new(provider));
1726 let mut warp_sync =
1727 WarpSync::new(client, config, None, Arc::new(MockBlockDownloader::new()), None);
1728
1729 for best_number in 1..11 {
1731 warp_sync.add_peer(PeerId::random(), Hash::random(), best_number);
1732 }
1733
1734 warp_sync.phase = Phase::TargetBlock(target_block.header().clone());
1736
1737 let (peer_id, request) = warp_sync.target_block_request().unwrap();
1738
1739 let response = vec![BlockData::<Block> {
1741 hash: wrong_block.header().hash(),
1742 header: Some(wrong_block.header().clone()),
1743 body: Some(wrong_block.extrinsics().iter().cloned().collect::<Vec<_>>()),
1744 indexed_body: None,
1745 receipt: None,
1746 message_queue: None,
1747 justification: None,
1748 justifications: None,
1749 }];
1750 assert!(matches!(
1752 warp_sync.on_block_response_inner(peer_id, request, response),
1753 Err(BadPeer(id, _rep)) if id == peer_id,
1754 ));
1755 }
1756
1757 #[test]
1758 fn correct_target_block_response_sets_strategy_result() {
1759 let client = Arc::new(TestClientBuilder::new().set_no_genesis().build());
1760 let mut provider = MockWarpSyncProvider::<Block>::new();
1761 provider
1762 .expect_current_authorities()
1763 .once()
1764 .return_const(AuthorityList::default());
1765 let mut target_block_builder = BlockBuilderBuilder::new(&*client)
1766 .on_parent_block(client.chain_info().best_hash)
1767 .with_parent_block_number(client.chain_info().best_number)
1768 .build()
1769 .unwrap();
1770 target_block_builder
1771 .push_storage_change(vec![1, 2, 3], Some(vec![4, 5, 6]))
1772 .unwrap();
1773 let target_block = target_block_builder.build().unwrap().block;
1774 let target_header = target_block.header().clone();
1775 provider.expect_verify().return_once(move |_proof, set_id, authorities| {
1777 Ok(VerificationResult::Complete(set_id, authorities, target_header, Default::default()))
1778 });
1779 let config = WarpSyncConfig::WithProvider(Arc::new(provider));
1780 let mut warp_sync =
1781 WarpSync::new(client, config, None, Arc::new(MockBlockDownloader::new()), None);
1782
1783 for best_number in 1..11 {
1785 warp_sync.add_peer(PeerId::random(), Hash::random(), best_number);
1786 }
1787
1788 warp_sync.phase = Phase::TargetBlock(target_block.header().clone());
1790
1791 let (peer_id, request) = warp_sync.target_block_request().unwrap();
1792
1793 let body = Some(target_block.extrinsics().iter().cloned().collect::<Vec<_>>());
1795 let justifications = Some(Justifications::from((*b"FRNK", Vec::new())));
1796 let response = vec![BlockData::<Block> {
1797 hash: target_block.header().hash(),
1798 header: Some(target_block.header().clone()),
1799 body: body.clone(),
1800 indexed_body: None,
1801 receipt: None,
1802 message_queue: None,
1803 justification: None,
1804 justifications: justifications.clone(),
1805 }];
1806
1807 assert!(warp_sync.on_block_response_inner(peer_id, request, response).is_ok());
1808
1809 let network_provider = NetworkServiceProvider::new();
1810 let network_handle = network_provider.handle();
1811
1812 let actions = warp_sync.actions(&network_handle).collect::<Vec<_>>();
1814 assert_eq!(actions.len(), 1);
1815 assert!(matches!(actions[0], SyncingAction::Finished));
1816
1817 let result = warp_sync.take_result().unwrap();
1819 assert_eq!(result.target_header, *target_block.header());
1820 assert_eq!(result.target_body, body);
1821 assert_eq!(result.target_justifications, justifications);
1822 }
1823}