1use crate::{
32 block_relay_protocol::{BlockDownloader, BlockResponseError},
33 blocks::BlockCollection,
34 justification_requests::ExtraRequests,
35 schema::v1::{StateRequest, StateResponse},
36 service::network::NetworkServiceHandle,
37 strategy::{
38 disconnected_peers::DisconnectedPeers,
39 state_sync::{ImportResult, StateSync, StateSyncProvider},
40 warp::{WarpSyncPhase, WarpSyncProgress},
41 StrategyKey, SyncingAction, SyncingStrategy,
42 },
43 types::{BadPeer, SyncState, SyncStatus},
44 LOG_TARGET,
45};
46
47use futures::{channel::oneshot, FutureExt};
48use log::{debug, error, info, trace, warn};
49use prometheus_endpoint::{register, Gauge, PrometheusError, Registry, U64};
50use prost::Message;
51use sc_client_api::{blockchain::BlockGap, BlockBackend, ProofProvider};
52use sc_consensus::{BlockImportError, BlockImportStatus, IncomingBlock};
53use sc_network::{IfDisconnected, ProtocolName};
54use sc_network_common::sync::message::{
55 BlockAnnounce, BlockAttributes, BlockData, BlockRequest, BlockResponse, Direction, FromBlock,
56};
57use sc_network_types::PeerId;
58use sp_arithmetic::traits::Saturating;
59use sp_blockchain::{Error as ClientError, HeaderBackend, HeaderMetadata};
60use sp_consensus::{BlockOrigin, BlockStatus};
61use sp_runtime::{
62 traits::{
63 Block as BlockT, CheckedSub, Header as HeaderT, NumberFor, One, SaturatedConversion, Zero,
64 },
65 EncodedJustification, Justifications,
66};
67
68use std::{
69 any::Any,
70 collections::{HashMap, HashSet},
71 ops::Range,
72 sync::Arc,
73};
74
75#[cfg(test)]
76mod test;
77
78const MAX_IMPORTING_BLOCKS: usize = 2048;
80
81const MAX_DOWNLOAD_AHEAD: u32 = 2048;
83
84const MAX_BLOCKS_TO_LOOK_BACKWARDS: u32 = MAX_DOWNLOAD_AHEAD / 2;
87
88const STATE_SYNC_FINALITY_THRESHOLD: u32 = 8;
90
91const MAJOR_SYNC_BLOCKS: u8 = 5;
97
98mod rep {
99 use sc_network::ReputationChange as Rep;
100 pub const BLOCKCHAIN_READ_ERROR: Rep = Rep::new(-(1 << 16), "DB Error");
103
104 pub const GENESIS_MISMATCH: Rep = Rep::new(i32::MIN, "Genesis mismatch");
107
108 pub const INCOMPLETE_HEADER: Rep = Rep::new(-(1 << 20), "Incomplete header");
110
111 pub const VERIFICATION_FAIL: Rep = Rep::new(-(1 << 29), "Block verification failed");
113
114 pub const BAD_BLOCK: Rep = Rep::new(-(1 << 29), "Bad block");
116
117 pub const NO_BLOCK: Rep = Rep::new(-(1 << 29), "No requested block data");
119
120 pub const NOT_REQUESTED: Rep = Rep::new(-(1 << 29), "Not requested block data");
122
123 pub const BAD_JUSTIFICATION: Rep = Rep::new(-(1 << 16), "Bad justification");
125
126 pub const UNKNOWN_ANCESTOR: Rep = Rep::new(-(1 << 16), "DB Error");
128
129 pub const BAD_RESPONSE: Rep = Rep::new(-(1 << 12), "Incomplete response");
131
132 pub const BAD_MESSAGE: Rep = Rep::new(-(1 << 12), "Bad message");
134}
135
136struct Metrics {
137 queued_blocks: Gauge<U64>,
138 fork_targets: Gauge<U64>,
139}
140
141impl Metrics {
142 fn register(r: &Registry) -> Result<Self, PrometheusError> {
143 Ok(Self {
144 queued_blocks: {
145 let g =
146 Gauge::new("substrate_sync_queued_blocks", "Number of blocks in import queue")?;
147 register(g, r)?
148 },
149 fork_targets: {
150 let g = Gauge::new("substrate_sync_fork_targets", "Number of fork sync targets")?;
151 register(g, r)?
152 },
153 })
154 }
155}
156
157#[derive(Debug, Clone)]
158enum AllowedRequests {
159 Some(HashSet<PeerId>),
160 All,
161}
162
163impl AllowedRequests {
164 fn add(&mut self, id: &PeerId) {
165 if let Self::Some(ref mut set) = self {
166 set.insert(*id);
167 }
168 }
169
170 fn take(&mut self) -> Self {
171 std::mem::take(self)
172 }
173
174 fn set_all(&mut self) {
175 *self = Self::All;
176 }
177
178 fn contains(&self, id: &PeerId) -> bool {
179 match self {
180 Self::Some(set) => set.contains(id),
181 Self::All => true,
182 }
183 }
184
185 fn is_empty(&self) -> bool {
186 match self {
187 Self::Some(set) => set.is_empty(),
188 Self::All => false,
189 }
190 }
191
192 fn clear(&mut self) {
193 std::mem::take(self);
194 }
195}
196
197impl Default for AllowedRequests {
198 fn default() -> Self {
199 Self::Some(HashSet::default())
200 }
201}
202
203struct GapSync<B: BlockT> {
204 blocks: BlockCollection<B>,
205 best_queued_number: NumberFor<B>,
206 target: NumberFor<B>,
207}
208
209#[derive(Copy, Clone, Debug, Eq, PartialEq)]
211pub enum ChainSyncMode {
212 Full,
214 LightState {
216 skip_proofs: bool,
218 storage_chain_mode: bool,
220 },
221}
222
223#[derive(Debug, Clone)]
225pub(crate) struct PeerSync<B: BlockT> {
226 pub peer_id: PeerId,
228 pub common_number: NumberFor<B>,
231 pub best_hash: B::Hash,
233 pub best_number: NumberFor<B>,
235 pub state: PeerSyncState<B>,
238}
239
240impl<B: BlockT> PeerSync<B> {
241 fn update_common_number(&mut self, new_common: NumberFor<B>) {
243 if self.common_number < new_common {
244 trace!(
245 target: LOG_TARGET,
246 "Updating peer {} common number from={} => to={}.",
247 self.peer_id,
248 self.common_number,
249 new_common,
250 );
251 self.common_number = new_common;
252 }
253 }
254}
255
256struct ForkTarget<B: BlockT> {
257 number: NumberFor<B>,
258 parent_hash: Option<B::Hash>,
259 peers: HashSet<PeerId>,
260}
261
262#[derive(Copy, Clone, Eq, PartialEq, Debug)]
267pub(crate) enum PeerSyncState<B: BlockT> {
268 Available,
270 AncestorSearch { start: NumberFor<B>, current: NumberFor<B>, state: AncestorSearchState<B> },
272 DownloadingNew(NumberFor<B>),
274 DownloadingStale(B::Hash),
278 DownloadingJustification(B::Hash),
280 DownloadingState,
282 DownloadingGap(NumberFor<B>),
284}
285
286impl<B: BlockT> PeerSyncState<B> {
287 pub fn is_available(&self) -> bool {
288 matches!(self, Self::Available)
289 }
290}
291
292pub struct ChainSync<B: BlockT, Client> {
295 client: Arc<Client>,
297 peers: HashMap<PeerId, PeerSync<B>>,
299 disconnected_peers: DisconnectedPeers,
300 blocks: BlockCollection<B>,
302 best_queued_number: NumberFor<B>,
304 best_queued_hash: B::Hash,
306 mode: ChainSyncMode,
308 extra_justifications: ExtraRequests<B>,
310 queue_blocks: HashSet<B::Hash>,
313 pending_state_sync_attempt: Option<(B::Hash, NumberFor<B>, bool)>,
321 fork_targets: HashMap<B::Hash, ForkTarget<B>>,
323 allowed_requests: AllowedRequests,
325 max_parallel_downloads: u32,
327 max_blocks_per_request: u32,
329 state_request_protocol_name: ProtocolName,
331 downloaded_blocks: usize,
333 state_sync: Option<StateSync<B, Client>>,
335 import_existing: bool,
338 block_downloader: Arc<dyn BlockDownloader<B>>,
340 gap_sync: Option<GapSync<B>>,
342 actions: Vec<SyncingAction<B>>,
344 metrics: Option<Metrics>,
346}
347
348impl<B, Client> SyncingStrategy<B> for ChainSync<B, Client>
349where
350 B: BlockT,
351 Client: HeaderBackend<B>
352 + BlockBackend<B>
353 + HeaderMetadata<B, Error = sp_blockchain::Error>
354 + ProofProvider<B>
355 + Send
356 + Sync
357 + 'static,
358{
359 fn add_peer(&mut self, peer_id: PeerId, best_hash: B::Hash, best_number: NumberFor<B>) {
360 match self.add_peer_inner(peer_id, best_hash, best_number) {
361 Ok(Some(request)) => {
362 let action = self.create_block_request_action(peer_id, request);
363 self.actions.push(action);
364 },
365 Ok(None) => {},
366 Err(bad_peer) => self.actions.push(SyncingAction::DropPeer(bad_peer)),
367 }
368 }
369
370 fn remove_peer(&mut self, peer_id: &PeerId) {
371 self.blocks.clear_peer_download(peer_id);
372 if let Some(gap_sync) = &mut self.gap_sync {
373 gap_sync.blocks.clear_peer_download(peer_id)
374 }
375
376 if let Some(state) = self.peers.remove(peer_id) {
377 if !state.state.is_available() {
378 if let Some(bad_peer) =
379 self.disconnected_peers.on_disconnect_during_request(*peer_id)
380 {
381 self.actions.push(SyncingAction::DropPeer(bad_peer));
382 }
383 }
384 }
385
386 self.extra_justifications.peer_disconnected(peer_id);
387 self.allowed_requests.set_all();
388 self.fork_targets.retain(|_, target| {
389 target.peers.remove(peer_id);
390 !target.peers.is_empty()
391 });
392 if let Some(metrics) = &self.metrics {
393 metrics.fork_targets.set(self.fork_targets.len().try_into().unwrap_or(u64::MAX));
394 }
395
396 let blocks = self.ready_blocks();
397
398 if !blocks.is_empty() {
399 self.validate_and_queue_blocks(blocks, false);
400 }
401 }
402
403 fn on_validated_block_announce(
404 &mut self,
405 is_best: bool,
406 peer_id: PeerId,
407 announce: &BlockAnnounce<B::Header>,
408 ) -> Option<(B::Hash, NumberFor<B>)> {
409 let number = *announce.header.number();
410 let hash = announce.header.hash();
411 let parent_status =
412 self.block_status(announce.header.parent_hash()).unwrap_or(BlockStatus::Unknown);
413 let known_parent = parent_status != BlockStatus::Unknown;
414 let ancient_parent = parent_status == BlockStatus::InChainPruned;
415
416 let known = self.is_known(&hash);
417 let peer = if let Some(peer) = self.peers.get_mut(&peer_id) {
418 peer
419 } else {
420 error!(target: LOG_TARGET, "💔 Called `on_validated_block_announce` with a bad peer ID {peer_id}");
421 return Some((hash, number))
422 };
423
424 if let PeerSyncState::AncestorSearch { .. } = peer.state {
425 trace!(target: LOG_TARGET, "Peer {} is in the ancestor search state.", peer_id);
426 return None
427 }
428
429 let peer_info = is_best.then(|| {
430 peer.best_number = number;
432 peer.best_hash = hash;
433
434 (hash, number)
435 });
436
437 if is_best {
440 if known && self.best_queued_number >= number {
441 self.update_peer_common_number(&peer_id, number);
442 } else if announce.header.parent_hash() == &self.best_queued_hash ||
443 known_parent && self.best_queued_number >= number
444 {
445 self.update_peer_common_number(&peer_id, number.saturating_sub(One::one()));
446 }
447 }
448 self.allowed_requests.add(&peer_id);
449
450 if known || self.is_already_downloading(&hash) {
452 trace!(target: LOG_TARGET, "Known block announce from {}: {}", peer_id, hash);
453 if let Some(target) = self.fork_targets.get_mut(&hash) {
454 target.peers.insert(peer_id);
455 }
456 return peer_info
457 }
458
459 if ancient_parent {
460 trace!(
461 target: LOG_TARGET,
462 "Ignored ancient block announced from {}: {} {:?}",
463 peer_id,
464 hash,
465 announce.header,
466 );
467 return peer_info
468 }
469
470 if self.status().state == SyncState::Idle {
471 trace!(
472 target: LOG_TARGET,
473 "Added sync target for block announced from {}: {} {:?}",
474 peer_id,
475 hash,
476 announce.summary(),
477 );
478 self.fork_targets
479 .entry(hash)
480 .or_insert_with(|| {
481 if let Some(metrics) = &self.metrics {
482 metrics.fork_targets.inc();
483 }
484
485 ForkTarget {
486 number,
487 parent_hash: Some(*announce.header.parent_hash()),
488 peers: Default::default(),
489 }
490 })
491 .peers
492 .insert(peer_id);
493 }
494
495 peer_info
496 }
497
498 fn set_sync_fork_request(
500 &mut self,
501 mut peers: Vec<PeerId>,
502 hash: &B::Hash,
503 number: NumberFor<B>,
504 ) {
505 if peers.is_empty() {
506 peers = self
507 .peers
508 .iter()
509 .filter(|(_, peer)| peer.best_number >= number)
511 .map(|(id, _)| *id)
512 .collect();
513
514 debug!(
515 target: LOG_TARGET,
516 "Explicit sync request for block {hash:?} with no peers specified. \
517 Syncing from these peers {peers:?} instead.",
518 );
519 } else {
520 debug!(
521 target: LOG_TARGET,
522 "Explicit sync request for block {hash:?} with {peers:?}",
523 );
524 }
525
526 if self.is_known(hash) {
527 debug!(target: LOG_TARGET, "Refusing to sync known hash {hash:?}");
528 return
529 }
530
531 trace!(target: LOG_TARGET, "Downloading requested old fork {hash:?}");
532 for peer_id in &peers {
533 if let Some(peer) = self.peers.get_mut(peer_id) {
534 if let PeerSyncState::AncestorSearch { .. } = peer.state {
535 continue
536 }
537
538 if number > peer.best_number {
539 peer.best_number = number;
540 peer.best_hash = *hash;
541 }
542 self.allowed_requests.add(peer_id);
543 }
544 }
545
546 self.fork_targets
547 .entry(*hash)
548 .or_insert_with(|| {
549 if let Some(metrics) = &self.metrics {
550 metrics.fork_targets.inc();
551 }
552
553 ForkTarget { number, peers: Default::default(), parent_hash: None }
554 })
555 .peers
556 .extend(peers);
557 }
558
559 fn request_justification(&mut self, hash: &B::Hash, number: NumberFor<B>) {
560 let client = &self.client;
561 self.extra_justifications
562 .schedule((*hash, number), |base, block| is_descendent_of(&**client, base, block))
563 }
564
565 fn clear_justification_requests(&mut self) {
566 self.extra_justifications.reset();
567 }
568
569 fn on_justification_import(&mut self, hash: B::Hash, number: NumberFor<B>, success: bool) {
570 let finalization_result = if success { Ok((hash, number)) } else { Err(()) };
571 self.extra_justifications
572 .try_finalize_root((hash, number), finalization_result, true);
573 self.allowed_requests.set_all();
574 }
575
576 fn on_generic_response(
577 &mut self,
578 peer_id: &PeerId,
579 key: StrategyKey,
580 protocol_name: ProtocolName,
581 response: Box<dyn Any + Send>,
582 ) {
583 if Self::STRATEGY_KEY != key {
584 warn!(
585 target: LOG_TARGET,
586 "Unexpected generic response strategy key {key:?}, protocol {protocol_name}",
587 );
588 debug_assert!(false);
589 return;
590 }
591
592 if protocol_name == self.state_request_protocol_name {
593 let Ok(response) = response.downcast::<Vec<u8>>() else {
594 warn!(target: LOG_TARGET, "Failed to downcast state response");
595 debug_assert!(false);
596 return;
597 };
598
599 if let Err(bad_peer) = self.on_state_data(&peer_id, &response) {
600 self.actions.push(SyncingAction::DropPeer(bad_peer));
601 }
602 } else if &protocol_name == self.block_downloader.protocol_name() {
603 let Ok(response) = response
604 .downcast::<(BlockRequest<B>, Result<Vec<BlockData<B>>, BlockResponseError>)>()
605 else {
606 warn!(target: LOG_TARGET, "Failed to downcast block response");
607 debug_assert!(false);
608 return;
609 };
610
611 let (request, response) = *response;
612 let blocks = match response {
613 Ok(blocks) => blocks,
614 Err(BlockResponseError::DecodeFailed(e)) => {
615 debug!(
616 target: LOG_TARGET,
617 "Failed to decode block response from peer {:?}: {:?}.",
618 peer_id,
619 e
620 );
621 self.actions.push(SyncingAction::DropPeer(BadPeer(*peer_id, rep::BAD_MESSAGE)));
622 return;
623 },
624 Err(BlockResponseError::ExtractionFailed(e)) => {
625 debug!(
626 target: LOG_TARGET,
627 "Failed to extract blocks from peer response {:?}: {:?}.",
628 peer_id,
629 e
630 );
631 self.actions.push(SyncingAction::DropPeer(BadPeer(*peer_id, rep::BAD_MESSAGE)));
632 return;
633 },
634 };
635
636 if let Err(bad_peer) = self.on_block_response(peer_id, key, request, blocks) {
637 self.actions.push(SyncingAction::DropPeer(bad_peer));
638 }
639 } else {
640 warn!(
641 target: LOG_TARGET,
642 "Unexpected generic response protocol {protocol_name}, strategy key \
643 {key:?}",
644 );
645 debug_assert!(false);
646 }
647 }
648
649 fn on_blocks_processed(
650 &mut self,
651 imported: usize,
652 count: usize,
653 results: Vec<(Result<BlockImportStatus<NumberFor<B>>, BlockImportError>, B::Hash)>,
654 ) {
655 trace!(target: LOG_TARGET, "Imported {imported} of {count}");
656
657 let mut has_error = false;
658 for (_, hash) in &results {
659 if self.queue_blocks.remove(hash) {
660 if let Some(metrics) = &self.metrics {
661 metrics.queued_blocks.dec();
662 }
663 }
664 self.blocks.clear_queued(hash);
665 if let Some(gap_sync) = &mut self.gap_sync {
666 gap_sync.blocks.clear_queued(hash);
667 }
668 }
669 for (result, hash) in results {
670 if has_error {
671 break
672 }
673
674 has_error |= result.is_err();
675
676 match result {
677 Ok(BlockImportStatus::ImportedKnown(number, peer_id)) => {
678 if let Some(peer) = peer_id {
679 self.update_peer_common_number(&peer, number);
680 }
681 self.complete_gap_if_target(number);
682 },
683 Ok(BlockImportStatus::ImportedUnknown(number, aux, peer_id)) => {
684 if aux.clear_justification_requests {
685 trace!(
686 target: LOG_TARGET,
687 "Block imported clears all pending justification requests {number}: {hash:?}",
688 );
689 self.clear_justification_requests();
690 }
691
692 if aux.needs_justification {
693 trace!(
694 target: LOG_TARGET,
695 "Block imported but requires justification {number}: {hash:?}",
696 );
697 self.request_justification(&hash, number);
698 }
699
700 if aux.bad_justification {
701 if let Some(ref peer) = peer_id {
702 warn!("💔 Sent block with bad justification to import");
703 self.actions.push(SyncingAction::DropPeer(BadPeer(
704 *peer,
705 rep::BAD_JUSTIFICATION,
706 )));
707 }
708 }
709
710 if let Some(peer) = peer_id {
711 self.update_peer_common_number(&peer, number);
712 }
713 let state_sync_complete =
714 self.state_sync.as_ref().map_or(false, |s| s.target_hash() == hash);
715 if state_sync_complete {
716 info!(
717 target: LOG_TARGET,
718 "State sync is complete ({} MiB), restarting block sync.",
719 self.state_sync.as_ref().map_or(0, |s| s.progress().size / (1024 * 1024)),
720 );
721 self.state_sync = None;
722 self.mode = ChainSyncMode::Full;
723 self.restart();
724 }
725
726 self.complete_gap_if_target(number);
727 },
728 Err(BlockImportError::IncompleteHeader(peer_id)) =>
729 if let Some(peer) = peer_id {
730 warn!(
731 target: LOG_TARGET,
732 "💔 Peer sent block with incomplete header to import",
733 );
734 self.actions
735 .push(SyncingAction::DropPeer(BadPeer(peer, rep::INCOMPLETE_HEADER)));
736 self.restart();
737 },
738 Err(BlockImportError::VerificationFailed(peer_id, e)) => {
739 let extra_message = peer_id
740 .map_or_else(|| "".into(), |peer| format!(" received from ({peer})"));
741
742 warn!(
743 target: LOG_TARGET,
744 "💔 Verification failed for block {hash:?}{extra_message}: {e:?}",
745 );
746
747 if let Some(peer) = peer_id {
748 self.actions
749 .push(SyncingAction::DropPeer(BadPeer(peer, rep::VERIFICATION_FAIL)));
750 }
751
752 self.restart();
753 },
754 Err(BlockImportError::BadBlock(peer_id)) =>
755 if let Some(peer) = peer_id {
756 warn!(
757 target: LOG_TARGET,
758 "💔 Block {hash:?} received from peer {peer} has been blacklisted",
759 );
760 self.actions.push(SyncingAction::DropPeer(BadPeer(peer, rep::BAD_BLOCK)));
761 },
762 Err(BlockImportError::MissingState) => {
763 trace!(target: LOG_TARGET, "Obsolete block {hash:?}");
767 },
768 e @ Err(BlockImportError::UnknownParent) | e @ Err(BlockImportError::Other(_)) => {
769 warn!(target: LOG_TARGET, "💔 Error importing block {hash:?}: {}", e.unwrap_err());
770 self.state_sync = None;
771 self.restart();
772 },
773 Err(BlockImportError::Cancelled) => {},
774 };
775 }
776
777 self.allowed_requests.set_all();
778 }
779
780 fn on_block_finalized(&mut self, hash: &B::Hash, number: NumberFor<B>) {
781 let client = &self.client;
782 let r = self.extra_justifications.on_block_finalized(hash, number, |base, block| {
783 is_descendent_of(&**client, base, block)
784 });
785
786 if let ChainSyncMode::LightState { skip_proofs, .. } = &self.mode {
787 if self.state_sync.is_none() {
788 if !self.peers.is_empty() && self.queue_blocks.is_empty() {
789 self.attempt_state_sync(*hash, number, *skip_proofs);
790 } else {
791 self.pending_state_sync_attempt.replace((*hash, number, *skip_proofs));
792 }
793 }
794 }
795
796 if let Err(err) = r {
797 warn!(
798 target: LOG_TARGET,
799 "💔 Error cleaning up pending extra justification data requests: {err}",
800 );
801 }
802 }
803
804 fn update_chain_info(&mut self, best_hash: &B::Hash, best_number: NumberFor<B>) {
805 self.on_block_queued(best_hash, best_number);
806 }
807
808 fn is_major_syncing(&self) -> bool {
809 self.status().state.is_major_syncing()
810 }
811
812 fn num_peers(&self) -> usize {
813 self.peers.len()
814 }
815
816 fn status(&self) -> SyncStatus<B> {
817 let median_seen = self.median_seen();
818 let best_seen_block =
819 median_seen.and_then(|median| (median > self.best_queued_number).then_some(median));
820 let sync_state = if let Some(target) = median_seen {
821 let best_block = self.client.info().best_number;
825 if target > best_block && target - best_block > MAJOR_SYNC_BLOCKS.into() {
826 if target > self.best_queued_number {
828 SyncState::Downloading { target }
829 } else {
830 SyncState::Importing { target }
831 }
832 } else {
833 SyncState::Idle
834 }
835 } else {
836 SyncState::Idle
837 };
838
839 let warp_sync_progress = self.gap_sync.as_ref().map(|gap_sync| WarpSyncProgress {
840 phase: WarpSyncPhase::DownloadingBlocks(gap_sync.best_queued_number),
841 total_bytes: 0,
842 status: None,
843 });
844
845 SyncStatus {
846 state: sync_state,
847 best_seen_block,
848 num_peers: self.peers.len() as u32,
849 queued_blocks: self.queue_blocks.len() as u32,
850 state_sync: self.state_sync.as_ref().map(|s| s.progress()),
851 warp_sync: warp_sync_progress,
852 }
853 }
854
855 fn num_downloaded_blocks(&self) -> usize {
856 self.downloaded_blocks
857 }
858
859 fn num_sync_requests(&self) -> usize {
860 self.fork_targets
861 .values()
862 .filter(|f| f.number <= self.best_queued_number)
863 .count()
864 }
865
866 fn actions(
867 &mut self,
868 network_service: &NetworkServiceHandle,
869 ) -> Result<Vec<SyncingAction<B>>, ClientError> {
870 if !self.peers.is_empty() && self.queue_blocks.is_empty() {
871 if let Some((hash, number, skip_proofs)) = self.pending_state_sync_attempt.take() {
872 self.attempt_state_sync(hash, number, skip_proofs);
873 }
874 }
875
876 let block_requests = self
877 .block_requests()
878 .into_iter()
879 .map(|(peer_id, request)| self.create_block_request_action(peer_id, request))
880 .collect::<Vec<_>>();
881 self.actions.extend(block_requests);
882
883 let justification_requests = self
884 .justification_requests()
885 .into_iter()
886 .map(|(peer_id, request)| self.create_block_request_action(peer_id, request))
887 .collect::<Vec<_>>();
888 self.actions.extend(justification_requests);
889
890 let state_request = self.state_request().into_iter().map(|(peer_id, request)| {
891 trace!(
892 target: LOG_TARGET,
893 "Created `StrategyRequest` to {peer_id}.",
894 );
895
896 let (tx, rx) = oneshot::channel();
897
898 network_service.start_request(
899 peer_id,
900 self.state_request_protocol_name.clone(),
901 request.encode_to_vec(),
902 tx,
903 IfDisconnected::ImmediateError,
904 );
905
906 SyncingAction::StartRequest {
907 peer_id,
908 key: Self::STRATEGY_KEY,
909 request: async move {
910 Ok(rx.await?.and_then(|(response, protocol_name)| {
911 Ok((Box::new(response) as Box<dyn Any + Send>, protocol_name))
912 }))
913 }
914 .boxed(),
915 remove_obsolete: false,
916 }
917 });
918 self.actions.extend(state_request);
919
920 Ok(std::mem::take(&mut self.actions))
921 }
922}
923
924impl<B, Client> ChainSync<B, Client>
925where
926 B: BlockT,
927 Client: HeaderBackend<B>
928 + BlockBackend<B>
929 + HeaderMetadata<B, Error = sp_blockchain::Error>
930 + ProofProvider<B>
931 + Send
932 + Sync
933 + 'static,
934{
935 pub const STRATEGY_KEY: StrategyKey = StrategyKey::new("ChainSync");
937
938 pub fn new(
940 mode: ChainSyncMode,
941 client: Arc<Client>,
942 max_parallel_downloads: u32,
943 max_blocks_per_request: u32,
944 state_request_protocol_name: ProtocolName,
945 block_downloader: Arc<dyn BlockDownloader<B>>,
946 metrics_registry: Option<&Registry>,
947 initial_peers: impl Iterator<Item = (PeerId, B::Hash, NumberFor<B>)>,
948 ) -> Result<Self, ClientError> {
949 let mut sync = Self {
950 client,
951 peers: HashMap::new(),
952 disconnected_peers: DisconnectedPeers::new(),
953 blocks: BlockCollection::new(),
954 best_queued_hash: Default::default(),
955 best_queued_number: Zero::zero(),
956 extra_justifications: ExtraRequests::new("justification", metrics_registry),
957 mode,
958 queue_blocks: Default::default(),
959 pending_state_sync_attempt: None,
960 fork_targets: Default::default(),
961 allowed_requests: Default::default(),
962 max_parallel_downloads,
963 max_blocks_per_request,
964 state_request_protocol_name,
965 downloaded_blocks: 0,
966 state_sync: None,
967 import_existing: false,
968 block_downloader,
969 gap_sync: None,
970 actions: Vec::new(),
971 metrics: metrics_registry.and_then(|r| match Metrics::register(r) {
972 Ok(metrics) => Some(metrics),
973 Err(err) => {
974 log::error!(
975 target: LOG_TARGET,
976 "Failed to register `ChainSync` metrics {err:?}",
977 );
978 None
979 },
980 }),
981 };
982
983 sync.reset_sync_start_point()?;
984 initial_peers.for_each(|(peer_id, best_hash, best_number)| {
985 sync.add_peer(peer_id, best_hash, best_number);
986 });
987
988 Ok(sync)
989 }
990
991 fn complete_gap_if_target(&mut self, number: NumberFor<B>) {
993 let gap_sync_complete = self.gap_sync.as_ref().map_or(false, |s| s.target == number);
994 if gap_sync_complete {
995 info!(
996 target: LOG_TARGET,
997 "Block history download is complete."
998 );
999 self.gap_sync = None;
1000 }
1001 }
1002
1003 #[must_use]
1004 fn add_peer_inner(
1005 &mut self,
1006 peer_id: PeerId,
1007 best_hash: B::Hash,
1008 best_number: NumberFor<B>,
1009 ) -> Result<Option<BlockRequest<B>>, BadPeer> {
1010 match self.block_status(&best_hash) {
1012 Err(e) => {
1013 debug!(target: LOG_TARGET, "Error reading blockchain: {e}");
1014 Err(BadPeer(peer_id, rep::BLOCKCHAIN_READ_ERROR))
1015 },
1016 Ok(BlockStatus::KnownBad) => {
1017 info!(
1018 "💔 New peer {peer_id} with known bad best block {best_hash} ({best_number})."
1019 );
1020 Err(BadPeer(peer_id, rep::BAD_BLOCK))
1021 },
1022 Ok(BlockStatus::Unknown) => {
1023 if best_number.is_zero() {
1024 info!(
1025 "💔 New peer {} with unknown genesis hash {} ({}).",
1026 peer_id, best_hash, best_number,
1027 );
1028 return Err(BadPeer(peer_id, rep::GENESIS_MISMATCH));
1029 }
1030
1031 if self.queue_blocks.len() > MAJOR_SYNC_BLOCKS as usize {
1035 debug!(
1036 target: LOG_TARGET,
1037 "New peer {} with unknown best hash {} ({}), assuming common block.",
1038 peer_id,
1039 self.best_queued_hash,
1040 self.best_queued_number
1041 );
1042 self.peers.insert(
1043 peer_id,
1044 PeerSync {
1045 peer_id,
1046 common_number: self.best_queued_number,
1047 best_hash,
1048 best_number,
1049 state: PeerSyncState::Available,
1050 },
1051 );
1052 return Ok(None);
1053 }
1054
1055 let (state, req) = if self.best_queued_number.is_zero() {
1057 debug!(
1058 target: LOG_TARGET,
1059 "New peer {peer_id} with best hash {best_hash} ({best_number}).",
1060 );
1061
1062 (PeerSyncState::Available, None)
1063 } else {
1064 let common_best = std::cmp::min(self.best_queued_number, best_number);
1065
1066 debug!(
1067 target: LOG_TARGET,
1068 "New peer {} with unknown best hash {} ({}), searching for common ancestor.",
1069 peer_id,
1070 best_hash,
1071 best_number
1072 );
1073
1074 (
1075 PeerSyncState::AncestorSearch {
1076 current: common_best,
1077 start: self.best_queued_number,
1078 state: AncestorSearchState::ExponentialBackoff(One::one()),
1079 },
1080 Some(ancestry_request::<B>(common_best)),
1081 )
1082 };
1083
1084 self.allowed_requests.add(&peer_id);
1085 self.peers.insert(
1086 peer_id,
1087 PeerSync {
1088 peer_id,
1089 common_number: Zero::zero(),
1090 best_hash,
1091 best_number,
1092 state,
1093 },
1094 );
1095
1096 Ok(req)
1097 },
1098 Ok(BlockStatus::Queued) |
1099 Ok(BlockStatus::InChainWithState) |
1100 Ok(BlockStatus::InChainPruned) => {
1101 debug!(
1102 target: LOG_TARGET,
1103 "New peer {peer_id} with known best hash {best_hash} ({best_number}).",
1104 );
1105 self.peers.insert(
1106 peer_id,
1107 PeerSync {
1108 peer_id,
1109 common_number: std::cmp::min(self.best_queued_number, best_number),
1110 best_hash,
1111 best_number,
1112 state: PeerSyncState::Available,
1113 },
1114 );
1115 self.allowed_requests.add(&peer_id);
1116 Ok(None)
1117 },
1118 }
1119 }
1120
1121 fn create_block_request_action(
1122 &mut self,
1123 peer_id: PeerId,
1124 request: BlockRequest<B>,
1125 ) -> SyncingAction<B> {
1126 let downloader = self.block_downloader.clone();
1127
1128 SyncingAction::StartRequest {
1129 peer_id,
1130 key: Self::STRATEGY_KEY,
1131 request: async move {
1132 Ok(downloader.download_blocks(peer_id, request.clone()).await?.and_then(
1133 |(response, protocol_name)| {
1134 let decoded_response =
1135 downloader.block_response_into_blocks(&request, response);
1136 let result = Box::new((request, decoded_response)) as Box<dyn Any + Send>;
1137 Ok((result, protocol_name))
1138 },
1139 ))
1140 }
1141 .boxed(),
1142 remove_obsolete: true,
1145 }
1146 }
1147
1148 #[must_use]
1150 fn on_block_data(
1151 &mut self,
1152 peer_id: &PeerId,
1153 request: Option<BlockRequest<B>>,
1154 response: BlockResponse<B>,
1155 ) -> Result<(), BadPeer> {
1156 self.downloaded_blocks += response.blocks.len();
1157 let mut gap = false;
1158 let new_blocks: Vec<IncomingBlock<B>> = if let Some(peer) = self.peers.get_mut(peer_id) {
1159 let mut blocks = response.blocks;
1160 if request.as_ref().map_or(false, |r| r.direction == Direction::Descending) {
1161 trace!(target: LOG_TARGET, "Reversing incoming block list");
1162 blocks.reverse()
1163 }
1164 self.allowed_requests.add(peer_id);
1165 if let Some(request) = request {
1166 match &mut peer.state {
1167 PeerSyncState::DownloadingNew(_) => {
1168 self.blocks.clear_peer_download(peer_id);
1169 peer.state = PeerSyncState::Available;
1170 if let Some(start_block) =
1171 validate_blocks::<B>(&blocks, peer_id, Some(request))?
1172 {
1173 self.blocks.insert(start_block, blocks, *peer_id);
1174 }
1175 self.ready_blocks()
1176 },
1177 PeerSyncState::DownloadingGap(_) => {
1178 peer.state = PeerSyncState::Available;
1179 if let Some(gap_sync) = &mut self.gap_sync {
1180 gap_sync.blocks.clear_peer_download(peer_id);
1181 if let Some(start_block) =
1182 validate_blocks::<B>(&blocks, peer_id, Some(request))?
1183 {
1184 gap_sync.blocks.insert(start_block, blocks, *peer_id);
1185 }
1186 gap = true;
1187 let blocks: Vec<_> = gap_sync
1188 .blocks
1189 .ready_blocks(gap_sync.best_queued_number + One::one())
1190 .into_iter()
1191 .map(|block_data| {
1192 let justifications =
1193 block_data.block.justifications.or_else(|| {
1194 legacy_justification_mapping(
1195 block_data.block.justification,
1196 )
1197 });
1198 IncomingBlock {
1199 hash: block_data.block.hash,
1200 header: block_data.block.header,
1201 body: block_data.block.body,
1202 indexed_body: block_data.block.indexed_body,
1203 justifications,
1204 origin: block_data.origin,
1205 allow_missing_state: true,
1206 import_existing: self.import_existing,
1207 skip_execution: true,
1208 state: None,
1209 }
1210 })
1211 .collect();
1212 debug!(
1213 target: LOG_TARGET,
1214 "Drained {} gap blocks from {}",
1215 blocks.len(),
1216 gap_sync.best_queued_number,
1217 );
1218 blocks
1219 } else {
1220 debug!(target: LOG_TARGET, "Unexpected gap block response from {peer_id}");
1221 return Err(BadPeer(*peer_id, rep::NO_BLOCK));
1222 }
1223 },
1224 PeerSyncState::DownloadingStale(_) => {
1225 peer.state = PeerSyncState::Available;
1226 if blocks.is_empty() {
1227 debug!(target: LOG_TARGET, "Empty block response from {peer_id}");
1228 return Err(BadPeer(*peer_id, rep::NO_BLOCK));
1229 }
1230 validate_blocks::<B>(&blocks, peer_id, Some(request))?;
1231 blocks
1232 .into_iter()
1233 .map(|b| {
1234 let justifications = b
1235 .justifications
1236 .or_else(|| legacy_justification_mapping(b.justification));
1237 IncomingBlock {
1238 hash: b.hash,
1239 header: b.header,
1240 body: b.body,
1241 indexed_body: None,
1242 justifications,
1243 origin: Some(*peer_id),
1244 allow_missing_state: true,
1245 import_existing: self.import_existing,
1246 skip_execution: self.skip_execution(),
1247 state: None,
1248 }
1249 })
1250 .collect()
1251 },
1252 PeerSyncState::AncestorSearch { current, start, state } => {
1253 let matching_hash = match (blocks.get(0), self.client.hash(*current)) {
1254 (Some(block), Ok(maybe_our_block_hash)) => {
1255 trace!(
1256 target: LOG_TARGET,
1257 "Got ancestry block #{} ({}) from peer {}",
1258 current,
1259 block.hash,
1260 peer_id,
1261 );
1262 maybe_our_block_hash.filter(|x| x == &block.hash)
1263 },
1264 (None, _) => {
1265 debug!(
1266 target: LOG_TARGET,
1267 "Invalid response when searching for ancestor from {peer_id}",
1268 );
1269 return Err(BadPeer(*peer_id, rep::UNKNOWN_ANCESTOR));
1270 },
1271 (_, Err(e)) => {
1272 info!(
1273 target: LOG_TARGET,
1274 "❌ Error answering legitimate blockchain query: {e}",
1275 );
1276 return Err(BadPeer(*peer_id, rep::BLOCKCHAIN_READ_ERROR));
1277 },
1278 };
1279 if matching_hash.is_some() {
1280 if *start < self.best_queued_number &&
1281 self.best_queued_number <= peer.best_number
1282 {
1283 trace!(
1287 target: LOG_TARGET,
1288 "Ancestry search: opportunistically updating peer {} common number from={} => to={}.",
1289 *peer_id,
1290 peer.common_number,
1291 self.best_queued_number,
1292 );
1293 peer.common_number = self.best_queued_number;
1294 } else if peer.common_number < *current {
1295 trace!(
1296 target: LOG_TARGET,
1297 "Ancestry search: updating peer {} common number from={} => to={}.",
1298 *peer_id,
1299 peer.common_number,
1300 *current,
1301 );
1302 peer.common_number = *current;
1303 }
1304 }
1305 if matching_hash.is_none() && current.is_zero() {
1306 trace!(
1307 target: LOG_TARGET,
1308 "Ancestry search: genesis mismatch for peer {peer_id}",
1309 );
1310 return Err(BadPeer(*peer_id, rep::GENESIS_MISMATCH));
1311 }
1312 if let Some((next_state, next_num)) =
1313 handle_ancestor_search_state(state, *current, matching_hash.is_some())
1314 {
1315 peer.state = PeerSyncState::AncestorSearch {
1316 current: next_num,
1317 start: *start,
1318 state: next_state,
1319 };
1320 let request = ancestry_request::<B>(next_num);
1321 let action = self.create_block_request_action(*peer_id, request);
1322 self.actions.push(action);
1323 return Ok(());
1324 } else {
1325 trace!(
1328 target: LOG_TARGET,
1329 "Ancestry search complete. Ours={} ({}), Theirs={} ({}), Common={:?} ({})",
1330 self.best_queued_hash,
1331 self.best_queued_number,
1332 peer.best_hash,
1333 peer.best_number,
1334 matching_hash,
1335 peer.common_number,
1336 );
1337 if peer.common_number < peer.best_number &&
1338 peer.best_number < self.best_queued_number
1339 {
1340 trace!(
1341 target: LOG_TARGET,
1342 "Added fork target {} for {}",
1343 peer.best_hash,
1344 peer_id,
1345 );
1346 self.fork_targets
1347 .entry(peer.best_hash)
1348 .or_insert_with(|| {
1349 if let Some(metrics) = &self.metrics {
1350 metrics.fork_targets.inc();
1351 }
1352
1353 ForkTarget {
1354 number: peer.best_number,
1355 parent_hash: None,
1356 peers: Default::default(),
1357 }
1358 })
1359 .peers
1360 .insert(*peer_id);
1361 }
1362 peer.state = PeerSyncState::Available;
1363 return Ok(());
1364 }
1365 },
1366 PeerSyncState::Available |
1367 PeerSyncState::DownloadingJustification(..) |
1368 PeerSyncState::DownloadingState => Vec::new(),
1369 }
1370 } else {
1371 validate_blocks::<B>(&blocks, peer_id, None)?;
1373 blocks
1374 .into_iter()
1375 .map(|b| {
1376 let justifications = b
1377 .justifications
1378 .or_else(|| legacy_justification_mapping(b.justification));
1379 IncomingBlock {
1380 hash: b.hash,
1381 header: b.header,
1382 body: b.body,
1383 indexed_body: None,
1384 justifications,
1385 origin: Some(*peer_id),
1386 allow_missing_state: true,
1387 import_existing: false,
1388 skip_execution: true,
1389 state: None,
1390 }
1391 })
1392 .collect()
1393 }
1394 } else {
1395 return Err(BadPeer(*peer_id, rep::NOT_REQUESTED));
1397 };
1398
1399 self.validate_and_queue_blocks(new_blocks, gap);
1400
1401 Ok(())
1402 }
1403
1404 fn on_block_response(
1405 &mut self,
1406 peer_id: &PeerId,
1407 key: StrategyKey,
1408 request: BlockRequest<B>,
1409 blocks: Vec<BlockData<B>>,
1410 ) -> Result<(), BadPeer> {
1411 if key != Self::STRATEGY_KEY {
1412 error!(
1413 target: LOG_TARGET,
1414 "`on_block_response()` called with unexpected key {key:?} for chain sync",
1415 );
1416 debug_assert!(false);
1417 }
1418 let block_response = BlockResponse::<B> { id: request.id, blocks };
1419
1420 let blocks_range = || match (
1421 block_response
1422 .blocks
1423 .first()
1424 .and_then(|b| b.header.as_ref().map(|h| h.number())),
1425 block_response.blocks.last().and_then(|b| b.header.as_ref().map(|h| h.number())),
1426 ) {
1427 (Some(first), Some(last)) if first != last => format!(" ({}..{})", first, last),
1428 (Some(first), Some(_)) => format!(" ({})", first),
1429 _ => Default::default(),
1430 };
1431 trace!(
1432 target: LOG_TARGET,
1433 "BlockResponse {} from {} with {} blocks {}",
1434 block_response.id,
1435 peer_id,
1436 block_response.blocks.len(),
1437 blocks_range(),
1438 );
1439
1440 if request.fields == BlockAttributes::JUSTIFICATION {
1441 self.on_block_justification(*peer_id, block_response)
1442 } else {
1443 self.on_block_data(peer_id, Some(request), block_response)
1444 }
1445 }
1446
1447 #[must_use]
1449 fn on_block_justification(
1450 &mut self,
1451 peer_id: PeerId,
1452 response: BlockResponse<B>,
1453 ) -> Result<(), BadPeer> {
1454 let peer = if let Some(peer) = self.peers.get_mut(&peer_id) {
1455 peer
1456 } else {
1457 error!(
1458 target: LOG_TARGET,
1459 "💔 Called on_block_justification with a peer ID of an unknown peer",
1460 );
1461 return Ok(());
1462 };
1463
1464 self.allowed_requests.add(&peer_id);
1465 if let PeerSyncState::DownloadingJustification(hash) = peer.state {
1466 peer.state = PeerSyncState::Available;
1467
1468 let justification = if let Some(block) = response.blocks.into_iter().next() {
1470 if hash != block.hash {
1471 warn!(
1472 target: LOG_TARGET,
1473 "💔 Invalid block justification provided by {}: requested: {:?} got: {:?}",
1474 peer_id,
1475 hash,
1476 block.hash,
1477 );
1478 return Err(BadPeer(peer_id, rep::BAD_JUSTIFICATION));
1479 }
1480
1481 block
1482 .justifications
1483 .or_else(|| legacy_justification_mapping(block.justification))
1484 } else {
1485 trace!(
1488 target: LOG_TARGET,
1489 "Peer {peer_id:?} provided empty response for justification request {hash:?}",
1490 );
1491
1492 None
1493 };
1494
1495 if let Some((peer_id, hash, number, justifications)) =
1496 self.extra_justifications.on_response(peer_id, justification)
1497 {
1498 self.actions.push(SyncingAction::ImportJustifications {
1499 peer_id,
1500 hash,
1501 number,
1502 justifications,
1503 });
1504 return Ok(());
1505 }
1506 }
1507
1508 Ok(())
1509 }
1510
1511 fn median_seen(&self) -> Option<NumberFor<B>> {
1513 let mut best_seens = self.peers.values().map(|p| p.best_number).collect::<Vec<_>>();
1514
1515 if best_seens.is_empty() {
1516 None
1517 } else {
1518 let middle = best_seens.len() / 2;
1519
1520 Some(*best_seens.select_nth_unstable(middle).1)
1522 }
1523 }
1524
1525 fn required_block_attributes(&self) -> BlockAttributes {
1526 match self.mode {
1527 ChainSyncMode::Full =>
1528 BlockAttributes::HEADER | BlockAttributes::JUSTIFICATION | BlockAttributes::BODY,
1529 ChainSyncMode::LightState { storage_chain_mode: false, .. } =>
1530 BlockAttributes::HEADER | BlockAttributes::JUSTIFICATION | BlockAttributes::BODY,
1531 ChainSyncMode::LightState { storage_chain_mode: true, .. } =>
1532 BlockAttributes::HEADER |
1533 BlockAttributes::JUSTIFICATION |
1534 BlockAttributes::INDEXED_BODY,
1535 }
1536 }
1537
1538 fn skip_execution(&self) -> bool {
1539 match self.mode {
1540 ChainSyncMode::Full => false,
1541 ChainSyncMode::LightState { .. } => true,
1542 }
1543 }
1544
1545 fn validate_and_queue_blocks(&mut self, mut new_blocks: Vec<IncomingBlock<B>>, gap: bool) {
1546 let orig_len = new_blocks.len();
1547 new_blocks.retain(|b| !self.queue_blocks.contains(&b.hash));
1548 if new_blocks.len() != orig_len {
1549 debug!(
1550 target: LOG_TARGET,
1551 "Ignoring {} blocks that are already queued",
1552 orig_len - new_blocks.len(),
1553 );
1554 }
1555
1556 let origin = if !gap && !self.status().state.is_major_syncing() {
1557 BlockOrigin::NetworkBroadcast
1558 } else {
1559 BlockOrigin::NetworkInitialSync
1560 };
1561
1562 if let Some((h, n)) = new_blocks
1563 .last()
1564 .and_then(|b| b.header.as_ref().map(|h| (&b.hash, *h.number())))
1565 {
1566 trace!(
1567 target: LOG_TARGET,
1568 "Accepted {} blocks ({:?}) with origin {:?}",
1569 new_blocks.len(),
1570 h,
1571 origin,
1572 );
1573 self.on_block_queued(h, n)
1574 }
1575 self.queue_blocks.extend(new_blocks.iter().map(|b| b.hash));
1576 if let Some(metrics) = &self.metrics {
1577 metrics
1578 .queued_blocks
1579 .set(self.queue_blocks.len().try_into().unwrap_or(u64::MAX));
1580 }
1581
1582 self.actions.push(SyncingAction::ImportBlocks { origin, blocks: new_blocks })
1583 }
1584
1585 fn update_peer_common_number(&mut self, peer_id: &PeerId, new_common: NumberFor<B>) {
1586 if let Some(peer) = self.peers.get_mut(peer_id) {
1587 peer.update_common_number(new_common);
1588 }
1589 }
1590
1591 fn on_block_queued(&mut self, hash: &B::Hash, number: NumberFor<B>) {
1596 if self.fork_targets.remove(hash).is_some() {
1597 if let Some(metrics) = &self.metrics {
1598 metrics.fork_targets.dec();
1599 }
1600 trace!(target: LOG_TARGET, "Completed fork sync {hash:?}");
1601 }
1602 if let Some(gap_sync) = &mut self.gap_sync {
1603 if number > gap_sync.best_queued_number && number <= gap_sync.target {
1604 gap_sync.best_queued_number = number;
1605 }
1606 }
1607 if number > self.best_queued_number {
1608 self.best_queued_number = number;
1609 self.best_queued_hash = *hash;
1610 for (n, peer) in self.peers.iter_mut() {
1612 if let PeerSyncState::AncestorSearch { .. } = peer.state {
1613 continue;
1615 }
1616 let new_common_number =
1617 if peer.best_number >= number { number } else { peer.best_number };
1618 trace!(
1619 target: LOG_TARGET,
1620 "Updating peer {} info, ours={}, common={}->{}, their best={}",
1621 n,
1622 number,
1623 peer.common_number,
1624 new_common_number,
1625 peer.best_number,
1626 );
1627 peer.common_number = new_common_number;
1628 }
1629 }
1630 self.allowed_requests.set_all();
1631 }
1632
1633 fn restart(&mut self) {
1637 self.blocks.clear();
1638 if let Err(e) = self.reset_sync_start_point() {
1639 warn!(target: LOG_TARGET, "💔 Unable to restart sync: {e}");
1640 }
1641 self.allowed_requests.set_all();
1642 debug!(
1643 target: LOG_TARGET,
1644 "Restarted with {} ({})",
1645 self.best_queued_number,
1646 self.best_queued_hash,
1647 );
1648 let old_peers = std::mem::take(&mut self.peers);
1649
1650 old_peers.into_iter().for_each(|(peer_id, mut peer_sync)| {
1651 match peer_sync.state {
1652 PeerSyncState::Available => {
1653 self.add_peer(peer_id, peer_sync.best_hash, peer_sync.best_number);
1654 },
1655 PeerSyncState::AncestorSearch { .. } |
1656 PeerSyncState::DownloadingNew(_) |
1657 PeerSyncState::DownloadingStale(_) |
1658 PeerSyncState::DownloadingGap(_) |
1659 PeerSyncState::DownloadingState => {
1660 self.actions
1662 .push(SyncingAction::CancelRequest { peer_id, key: Self::STRATEGY_KEY });
1663 self.add_peer(peer_id, peer_sync.best_hash, peer_sync.best_number);
1664 },
1665 PeerSyncState::DownloadingJustification(_) => {
1666 trace!(
1670 target: LOG_TARGET,
1671 "Keeping peer {} after restart, updating common number from={} => to={} (our best).",
1672 peer_id,
1673 peer_sync.common_number,
1674 self.best_queued_number,
1675 );
1676 peer_sync.common_number = self.best_queued_number;
1677 self.peers.insert(peer_id, peer_sync);
1678 },
1679 }
1680 });
1681 }
1682
1683 fn reset_sync_start_point(&mut self) -> Result<(), ClientError> {
1686 let info = self.client.info();
1687 debug!(target: LOG_TARGET, "Restarting sync with client info {info:?}");
1688
1689 if matches!(self.mode, ChainSyncMode::LightState { .. }) && info.finalized_state.is_some() {
1690 warn!(
1691 target: LOG_TARGET,
1692 "Can't use fast sync mode with a partially synced database. Reverting to full sync mode."
1693 );
1694 self.mode = ChainSyncMode::Full;
1695 }
1696
1697 self.import_existing = false;
1698 self.best_queued_hash = info.best_hash;
1699 self.best_queued_number = info.best_number;
1700
1701 if self.mode == ChainSyncMode::Full &&
1702 self.client.block_status(info.best_hash)? != BlockStatus::InChainWithState
1703 {
1704 self.import_existing = true;
1705 if let Some((hash, number)) = info.finalized_state {
1707 debug!(target: LOG_TARGET, "Starting from finalized state #{number}");
1708 self.best_queued_hash = hash;
1709 self.best_queued_number = number;
1710 } else {
1711 debug!(target: LOG_TARGET, "Restarting from genesis");
1712 self.best_queued_hash = Default::default();
1713 self.best_queued_number = Zero::zero();
1714 }
1715 }
1716
1717 if let Some(BlockGap { start, end, .. }) = info.block_gap {
1718 let old_gap = self.gap_sync.take().map(|g| (g.best_queued_number, g.target));
1719 debug!(target: LOG_TARGET, "Starting gap sync #{start} - #{end} (old gap best and target: {old_gap:?})");
1720 self.gap_sync = Some(GapSync {
1721 best_queued_number: start - One::one(),
1722 target: end,
1723 blocks: BlockCollection::new(),
1724 });
1725 }
1726 trace!(
1727 target: LOG_TARGET,
1728 "Restarted sync at #{} ({:?})",
1729 self.best_queued_number,
1730 self.best_queued_hash,
1731 );
1732 Ok(())
1733 }
1734
1735 fn block_status(&self, hash: &B::Hash) -> Result<BlockStatus, ClientError> {
1737 if self.queue_blocks.contains(hash) {
1738 return Ok(BlockStatus::Queued);
1739 }
1740 self.client.block_status(*hash)
1741 }
1742
1743 fn is_known(&self, hash: &B::Hash) -> bool {
1745 self.block_status(hash).ok().map_or(false, |s| s != BlockStatus::Unknown)
1746 }
1747
1748 fn is_already_downloading(&self, hash: &B::Hash) -> bool {
1750 self.peers
1751 .iter()
1752 .any(|(_, p)| p.state == PeerSyncState::DownloadingStale(*hash))
1753 }
1754
1755 fn ready_blocks(&mut self) -> Vec<IncomingBlock<B>> {
1757 self.blocks
1758 .ready_blocks(self.best_queued_number + One::one())
1759 .into_iter()
1760 .map(|block_data| {
1761 let justifications = block_data
1762 .block
1763 .justifications
1764 .or_else(|| legacy_justification_mapping(block_data.block.justification));
1765 IncomingBlock {
1766 hash: block_data.block.hash,
1767 header: block_data.block.header,
1768 body: block_data.block.body,
1769 indexed_body: block_data.block.indexed_body,
1770 justifications,
1771 origin: block_data.origin,
1772 allow_missing_state: true,
1773 import_existing: self.import_existing,
1774 skip_execution: self.skip_execution(),
1775 state: None,
1776 }
1777 })
1778 .collect()
1779 }
1780
1781 fn justification_requests(&mut self) -> Vec<(PeerId, BlockRequest<B>)> {
1783 let peers = &mut self.peers;
1784 let mut matcher = self.extra_justifications.matcher();
1785 std::iter::from_fn(move || {
1786 if let Some((peer, request)) = matcher.next(peers) {
1787 peers
1788 .get_mut(&peer)
1789 .expect(
1790 "`Matcher::next` guarantees the `PeerId` comes from the given peers; qed",
1791 )
1792 .state = PeerSyncState::DownloadingJustification(request.0);
1793 let req = BlockRequest::<B> {
1794 id: 0,
1795 fields: BlockAttributes::JUSTIFICATION,
1796 from: FromBlock::Hash(request.0),
1797 direction: Direction::Ascending,
1798 max: Some(1),
1799 };
1800 Some((peer, req))
1801 } else {
1802 None
1803 }
1804 })
1805 .collect()
1806 }
1807
1808 fn block_requests(&mut self) -> Vec<(PeerId, BlockRequest<B>)> {
1810 if self.allowed_requests.is_empty() || self.state_sync.is_some() {
1811 return Vec::new();
1812 }
1813
1814 if self.queue_blocks.len() > MAX_IMPORTING_BLOCKS {
1815 trace!(target: LOG_TARGET, "Too many blocks in the queue.");
1816 return Vec::new();
1817 }
1818 let is_major_syncing = self.status().state.is_major_syncing();
1819 let attrs = self.required_block_attributes();
1820 let blocks = &mut self.blocks;
1821 let fork_targets = &mut self.fork_targets;
1822 let last_finalized =
1823 std::cmp::min(self.best_queued_number, self.client.info().finalized_number);
1824 let best_queued = self.best_queued_number;
1825 let client = &self.client;
1826 let queue_blocks = &self.queue_blocks;
1827 let allowed_requests = self.allowed_requests.clone();
1828 let max_parallel = if is_major_syncing { 1 } else { self.max_parallel_downloads };
1829 let max_blocks_per_request = self.max_blocks_per_request;
1830 let gap_sync = &mut self.gap_sync;
1831 let disconnected_peers = &mut self.disconnected_peers;
1832 let metrics = self.metrics.as_ref();
1833 let requests = self
1834 .peers
1835 .iter_mut()
1836 .filter_map(move |(&id, peer)| {
1837 if !peer.state.is_available() ||
1838 !allowed_requests.contains(&id) ||
1839 !disconnected_peers.is_peer_available(&id)
1840 {
1841 return None;
1842 }
1843
1844 if best_queued.saturating_sub(peer.common_number) >
1850 MAX_BLOCKS_TO_LOOK_BACKWARDS.into() &&
1851 best_queued < peer.best_number &&
1852 peer.common_number < last_finalized &&
1853 queue_blocks.len() <= MAJOR_SYNC_BLOCKS as usize
1854 {
1855 trace!(
1856 target: LOG_TARGET,
1857 "Peer {:?} common block {} too far behind of our best {}. Starting ancestry search.",
1858 id,
1859 peer.common_number,
1860 best_queued,
1861 );
1862 let current = std::cmp::min(peer.best_number, best_queued);
1863 peer.state = PeerSyncState::AncestorSearch {
1864 current,
1865 start: best_queued,
1866 state: AncestorSearchState::ExponentialBackoff(One::one()),
1867 };
1868 Some((id, ancestry_request::<B>(current)))
1869 } else if let Some((range, req)) = peer_block_request(
1870 &id,
1871 peer,
1872 blocks,
1873 attrs,
1874 max_parallel,
1875 max_blocks_per_request,
1876 last_finalized,
1877 best_queued,
1878 ) {
1879 peer.state = PeerSyncState::DownloadingNew(range.start);
1880 trace!(
1881 target: LOG_TARGET,
1882 "New block request for {}, (best:{}, common:{}) {:?}",
1883 id,
1884 peer.best_number,
1885 peer.common_number,
1886 req,
1887 );
1888 Some((id, req))
1889 } else if let Some((hash, req)) = fork_sync_request(
1890 &id,
1891 fork_targets,
1892 best_queued,
1893 last_finalized,
1894 attrs,
1895 |hash| {
1896 if queue_blocks.contains(hash) {
1897 BlockStatus::Queued
1898 } else {
1899 client.block_status(*hash).unwrap_or(BlockStatus::Unknown)
1900 }
1901 },
1902 max_blocks_per_request,
1903 metrics,
1904 ) {
1905 trace!(target: LOG_TARGET, "Downloading fork {hash:?} from {id}");
1906 peer.state = PeerSyncState::DownloadingStale(hash);
1907 Some((id, req))
1908 } else if let Some((range, req)) = gap_sync.as_mut().and_then(|sync| {
1909 peer_gap_block_request(
1910 &id,
1911 peer,
1912 &mut sync.blocks,
1913 attrs,
1914 sync.target,
1915 sync.best_queued_number,
1916 max_blocks_per_request,
1917 )
1918 }) {
1919 peer.state = PeerSyncState::DownloadingGap(range.start);
1920 trace!(
1921 target: LOG_TARGET,
1922 "New gap block request for {}, (best:{}, common:{}) {:?}",
1923 id,
1924 peer.best_number,
1925 peer.common_number,
1926 req,
1927 );
1928 Some((id, req))
1929 } else {
1930 None
1931 }
1932 })
1933 .collect::<Vec<_>>();
1934
1935 if !requests.is_empty() {
1938 self.allowed_requests.take();
1939 }
1940
1941 requests
1942 }
1943
1944 fn state_request(&mut self) -> Option<(PeerId, StateRequest)> {
1946 if self.allowed_requests.is_empty() {
1947 return None;
1948 }
1949 if self.state_sync.is_some() &&
1950 self.peers.iter().any(|(_, peer)| peer.state == PeerSyncState::DownloadingState)
1951 {
1952 return None;
1954 }
1955 if let Some(sync) = &self.state_sync {
1956 if sync.is_complete() {
1957 return None;
1958 }
1959
1960 for (id, peer) in self.peers.iter_mut() {
1961 if peer.state.is_available() &&
1962 peer.common_number >= sync.target_number() &&
1963 self.disconnected_peers.is_peer_available(&id)
1964 {
1965 peer.state = PeerSyncState::DownloadingState;
1966 let request = sync.next_request();
1967 trace!(target: LOG_TARGET, "New StateRequest for {}: {:?}", id, request);
1968 self.allowed_requests.clear();
1969 return Some((*id, request));
1970 }
1971 }
1972 }
1973 None
1974 }
1975
1976 #[must_use]
1977 fn on_state_data(&mut self, peer_id: &PeerId, response: &[u8]) -> Result<(), BadPeer> {
1978 let response = match StateResponse::decode(response) {
1979 Ok(response) => response,
1980 Err(error) => {
1981 debug!(
1982 target: LOG_TARGET,
1983 "Failed to decode state response from peer {peer_id:?}: {error:?}.",
1984 );
1985
1986 return Err(BadPeer(*peer_id, rep::BAD_RESPONSE));
1987 },
1988 };
1989
1990 if let Some(peer) = self.peers.get_mut(peer_id) {
1991 if let PeerSyncState::DownloadingState = peer.state {
1992 peer.state = PeerSyncState::Available;
1993 self.allowed_requests.set_all();
1994 }
1995 }
1996 let import_result = if let Some(sync) = &mut self.state_sync {
1997 debug!(
1998 target: LOG_TARGET,
1999 "Importing state data from {} with {} keys, {} proof nodes.",
2000 peer_id,
2001 response.entries.len(),
2002 response.proof.len(),
2003 );
2004 sync.import(response)
2005 } else {
2006 debug!(target: LOG_TARGET, "Ignored obsolete state response from {peer_id}");
2007 return Err(BadPeer(*peer_id, rep::NOT_REQUESTED));
2008 };
2009
2010 match import_result {
2011 ImportResult::Import(hash, header, state, body, justifications) => {
2012 let origin = BlockOrigin::NetworkInitialSync;
2013 let block = IncomingBlock {
2014 hash,
2015 header: Some(header),
2016 body,
2017 indexed_body: None,
2018 justifications,
2019 origin: None,
2020 allow_missing_state: true,
2021 import_existing: true,
2022 skip_execution: self.skip_execution(),
2023 state: Some(state),
2024 };
2025 debug!(target: LOG_TARGET, "State download is complete. Import is queued");
2026 self.actions.push(SyncingAction::ImportBlocks { origin, blocks: vec![block] });
2027 Ok(())
2028 },
2029 ImportResult::Continue => Ok(()),
2030 ImportResult::BadResponse => {
2031 debug!(target: LOG_TARGET, "Bad state data received from {peer_id}");
2032 Err(BadPeer(*peer_id, rep::BAD_BLOCK))
2033 },
2034 }
2035 }
2036
2037 fn attempt_state_sync(
2038 &mut self,
2039 finalized_hash: B::Hash,
2040 finalized_number: NumberFor<B>,
2041 skip_proofs: bool,
2042 ) {
2043 let mut heads: Vec<_> = self.peers.values().map(|peer| peer.best_number).collect();
2044 heads.sort();
2045 let median = heads[heads.len() / 2];
2046 if finalized_number + STATE_SYNC_FINALITY_THRESHOLD.saturated_into() >= median {
2047 if let Ok(Some(header)) = self.client.header(finalized_hash) {
2048 log::debug!(
2049 target: LOG_TARGET,
2050 "Starting state sync for #{finalized_number} ({finalized_hash})",
2051 );
2052 self.state_sync =
2053 Some(StateSync::new(self.client.clone(), header, None, None, skip_proofs));
2054 self.allowed_requests.set_all();
2055 } else {
2056 log::error!(
2057 target: LOG_TARGET,
2058 "Failed to start state sync: header for finalized block \
2059 #{finalized_number} ({finalized_hash}) is not available",
2060 );
2061 debug_assert!(false);
2062 }
2063 }
2064 }
2065
2066 #[cfg(test)]
2068 #[must_use]
2069 fn take_actions(&mut self) -> impl Iterator<Item = SyncingAction<B>> {
2070 std::mem::take(&mut self.actions).into_iter()
2071 }
2072}
2073
2074fn legacy_justification_mapping(
2079 justification: Option<EncodedJustification>,
2080) -> Option<Justifications> {
2081 justification.map(|just| (*b"FRNK", just).into())
2082}
2083
2084fn ancestry_request<B: BlockT>(block: NumberFor<B>) -> BlockRequest<B> {
2087 BlockRequest::<B> {
2088 id: 0,
2089 fields: BlockAttributes::HEADER | BlockAttributes::JUSTIFICATION,
2090 from: FromBlock::Number(block),
2091 direction: Direction::Ascending,
2092 max: Some(1),
2093 }
2094}
2095
2096#[derive(Copy, Clone, Eq, PartialEq, Debug)]
2099pub(crate) enum AncestorSearchState<B: BlockT> {
2100 ExponentialBackoff(NumberFor<B>),
2103 BinarySearch(NumberFor<B>, NumberFor<B>),
2106}
2107
2108fn handle_ancestor_search_state<B: BlockT>(
2116 state: &AncestorSearchState<B>,
2117 curr_block_num: NumberFor<B>,
2118 block_hash_match: bool,
2119) -> Option<(AncestorSearchState<B>, NumberFor<B>)> {
2120 let two = <NumberFor<B>>::one() + <NumberFor<B>>::one();
2121 match state {
2122 AncestorSearchState::ExponentialBackoff(next_distance_to_tip) => {
2123 let next_distance_to_tip = *next_distance_to_tip;
2124 if block_hash_match && next_distance_to_tip == One::one() {
2125 return None;
2128 }
2129 if block_hash_match {
2130 let left = curr_block_num;
2131 let right = left + next_distance_to_tip / two;
2132 let middle = left + (right - left) / two;
2133 Some((AncestorSearchState::BinarySearch(left, right), middle))
2134 } else {
2135 let next_block_num =
2136 curr_block_num.checked_sub(&next_distance_to_tip).unwrap_or_else(Zero::zero);
2137 let next_distance_to_tip = next_distance_to_tip * two;
2138 Some((
2139 AncestorSearchState::ExponentialBackoff(next_distance_to_tip),
2140 next_block_num,
2141 ))
2142 }
2143 },
2144 AncestorSearchState::BinarySearch(mut left, mut right) => {
2145 if left >= curr_block_num {
2146 return None;
2147 }
2148 if block_hash_match {
2149 left = curr_block_num;
2150 } else {
2151 right = curr_block_num;
2152 }
2153 assert!(right >= left);
2154 let middle = left + (right - left) / two;
2155 if middle == curr_block_num {
2156 None
2157 } else {
2158 Some((AncestorSearchState::BinarySearch(left, right), middle))
2159 }
2160 },
2161 }
2162}
2163
2164fn peer_block_request<B: BlockT>(
2166 id: &PeerId,
2167 peer: &PeerSync<B>,
2168 blocks: &mut BlockCollection<B>,
2169 attrs: BlockAttributes,
2170 max_parallel_downloads: u32,
2171 max_blocks_per_request: u32,
2172 finalized: NumberFor<B>,
2173 best_num: NumberFor<B>,
2174) -> Option<(Range<NumberFor<B>>, BlockRequest<B>)> {
2175 if best_num >= peer.best_number {
2176 return None;
2178 } else if peer.common_number < finalized {
2179 trace!(
2180 target: LOG_TARGET,
2181 "Requesting pre-finalized chain from {:?}, common={}, finalized={}, peer best={}, our best={}",
2182 id, peer.common_number, finalized, peer.best_number, best_num,
2183 );
2184 }
2185 let range = blocks.needed_blocks(
2186 *id,
2187 max_blocks_per_request,
2188 peer.best_number,
2189 peer.common_number,
2190 max_parallel_downloads,
2191 MAX_DOWNLOAD_AHEAD,
2192 )?;
2193
2194 let last = range.end.saturating_sub(One::one());
2196
2197 let from = if peer.best_number == last {
2198 FromBlock::Hash(peer.best_hash)
2199 } else {
2200 FromBlock::Number(last)
2201 };
2202
2203 let request = BlockRequest::<B> {
2204 id: 0,
2205 fields: attrs,
2206 from,
2207 direction: Direction::Descending,
2208 max: Some((range.end - range.start).saturated_into::<u32>()),
2209 };
2210
2211 Some((range, request))
2212}
2213
2214fn peer_gap_block_request<B: BlockT>(
2216 id: &PeerId,
2217 peer: &PeerSync<B>,
2218 blocks: &mut BlockCollection<B>,
2219 attrs: BlockAttributes,
2220 target: NumberFor<B>,
2221 common_number: NumberFor<B>,
2222 max_blocks_per_request: u32,
2223) -> Option<(Range<NumberFor<B>>, BlockRequest<B>)> {
2224 let range = blocks.needed_blocks(
2225 *id,
2226 max_blocks_per_request,
2227 std::cmp::min(peer.best_number, target),
2228 common_number,
2229 1,
2230 MAX_DOWNLOAD_AHEAD,
2231 )?;
2232
2233 let last = range.end.saturating_sub(One::one());
2235 let from = FromBlock::Number(last);
2236
2237 let request = BlockRequest::<B> {
2238 id: 0,
2239 fields: attrs,
2240 from,
2241 direction: Direction::Descending,
2242 max: Some((range.end - range.start).saturated_into::<u32>()),
2243 };
2244 Some((range, request))
2245}
2246
2247fn fork_sync_request<B: BlockT>(
2249 id: &PeerId,
2250 fork_targets: &mut HashMap<B::Hash, ForkTarget<B>>,
2251 best_num: NumberFor<B>,
2252 finalized: NumberFor<B>,
2253 attributes: BlockAttributes,
2254 check_block: impl Fn(&B::Hash) -> BlockStatus,
2255 max_blocks_per_request: u32,
2256 metrics: Option<&Metrics>,
2257) -> Option<(B::Hash, BlockRequest<B>)> {
2258 fork_targets.retain(|hash, r| {
2259 if r.number <= finalized {
2260 trace!(
2261 target: LOG_TARGET,
2262 "Removed expired fork sync request {:?} (#{})",
2263 hash,
2264 r.number,
2265 );
2266 return false;
2267 }
2268 if check_block(hash) != BlockStatus::Unknown {
2269 trace!(
2270 target: LOG_TARGET,
2271 "Removed obsolete fork sync request {:?} (#{})",
2272 hash,
2273 r.number,
2274 );
2275 return false;
2276 }
2277 true
2278 });
2279 if let Some(metrics) = metrics {
2280 metrics.fork_targets.set(fork_targets.len().try_into().unwrap_or(u64::MAX));
2281 }
2282 for (hash, r) in fork_targets {
2283 if !r.peers.contains(&id) {
2284 continue;
2285 }
2286 if r.number <= best_num ||
2289 (r.number - best_num).saturated_into::<u32>() < max_blocks_per_request as u32
2290 {
2291 let parent_status = r.parent_hash.as_ref().map_or(BlockStatus::Unknown, check_block);
2292 let count = if parent_status == BlockStatus::Unknown {
2293 (r.number - finalized).saturated_into::<u32>() } else {
2295 1
2297 };
2298 trace!(
2299 target: LOG_TARGET,
2300 "Downloading requested fork {hash:?} from {id}, {count} blocks",
2301 );
2302 return Some((
2303 *hash,
2304 BlockRequest::<B> {
2305 id: 0,
2306 fields: attributes,
2307 from: FromBlock::Hash(*hash),
2308 direction: Direction::Descending,
2309 max: Some(count),
2310 },
2311 ));
2312 } else {
2313 trace!(target: LOG_TARGET, "Fork too far in the future: {:?} (#{})", hash, r.number);
2314 }
2315 }
2316 None
2317}
2318
2319fn is_descendent_of<Block, T>(
2321 client: &T,
2322 base: &Block::Hash,
2323 block: &Block::Hash,
2324) -> sp_blockchain::Result<bool>
2325where
2326 Block: BlockT,
2327 T: HeaderMetadata<Block, Error = sp_blockchain::Error> + ?Sized,
2328{
2329 if base == block {
2330 return Ok(false);
2331 }
2332
2333 let ancestor = sp_blockchain::lowest_common_ancestor(client, *block, *base)?;
2334
2335 Ok(ancestor.hash == *base)
2336}
2337
2338pub fn validate_blocks<Block: BlockT>(
2343 blocks: &Vec<BlockData<Block>>,
2344 peer_id: &PeerId,
2345 request: Option<BlockRequest<Block>>,
2346) -> Result<Option<NumberFor<Block>>, BadPeer> {
2347 if let Some(request) = request {
2348 if Some(blocks.len() as _) > request.max {
2349 debug!(
2350 target: LOG_TARGET,
2351 "Received more blocks than requested from {}. Expected in maximum {:?}, got {}.",
2352 peer_id,
2353 request.max,
2354 blocks.len(),
2355 );
2356
2357 return Err(BadPeer(*peer_id, rep::NOT_REQUESTED));
2358 }
2359
2360 let block_header =
2361 if request.direction == Direction::Descending { blocks.last() } else { blocks.first() }
2362 .and_then(|b| b.header.as_ref());
2363
2364 let expected_block = block_header.as_ref().map_or(false, |h| match request.from {
2365 FromBlock::Hash(hash) => h.hash() == hash,
2366 FromBlock::Number(n) => h.number() == &n,
2367 });
2368
2369 if !expected_block {
2370 debug!(
2371 target: LOG_TARGET,
2372 "Received block that was not requested. Requested {:?}, got {:?}.",
2373 request.from,
2374 block_header,
2375 );
2376
2377 return Err(BadPeer(*peer_id, rep::NOT_REQUESTED));
2378 }
2379
2380 if request.fields.contains(BlockAttributes::HEADER) &&
2381 blocks.iter().any(|b| b.header.is_none())
2382 {
2383 trace!(
2384 target: LOG_TARGET,
2385 "Missing requested header for a block in response from {peer_id}.",
2386 );
2387
2388 return Err(BadPeer(*peer_id, rep::BAD_RESPONSE));
2389 }
2390
2391 if request.fields.contains(BlockAttributes::BODY) && blocks.iter().any(|b| b.body.is_none())
2392 {
2393 trace!(
2394 target: LOG_TARGET,
2395 "Missing requested body for a block in response from {peer_id}.",
2396 );
2397
2398 return Err(BadPeer(*peer_id, rep::BAD_RESPONSE));
2399 }
2400 }
2401
2402 for b in blocks {
2403 if let Some(header) = &b.header {
2404 let hash = header.hash();
2405 if hash != b.hash {
2406 debug!(
2407 target: LOG_TARGET,
2408 "Bad header received from {}. Expected hash {:?}, got {:?}",
2409 peer_id,
2410 b.hash,
2411 hash,
2412 );
2413 return Err(BadPeer(*peer_id, rep::BAD_BLOCK));
2414 }
2415 }
2416 }
2417
2418 Ok(blocks.first().and_then(|b| b.header.as_ref()).map(|h| *h.number()))
2419}