1use crate::{
32 blocks::BlockCollection,
33 justification_requests::ExtraRequests,
34 schema::v1::StateResponse,
35 strategy::{
36 disconnected_peers::DisconnectedPeers,
37 state_sync::{ImportResult, StateSync, StateSyncProvider},
38 warp::{EncodedProof, WarpSyncPhase, WarpSyncProgress},
39 StrategyKey, SyncingAction, SyncingStrategy,
40 },
41 types::{BadPeer, OpaqueStateRequest, OpaqueStateResponse, SyncState, SyncStatus},
42 LOG_TARGET,
43};
44
45use codec::Encode;
46use log::{debug, error, info, trace, warn};
47use prometheus_endpoint::{register, Gauge, PrometheusError, Registry, U64};
48use sc_client_api::{blockchain::BlockGap, BlockBackend, ProofProvider};
49use sc_consensus::{BlockImportError, BlockImportStatus, IncomingBlock};
50use sc_network::ProtocolName;
51use sc_network_common::sync::message::{
52 BlockAnnounce, BlockAttributes, BlockData, BlockRequest, BlockResponse, Direction, FromBlock,
53};
54use sc_network_types::PeerId;
55use sp_arithmetic::traits::Saturating;
56use sp_blockchain::{Error as ClientError, HeaderBackend, HeaderMetadata};
57use sp_consensus::{BlockOrigin, BlockStatus};
58use sp_runtime::{
59 traits::{
60 Block as BlockT, CheckedSub, Hash, HashingFor, Header as HeaderT, NumberFor, One,
61 SaturatedConversion, Zero,
62 },
63 EncodedJustification, Justifications,
64};
65
66use std::{
67 collections::{HashMap, HashSet},
68 ops::Range,
69 sync::Arc,
70};
71
72#[cfg(test)]
73mod test;
74
75const MAX_IMPORTING_BLOCKS: usize = 2048;
77
78const MAX_DOWNLOAD_AHEAD: u32 = 2048;
80
81const MAX_BLOCKS_TO_LOOK_BACKWARDS: u32 = MAX_DOWNLOAD_AHEAD / 2;
84
85const STATE_SYNC_FINALITY_THRESHOLD: u32 = 8;
87
88const MAJOR_SYNC_BLOCKS: u8 = 5;
94
95mod rep {
96 use sc_network::ReputationChange as Rep;
97 pub const BLOCKCHAIN_READ_ERROR: Rep = Rep::new(-(1 << 16), "DB Error");
100
101 pub const GENESIS_MISMATCH: Rep = Rep::new(i32::MIN, "Genesis mismatch");
104
105 pub const INCOMPLETE_HEADER: Rep = Rep::new(-(1 << 20), "Incomplete header");
107
108 pub const VERIFICATION_FAIL: Rep = Rep::new(-(1 << 29), "Block verification failed");
110
111 pub const BAD_BLOCK: Rep = Rep::new(-(1 << 29), "Bad block");
113
114 pub const NO_BLOCK: Rep = Rep::new(-(1 << 29), "No requested block data");
116
117 pub const NOT_REQUESTED: Rep = Rep::new(-(1 << 29), "Not requested block data");
119
120 pub const BAD_JUSTIFICATION: Rep = Rep::new(-(1 << 16), "Bad justification");
122
123 pub const UNKNOWN_ANCESTOR: Rep = Rep::new(-(1 << 16), "DB Error");
125
126 pub const BAD_RESPONSE: Rep = Rep::new(-(1 << 12), "Incomplete response");
128}
129
130struct Metrics {
131 queued_blocks: Gauge<U64>,
132 fork_targets: Gauge<U64>,
133}
134
135impl Metrics {
136 fn register(r: &Registry) -> Result<Self, PrometheusError> {
137 Ok(Self {
138 queued_blocks: {
139 let g =
140 Gauge::new("substrate_sync_queued_blocks", "Number of blocks in import queue")?;
141 register(g, r)?
142 },
143 fork_targets: {
144 let g = Gauge::new("substrate_sync_fork_targets", "Number of fork sync targets")?;
145 register(g, r)?
146 },
147 })
148 }
149}
150
151#[derive(Debug, Clone)]
152enum AllowedRequests {
153 Some(HashSet<PeerId>),
154 All,
155}
156
157impl AllowedRequests {
158 fn add(&mut self, id: &PeerId) {
159 if let Self::Some(ref mut set) = self {
160 set.insert(*id);
161 }
162 }
163
164 fn take(&mut self) -> Self {
165 std::mem::take(self)
166 }
167
168 fn set_all(&mut self) {
169 *self = Self::All;
170 }
171
172 fn contains(&self, id: &PeerId) -> bool {
173 match self {
174 Self::Some(set) => set.contains(id),
175 Self::All => true,
176 }
177 }
178
179 fn is_empty(&self) -> bool {
180 match self {
181 Self::Some(set) => set.is_empty(),
182 Self::All => false,
183 }
184 }
185
186 fn clear(&mut self) {
187 std::mem::take(self);
188 }
189}
190
191impl Default for AllowedRequests {
192 fn default() -> Self {
193 Self::Some(HashSet::default())
194 }
195}
196
197struct GapSync<B: BlockT> {
198 blocks: BlockCollection<B>,
199 best_queued_number: NumberFor<B>,
200 target: NumberFor<B>,
201}
202
203#[derive(Copy, Clone, Debug, Eq, PartialEq)]
205pub enum ChainSyncMode {
206 Full,
208 LightState {
210 skip_proofs: bool,
212 storage_chain_mode: bool,
214 },
215}
216
217#[derive(Debug, Clone)]
219pub(crate) struct PeerSync<B: BlockT> {
220 pub peer_id: PeerId,
222 pub common_number: NumberFor<B>,
225 pub best_hash: B::Hash,
227 pub best_number: NumberFor<B>,
229 pub state: PeerSyncState<B>,
232}
233
234impl<B: BlockT> PeerSync<B> {
235 fn update_common_number(&mut self, new_common: NumberFor<B>) {
237 if self.common_number < new_common {
238 trace!(
239 target: LOG_TARGET,
240 "Updating peer {} common number from={} => to={}.",
241 self.peer_id,
242 self.common_number,
243 new_common,
244 );
245 self.common_number = new_common;
246 }
247 }
248}
249
250struct ForkTarget<B: BlockT> {
251 number: NumberFor<B>,
252 parent_hash: Option<B::Hash>,
253 peers: HashSet<PeerId>,
254}
255
256#[derive(Copy, Clone, Eq, PartialEq, Debug)]
261pub(crate) enum PeerSyncState<B: BlockT> {
262 Available,
264 AncestorSearch { start: NumberFor<B>, current: NumberFor<B>, state: AncestorSearchState<B> },
266 DownloadingNew(NumberFor<B>),
268 DownloadingStale(B::Hash),
272 DownloadingJustification(B::Hash),
274 DownloadingState,
276 DownloadingGap(NumberFor<B>),
278}
279
280impl<B: BlockT> PeerSyncState<B> {
281 pub fn is_available(&self) -> bool {
282 matches!(self, Self::Available)
283 }
284}
285
286pub struct ChainSync<B: BlockT, Client> {
289 client: Arc<Client>,
291 peers: HashMap<PeerId, PeerSync<B>>,
293 disconnected_peers: DisconnectedPeers,
294 blocks: BlockCollection<B>,
296 best_queued_number: NumberFor<B>,
298 best_queued_hash: B::Hash,
300 mode: ChainSyncMode,
302 extra_justifications: ExtraRequests<B>,
304 queue_blocks: HashSet<B::Hash>,
307 pending_state_sync_attempt: Option<(B::Hash, NumberFor<B>, bool)>,
315 fork_targets: HashMap<B::Hash, ForkTarget<B>>,
317 allowed_requests: AllowedRequests,
319 max_parallel_downloads: u32,
321 max_blocks_per_request: u32,
323 state_request_protocol_name: ProtocolName,
325 downloaded_blocks: usize,
327 state_sync: Option<StateSync<B, Client>>,
329 import_existing: bool,
332 gap_sync: Option<GapSync<B>>,
334 actions: Vec<SyncingAction<B>>,
336 metrics: Option<Metrics>,
338}
339
340impl<B, Client> SyncingStrategy<B> for ChainSync<B, Client>
341where
342 B: BlockT,
343 Client: HeaderBackend<B>
344 + BlockBackend<B>
345 + HeaderMetadata<B, Error = sp_blockchain::Error>
346 + ProofProvider<B>
347 + Send
348 + Sync
349 + 'static,
350{
351 fn add_peer(&mut self, peer_id: PeerId, best_hash: B::Hash, best_number: NumberFor<B>) {
352 match self.add_peer_inner(peer_id, best_hash, best_number) {
353 Ok(Some(request)) => self.actions.push(SyncingAction::SendBlockRequest {
354 peer_id,
355 key: StrategyKey::ChainSync,
356 request,
357 }),
358 Ok(None) => {},
359 Err(bad_peer) => self.actions.push(SyncingAction::DropPeer(bad_peer)),
360 }
361 }
362
363 fn remove_peer(&mut self, peer_id: &PeerId) {
364 self.blocks.clear_peer_download(peer_id);
365 if let Some(gap_sync) = &mut self.gap_sync {
366 gap_sync.blocks.clear_peer_download(peer_id)
367 }
368
369 if let Some(state) = self.peers.remove(peer_id) {
370 if !state.state.is_available() {
371 if let Some(bad_peer) =
372 self.disconnected_peers.on_disconnect_during_request(*peer_id)
373 {
374 self.actions.push(SyncingAction::DropPeer(bad_peer));
375 }
376 }
377 }
378
379 self.extra_justifications.peer_disconnected(peer_id);
380 self.allowed_requests.set_all();
381 self.fork_targets.retain(|_, target| {
382 target.peers.remove(peer_id);
383 !target.peers.is_empty()
384 });
385 if let Some(metrics) = &self.metrics {
386 metrics.fork_targets.set(self.fork_targets.len().try_into().unwrap_or(u64::MAX));
387 }
388
389 let blocks = self.ready_blocks();
390
391 if !blocks.is_empty() {
392 self.validate_and_queue_blocks(blocks, false);
393 }
394 }
395
396 fn on_validated_block_announce(
397 &mut self,
398 is_best: bool,
399 peer_id: PeerId,
400 announce: &BlockAnnounce<B::Header>,
401 ) -> Option<(B::Hash, NumberFor<B>)> {
402 let number = *announce.header.number();
403 let hash = announce.header.hash();
404 let parent_status =
405 self.block_status(announce.header.parent_hash()).unwrap_or(BlockStatus::Unknown);
406 let known_parent = parent_status != BlockStatus::Unknown;
407 let ancient_parent = parent_status == BlockStatus::InChainPruned;
408
409 let known = self.is_known(&hash);
410 let peer = if let Some(peer) = self.peers.get_mut(&peer_id) {
411 peer
412 } else {
413 error!(target: LOG_TARGET, "💔 Called `on_validated_block_announce` with a bad peer ID {peer_id}");
414 return Some((hash, number))
415 };
416
417 if let PeerSyncState::AncestorSearch { .. } = peer.state {
418 trace!(target: LOG_TARGET, "Peer {} is in the ancestor search state.", peer_id);
419 return None
420 }
421
422 let peer_info = is_best.then(|| {
423 peer.best_number = number;
425 peer.best_hash = hash;
426
427 (hash, number)
428 });
429
430 if is_best {
433 if known && self.best_queued_number >= number {
434 self.update_peer_common_number(&peer_id, number);
435 } else if announce.header.parent_hash() == &self.best_queued_hash ||
436 known_parent && self.best_queued_number >= number
437 {
438 self.update_peer_common_number(&peer_id, number.saturating_sub(One::one()));
439 }
440 }
441 self.allowed_requests.add(&peer_id);
442
443 if known || self.is_already_downloading(&hash) {
445 trace!(target: LOG_TARGET, "Known block announce from {}: {}", peer_id, hash);
446 if let Some(target) = self.fork_targets.get_mut(&hash) {
447 target.peers.insert(peer_id);
448 }
449 return peer_info
450 }
451
452 if ancient_parent {
453 trace!(
454 target: LOG_TARGET,
455 "Ignored ancient block announced from {}: {} {:?}",
456 peer_id,
457 hash,
458 announce.header,
459 );
460 return peer_info
461 }
462
463 if self.status().state == SyncState::Idle {
464 trace!(
465 target: LOG_TARGET,
466 "Added sync target for block announced from {}: {} {:?}",
467 peer_id,
468 hash,
469 announce.summary(),
470 );
471 self.fork_targets
472 .entry(hash)
473 .or_insert_with(|| {
474 if let Some(metrics) = &self.metrics {
475 metrics.fork_targets.inc();
476 }
477
478 ForkTarget {
479 number,
480 parent_hash: Some(*announce.header.parent_hash()),
481 peers: Default::default(),
482 }
483 })
484 .peers
485 .insert(peer_id);
486 }
487
488 peer_info
489 }
490
491 fn set_sync_fork_request(
493 &mut self,
494 mut peers: Vec<PeerId>,
495 hash: &B::Hash,
496 number: NumberFor<B>,
497 ) {
498 if peers.is_empty() {
499 peers = self
500 .peers
501 .iter()
502 .filter(|(_, peer)| peer.best_number >= number)
504 .map(|(id, _)| *id)
505 .collect();
506
507 debug!(
508 target: LOG_TARGET,
509 "Explicit sync request for block {hash:?} with no peers specified. \
510 Syncing from these peers {peers:?} instead.",
511 );
512 } else {
513 debug!(
514 target: LOG_TARGET,
515 "Explicit sync request for block {hash:?} with {peers:?}",
516 );
517 }
518
519 if self.is_known(hash) {
520 debug!(target: LOG_TARGET, "Refusing to sync known hash {hash:?}");
521 return
522 }
523
524 trace!(target: LOG_TARGET, "Downloading requested old fork {hash:?}");
525 for peer_id in &peers {
526 if let Some(peer) = self.peers.get_mut(peer_id) {
527 if let PeerSyncState::AncestorSearch { .. } = peer.state {
528 continue
529 }
530
531 if number > peer.best_number {
532 peer.best_number = number;
533 peer.best_hash = *hash;
534 }
535 self.allowed_requests.add(peer_id);
536 }
537 }
538
539 self.fork_targets
540 .entry(*hash)
541 .or_insert_with(|| {
542 if let Some(metrics) = &self.metrics {
543 metrics.fork_targets.inc();
544 }
545
546 ForkTarget { number, peers: Default::default(), parent_hash: None }
547 })
548 .peers
549 .extend(peers);
550 }
551
552 fn request_justification(&mut self, hash: &B::Hash, number: NumberFor<B>) {
553 let client = &self.client;
554 self.extra_justifications
555 .schedule((*hash, number), |base, block| is_descendent_of(&**client, base, block))
556 }
557
558 fn clear_justification_requests(&mut self) {
559 self.extra_justifications.reset();
560 }
561
562 fn on_justification_import(&mut self, hash: B::Hash, number: NumberFor<B>, success: bool) {
563 let finalization_result = if success { Ok((hash, number)) } else { Err(()) };
564 self.extra_justifications
565 .try_finalize_root((hash, number), finalization_result, true);
566 self.allowed_requests.set_all();
567 }
568
569 fn on_block_response(
570 &mut self,
571 peer_id: PeerId,
572 key: StrategyKey,
573 request: BlockRequest<B>,
574 blocks: Vec<BlockData<B>>,
575 ) {
576 if key != StrategyKey::ChainSync {
577 error!(
578 target: LOG_TARGET,
579 "`on_block_response()` called with unexpected key {key:?} for chain sync",
580 );
581 debug_assert!(false);
582 }
583 let block_response = BlockResponse::<B> { id: request.id, blocks };
584
585 let blocks_range = || match (
586 block_response
587 .blocks
588 .first()
589 .and_then(|b| b.header.as_ref().map(|h| h.number())),
590 block_response.blocks.last().and_then(|b| b.header.as_ref().map(|h| h.number())),
591 ) {
592 (Some(first), Some(last)) if first != last => format!(" ({}..{})", first, last),
593 (Some(first), Some(_)) => format!(" ({})", first),
594 _ => Default::default(),
595 };
596 trace!(
597 target: LOG_TARGET,
598 "BlockResponse {} from {} with {} blocks {}",
599 block_response.id,
600 peer_id,
601 block_response.blocks.len(),
602 blocks_range(),
603 );
604
605 let res = if request.fields == BlockAttributes::JUSTIFICATION {
606 self.on_block_justification(peer_id, block_response)
607 } else {
608 self.on_block_data(&peer_id, Some(request), block_response)
609 };
610
611 if let Err(bad_peer) = res {
612 self.actions.push(SyncingAction::DropPeer(bad_peer));
613 }
614 }
615
616 fn on_state_response(
617 &mut self,
618 peer_id: PeerId,
619 key: StrategyKey,
620 response: OpaqueStateResponse,
621 ) {
622 if key != StrategyKey::ChainSync {
623 error!(
624 target: LOG_TARGET,
625 "`on_state_response()` called with unexpected key {key:?} for chain sync",
626 );
627 debug_assert!(false);
628 }
629 if let Err(bad_peer) = self.on_state_data(&peer_id, response) {
630 self.actions.push(SyncingAction::DropPeer(bad_peer));
631 }
632 }
633
634 fn on_warp_proof_response(
635 &mut self,
636 _peer_id: &PeerId,
637 _key: StrategyKey,
638 _response: EncodedProof,
639 ) {
640 error!(
641 target: LOG_TARGET,
642 "`on_warp_proof_response()` called for chain sync strategy",
643 );
644 debug_assert!(false);
645 }
646
647 fn on_blocks_processed(
648 &mut self,
649 imported: usize,
650 count: usize,
651 results: Vec<(Result<BlockImportStatus<NumberFor<B>>, BlockImportError>, B::Hash)>,
652 ) {
653 trace!(target: LOG_TARGET, "Imported {imported} of {count}");
654
655 let mut has_error = false;
656 for (_, hash) in &results {
657 if self.queue_blocks.remove(hash) {
658 if let Some(metrics) = &self.metrics {
659 metrics.queued_blocks.dec();
660 }
661 }
662 self.blocks.clear_queued(hash);
663 if let Some(gap_sync) = &mut self.gap_sync {
664 gap_sync.blocks.clear_queued(hash);
665 }
666 }
667 for (result, hash) in results {
668 if has_error {
669 break
670 }
671
672 has_error |= result.is_err();
673
674 match result {
675 Ok(BlockImportStatus::ImportedKnown(number, peer_id)) =>
676 if let Some(peer) = peer_id {
677 self.update_peer_common_number(&peer, number);
678 },
679 Ok(BlockImportStatus::ImportedUnknown(number, aux, peer_id)) => {
680 if aux.clear_justification_requests {
681 trace!(
682 target: LOG_TARGET,
683 "Block imported clears all pending justification requests {number}: {hash:?}",
684 );
685 self.clear_justification_requests();
686 }
687
688 if aux.needs_justification {
689 trace!(
690 target: LOG_TARGET,
691 "Block imported but requires justification {number}: {hash:?}",
692 );
693 self.request_justification(&hash, number);
694 }
695
696 if aux.bad_justification {
697 if let Some(ref peer) = peer_id {
698 warn!("💔 Sent block with bad justification to import");
699 self.actions.push(SyncingAction::DropPeer(BadPeer(
700 *peer,
701 rep::BAD_JUSTIFICATION,
702 )));
703 }
704 }
705
706 if let Some(peer) = peer_id {
707 self.update_peer_common_number(&peer, number);
708 }
709 let state_sync_complete =
710 self.state_sync.as_ref().map_or(false, |s| s.target_hash() == hash);
711 if state_sync_complete {
712 info!(
713 target: LOG_TARGET,
714 "State sync is complete ({} MiB), restarting block sync.",
715 self.state_sync.as_ref().map_or(0, |s| s.progress().size / (1024 * 1024)),
716 );
717 self.state_sync = None;
718 self.mode = ChainSyncMode::Full;
719 self.restart();
720 }
721 let gap_sync_complete =
722 self.gap_sync.as_ref().map_or(false, |s| s.target == number);
723 if gap_sync_complete {
724 info!(
725 target: LOG_TARGET,
726 "Block history download is complete."
727 );
728 self.gap_sync = None;
729 }
730 },
731 Err(BlockImportError::IncompleteHeader(peer_id)) =>
732 if let Some(peer) = peer_id {
733 warn!(
734 target: LOG_TARGET,
735 "💔 Peer sent block with incomplete header to import",
736 );
737 self.actions
738 .push(SyncingAction::DropPeer(BadPeer(peer, rep::INCOMPLETE_HEADER)));
739 self.restart();
740 },
741 Err(BlockImportError::VerificationFailed(peer_id, e)) => {
742 let extra_message = peer_id
743 .map_or_else(|| "".into(), |peer| format!(" received from ({peer})"));
744
745 warn!(
746 target: LOG_TARGET,
747 "💔 Verification failed for block {hash:?}{extra_message}: {e:?}",
748 );
749
750 if let Some(peer) = peer_id {
751 self.actions
752 .push(SyncingAction::DropPeer(BadPeer(peer, rep::VERIFICATION_FAIL)));
753 }
754
755 self.restart();
756 },
757 Err(BlockImportError::BadBlock(peer_id)) =>
758 if let Some(peer) = peer_id {
759 warn!(
760 target: LOG_TARGET,
761 "💔 Block {hash:?} received from peer {peer} has been blacklisted",
762 );
763 self.actions.push(SyncingAction::DropPeer(BadPeer(peer, rep::BAD_BLOCK)));
764 },
765 Err(BlockImportError::MissingState) => {
766 trace!(target: LOG_TARGET, "Obsolete block {hash:?}");
770 },
771 e @ Err(BlockImportError::UnknownParent) | e @ Err(BlockImportError::Other(_)) => {
772 warn!(target: LOG_TARGET, "💔 Error importing block {hash:?}: {}", e.unwrap_err());
773 self.state_sync = None;
774 self.restart();
775 },
776 Err(BlockImportError::Cancelled) => {},
777 };
778 }
779
780 self.allowed_requests.set_all();
781 }
782
783 fn on_block_finalized(&mut self, hash: &B::Hash, number: NumberFor<B>) {
784 let client = &self.client;
785 let r = self.extra_justifications.on_block_finalized(hash, number, |base, block| {
786 is_descendent_of(&**client, base, block)
787 });
788
789 if let ChainSyncMode::LightState { skip_proofs, .. } = &self.mode {
790 if self.state_sync.is_none() {
791 if !self.peers.is_empty() && self.queue_blocks.is_empty() {
792 self.attempt_state_sync(*hash, number, *skip_proofs);
793 } else {
794 self.pending_state_sync_attempt.replace((*hash, number, *skip_proofs));
795 }
796 }
797 }
798
799 if let Err(err) = r {
800 warn!(
801 target: LOG_TARGET,
802 "💔 Error cleaning up pending extra justification data requests: {err}",
803 );
804 }
805 }
806
807 fn update_chain_info(&mut self, best_hash: &B::Hash, best_number: NumberFor<B>) {
808 self.on_block_queued(best_hash, best_number);
809 }
810
811 fn is_major_syncing(&self) -> bool {
812 self.status().state.is_major_syncing()
813 }
814
815 fn num_peers(&self) -> usize {
816 self.peers.len()
817 }
818
819 fn status(&self) -> SyncStatus<B> {
820 let median_seen = self.median_seen();
821 let best_seen_block =
822 median_seen.and_then(|median| (median > self.best_queued_number).then_some(median));
823 let sync_state = if let Some(target) = median_seen {
824 let best_block = self.client.info().best_number;
828 if target > best_block && target - best_block > MAJOR_SYNC_BLOCKS.into() {
829 if target > self.best_queued_number {
831 SyncState::Downloading { target }
832 } else {
833 SyncState::Importing { target }
834 }
835 } else {
836 SyncState::Idle
837 }
838 } else {
839 SyncState::Idle
840 };
841
842 let warp_sync_progress = self.gap_sync.as_ref().map(|gap_sync| WarpSyncProgress {
843 phase: WarpSyncPhase::DownloadingBlocks(gap_sync.best_queued_number),
844 total_bytes: 0,
845 });
846
847 SyncStatus {
848 state: sync_state,
849 best_seen_block,
850 num_peers: self.peers.len() as u32,
851 queued_blocks: self.queue_blocks.len() as u32,
852 state_sync: self.state_sync.as_ref().map(|s| s.progress()),
853 warp_sync: warp_sync_progress,
854 }
855 }
856
857 fn num_downloaded_blocks(&self) -> usize {
858 self.downloaded_blocks
859 }
860
861 fn num_sync_requests(&self) -> usize {
862 self.fork_targets
863 .values()
864 .filter(|f| f.number <= self.best_queued_number)
865 .count()
866 }
867
868 fn actions(&mut self) -> Result<Vec<SyncingAction<B>>, ClientError> {
869 if !self.peers.is_empty() && self.queue_blocks.is_empty() {
870 if let Some((hash, number, skip_proofs)) = self.pending_state_sync_attempt.take() {
871 self.attempt_state_sync(hash, number, skip_proofs);
872 }
873 }
874
875 let block_requests = self.block_requests().into_iter().map(|(peer_id, request)| {
876 SyncingAction::SendBlockRequest { peer_id, key: StrategyKey::ChainSync, request }
877 });
878 self.actions.extend(block_requests);
879
880 let justification_requests =
881 self.justification_requests().into_iter().map(|(peer_id, request)| {
882 SyncingAction::SendBlockRequest { peer_id, key: StrategyKey::ChainSync, request }
883 });
884 self.actions.extend(justification_requests);
885
886 let state_request = self.state_request().into_iter().map(|(peer_id, request)| {
887 SyncingAction::SendStateRequest {
888 peer_id,
889 key: StrategyKey::ChainSync,
890 protocol_name: self.state_request_protocol_name.clone(),
891 request,
892 }
893 });
894 self.actions.extend(state_request);
895
896 Ok(std::mem::take(&mut self.actions))
897 }
898}
899
900impl<B, Client> ChainSync<B, Client>
901where
902 B: BlockT,
903 Client: HeaderBackend<B>
904 + BlockBackend<B>
905 + HeaderMetadata<B, Error = sp_blockchain::Error>
906 + ProofProvider<B>
907 + Send
908 + Sync
909 + 'static,
910{
911 pub fn new(
913 mode: ChainSyncMode,
914 client: Arc<Client>,
915 max_parallel_downloads: u32,
916 max_blocks_per_request: u32,
917 state_request_protocol_name: ProtocolName,
918 metrics_registry: Option<&Registry>,
919 initial_peers: impl Iterator<Item = (PeerId, B::Hash, NumberFor<B>)>,
920 ) -> Result<Self, ClientError> {
921 let mut sync = Self {
922 client,
923 peers: HashMap::new(),
924 disconnected_peers: DisconnectedPeers::new(),
925 blocks: BlockCollection::new(),
926 best_queued_hash: Default::default(),
927 best_queued_number: Zero::zero(),
928 extra_justifications: ExtraRequests::new("justification", metrics_registry),
929 mode,
930 queue_blocks: Default::default(),
931 pending_state_sync_attempt: None,
932 fork_targets: Default::default(),
933 allowed_requests: Default::default(),
934 max_parallel_downloads,
935 max_blocks_per_request,
936 state_request_protocol_name,
937 downloaded_blocks: 0,
938 state_sync: None,
939 import_existing: false,
940 gap_sync: None,
941 actions: Vec::new(),
942 metrics: metrics_registry.and_then(|r| match Metrics::register(r) {
943 Ok(metrics) => Some(metrics),
944 Err(err) => {
945 log::error!(
946 target: LOG_TARGET,
947 "Failed to register `ChainSync` metrics {err:?}",
948 );
949 None
950 },
951 }),
952 };
953
954 sync.reset_sync_start_point()?;
955 initial_peers.for_each(|(peer_id, best_hash, best_number)| {
956 sync.add_peer(peer_id, best_hash, best_number);
957 });
958
959 Ok(sync)
960 }
961
962 #[must_use]
963 fn add_peer_inner(
964 &mut self,
965 peer_id: PeerId,
966 best_hash: B::Hash,
967 best_number: NumberFor<B>,
968 ) -> Result<Option<BlockRequest<B>>, BadPeer> {
969 match self.block_status(&best_hash) {
971 Err(e) => {
972 debug!(target: LOG_TARGET, "Error reading blockchain: {e}");
973 Err(BadPeer(peer_id, rep::BLOCKCHAIN_READ_ERROR))
974 },
975 Ok(BlockStatus::KnownBad) => {
976 info!(
977 "💔 New peer {peer_id} with known bad best block {best_hash} ({best_number})."
978 );
979 Err(BadPeer(peer_id, rep::BAD_BLOCK))
980 },
981 Ok(BlockStatus::Unknown) => {
982 if best_number.is_zero() {
983 info!(
984 "💔 New peer {} with unknown genesis hash {} ({}).",
985 peer_id, best_hash, best_number,
986 );
987 return Err(BadPeer(peer_id, rep::GENESIS_MISMATCH));
988 }
989
990 if self.queue_blocks.len() > MAJOR_SYNC_BLOCKS.into() {
994 debug!(
995 target: LOG_TARGET,
996 "New peer {} with unknown best hash {} ({}), assuming common block.",
997 peer_id,
998 self.best_queued_hash,
999 self.best_queued_number
1000 );
1001 self.peers.insert(
1002 peer_id,
1003 PeerSync {
1004 peer_id,
1005 common_number: self.best_queued_number,
1006 best_hash,
1007 best_number,
1008 state: PeerSyncState::Available,
1009 },
1010 );
1011 return Ok(None);
1012 }
1013
1014 let (state, req) = if self.best_queued_number.is_zero() {
1016 debug!(
1017 target: LOG_TARGET,
1018 "New peer {peer_id} with best hash {best_hash} ({best_number}).",
1019 );
1020
1021 (PeerSyncState::Available, None)
1022 } else {
1023 let common_best = std::cmp::min(self.best_queued_number, best_number);
1024
1025 debug!(
1026 target: LOG_TARGET,
1027 "New peer {} with unknown best hash {} ({}), searching for common ancestor.",
1028 peer_id,
1029 best_hash,
1030 best_number
1031 );
1032
1033 (
1034 PeerSyncState::AncestorSearch {
1035 current: common_best,
1036 start: self.best_queued_number,
1037 state: AncestorSearchState::ExponentialBackoff(One::one()),
1038 },
1039 Some(ancestry_request::<B>(common_best)),
1040 )
1041 };
1042
1043 self.allowed_requests.add(&peer_id);
1044 self.peers.insert(
1045 peer_id,
1046 PeerSync {
1047 peer_id,
1048 common_number: Zero::zero(),
1049 best_hash,
1050 best_number,
1051 state,
1052 },
1053 );
1054
1055 Ok(req)
1056 },
1057 Ok(BlockStatus::Queued) |
1058 Ok(BlockStatus::InChainWithState) |
1059 Ok(BlockStatus::InChainPruned) => {
1060 debug!(
1061 target: LOG_TARGET,
1062 "New peer {peer_id} with known best hash {best_hash} ({best_number}).",
1063 );
1064 self.peers.insert(
1065 peer_id,
1066 PeerSync {
1067 peer_id,
1068 common_number: std::cmp::min(self.best_queued_number, best_number),
1069 best_hash,
1070 best_number,
1071 state: PeerSyncState::Available,
1072 },
1073 );
1074 self.allowed_requests.add(&peer_id);
1075 Ok(None)
1076 },
1077 }
1078 }
1079
1080 #[must_use]
1082 fn on_block_data(
1083 &mut self,
1084 peer_id: &PeerId,
1085 request: Option<BlockRequest<B>>,
1086 response: BlockResponse<B>,
1087 ) -> Result<(), BadPeer> {
1088 self.downloaded_blocks += response.blocks.len();
1089 let mut gap = false;
1090 let new_blocks: Vec<IncomingBlock<B>> = if let Some(peer) = self.peers.get_mut(peer_id) {
1091 let mut blocks = response.blocks;
1092 if request.as_ref().map_or(false, |r| r.direction == Direction::Descending) {
1093 trace!(target: LOG_TARGET, "Reversing incoming block list");
1094 blocks.reverse()
1095 }
1096 self.allowed_requests.add(peer_id);
1097 if let Some(request) = request {
1098 match &mut peer.state {
1099 PeerSyncState::DownloadingNew(_) => {
1100 self.blocks.clear_peer_download(peer_id);
1101 peer.state = PeerSyncState::Available;
1102 if let Some(start_block) =
1103 validate_blocks::<B>(&blocks, peer_id, Some(request))?
1104 {
1105 self.blocks.insert(start_block, blocks, *peer_id);
1106 }
1107 self.ready_blocks()
1108 },
1109 PeerSyncState::DownloadingGap(_) => {
1110 peer.state = PeerSyncState::Available;
1111 if let Some(gap_sync) = &mut self.gap_sync {
1112 gap_sync.blocks.clear_peer_download(peer_id);
1113 if let Some(start_block) =
1114 validate_blocks::<B>(&blocks, peer_id, Some(request))?
1115 {
1116 gap_sync.blocks.insert(start_block, blocks, *peer_id);
1117 }
1118 gap = true;
1119 let blocks: Vec<_> = gap_sync
1120 .blocks
1121 .ready_blocks(gap_sync.best_queued_number + One::one())
1122 .into_iter()
1123 .map(|block_data| {
1124 let justifications =
1125 block_data.block.justifications.or_else(|| {
1126 legacy_justification_mapping(
1127 block_data.block.justification,
1128 )
1129 });
1130 IncomingBlock {
1131 hash: block_data.block.hash,
1132 header: block_data.block.header,
1133 body: block_data.block.body,
1134 indexed_body: block_data.block.indexed_body,
1135 justifications,
1136 origin: block_data.origin,
1137 allow_missing_state: true,
1138 import_existing: self.import_existing,
1139 skip_execution: true,
1140 state: None,
1141 }
1142 })
1143 .collect();
1144 debug!(
1145 target: LOG_TARGET,
1146 "Drained {} gap blocks from {}",
1147 blocks.len(),
1148 gap_sync.best_queued_number,
1149 );
1150 blocks
1151 } else {
1152 debug!(target: LOG_TARGET, "Unexpected gap block response from {peer_id}");
1153 return Err(BadPeer(*peer_id, rep::NO_BLOCK));
1154 }
1155 },
1156 PeerSyncState::DownloadingStale(_) => {
1157 peer.state = PeerSyncState::Available;
1158 if blocks.is_empty() {
1159 debug!(target: LOG_TARGET, "Empty block response from {peer_id}");
1160 return Err(BadPeer(*peer_id, rep::NO_BLOCK));
1161 }
1162 validate_blocks::<B>(&blocks, peer_id, Some(request))?;
1163 blocks
1164 .into_iter()
1165 .map(|b| {
1166 let justifications = b
1167 .justifications
1168 .or_else(|| legacy_justification_mapping(b.justification));
1169 IncomingBlock {
1170 hash: b.hash,
1171 header: b.header,
1172 body: b.body,
1173 indexed_body: None,
1174 justifications,
1175 origin: Some(*peer_id),
1176 allow_missing_state: true,
1177 import_existing: self.import_existing,
1178 skip_execution: self.skip_execution(),
1179 state: None,
1180 }
1181 })
1182 .collect()
1183 },
1184 PeerSyncState::AncestorSearch { current, start, state } => {
1185 let matching_hash = match (blocks.get(0), self.client.hash(*current)) {
1186 (Some(block), Ok(maybe_our_block_hash)) => {
1187 trace!(
1188 target: LOG_TARGET,
1189 "Got ancestry block #{} ({}) from peer {}",
1190 current,
1191 block.hash,
1192 peer_id,
1193 );
1194 maybe_our_block_hash.filter(|x| x == &block.hash)
1195 },
1196 (None, _) => {
1197 debug!(
1198 target: LOG_TARGET,
1199 "Invalid response when searching for ancestor from {peer_id}",
1200 );
1201 return Err(BadPeer(*peer_id, rep::UNKNOWN_ANCESTOR));
1202 },
1203 (_, Err(e)) => {
1204 info!(
1205 target: LOG_TARGET,
1206 "❌ Error answering legitimate blockchain query: {e}",
1207 );
1208 return Err(BadPeer(*peer_id, rep::BLOCKCHAIN_READ_ERROR));
1209 },
1210 };
1211 if matching_hash.is_some() {
1212 if *start < self.best_queued_number &&
1213 self.best_queued_number <= peer.best_number
1214 {
1215 trace!(
1219 target: LOG_TARGET,
1220 "Ancestry search: opportunistically updating peer {} common number from={} => to={}.",
1221 *peer_id,
1222 peer.common_number,
1223 self.best_queued_number,
1224 );
1225 peer.common_number = self.best_queued_number;
1226 } else if peer.common_number < *current {
1227 trace!(
1228 target: LOG_TARGET,
1229 "Ancestry search: updating peer {} common number from={} => to={}.",
1230 *peer_id,
1231 peer.common_number,
1232 *current,
1233 );
1234 peer.common_number = *current;
1235 }
1236 }
1237 if matching_hash.is_none() && current.is_zero() {
1238 trace!(
1239 target: LOG_TARGET,
1240 "Ancestry search: genesis mismatch for peer {peer_id}",
1241 );
1242 return Err(BadPeer(*peer_id, rep::GENESIS_MISMATCH));
1243 }
1244 if let Some((next_state, next_num)) =
1245 handle_ancestor_search_state(state, *current, matching_hash.is_some())
1246 {
1247 peer.state = PeerSyncState::AncestorSearch {
1248 current: next_num,
1249 start: *start,
1250 state: next_state,
1251 };
1252 let request = ancestry_request::<B>(next_num);
1253 self.actions.push(SyncingAction::SendBlockRequest {
1254 peer_id: *peer_id,
1255 key: StrategyKey::ChainSync,
1256 request,
1257 });
1258 return Ok(());
1259 } else {
1260 trace!(
1263 target: LOG_TARGET,
1264 "Ancestry search complete. Ours={} ({}), Theirs={} ({}), Common={:?} ({})",
1265 self.best_queued_hash,
1266 self.best_queued_number,
1267 peer.best_hash,
1268 peer.best_number,
1269 matching_hash,
1270 peer.common_number,
1271 );
1272 if peer.common_number < peer.best_number &&
1273 peer.best_number < self.best_queued_number
1274 {
1275 trace!(
1276 target: LOG_TARGET,
1277 "Added fork target {} for {}",
1278 peer.best_hash,
1279 peer_id,
1280 );
1281 self.fork_targets
1282 .entry(peer.best_hash)
1283 .or_insert_with(|| {
1284 if let Some(metrics) = &self.metrics {
1285 metrics.fork_targets.inc();
1286 }
1287
1288 ForkTarget {
1289 number: peer.best_number,
1290 parent_hash: None,
1291 peers: Default::default(),
1292 }
1293 })
1294 .peers
1295 .insert(*peer_id);
1296 }
1297 peer.state = PeerSyncState::Available;
1298 return Ok(());
1299 }
1300 },
1301 PeerSyncState::Available |
1302 PeerSyncState::DownloadingJustification(..) |
1303 PeerSyncState::DownloadingState => Vec::new(),
1304 }
1305 } else {
1306 validate_blocks::<B>(&blocks, peer_id, None)?;
1308 blocks
1309 .into_iter()
1310 .map(|b| {
1311 let justifications = b
1312 .justifications
1313 .or_else(|| legacy_justification_mapping(b.justification));
1314 IncomingBlock {
1315 hash: b.hash,
1316 header: b.header,
1317 body: b.body,
1318 indexed_body: None,
1319 justifications,
1320 origin: Some(*peer_id),
1321 allow_missing_state: true,
1322 import_existing: false,
1323 skip_execution: true,
1324 state: None,
1325 }
1326 })
1327 .collect()
1328 }
1329 } else {
1330 return Err(BadPeer(*peer_id, rep::NOT_REQUESTED));
1332 };
1333
1334 self.validate_and_queue_blocks(new_blocks, gap);
1335
1336 Ok(())
1337 }
1338
1339 #[must_use]
1341 fn on_block_justification(
1342 &mut self,
1343 peer_id: PeerId,
1344 response: BlockResponse<B>,
1345 ) -> Result<(), BadPeer> {
1346 let peer = if let Some(peer) = self.peers.get_mut(&peer_id) {
1347 peer
1348 } else {
1349 error!(
1350 target: LOG_TARGET,
1351 "💔 Called on_block_justification with a peer ID of an unknown peer",
1352 );
1353 return Ok(());
1354 };
1355
1356 self.allowed_requests.add(&peer_id);
1357 if let PeerSyncState::DownloadingJustification(hash) = peer.state {
1358 peer.state = PeerSyncState::Available;
1359
1360 let justification = if let Some(block) = response.blocks.into_iter().next() {
1362 if hash != block.hash {
1363 warn!(
1364 target: LOG_TARGET,
1365 "💔 Invalid block justification provided by {}: requested: {:?} got: {:?}",
1366 peer_id,
1367 hash,
1368 block.hash,
1369 );
1370 return Err(BadPeer(peer_id, rep::BAD_JUSTIFICATION));
1371 }
1372
1373 block
1374 .justifications
1375 .or_else(|| legacy_justification_mapping(block.justification))
1376 } else {
1377 trace!(
1380 target: LOG_TARGET,
1381 "Peer {peer_id:?} provided empty response for justification request {hash:?}",
1382 );
1383
1384 None
1385 };
1386
1387 if let Some((peer_id, hash, number, justifications)) =
1388 self.extra_justifications.on_response(peer_id, justification)
1389 {
1390 self.actions.push(SyncingAction::ImportJustifications {
1391 peer_id,
1392 hash,
1393 number,
1394 justifications,
1395 });
1396 return Ok(());
1397 }
1398 }
1399
1400 Ok(())
1401 }
1402
1403 fn median_seen(&self) -> Option<NumberFor<B>> {
1405 let mut best_seens = self.peers.values().map(|p| p.best_number).collect::<Vec<_>>();
1406
1407 if best_seens.is_empty() {
1408 None
1409 } else {
1410 let middle = best_seens.len() / 2;
1411
1412 Some(*best_seens.select_nth_unstable(middle).1)
1414 }
1415 }
1416
1417 fn required_block_attributes(&self) -> BlockAttributes {
1418 match self.mode {
1419 ChainSyncMode::Full =>
1420 BlockAttributes::HEADER | BlockAttributes::JUSTIFICATION | BlockAttributes::BODY,
1421 ChainSyncMode::LightState { storage_chain_mode: false, .. } =>
1422 BlockAttributes::HEADER | BlockAttributes::JUSTIFICATION | BlockAttributes::BODY,
1423 ChainSyncMode::LightState { storage_chain_mode: true, .. } =>
1424 BlockAttributes::HEADER |
1425 BlockAttributes::JUSTIFICATION |
1426 BlockAttributes::INDEXED_BODY,
1427 }
1428 }
1429
1430 fn skip_execution(&self) -> bool {
1431 match self.mode {
1432 ChainSyncMode::Full => false,
1433 ChainSyncMode::LightState { .. } => true,
1434 }
1435 }
1436
1437 fn validate_and_queue_blocks(&mut self, mut new_blocks: Vec<IncomingBlock<B>>, gap: bool) {
1438 let orig_len = new_blocks.len();
1439 new_blocks.retain(|b| !self.queue_blocks.contains(&b.hash));
1440 if new_blocks.len() != orig_len {
1441 debug!(
1442 target: LOG_TARGET,
1443 "Ignoring {} blocks that are already queued",
1444 orig_len - new_blocks.len(),
1445 );
1446 }
1447
1448 let origin = if !gap && !self.status().state.is_major_syncing() {
1449 BlockOrigin::NetworkBroadcast
1450 } else {
1451 BlockOrigin::NetworkInitialSync
1452 };
1453
1454 if let Some((h, n)) = new_blocks
1455 .last()
1456 .and_then(|b| b.header.as_ref().map(|h| (&b.hash, *h.number())))
1457 {
1458 trace!(
1459 target: LOG_TARGET,
1460 "Accepted {} blocks ({:?}) with origin {:?}",
1461 new_blocks.len(),
1462 h,
1463 origin,
1464 );
1465 self.on_block_queued(h, n)
1466 }
1467 self.queue_blocks.extend(new_blocks.iter().map(|b| b.hash));
1468 if let Some(metrics) = &self.metrics {
1469 metrics
1470 .queued_blocks
1471 .set(self.queue_blocks.len().try_into().unwrap_or(u64::MAX));
1472 }
1473
1474 self.actions.push(SyncingAction::ImportBlocks { origin, blocks: new_blocks })
1475 }
1476
1477 fn update_peer_common_number(&mut self, peer_id: &PeerId, new_common: NumberFor<B>) {
1478 if let Some(peer) = self.peers.get_mut(peer_id) {
1479 peer.update_common_number(new_common);
1480 }
1481 }
1482
1483 fn on_block_queued(&mut self, hash: &B::Hash, number: NumberFor<B>) {
1488 if self.fork_targets.remove(hash).is_some() {
1489 if let Some(metrics) = &self.metrics {
1490 metrics.fork_targets.dec();
1491 }
1492 trace!(target: LOG_TARGET, "Completed fork sync {hash:?}");
1493 }
1494 if let Some(gap_sync) = &mut self.gap_sync {
1495 if number > gap_sync.best_queued_number && number <= gap_sync.target {
1496 gap_sync.best_queued_number = number;
1497 }
1498 }
1499 if number > self.best_queued_number {
1500 self.best_queued_number = number;
1501 self.best_queued_hash = *hash;
1502 for (n, peer) in self.peers.iter_mut() {
1504 if let PeerSyncState::AncestorSearch { .. } = peer.state {
1505 continue;
1507 }
1508 let new_common_number =
1509 if peer.best_number >= number { number } else { peer.best_number };
1510 trace!(
1511 target: LOG_TARGET,
1512 "Updating peer {} info, ours={}, common={}->{}, their best={}",
1513 n,
1514 number,
1515 peer.common_number,
1516 new_common_number,
1517 peer.best_number,
1518 );
1519 peer.common_number = new_common_number;
1520 }
1521 }
1522 self.allowed_requests.set_all();
1523 }
1524
1525 fn restart(&mut self) {
1529 self.blocks.clear();
1530 if let Err(e) = self.reset_sync_start_point() {
1531 warn!(target: LOG_TARGET, "💔 Unable to restart sync: {e}");
1532 }
1533 self.allowed_requests.set_all();
1534 debug!(
1535 target: LOG_TARGET,
1536 "Restarted with {} ({})",
1537 self.best_queued_number,
1538 self.best_queued_hash,
1539 );
1540 let old_peers = std::mem::take(&mut self.peers);
1541
1542 old_peers.into_iter().for_each(|(peer_id, mut peer_sync)| {
1543 match peer_sync.state {
1544 PeerSyncState::Available => {
1545 self.add_peer(peer_id, peer_sync.best_hash, peer_sync.best_number);
1546 },
1547 PeerSyncState::AncestorSearch { .. } |
1548 PeerSyncState::DownloadingNew(_) |
1549 PeerSyncState::DownloadingStale(_) |
1550 PeerSyncState::DownloadingGap(_) |
1551 PeerSyncState::DownloadingState => {
1552 self.actions.push(SyncingAction::CancelRequest {
1554 peer_id,
1555 key: StrategyKey::ChainSync,
1556 });
1557 self.add_peer(peer_id, peer_sync.best_hash, peer_sync.best_number);
1558 },
1559 PeerSyncState::DownloadingJustification(_) => {
1560 trace!(
1564 target: LOG_TARGET,
1565 "Keeping peer {} after restart, updating common number from={} => to={} (our best).",
1566 peer_id,
1567 peer_sync.common_number,
1568 self.best_queued_number,
1569 );
1570 peer_sync.common_number = self.best_queued_number;
1571 self.peers.insert(peer_id, peer_sync);
1572 },
1573 }
1574 });
1575 }
1576
1577 fn reset_sync_start_point(&mut self) -> Result<(), ClientError> {
1580 let info = self.client.info();
1581 if matches!(self.mode, ChainSyncMode::LightState { .. }) && info.finalized_state.is_some() {
1582 warn!(
1583 target: LOG_TARGET,
1584 "Can't use fast sync mode with a partially synced database. Reverting to full sync mode."
1585 );
1586 self.mode = ChainSyncMode::Full;
1587 }
1588
1589 self.import_existing = false;
1590 self.best_queued_hash = info.best_hash;
1591 self.best_queued_number = info.best_number;
1592
1593 if self.mode == ChainSyncMode::Full &&
1594 self.client.block_status(info.best_hash)? != BlockStatus::InChainWithState
1595 {
1596 self.import_existing = true;
1597 if let Some((hash, number)) = info.finalized_state {
1599 debug!(target: LOG_TARGET, "Starting from finalized state #{number}");
1600 self.best_queued_hash = hash;
1601 self.best_queued_number = number;
1602 } else {
1603 debug!(target: LOG_TARGET, "Restarting from genesis");
1604 self.best_queued_hash = Default::default();
1605 self.best_queued_number = Zero::zero();
1606 }
1607 }
1608
1609 if let Some(BlockGap { start, end, .. }) = info.block_gap {
1610 debug!(target: LOG_TARGET, "Starting gap sync #{start} - #{end}");
1611 self.gap_sync = Some(GapSync {
1612 best_queued_number: start - One::one(),
1613 target: end,
1614 blocks: BlockCollection::new(),
1615 });
1616 }
1617 trace!(
1618 target: LOG_TARGET,
1619 "Restarted sync at #{} ({:?})",
1620 self.best_queued_number,
1621 self.best_queued_hash,
1622 );
1623 Ok(())
1624 }
1625
1626 fn block_status(&self, hash: &B::Hash) -> Result<BlockStatus, ClientError> {
1628 if self.queue_blocks.contains(hash) {
1629 return Ok(BlockStatus::Queued);
1630 }
1631 self.client.block_status(*hash)
1632 }
1633
1634 fn is_known(&self, hash: &B::Hash) -> bool {
1636 self.block_status(hash).ok().map_or(false, |s| s != BlockStatus::Unknown)
1637 }
1638
1639 fn is_already_downloading(&self, hash: &B::Hash) -> bool {
1641 self.peers
1642 .iter()
1643 .any(|(_, p)| p.state == PeerSyncState::DownloadingStale(*hash))
1644 }
1645
1646 fn ready_blocks(&mut self) -> Vec<IncomingBlock<B>> {
1648 self.blocks
1649 .ready_blocks(self.best_queued_number + One::one())
1650 .into_iter()
1651 .map(|block_data| {
1652 let justifications = block_data
1653 .block
1654 .justifications
1655 .or_else(|| legacy_justification_mapping(block_data.block.justification));
1656 IncomingBlock {
1657 hash: block_data.block.hash,
1658 header: block_data.block.header,
1659 body: block_data.block.body,
1660 indexed_body: block_data.block.indexed_body,
1661 justifications,
1662 origin: block_data.origin,
1663 allow_missing_state: true,
1664 import_existing: self.import_existing,
1665 skip_execution: self.skip_execution(),
1666 state: None,
1667 }
1668 })
1669 .collect()
1670 }
1671
1672 fn justification_requests(&mut self) -> Vec<(PeerId, BlockRequest<B>)> {
1674 let peers = &mut self.peers;
1675 let mut matcher = self.extra_justifications.matcher();
1676 std::iter::from_fn(move || {
1677 if let Some((peer, request)) = matcher.next(peers) {
1678 peers
1679 .get_mut(&peer)
1680 .expect(
1681 "`Matcher::next` guarantees the `PeerId` comes from the given peers; qed",
1682 )
1683 .state = PeerSyncState::DownloadingJustification(request.0);
1684 let req = BlockRequest::<B> {
1685 id: 0,
1686 fields: BlockAttributes::JUSTIFICATION,
1687 from: FromBlock::Hash(request.0),
1688 direction: Direction::Ascending,
1689 max: Some(1),
1690 };
1691 Some((peer, req))
1692 } else {
1693 None
1694 }
1695 })
1696 .collect()
1697 }
1698
1699 fn block_requests(&mut self) -> Vec<(PeerId, BlockRequest<B>)> {
1701 if self.allowed_requests.is_empty() || self.state_sync.is_some() {
1702 return Vec::new();
1703 }
1704
1705 if self.queue_blocks.len() > MAX_IMPORTING_BLOCKS {
1706 trace!(target: LOG_TARGET, "Too many blocks in the queue.");
1707 return Vec::new();
1708 }
1709 let is_major_syncing = self.status().state.is_major_syncing();
1710 let attrs = self.required_block_attributes();
1711 let blocks = &mut self.blocks;
1712 let fork_targets = &mut self.fork_targets;
1713 let last_finalized =
1714 std::cmp::min(self.best_queued_number, self.client.info().finalized_number);
1715 let best_queued = self.best_queued_number;
1716 let client = &self.client;
1717 let queue_blocks = &self.queue_blocks;
1718 let allowed_requests = self.allowed_requests.clone();
1719 let max_parallel = if is_major_syncing { 1 } else { self.max_parallel_downloads };
1720 let max_blocks_per_request = self.max_blocks_per_request;
1721 let gap_sync = &mut self.gap_sync;
1722 let disconnected_peers = &mut self.disconnected_peers;
1723 let metrics = self.metrics.as_ref();
1724 let requests = self
1725 .peers
1726 .iter_mut()
1727 .filter_map(move |(&id, peer)| {
1728 if !peer.state.is_available() ||
1729 !allowed_requests.contains(&id) ||
1730 !disconnected_peers.is_peer_available(&id)
1731 {
1732 return None;
1733 }
1734
1735 if best_queued.saturating_sub(peer.common_number) >
1741 MAX_BLOCKS_TO_LOOK_BACKWARDS.into() &&
1742 best_queued < peer.best_number &&
1743 peer.common_number < last_finalized &&
1744 queue_blocks.len() <= MAJOR_SYNC_BLOCKS.into()
1745 {
1746 trace!(
1747 target: LOG_TARGET,
1748 "Peer {:?} common block {} too far behind of our best {}. Starting ancestry search.",
1749 id,
1750 peer.common_number,
1751 best_queued,
1752 );
1753 let current = std::cmp::min(peer.best_number, best_queued);
1754 peer.state = PeerSyncState::AncestorSearch {
1755 current,
1756 start: best_queued,
1757 state: AncestorSearchState::ExponentialBackoff(One::one()),
1758 };
1759 Some((id, ancestry_request::<B>(current)))
1760 } else if let Some((range, req)) = peer_block_request(
1761 &id,
1762 peer,
1763 blocks,
1764 attrs,
1765 max_parallel,
1766 max_blocks_per_request,
1767 last_finalized,
1768 best_queued,
1769 ) {
1770 peer.state = PeerSyncState::DownloadingNew(range.start);
1771 trace!(
1772 target: LOG_TARGET,
1773 "New block request for {}, (best:{}, common:{}) {:?}",
1774 id,
1775 peer.best_number,
1776 peer.common_number,
1777 req,
1778 );
1779 Some((id, req))
1780 } else if let Some((hash, req)) = fork_sync_request(
1781 &id,
1782 fork_targets,
1783 best_queued,
1784 last_finalized,
1785 attrs,
1786 |hash| {
1787 if queue_blocks.contains(hash) {
1788 BlockStatus::Queued
1789 } else {
1790 client.block_status(*hash).unwrap_or(BlockStatus::Unknown)
1791 }
1792 },
1793 max_blocks_per_request,
1794 metrics,
1795 ) {
1796 trace!(target: LOG_TARGET, "Downloading fork {hash:?} from {id}");
1797 peer.state = PeerSyncState::DownloadingStale(hash);
1798 Some((id, req))
1799 } else if let Some((range, req)) = gap_sync.as_mut().and_then(|sync| {
1800 peer_gap_block_request(
1801 &id,
1802 peer,
1803 &mut sync.blocks,
1804 attrs,
1805 sync.target,
1806 sync.best_queued_number,
1807 max_blocks_per_request,
1808 )
1809 }) {
1810 peer.state = PeerSyncState::DownloadingGap(range.start);
1811 trace!(
1812 target: LOG_TARGET,
1813 "New gap block request for {}, (best:{}, common:{}) {:?}",
1814 id,
1815 peer.best_number,
1816 peer.common_number,
1817 req,
1818 );
1819 Some((id, req))
1820 } else {
1821 None
1822 }
1823 })
1824 .collect::<Vec<_>>();
1825
1826 if !requests.is_empty() {
1829 self.allowed_requests.take();
1830 }
1831
1832 requests
1833 }
1834
1835 fn state_request(&mut self) -> Option<(PeerId, OpaqueStateRequest)> {
1837 if self.allowed_requests.is_empty() {
1838 return None;
1839 }
1840 if self.state_sync.is_some() &&
1841 self.peers.iter().any(|(_, peer)| peer.state == PeerSyncState::DownloadingState)
1842 {
1843 return None;
1845 }
1846 if let Some(sync) = &self.state_sync {
1847 if sync.is_complete() {
1848 return None;
1849 }
1850
1851 for (id, peer) in self.peers.iter_mut() {
1852 if peer.state.is_available() &&
1853 peer.common_number >= sync.target_number() &&
1854 self.disconnected_peers.is_peer_available(&id)
1855 {
1856 peer.state = PeerSyncState::DownloadingState;
1857 let request = sync.next_request();
1858 trace!(target: LOG_TARGET, "New StateRequest for {}: {:?}", id, request);
1859 self.allowed_requests.clear();
1860 return Some((*id, OpaqueStateRequest(Box::new(request))));
1861 }
1862 }
1863 }
1864 None
1865 }
1866
1867 #[must_use]
1868 fn on_state_data(
1869 &mut self,
1870 peer_id: &PeerId,
1871 response: OpaqueStateResponse,
1872 ) -> Result<(), BadPeer> {
1873 let response: Box<StateResponse> = response.0.downcast().map_err(|_error| {
1874 error!(
1875 target: LOG_TARGET,
1876 "Failed to downcast opaque state response, this is an implementation bug."
1877 );
1878
1879 BadPeer(*peer_id, rep::BAD_RESPONSE)
1880 })?;
1881
1882 if let Some(peer) = self.peers.get_mut(peer_id) {
1883 if let PeerSyncState::DownloadingState = peer.state {
1884 peer.state = PeerSyncState::Available;
1885 self.allowed_requests.set_all();
1886 }
1887 }
1888 let import_result = if let Some(sync) = &mut self.state_sync {
1889 debug!(
1890 target: LOG_TARGET,
1891 "Importing state data from {} with {} keys, {} proof nodes.",
1892 peer_id,
1893 response.entries.len(),
1894 response.proof.len(),
1895 );
1896 sync.import(*response)
1897 } else {
1898 debug!(target: LOG_TARGET, "Ignored obsolete state response from {peer_id}");
1899 return Err(BadPeer(*peer_id, rep::NOT_REQUESTED));
1900 };
1901
1902 match import_result {
1903 ImportResult::Import(hash, header, state, body, justifications) => {
1904 let origin = BlockOrigin::NetworkInitialSync;
1905 let block = IncomingBlock {
1906 hash,
1907 header: Some(header),
1908 body,
1909 indexed_body: None,
1910 justifications,
1911 origin: None,
1912 allow_missing_state: true,
1913 import_existing: true,
1914 skip_execution: self.skip_execution(),
1915 state: Some(state),
1916 };
1917 debug!(target: LOG_TARGET, "State download is complete. Import is queued");
1918 self.actions.push(SyncingAction::ImportBlocks { origin, blocks: vec![block] });
1919 Ok(())
1920 },
1921 ImportResult::Continue => Ok(()),
1922 ImportResult::BadResponse => {
1923 debug!(target: LOG_TARGET, "Bad state data received from {peer_id}");
1924 Err(BadPeer(*peer_id, rep::BAD_BLOCK))
1925 },
1926 }
1927 }
1928
1929 fn attempt_state_sync(
1930 &mut self,
1931 finalized_hash: B::Hash,
1932 finalized_number: NumberFor<B>,
1933 skip_proofs: bool,
1934 ) {
1935 let mut heads: Vec<_> = self.peers.values().map(|peer| peer.best_number).collect();
1936 heads.sort();
1937 let median = heads[heads.len() / 2];
1938 if finalized_number + STATE_SYNC_FINALITY_THRESHOLD.saturated_into() >= median {
1939 if let Ok(Some(header)) = self.client.header(finalized_hash) {
1940 log::debug!(
1941 target: LOG_TARGET,
1942 "Starting state sync for #{finalized_number} ({finalized_hash})",
1943 );
1944 self.state_sync =
1945 Some(StateSync::new(self.client.clone(), header, None, None, skip_proofs));
1946 self.allowed_requests.set_all();
1947 } else {
1948 log::error!(
1949 target: LOG_TARGET,
1950 "Failed to start state sync: header for finalized block \
1951 #{finalized_number} ({finalized_hash}) is not available",
1952 );
1953 debug_assert!(false);
1954 }
1955 }
1956 }
1957
1958 #[cfg(test)]
1960 #[must_use]
1961 fn take_actions(&mut self) -> impl Iterator<Item = SyncingAction<B>> {
1962 std::mem::take(&mut self.actions).into_iter()
1963 }
1964}
1965
1966fn legacy_justification_mapping(
1971 justification: Option<EncodedJustification>,
1972) -> Option<Justifications> {
1973 justification.map(|just| (*b"FRNK", just).into())
1974}
1975
1976fn ancestry_request<B: BlockT>(block: NumberFor<B>) -> BlockRequest<B> {
1979 BlockRequest::<B> {
1980 id: 0,
1981 fields: BlockAttributes::HEADER | BlockAttributes::JUSTIFICATION,
1982 from: FromBlock::Number(block),
1983 direction: Direction::Ascending,
1984 max: Some(1),
1985 }
1986}
1987
1988#[derive(Copy, Clone, Eq, PartialEq, Debug)]
1991pub(crate) enum AncestorSearchState<B: BlockT> {
1992 ExponentialBackoff(NumberFor<B>),
1995 BinarySearch(NumberFor<B>, NumberFor<B>),
1998}
1999
2000fn handle_ancestor_search_state<B: BlockT>(
2008 state: &AncestorSearchState<B>,
2009 curr_block_num: NumberFor<B>,
2010 block_hash_match: bool,
2011) -> Option<(AncestorSearchState<B>, NumberFor<B>)> {
2012 let two = <NumberFor<B>>::one() + <NumberFor<B>>::one();
2013 match state {
2014 AncestorSearchState::ExponentialBackoff(next_distance_to_tip) => {
2015 let next_distance_to_tip = *next_distance_to_tip;
2016 if block_hash_match && next_distance_to_tip == One::one() {
2017 return None;
2020 }
2021 if block_hash_match {
2022 let left = curr_block_num;
2023 let right = left + next_distance_to_tip / two;
2024 let middle = left + (right - left) / two;
2025 Some((AncestorSearchState::BinarySearch(left, right), middle))
2026 } else {
2027 let next_block_num =
2028 curr_block_num.checked_sub(&next_distance_to_tip).unwrap_or_else(Zero::zero);
2029 let next_distance_to_tip = next_distance_to_tip * two;
2030 Some((
2031 AncestorSearchState::ExponentialBackoff(next_distance_to_tip),
2032 next_block_num,
2033 ))
2034 }
2035 },
2036 AncestorSearchState::BinarySearch(mut left, mut right) => {
2037 if left >= curr_block_num {
2038 return None;
2039 }
2040 if block_hash_match {
2041 left = curr_block_num;
2042 } else {
2043 right = curr_block_num;
2044 }
2045 assert!(right >= left);
2046 let middle = left + (right - left) / two;
2047 if middle == curr_block_num {
2048 None
2049 } else {
2050 Some((AncestorSearchState::BinarySearch(left, right), middle))
2051 }
2052 },
2053 }
2054}
2055
2056fn peer_block_request<B: BlockT>(
2058 id: &PeerId,
2059 peer: &PeerSync<B>,
2060 blocks: &mut BlockCollection<B>,
2061 attrs: BlockAttributes,
2062 max_parallel_downloads: u32,
2063 max_blocks_per_request: u32,
2064 finalized: NumberFor<B>,
2065 best_num: NumberFor<B>,
2066) -> Option<(Range<NumberFor<B>>, BlockRequest<B>)> {
2067 if best_num >= peer.best_number {
2068 return None;
2070 } else if peer.common_number < finalized {
2071 trace!(
2072 target: LOG_TARGET,
2073 "Requesting pre-finalized chain from {:?}, common={}, finalized={}, peer best={}, our best={}",
2074 id, peer.common_number, finalized, peer.best_number, best_num,
2075 );
2076 }
2077 let range = blocks.needed_blocks(
2078 *id,
2079 max_blocks_per_request,
2080 peer.best_number,
2081 peer.common_number,
2082 max_parallel_downloads,
2083 MAX_DOWNLOAD_AHEAD,
2084 )?;
2085
2086 let last = range.end.saturating_sub(One::one());
2088
2089 let from = if peer.best_number == last {
2090 FromBlock::Hash(peer.best_hash)
2091 } else {
2092 FromBlock::Number(last)
2093 };
2094
2095 let request = BlockRequest::<B> {
2096 id: 0,
2097 fields: attrs,
2098 from,
2099 direction: Direction::Descending,
2100 max: Some((range.end - range.start).saturated_into::<u32>()),
2101 };
2102
2103 Some((range, request))
2104}
2105
2106fn peer_gap_block_request<B: BlockT>(
2108 id: &PeerId,
2109 peer: &PeerSync<B>,
2110 blocks: &mut BlockCollection<B>,
2111 attrs: BlockAttributes,
2112 target: NumberFor<B>,
2113 common_number: NumberFor<B>,
2114 max_blocks_per_request: u32,
2115) -> Option<(Range<NumberFor<B>>, BlockRequest<B>)> {
2116 let range = blocks.needed_blocks(
2117 *id,
2118 max_blocks_per_request,
2119 std::cmp::min(peer.best_number, target),
2120 common_number,
2121 1,
2122 MAX_DOWNLOAD_AHEAD,
2123 )?;
2124
2125 let last = range.end.saturating_sub(One::one());
2127 let from = FromBlock::Number(last);
2128
2129 let request = BlockRequest::<B> {
2130 id: 0,
2131 fields: attrs,
2132 from,
2133 direction: Direction::Descending,
2134 max: Some((range.end - range.start).saturated_into::<u32>()),
2135 };
2136 Some((range, request))
2137}
2138
2139fn fork_sync_request<B: BlockT>(
2141 id: &PeerId,
2142 fork_targets: &mut HashMap<B::Hash, ForkTarget<B>>,
2143 best_num: NumberFor<B>,
2144 finalized: NumberFor<B>,
2145 attributes: BlockAttributes,
2146 check_block: impl Fn(&B::Hash) -> BlockStatus,
2147 max_blocks_per_request: u32,
2148 metrics: Option<&Metrics>,
2149) -> Option<(B::Hash, BlockRequest<B>)> {
2150 fork_targets.retain(|hash, r| {
2151 if r.number <= finalized {
2152 trace!(
2153 target: LOG_TARGET,
2154 "Removed expired fork sync request {:?} (#{})",
2155 hash,
2156 r.number,
2157 );
2158 return false;
2159 }
2160 if check_block(hash) != BlockStatus::Unknown {
2161 trace!(
2162 target: LOG_TARGET,
2163 "Removed obsolete fork sync request {:?} (#{})",
2164 hash,
2165 r.number,
2166 );
2167 return false;
2168 }
2169 true
2170 });
2171 if let Some(metrics) = metrics {
2172 metrics.fork_targets.set(fork_targets.len().try_into().unwrap_or(u64::MAX));
2173 }
2174 for (hash, r) in fork_targets {
2175 if !r.peers.contains(&id) {
2176 continue;
2177 }
2178 if r.number <= best_num ||
2181 (r.number - best_num).saturated_into::<u32>() < max_blocks_per_request as u32
2182 {
2183 let parent_status = r.parent_hash.as_ref().map_or(BlockStatus::Unknown, check_block);
2184 let count = if parent_status == BlockStatus::Unknown {
2185 (r.number - finalized).saturated_into::<u32>() } else {
2187 1
2189 };
2190 trace!(
2191 target: LOG_TARGET,
2192 "Downloading requested fork {hash:?} from {id}, {count} blocks",
2193 );
2194 return Some((
2195 *hash,
2196 BlockRequest::<B> {
2197 id: 0,
2198 fields: attributes,
2199 from: FromBlock::Hash(*hash),
2200 direction: Direction::Descending,
2201 max: Some(count),
2202 },
2203 ));
2204 } else {
2205 trace!(target: LOG_TARGET, "Fork too far in the future: {:?} (#{})", hash, r.number);
2206 }
2207 }
2208 None
2209}
2210
2211fn is_descendent_of<Block, T>(
2213 client: &T,
2214 base: &Block::Hash,
2215 block: &Block::Hash,
2216) -> sp_blockchain::Result<bool>
2217where
2218 Block: BlockT,
2219 T: HeaderMetadata<Block, Error = sp_blockchain::Error> + ?Sized,
2220{
2221 if base == block {
2222 return Ok(false);
2223 }
2224
2225 let ancestor = sp_blockchain::lowest_common_ancestor(client, *block, *base)?;
2226
2227 Ok(ancestor.hash == *base)
2228}
2229
2230pub fn validate_blocks<Block: BlockT>(
2235 blocks: &Vec<BlockData<Block>>,
2236 peer_id: &PeerId,
2237 request: Option<BlockRequest<Block>>,
2238) -> Result<Option<NumberFor<Block>>, BadPeer> {
2239 if let Some(request) = request {
2240 if Some(blocks.len() as _) > request.max {
2241 debug!(
2242 target: LOG_TARGET,
2243 "Received more blocks than requested from {}. Expected in maximum {:?}, got {}.",
2244 peer_id,
2245 request.max,
2246 blocks.len(),
2247 );
2248
2249 return Err(BadPeer(*peer_id, rep::NOT_REQUESTED));
2250 }
2251
2252 let block_header =
2253 if request.direction == Direction::Descending { blocks.last() } else { blocks.first() }
2254 .and_then(|b| b.header.as_ref());
2255
2256 let expected_block = block_header.as_ref().map_or(false, |h| match request.from {
2257 FromBlock::Hash(hash) => h.hash() == hash,
2258 FromBlock::Number(n) => h.number() == &n,
2259 });
2260
2261 if !expected_block {
2262 debug!(
2263 target: LOG_TARGET,
2264 "Received block that was not requested. Requested {:?}, got {:?}.",
2265 request.from,
2266 block_header,
2267 );
2268
2269 return Err(BadPeer(*peer_id, rep::NOT_REQUESTED));
2270 }
2271
2272 if request.fields.contains(BlockAttributes::HEADER) &&
2273 blocks.iter().any(|b| b.header.is_none())
2274 {
2275 trace!(
2276 target: LOG_TARGET,
2277 "Missing requested header for a block in response from {peer_id}.",
2278 );
2279
2280 return Err(BadPeer(*peer_id, rep::BAD_RESPONSE));
2281 }
2282
2283 if request.fields.contains(BlockAttributes::BODY) && blocks.iter().any(|b| b.body.is_none())
2284 {
2285 trace!(
2286 target: LOG_TARGET,
2287 "Missing requested body for a block in response from {peer_id}.",
2288 );
2289
2290 return Err(BadPeer(*peer_id, rep::BAD_RESPONSE));
2291 }
2292 }
2293
2294 for b in blocks {
2295 if let Some(header) = &b.header {
2296 let hash = header.hash();
2297 if hash != b.hash {
2298 debug!(
2299 target: LOG_TARGET,
2300 "Bad header received from {}. Expected hash {:?}, got {:?}",
2301 peer_id,
2302 b.hash,
2303 hash,
2304 );
2305 return Err(BadPeer(*peer_id, rep::BAD_BLOCK));
2306 }
2307 }
2308 if let (Some(header), Some(body)) = (&b.header, &b.body) {
2309 let expected = *header.extrinsics_root();
2310 let got = HashingFor::<Block>::ordered_trie_root(
2311 body.iter().map(Encode::encode).collect(),
2312 sp_runtime::StateVersion::V0,
2313 );
2314 if expected != got {
2315 debug!(
2316 target: LOG_TARGET,
2317 "Bad extrinsic root for a block {} received from {}. Expected {:?}, got {:?}",
2318 b.hash,
2319 peer_id,
2320 expected,
2321 got,
2322 );
2323 return Err(BadPeer(*peer_id, rep::BAD_BLOCK));
2324 }
2325 }
2326 }
2327
2328 Ok(blocks.first().and_then(|b| b.header.as_ref()).map(|h| *h.number()))
2329}