1use crate::{
32 block_relay_protocol::{BlockDownloader, BlockResponseError},
33 blocks::BlockCollection,
34 justification_requests::ExtraRequests,
35 schema::v1::{StateRequest, StateResponse},
36 service::network::NetworkServiceHandle,
37 strategy::{
38 disconnected_peers::DisconnectedPeers,
39 state_sync::{ImportResult, StateSync, StateSyncProvider},
40 warp::{WarpSyncPhase, WarpSyncProgress},
41 StrategyKey, SyncingAction, SyncingStrategy,
42 },
43 types::{BadPeer, SyncState, SyncStatus},
44 LOG_TARGET,
45};
46
47use futures::{channel::oneshot, FutureExt};
48use log::{debug, error, info, trace, warn};
49use prometheus_endpoint::{register, Gauge, PrometheusError, Registry, U64};
50use prost::Message;
51use sc_client_api::{blockchain::BlockGap, BlockBackend, ProofProvider};
52use sc_consensus::{BlockImportError, BlockImportStatus, IncomingBlock};
53use sc_network::{IfDisconnected, ProtocolName};
54use sc_network_common::sync::message::{
55 BlockAnnounce, BlockAttributes, BlockData, BlockRequest, BlockResponse, Direction, FromBlock,
56};
57use sc_network_types::PeerId;
58use sp_arithmetic::traits::Saturating;
59use sp_blockchain::{Error as ClientError, HeaderBackend, HeaderMetadata};
60use sp_consensus::{BlockOrigin, BlockStatus};
61use sp_runtime::{
62 traits::{
63 Block as BlockT, CheckedSub, Header as HeaderT, NumberFor, One, SaturatedConversion, Zero,
64 },
65 EncodedJustification, Justifications,
66};
67
68use std::{
69 any::Any,
70 collections::{HashMap, HashSet},
71 ops::Range,
72 sync::Arc,
73};
74
75#[cfg(test)]
76mod test;
77
78const MAX_IMPORTING_BLOCKS: usize = 2048;
80
81const MAX_DOWNLOAD_AHEAD: u32 = 2048;
83
84const MAX_BLOCKS_TO_LOOK_BACKWARDS: u32 = MAX_DOWNLOAD_AHEAD / 2;
87
88const STATE_SYNC_FINALITY_THRESHOLD: u32 = 8;
90
91const MAJOR_SYNC_BLOCKS: u8 = 5;
97
98mod rep {
99 use sc_network::ReputationChange as Rep;
100 pub const BLOCKCHAIN_READ_ERROR: Rep = Rep::new(-(1 << 16), "DB Error");
103
104 pub const GENESIS_MISMATCH: Rep = Rep::new(i32::MIN, "Genesis mismatch");
107
108 pub const INCOMPLETE_HEADER: Rep = Rep::new(-(1 << 20), "Incomplete header");
110
111 pub const VERIFICATION_FAIL: Rep = Rep::new(-(1 << 29), "Block verification failed");
113
114 pub const BAD_BLOCK: Rep = Rep::new(-(1 << 29), "Bad block");
116
117 pub const NO_BLOCK: Rep = Rep::new(-(1 << 29), "No requested block data");
119
120 pub const NOT_REQUESTED: Rep = Rep::new(-(1 << 29), "Not requested block data");
122
123 pub const BAD_JUSTIFICATION: Rep = Rep::new(-(1 << 16), "Bad justification");
125
126 pub const UNKNOWN_ANCESTOR: Rep = Rep::new(-(1 << 16), "DB Error");
128
129 pub const BAD_RESPONSE: Rep = Rep::new(-(1 << 12), "Incomplete response");
131
132 pub const BAD_MESSAGE: Rep = Rep::new(-(1 << 12), "Bad message");
134}
135
136struct Metrics {
137 queued_blocks: Gauge<U64>,
138 fork_targets: Gauge<U64>,
139}
140
141impl Metrics {
142 fn register(r: &Registry) -> Result<Self, PrometheusError> {
143 Ok(Self {
144 queued_blocks: {
145 let g =
146 Gauge::new("substrate_sync_queued_blocks", "Number of blocks in import queue")?;
147 register(g, r)?
148 },
149 fork_targets: {
150 let g = Gauge::new("substrate_sync_fork_targets", "Number of fork sync targets")?;
151 register(g, r)?
152 },
153 })
154 }
155}
156
157#[derive(Debug, Clone)]
158enum AllowedRequests {
159 Some(HashSet<PeerId>),
160 All,
161}
162
163impl AllowedRequests {
164 fn add(&mut self, id: &PeerId) {
165 if let Self::Some(ref mut set) = self {
166 set.insert(*id);
167 }
168 }
169
170 fn take(&mut self) -> Self {
171 std::mem::take(self)
172 }
173
174 fn set_all(&mut self) {
175 *self = Self::All;
176 }
177
178 fn contains(&self, id: &PeerId) -> bool {
179 match self {
180 Self::Some(set) => set.contains(id),
181 Self::All => true,
182 }
183 }
184
185 fn is_empty(&self) -> bool {
186 match self {
187 Self::Some(set) => set.is_empty(),
188 Self::All => false,
189 }
190 }
191
192 fn clear(&mut self) {
193 std::mem::take(self);
194 }
195}
196
197impl Default for AllowedRequests {
198 fn default() -> Self {
199 Self::Some(HashSet::default())
200 }
201}
202
203struct GapSync<B: BlockT> {
204 blocks: BlockCollection<B>,
205 best_queued_number: NumberFor<B>,
206 target: NumberFor<B>,
207}
208
209#[derive(Copy, Clone, Debug, Eq, PartialEq)]
211pub enum ChainSyncMode {
212 Full,
214 LightState {
216 skip_proofs: bool,
218 storage_chain_mode: bool,
220 },
221}
222
223#[derive(Debug, Clone)]
225pub(crate) struct PeerSync<B: BlockT> {
226 pub peer_id: PeerId,
228 pub common_number: NumberFor<B>,
231 pub best_hash: B::Hash,
233 pub best_number: NumberFor<B>,
235 pub state: PeerSyncState<B>,
238}
239
240impl<B: BlockT> PeerSync<B> {
241 fn update_common_number(&mut self, new_common: NumberFor<B>) {
243 if self.common_number < new_common {
244 trace!(
245 target: LOG_TARGET,
246 "Updating peer {} common number from={} => to={}.",
247 self.peer_id,
248 self.common_number,
249 new_common,
250 );
251 self.common_number = new_common;
252 }
253 }
254}
255
256struct ForkTarget<B: BlockT> {
257 number: NumberFor<B>,
258 parent_hash: Option<B::Hash>,
259 peers: HashSet<PeerId>,
260}
261
262#[derive(Copy, Clone, Eq, PartialEq, Debug)]
267pub(crate) enum PeerSyncState<B: BlockT> {
268 Available,
270 AncestorSearch { start: NumberFor<B>, current: NumberFor<B>, state: AncestorSearchState<B> },
272 DownloadingNew(NumberFor<B>),
274 DownloadingStale(B::Hash),
278 DownloadingJustification(B::Hash),
280 DownloadingState,
282 DownloadingGap(NumberFor<B>),
284}
285
286impl<B: BlockT> PeerSyncState<B> {
287 pub fn is_available(&self) -> bool {
288 matches!(self, Self::Available)
289 }
290}
291
292pub struct ChainSync<B: BlockT, Client> {
295 client: Arc<Client>,
297 peers: HashMap<PeerId, PeerSync<B>>,
299 disconnected_peers: DisconnectedPeers,
300 blocks: BlockCollection<B>,
302 best_queued_number: NumberFor<B>,
304 best_queued_hash: B::Hash,
306 mode: ChainSyncMode,
308 extra_justifications: ExtraRequests<B>,
310 queue_blocks: HashSet<B::Hash>,
313 pending_state_sync_attempt: Option<(B::Hash, NumberFor<B>, bool)>,
321 fork_targets: HashMap<B::Hash, ForkTarget<B>>,
323 allowed_requests: AllowedRequests,
325 max_parallel_downloads: u32,
327 max_blocks_per_request: u32,
329 state_request_protocol_name: ProtocolName,
331 downloaded_blocks: usize,
333 state_sync: Option<StateSync<B, Client>>,
335 import_existing: bool,
338 block_downloader: Arc<dyn BlockDownloader<B>>,
340 gap_sync: Option<GapSync<B>>,
342 actions: Vec<SyncingAction<B>>,
344 metrics: Option<Metrics>,
346}
347
348impl<B, Client> SyncingStrategy<B> for ChainSync<B, Client>
349where
350 B: BlockT,
351 Client: HeaderBackend<B>
352 + BlockBackend<B>
353 + HeaderMetadata<B, Error = sp_blockchain::Error>
354 + ProofProvider<B>
355 + Send
356 + Sync
357 + 'static,
358{
359 fn add_peer(&mut self, peer_id: PeerId, best_hash: B::Hash, best_number: NumberFor<B>) {
360 match self.add_peer_inner(peer_id, best_hash, best_number) {
361 Ok(Some(request)) => {
362 let action = self.create_block_request_action(peer_id, request);
363 self.actions.push(action);
364 },
365 Ok(None) => {},
366 Err(bad_peer) => self.actions.push(SyncingAction::DropPeer(bad_peer)),
367 }
368 }
369
370 fn remove_peer(&mut self, peer_id: &PeerId) {
371 self.blocks.clear_peer_download(peer_id);
372 if let Some(gap_sync) = &mut self.gap_sync {
373 gap_sync.blocks.clear_peer_download(peer_id)
374 }
375
376 if let Some(state) = self.peers.remove(peer_id) {
377 if !state.state.is_available() {
378 if let Some(bad_peer) =
379 self.disconnected_peers.on_disconnect_during_request(*peer_id)
380 {
381 self.actions.push(SyncingAction::DropPeer(bad_peer));
382 }
383 }
384 }
385
386 self.extra_justifications.peer_disconnected(peer_id);
387 self.allowed_requests.set_all();
388 self.fork_targets.retain(|_, target| {
389 target.peers.remove(peer_id);
390 !target.peers.is_empty()
391 });
392 if let Some(metrics) = &self.metrics {
393 metrics.fork_targets.set(self.fork_targets.len().try_into().unwrap_or(u64::MAX));
394 }
395
396 let blocks = self.ready_blocks();
397
398 if !blocks.is_empty() {
399 self.validate_and_queue_blocks(blocks, false);
400 }
401 }
402
403 fn on_validated_block_announce(
404 &mut self,
405 is_best: bool,
406 peer_id: PeerId,
407 announce: &BlockAnnounce<B::Header>,
408 ) -> Option<(B::Hash, NumberFor<B>)> {
409 let number = *announce.header.number();
410 let hash = announce.header.hash();
411 let parent_status =
412 self.block_status(announce.header.parent_hash()).unwrap_or(BlockStatus::Unknown);
413 let known_parent = parent_status != BlockStatus::Unknown;
414 let ancient_parent = parent_status == BlockStatus::InChainPruned;
415
416 let known = self.is_known(&hash);
417 let peer = if let Some(peer) = self.peers.get_mut(&peer_id) {
418 peer
419 } else {
420 error!(target: LOG_TARGET, "๐ Called `on_validated_block_announce` with a bad peer ID {peer_id}");
421 return Some((hash, number))
422 };
423
424 if let PeerSyncState::AncestorSearch { .. } = peer.state {
425 trace!(target: LOG_TARGET, "Peer {} is in the ancestor search state.", peer_id);
426 return None
427 }
428
429 let peer_info = is_best.then(|| {
430 peer.best_number = number;
432 peer.best_hash = hash;
433
434 (hash, number)
435 });
436
437 if is_best {
440 if known && self.best_queued_number >= number {
441 self.update_peer_common_number(&peer_id, number);
442 } else if announce.header.parent_hash() == &self.best_queued_hash ||
443 known_parent && self.best_queued_number >= number
444 {
445 self.update_peer_common_number(&peer_id, number.saturating_sub(One::one()));
446 }
447 }
448 self.allowed_requests.add(&peer_id);
449
450 if known || self.is_already_downloading(&hash) {
452 trace!(target: LOG_TARGET, "Known block announce from {}: {}", peer_id, hash);
453 if let Some(target) = self.fork_targets.get_mut(&hash) {
454 target.peers.insert(peer_id);
455 }
456 return peer_info
457 }
458
459 if ancient_parent {
460 trace!(
461 target: LOG_TARGET,
462 "Ignored ancient block announced from {}: {} {:?}",
463 peer_id,
464 hash,
465 announce.header,
466 );
467 return peer_info
468 }
469
470 if self.status().state == SyncState::Idle {
471 trace!(
472 target: LOG_TARGET,
473 "Added sync target for block announced from {}: {} {:?}",
474 peer_id,
475 hash,
476 announce.summary(),
477 );
478 self.fork_targets
479 .entry(hash)
480 .or_insert_with(|| {
481 if let Some(metrics) = &self.metrics {
482 metrics.fork_targets.inc();
483 }
484
485 ForkTarget {
486 number,
487 parent_hash: Some(*announce.header.parent_hash()),
488 peers: Default::default(),
489 }
490 })
491 .peers
492 .insert(peer_id);
493 }
494
495 peer_info
496 }
497
498 fn set_sync_fork_request(
500 &mut self,
501 mut peers: Vec<PeerId>,
502 hash: &B::Hash,
503 number: NumberFor<B>,
504 ) {
505 if peers.is_empty() {
506 peers = self
507 .peers
508 .iter()
509 .filter(|(_, peer)| peer.best_number >= number)
511 .map(|(id, _)| *id)
512 .collect();
513
514 debug!(
515 target: LOG_TARGET,
516 "Explicit sync request for block {hash:?} with no peers specified. \
517 Syncing from these peers {peers:?} instead.",
518 );
519 } else {
520 debug!(
521 target: LOG_TARGET,
522 "Explicit sync request for block {hash:?} with {peers:?}",
523 );
524 }
525
526 if self.is_known(hash) {
527 debug!(target: LOG_TARGET, "Refusing to sync known hash {hash:?}");
528 return
529 }
530
531 trace!(target: LOG_TARGET, "Downloading requested old fork {hash:?}");
532 for peer_id in &peers {
533 if let Some(peer) = self.peers.get_mut(peer_id) {
534 if let PeerSyncState::AncestorSearch { .. } = peer.state {
535 continue
536 }
537
538 if number > peer.best_number {
539 peer.best_number = number;
540 peer.best_hash = *hash;
541 }
542 self.allowed_requests.add(peer_id);
543 }
544 }
545
546 self.fork_targets
547 .entry(*hash)
548 .or_insert_with(|| {
549 if let Some(metrics) = &self.metrics {
550 metrics.fork_targets.inc();
551 }
552
553 ForkTarget { number, peers: Default::default(), parent_hash: None }
554 })
555 .peers
556 .extend(peers);
557 }
558
559 fn request_justification(&mut self, hash: &B::Hash, number: NumberFor<B>) {
560 let client = &self.client;
561 self.extra_justifications
562 .schedule((*hash, number), |base, block| is_descendent_of(&**client, base, block))
563 }
564
565 fn clear_justification_requests(&mut self) {
566 self.extra_justifications.reset();
567 }
568
569 fn on_justification_import(&mut self, hash: B::Hash, number: NumberFor<B>, success: bool) {
570 let finalization_result = if success { Ok((hash, number)) } else { Err(()) };
571 self.extra_justifications
572 .try_finalize_root((hash, number), finalization_result, true);
573 self.allowed_requests.set_all();
574 }
575
576 fn on_generic_response(
577 &mut self,
578 peer_id: &PeerId,
579 key: StrategyKey,
580 protocol_name: ProtocolName,
581 response: Box<dyn Any + Send>,
582 ) {
583 if Self::STRATEGY_KEY != key {
584 warn!(
585 target: LOG_TARGET,
586 "Unexpected generic response strategy key {key:?}, protocol {protocol_name}",
587 );
588 debug_assert!(false);
589 return;
590 }
591
592 if protocol_name == self.state_request_protocol_name {
593 let Ok(response) = response.downcast::<Vec<u8>>() else {
594 warn!(target: LOG_TARGET, "Failed to downcast state response");
595 debug_assert!(false);
596 return;
597 };
598
599 if let Err(bad_peer) = self.on_state_data(&peer_id, &response) {
600 self.actions.push(SyncingAction::DropPeer(bad_peer));
601 }
602 } else if &protocol_name == self.block_downloader.protocol_name() {
603 let Ok(response) = response
604 .downcast::<(BlockRequest<B>, Result<Vec<BlockData<B>>, BlockResponseError>)>()
605 else {
606 warn!(target: LOG_TARGET, "Failed to downcast block response");
607 debug_assert!(false);
608 return;
609 };
610
611 let (request, response) = *response;
612 let blocks = match response {
613 Ok(blocks) => blocks,
614 Err(BlockResponseError::DecodeFailed(e)) => {
615 debug!(
616 target: LOG_TARGET,
617 "Failed to decode block response from peer {:?}: {:?}.",
618 peer_id,
619 e
620 );
621 self.actions.push(SyncingAction::DropPeer(BadPeer(*peer_id, rep::BAD_MESSAGE)));
622 return;
623 },
624 Err(BlockResponseError::ExtractionFailed(e)) => {
625 debug!(
626 target: LOG_TARGET,
627 "Failed to extract blocks from peer response {:?}: {:?}.",
628 peer_id,
629 e
630 );
631 self.actions.push(SyncingAction::DropPeer(BadPeer(*peer_id, rep::BAD_MESSAGE)));
632 return;
633 },
634 };
635
636 if let Err(bad_peer) = self.on_block_response(peer_id, key, request, blocks) {
637 self.actions.push(SyncingAction::DropPeer(bad_peer));
638 }
639 } else {
640 warn!(
641 target: LOG_TARGET,
642 "Unexpected generic response protocol {protocol_name}, strategy key \
643 {key:?}",
644 );
645 debug_assert!(false);
646 }
647 }
648
649 fn on_blocks_processed(
650 &mut self,
651 imported: usize,
652 count: usize,
653 results: Vec<(Result<BlockImportStatus<NumberFor<B>>, BlockImportError>, B::Hash)>,
654 ) {
655 trace!(target: LOG_TARGET, "Imported {imported} of {count}");
656
657 let mut has_error = false;
658 for (_, hash) in &results {
659 if self.queue_blocks.remove(hash) {
660 if let Some(metrics) = &self.metrics {
661 metrics.queued_blocks.dec();
662 }
663 }
664 self.blocks.clear_queued(hash);
665 if let Some(gap_sync) = &mut self.gap_sync {
666 gap_sync.blocks.clear_queued(hash);
667 }
668 }
669 for (result, hash) in results {
670 if has_error {
671 break
672 }
673
674 has_error |= result.is_err();
675
676 match result {
677 Ok(BlockImportStatus::ImportedKnown(number, peer_id)) => {
678 if let Some(peer) = peer_id {
679 self.update_peer_common_number(&peer, number);
680 }
681 self.complete_gap_if_target(number);
682 },
683 Ok(BlockImportStatus::ImportedUnknown(number, aux, peer_id)) => {
684 if aux.clear_justification_requests {
685 trace!(
686 target: LOG_TARGET,
687 "Block imported clears all pending justification requests {number}: {hash:?}",
688 );
689 self.clear_justification_requests();
690 }
691
692 if aux.needs_justification {
693 trace!(
694 target: LOG_TARGET,
695 "Block imported but requires justification {number}: {hash:?}",
696 );
697 self.request_justification(&hash, number);
698 }
699
700 if aux.bad_justification {
701 if let Some(ref peer) = peer_id {
702 warn!("๐ Sent block with bad justification to import");
703 self.actions.push(SyncingAction::DropPeer(BadPeer(
704 *peer,
705 rep::BAD_JUSTIFICATION,
706 )));
707 }
708 }
709
710 if let Some(peer) = peer_id {
711 self.update_peer_common_number(&peer, number);
712 }
713 let state_sync_complete =
714 self.state_sync.as_ref().map_or(false, |s| s.target_hash() == hash);
715 if state_sync_complete {
716 info!(
717 target: LOG_TARGET,
718 "State sync is complete ({} MiB), restarting block sync.",
719 self.state_sync.as_ref().map_or(0, |s| s.progress().size / (1024 * 1024)),
720 );
721 self.state_sync = None;
722 self.mode = ChainSyncMode::Full;
723 self.restart();
724 }
725
726 self.complete_gap_if_target(number);
727 },
728 Err(BlockImportError::IncompleteHeader(peer_id)) =>
729 if let Some(peer) = peer_id {
730 warn!(
731 target: LOG_TARGET,
732 "๐ Peer sent block with incomplete header to import",
733 );
734 self.actions
735 .push(SyncingAction::DropPeer(BadPeer(peer, rep::INCOMPLETE_HEADER)));
736 self.restart();
737 },
738 Err(BlockImportError::VerificationFailed(peer_id, e)) => {
739 let extra_message = peer_id
740 .map_or_else(|| "".into(), |peer| format!(" received from ({peer})"));
741
742 warn!(
743 target: LOG_TARGET,
744 "๐ Verification failed for block {hash:?}{extra_message}: {e:?}",
745 );
746
747 if let Some(peer) = peer_id {
748 self.actions
749 .push(SyncingAction::DropPeer(BadPeer(peer, rep::VERIFICATION_FAIL)));
750 }
751
752 self.restart();
753 },
754 Err(BlockImportError::BadBlock(peer_id)) =>
755 if let Some(peer) = peer_id {
756 warn!(
757 target: LOG_TARGET,
758 "๐ Block {hash:?} received from peer {peer} has been blacklisted",
759 );
760 self.actions.push(SyncingAction::DropPeer(BadPeer(peer, rep::BAD_BLOCK)));
761 },
762 Err(BlockImportError::MissingState) => {
763 trace!(target: LOG_TARGET, "Obsolete block {hash:?}");
767 },
768 e @ Err(BlockImportError::UnknownParent) | e @ Err(BlockImportError::Other(_)) => {
769 warn!(target: LOG_TARGET, "๐ Error importing block {hash:?}: {}", e.unwrap_err());
770 self.state_sync = None;
771 self.restart();
772 },
773 Err(BlockImportError::Cancelled) => {},
774 };
775 }
776
777 self.allowed_requests.set_all();
778 }
779
780 fn on_block_finalized(&mut self, hash: &B::Hash, number: NumberFor<B>) {
781 let client = &self.client;
782 let r = self.extra_justifications.on_block_finalized(hash, number, |base, block| {
783 is_descendent_of(&**client, base, block)
784 });
785
786 if let ChainSyncMode::LightState { skip_proofs, .. } = &self.mode {
787 if self.state_sync.is_none() {
788 if !self.peers.is_empty() && self.queue_blocks.is_empty() {
789 self.attempt_state_sync(*hash, number, *skip_proofs);
790 } else {
791 self.pending_state_sync_attempt.replace((*hash, number, *skip_proofs));
792 }
793 }
794 }
795
796 if let Err(err) = r {
797 warn!(
798 target: LOG_TARGET,
799 "๐ Error cleaning up pending extra justification data requests: {err}",
800 );
801 }
802 }
803
804 fn update_chain_info(&mut self, best_hash: &B::Hash, best_number: NumberFor<B>) {
805 self.on_block_queued(best_hash, best_number);
806 }
807
808 fn is_major_syncing(&self) -> bool {
809 self.status().state.is_major_syncing()
810 }
811
812 fn num_peers(&self) -> usize {
813 self.peers.len()
814 }
815
816 fn status(&self) -> SyncStatus<B> {
817 let median_seen = self.median_seen();
818 let best_seen_block =
819 median_seen.and_then(|median| (median > self.best_queued_number).then_some(median));
820 let sync_state = if let Some(target) = median_seen {
821 let best_block = self.client.info().best_number;
825 if target > best_block && target - best_block > MAJOR_SYNC_BLOCKS.into() {
826 if target > self.best_queued_number {
828 SyncState::Downloading { target }
829 } else {
830 SyncState::Importing { target }
831 }
832 } else {
833 SyncState::Idle
834 }
835 } else {
836 SyncState::Idle
837 };
838
839 let warp_sync_progress = self.gap_sync.as_ref().map(|gap_sync| WarpSyncProgress {
840 phase: WarpSyncPhase::DownloadingBlocks(gap_sync.best_queued_number),
841 total_bytes: 0,
842 });
843
844 SyncStatus {
845 state: sync_state,
846 best_seen_block,
847 num_peers: self.peers.len() as u32,
848 queued_blocks: self.queue_blocks.len() as u32,
849 state_sync: self.state_sync.as_ref().map(|s| s.progress()),
850 warp_sync: warp_sync_progress,
851 }
852 }
853
854 fn num_downloaded_blocks(&self) -> usize {
855 self.downloaded_blocks
856 }
857
858 fn num_sync_requests(&self) -> usize {
859 self.fork_targets
860 .values()
861 .filter(|f| f.number <= self.best_queued_number)
862 .count()
863 }
864
865 fn actions(
866 &mut self,
867 network_service: &NetworkServiceHandle,
868 ) -> Result<Vec<SyncingAction<B>>, ClientError> {
869 if !self.peers.is_empty() && self.queue_blocks.is_empty() {
870 if let Some((hash, number, skip_proofs)) = self.pending_state_sync_attempt.take() {
871 self.attempt_state_sync(hash, number, skip_proofs);
872 }
873 }
874
875 let block_requests = self
876 .block_requests()
877 .into_iter()
878 .map(|(peer_id, request)| self.create_block_request_action(peer_id, request))
879 .collect::<Vec<_>>();
880 self.actions.extend(block_requests);
881
882 let justification_requests = self
883 .justification_requests()
884 .into_iter()
885 .map(|(peer_id, request)| self.create_block_request_action(peer_id, request))
886 .collect::<Vec<_>>();
887 self.actions.extend(justification_requests);
888
889 let state_request = self.state_request().into_iter().map(|(peer_id, request)| {
890 trace!(
891 target: LOG_TARGET,
892 "Created `StrategyRequest` to {peer_id}.",
893 );
894
895 let (tx, rx) = oneshot::channel();
896
897 network_service.start_request(
898 peer_id,
899 self.state_request_protocol_name.clone(),
900 request.encode_to_vec(),
901 tx,
902 IfDisconnected::ImmediateError,
903 );
904
905 SyncingAction::StartRequest {
906 peer_id,
907 key: Self::STRATEGY_KEY,
908 request: async move {
909 Ok(rx.await?.and_then(|(response, protocol_name)| {
910 Ok((Box::new(response) as Box<dyn Any + Send>, protocol_name))
911 }))
912 }
913 .boxed(),
914 remove_obsolete: false,
915 }
916 });
917 self.actions.extend(state_request);
918
919 Ok(std::mem::take(&mut self.actions))
920 }
921}
922
923impl<B, Client> ChainSync<B, Client>
924where
925 B: BlockT,
926 Client: HeaderBackend<B>
927 + BlockBackend<B>
928 + HeaderMetadata<B, Error = sp_blockchain::Error>
929 + ProofProvider<B>
930 + Send
931 + Sync
932 + 'static,
933{
934 pub const STRATEGY_KEY: StrategyKey = StrategyKey::new("ChainSync");
936
937 pub fn new(
939 mode: ChainSyncMode,
940 client: Arc<Client>,
941 max_parallel_downloads: u32,
942 max_blocks_per_request: u32,
943 state_request_protocol_name: ProtocolName,
944 block_downloader: Arc<dyn BlockDownloader<B>>,
945 metrics_registry: Option<&Registry>,
946 initial_peers: impl Iterator<Item = (PeerId, B::Hash, NumberFor<B>)>,
947 ) -> Result<Self, ClientError> {
948 let mut sync = Self {
949 client,
950 peers: HashMap::new(),
951 disconnected_peers: DisconnectedPeers::new(),
952 blocks: BlockCollection::new(),
953 best_queued_hash: Default::default(),
954 best_queued_number: Zero::zero(),
955 extra_justifications: ExtraRequests::new("justification", metrics_registry),
956 mode,
957 queue_blocks: Default::default(),
958 pending_state_sync_attempt: None,
959 fork_targets: Default::default(),
960 allowed_requests: Default::default(),
961 max_parallel_downloads,
962 max_blocks_per_request,
963 state_request_protocol_name,
964 downloaded_blocks: 0,
965 state_sync: None,
966 import_existing: false,
967 block_downloader,
968 gap_sync: None,
969 actions: Vec::new(),
970 metrics: metrics_registry.and_then(|r| match Metrics::register(r) {
971 Ok(metrics) => Some(metrics),
972 Err(err) => {
973 log::error!(
974 target: LOG_TARGET,
975 "Failed to register `ChainSync` metrics {err:?}",
976 );
977 None
978 },
979 }),
980 };
981
982 sync.reset_sync_start_point()?;
983 initial_peers.for_each(|(peer_id, best_hash, best_number)| {
984 sync.add_peer(peer_id, best_hash, best_number);
985 });
986
987 Ok(sync)
988 }
989
990 fn complete_gap_if_target(&mut self, number: NumberFor<B>) {
992 let gap_sync_complete = self.gap_sync.as_ref().map_or(false, |s| s.target == number);
993 if gap_sync_complete {
994 info!(
995 target: LOG_TARGET,
996 "Block history download is complete."
997 );
998 self.gap_sync = None;
999 }
1000 }
1001
1002 #[must_use]
1003 fn add_peer_inner(
1004 &mut self,
1005 peer_id: PeerId,
1006 best_hash: B::Hash,
1007 best_number: NumberFor<B>,
1008 ) -> Result<Option<BlockRequest<B>>, BadPeer> {
1009 match self.block_status(&best_hash) {
1011 Err(e) => {
1012 debug!(target: LOG_TARGET, "Error reading blockchain: {e}");
1013 Err(BadPeer(peer_id, rep::BLOCKCHAIN_READ_ERROR))
1014 },
1015 Ok(BlockStatus::KnownBad) => {
1016 info!(
1017 "๐ New peer {peer_id} with known bad best block {best_hash} ({best_number})."
1018 );
1019 Err(BadPeer(peer_id, rep::BAD_BLOCK))
1020 },
1021 Ok(BlockStatus::Unknown) => {
1022 if best_number.is_zero() {
1023 info!(
1024 "๐ New peer {} with unknown genesis hash {} ({}).",
1025 peer_id, best_hash, best_number,
1026 );
1027 return Err(BadPeer(peer_id, rep::GENESIS_MISMATCH));
1028 }
1029
1030 if self.queue_blocks.len() > MAJOR_SYNC_BLOCKS as usize {
1034 debug!(
1035 target: LOG_TARGET,
1036 "New peer {} with unknown best hash {} ({}), assuming common block.",
1037 peer_id,
1038 self.best_queued_hash,
1039 self.best_queued_number
1040 );
1041 self.peers.insert(
1042 peer_id,
1043 PeerSync {
1044 peer_id,
1045 common_number: self.best_queued_number,
1046 best_hash,
1047 best_number,
1048 state: PeerSyncState::Available,
1049 },
1050 );
1051 return Ok(None);
1052 }
1053
1054 let (state, req) = if self.best_queued_number.is_zero() {
1056 debug!(
1057 target: LOG_TARGET,
1058 "New peer {peer_id} with best hash {best_hash} ({best_number}).",
1059 );
1060
1061 (PeerSyncState::Available, None)
1062 } else {
1063 let common_best = std::cmp::min(self.best_queued_number, best_number);
1064
1065 debug!(
1066 target: LOG_TARGET,
1067 "New peer {} with unknown best hash {} ({}), searching for common ancestor.",
1068 peer_id,
1069 best_hash,
1070 best_number
1071 );
1072
1073 (
1074 PeerSyncState::AncestorSearch {
1075 current: common_best,
1076 start: self.best_queued_number,
1077 state: AncestorSearchState::ExponentialBackoff(One::one()),
1078 },
1079 Some(ancestry_request::<B>(common_best)),
1080 )
1081 };
1082
1083 self.allowed_requests.add(&peer_id);
1084 self.peers.insert(
1085 peer_id,
1086 PeerSync {
1087 peer_id,
1088 common_number: Zero::zero(),
1089 best_hash,
1090 best_number,
1091 state,
1092 },
1093 );
1094
1095 Ok(req)
1096 },
1097 Ok(BlockStatus::Queued) |
1098 Ok(BlockStatus::InChainWithState) |
1099 Ok(BlockStatus::InChainPruned) => {
1100 debug!(
1101 target: LOG_TARGET,
1102 "New peer {peer_id} with known best hash {best_hash} ({best_number}).",
1103 );
1104 self.peers.insert(
1105 peer_id,
1106 PeerSync {
1107 peer_id,
1108 common_number: std::cmp::min(self.best_queued_number, best_number),
1109 best_hash,
1110 best_number,
1111 state: PeerSyncState::Available,
1112 },
1113 );
1114 self.allowed_requests.add(&peer_id);
1115 Ok(None)
1116 },
1117 }
1118 }
1119
1120 fn create_block_request_action(
1121 &mut self,
1122 peer_id: PeerId,
1123 request: BlockRequest<B>,
1124 ) -> SyncingAction<B> {
1125 let downloader = self.block_downloader.clone();
1126
1127 SyncingAction::StartRequest {
1128 peer_id,
1129 key: Self::STRATEGY_KEY,
1130 request: async move {
1131 Ok(downloader.download_blocks(peer_id, request.clone()).await?.and_then(
1132 |(response, protocol_name)| {
1133 let decoded_response =
1134 downloader.block_response_into_blocks(&request, response);
1135 let result = Box::new((request, decoded_response)) as Box<dyn Any + Send>;
1136 Ok((result, protocol_name))
1137 },
1138 ))
1139 }
1140 .boxed(),
1141 remove_obsolete: true,
1144 }
1145 }
1146
1147 #[must_use]
1149 fn on_block_data(
1150 &mut self,
1151 peer_id: &PeerId,
1152 request: Option<BlockRequest<B>>,
1153 response: BlockResponse<B>,
1154 ) -> Result<(), BadPeer> {
1155 self.downloaded_blocks += response.blocks.len();
1156 let mut gap = false;
1157 let new_blocks: Vec<IncomingBlock<B>> = if let Some(peer) = self.peers.get_mut(peer_id) {
1158 let mut blocks = response.blocks;
1159 if request.as_ref().map_or(false, |r| r.direction == Direction::Descending) {
1160 trace!(target: LOG_TARGET, "Reversing incoming block list");
1161 blocks.reverse()
1162 }
1163 self.allowed_requests.add(peer_id);
1164 if let Some(request) = request {
1165 match &mut peer.state {
1166 PeerSyncState::DownloadingNew(_) => {
1167 self.blocks.clear_peer_download(peer_id);
1168 peer.state = PeerSyncState::Available;
1169 if let Some(start_block) =
1170 validate_blocks::<B>(&blocks, peer_id, Some(request))?
1171 {
1172 self.blocks.insert(start_block, blocks, *peer_id);
1173 }
1174 self.ready_blocks()
1175 },
1176 PeerSyncState::DownloadingGap(_) => {
1177 peer.state = PeerSyncState::Available;
1178 if let Some(gap_sync) = &mut self.gap_sync {
1179 gap_sync.blocks.clear_peer_download(peer_id);
1180 if let Some(start_block) =
1181 validate_blocks::<B>(&blocks, peer_id, Some(request))?
1182 {
1183 gap_sync.blocks.insert(start_block, blocks, *peer_id);
1184 }
1185 gap = true;
1186 let blocks: Vec<_> = gap_sync
1187 .blocks
1188 .ready_blocks(gap_sync.best_queued_number + One::one())
1189 .into_iter()
1190 .map(|block_data| {
1191 let justifications =
1192 block_data.block.justifications.or_else(|| {
1193 legacy_justification_mapping(
1194 block_data.block.justification,
1195 )
1196 });
1197 IncomingBlock {
1198 hash: block_data.block.hash,
1199 header: block_data.block.header,
1200 body: block_data.block.body,
1201 indexed_body: block_data.block.indexed_body,
1202 justifications,
1203 origin: block_data.origin,
1204 allow_missing_state: true,
1205 import_existing: self.import_existing,
1206 skip_execution: true,
1207 state: None,
1208 }
1209 })
1210 .collect();
1211 debug!(
1212 target: LOG_TARGET,
1213 "Drained {} gap blocks from {}",
1214 blocks.len(),
1215 gap_sync.best_queued_number,
1216 );
1217 blocks
1218 } else {
1219 debug!(target: LOG_TARGET, "Unexpected gap block response from {peer_id}");
1220 return Err(BadPeer(*peer_id, rep::NO_BLOCK));
1221 }
1222 },
1223 PeerSyncState::DownloadingStale(_) => {
1224 peer.state = PeerSyncState::Available;
1225 if blocks.is_empty() {
1226 debug!(target: LOG_TARGET, "Empty block response from {peer_id}");
1227 return Err(BadPeer(*peer_id, rep::NO_BLOCK));
1228 }
1229 validate_blocks::<B>(&blocks, peer_id, Some(request))?;
1230 blocks
1231 .into_iter()
1232 .map(|b| {
1233 let justifications = b
1234 .justifications
1235 .or_else(|| legacy_justification_mapping(b.justification));
1236 IncomingBlock {
1237 hash: b.hash,
1238 header: b.header,
1239 body: b.body,
1240 indexed_body: None,
1241 justifications,
1242 origin: Some(*peer_id),
1243 allow_missing_state: true,
1244 import_existing: self.import_existing,
1245 skip_execution: self.skip_execution(),
1246 state: None,
1247 }
1248 })
1249 .collect()
1250 },
1251 PeerSyncState::AncestorSearch { current, start, state } => {
1252 let matching_hash = match (blocks.get(0), self.client.hash(*current)) {
1253 (Some(block), Ok(maybe_our_block_hash)) => {
1254 trace!(
1255 target: LOG_TARGET,
1256 "Got ancestry block #{} ({}) from peer {}",
1257 current,
1258 block.hash,
1259 peer_id,
1260 );
1261 maybe_our_block_hash.filter(|x| x == &block.hash)
1262 },
1263 (None, _) => {
1264 debug!(
1265 target: LOG_TARGET,
1266 "Invalid response when searching for ancestor from {peer_id}",
1267 );
1268 return Err(BadPeer(*peer_id, rep::UNKNOWN_ANCESTOR));
1269 },
1270 (_, Err(e)) => {
1271 info!(
1272 target: LOG_TARGET,
1273 "โ Error answering legitimate blockchain query: {e}",
1274 );
1275 return Err(BadPeer(*peer_id, rep::BLOCKCHAIN_READ_ERROR));
1276 },
1277 };
1278 if matching_hash.is_some() {
1279 if *start < self.best_queued_number &&
1280 self.best_queued_number <= peer.best_number
1281 {
1282 trace!(
1286 target: LOG_TARGET,
1287 "Ancestry search: opportunistically updating peer {} common number from={} => to={}.",
1288 *peer_id,
1289 peer.common_number,
1290 self.best_queued_number,
1291 );
1292 peer.common_number = self.best_queued_number;
1293 } else if peer.common_number < *current {
1294 trace!(
1295 target: LOG_TARGET,
1296 "Ancestry search: updating peer {} common number from={} => to={}.",
1297 *peer_id,
1298 peer.common_number,
1299 *current,
1300 );
1301 peer.common_number = *current;
1302 }
1303 }
1304 if matching_hash.is_none() && current.is_zero() {
1305 trace!(
1306 target: LOG_TARGET,
1307 "Ancestry search: genesis mismatch for peer {peer_id}",
1308 );
1309 return Err(BadPeer(*peer_id, rep::GENESIS_MISMATCH));
1310 }
1311 if let Some((next_state, next_num)) =
1312 handle_ancestor_search_state(state, *current, matching_hash.is_some())
1313 {
1314 peer.state = PeerSyncState::AncestorSearch {
1315 current: next_num,
1316 start: *start,
1317 state: next_state,
1318 };
1319 let request = ancestry_request::<B>(next_num);
1320 let action = self.create_block_request_action(*peer_id, request);
1321 self.actions.push(action);
1322 return Ok(());
1323 } else {
1324 trace!(
1327 target: LOG_TARGET,
1328 "Ancestry search complete. Ours={} ({}), Theirs={} ({}), Common={:?} ({})",
1329 self.best_queued_hash,
1330 self.best_queued_number,
1331 peer.best_hash,
1332 peer.best_number,
1333 matching_hash,
1334 peer.common_number,
1335 );
1336 if peer.common_number < peer.best_number &&
1337 peer.best_number < self.best_queued_number
1338 {
1339 trace!(
1340 target: LOG_TARGET,
1341 "Added fork target {} for {}",
1342 peer.best_hash,
1343 peer_id,
1344 );
1345 self.fork_targets
1346 .entry(peer.best_hash)
1347 .or_insert_with(|| {
1348 if let Some(metrics) = &self.metrics {
1349 metrics.fork_targets.inc();
1350 }
1351
1352 ForkTarget {
1353 number: peer.best_number,
1354 parent_hash: None,
1355 peers: Default::default(),
1356 }
1357 })
1358 .peers
1359 .insert(*peer_id);
1360 }
1361 peer.state = PeerSyncState::Available;
1362 return Ok(());
1363 }
1364 },
1365 PeerSyncState::Available |
1366 PeerSyncState::DownloadingJustification(..) |
1367 PeerSyncState::DownloadingState => Vec::new(),
1368 }
1369 } else {
1370 validate_blocks::<B>(&blocks, peer_id, None)?;
1372 blocks
1373 .into_iter()
1374 .map(|b| {
1375 let justifications = b
1376 .justifications
1377 .or_else(|| legacy_justification_mapping(b.justification));
1378 IncomingBlock {
1379 hash: b.hash,
1380 header: b.header,
1381 body: b.body,
1382 indexed_body: None,
1383 justifications,
1384 origin: Some(*peer_id),
1385 allow_missing_state: true,
1386 import_existing: false,
1387 skip_execution: true,
1388 state: None,
1389 }
1390 })
1391 .collect()
1392 }
1393 } else {
1394 return Err(BadPeer(*peer_id, rep::NOT_REQUESTED));
1396 };
1397
1398 self.validate_and_queue_blocks(new_blocks, gap);
1399
1400 Ok(())
1401 }
1402
1403 fn on_block_response(
1404 &mut self,
1405 peer_id: &PeerId,
1406 key: StrategyKey,
1407 request: BlockRequest<B>,
1408 blocks: Vec<BlockData<B>>,
1409 ) -> Result<(), BadPeer> {
1410 if key != Self::STRATEGY_KEY {
1411 error!(
1412 target: LOG_TARGET,
1413 "`on_block_response()` called with unexpected key {key:?} for chain sync",
1414 );
1415 debug_assert!(false);
1416 }
1417 let block_response = BlockResponse::<B> { id: request.id, blocks };
1418
1419 let blocks_range = || match (
1420 block_response
1421 .blocks
1422 .first()
1423 .and_then(|b| b.header.as_ref().map(|h| h.number())),
1424 block_response.blocks.last().and_then(|b| b.header.as_ref().map(|h| h.number())),
1425 ) {
1426 (Some(first), Some(last)) if first != last => format!(" ({}..{})", first, last),
1427 (Some(first), Some(_)) => format!(" ({})", first),
1428 _ => Default::default(),
1429 };
1430 trace!(
1431 target: LOG_TARGET,
1432 "BlockResponse {} from {} with {} blocks {}",
1433 block_response.id,
1434 peer_id,
1435 block_response.blocks.len(),
1436 blocks_range(),
1437 );
1438
1439 if request.fields == BlockAttributes::JUSTIFICATION {
1440 self.on_block_justification(*peer_id, block_response)
1441 } else {
1442 self.on_block_data(peer_id, Some(request), block_response)
1443 }
1444 }
1445
1446 #[must_use]
1448 fn on_block_justification(
1449 &mut self,
1450 peer_id: PeerId,
1451 response: BlockResponse<B>,
1452 ) -> Result<(), BadPeer> {
1453 let peer = if let Some(peer) = self.peers.get_mut(&peer_id) {
1454 peer
1455 } else {
1456 error!(
1457 target: LOG_TARGET,
1458 "๐ Called on_block_justification with a peer ID of an unknown peer",
1459 );
1460 return Ok(());
1461 };
1462
1463 self.allowed_requests.add(&peer_id);
1464 if let PeerSyncState::DownloadingJustification(hash) = peer.state {
1465 peer.state = PeerSyncState::Available;
1466
1467 let justification = if let Some(block) = response.blocks.into_iter().next() {
1469 if hash != block.hash {
1470 warn!(
1471 target: LOG_TARGET,
1472 "๐ Invalid block justification provided by {}: requested: {:?} got: {:?}",
1473 peer_id,
1474 hash,
1475 block.hash,
1476 );
1477 return Err(BadPeer(peer_id, rep::BAD_JUSTIFICATION));
1478 }
1479
1480 block
1481 .justifications
1482 .or_else(|| legacy_justification_mapping(block.justification))
1483 } else {
1484 trace!(
1487 target: LOG_TARGET,
1488 "Peer {peer_id:?} provided empty response for justification request {hash:?}",
1489 );
1490
1491 None
1492 };
1493
1494 if let Some((peer_id, hash, number, justifications)) =
1495 self.extra_justifications.on_response(peer_id, justification)
1496 {
1497 self.actions.push(SyncingAction::ImportJustifications {
1498 peer_id,
1499 hash,
1500 number,
1501 justifications,
1502 });
1503 return Ok(());
1504 }
1505 }
1506
1507 Ok(())
1508 }
1509
1510 fn median_seen(&self) -> Option<NumberFor<B>> {
1512 let mut best_seens = self.peers.values().map(|p| p.best_number).collect::<Vec<_>>();
1513
1514 if best_seens.is_empty() {
1515 None
1516 } else {
1517 let middle = best_seens.len() / 2;
1518
1519 Some(*best_seens.select_nth_unstable(middle).1)
1521 }
1522 }
1523
1524 fn required_block_attributes(&self) -> BlockAttributes {
1525 match self.mode {
1526 ChainSyncMode::Full =>
1527 BlockAttributes::HEADER | BlockAttributes::JUSTIFICATION | BlockAttributes::BODY,
1528 ChainSyncMode::LightState { storage_chain_mode: false, .. } =>
1529 BlockAttributes::HEADER | BlockAttributes::JUSTIFICATION | BlockAttributes::BODY,
1530 ChainSyncMode::LightState { storage_chain_mode: true, .. } =>
1531 BlockAttributes::HEADER |
1532 BlockAttributes::JUSTIFICATION |
1533 BlockAttributes::INDEXED_BODY,
1534 }
1535 }
1536
1537 fn skip_execution(&self) -> bool {
1538 match self.mode {
1539 ChainSyncMode::Full => false,
1540 ChainSyncMode::LightState { .. } => true,
1541 }
1542 }
1543
1544 fn validate_and_queue_blocks(&mut self, mut new_blocks: Vec<IncomingBlock<B>>, gap: bool) {
1545 let orig_len = new_blocks.len();
1546 new_blocks.retain(|b| !self.queue_blocks.contains(&b.hash));
1547 if new_blocks.len() != orig_len {
1548 debug!(
1549 target: LOG_TARGET,
1550 "Ignoring {} blocks that are already queued",
1551 orig_len - new_blocks.len(),
1552 );
1553 }
1554
1555 let origin = if !gap && !self.status().state.is_major_syncing() {
1556 BlockOrigin::NetworkBroadcast
1557 } else {
1558 BlockOrigin::NetworkInitialSync
1559 };
1560
1561 if let Some((h, n)) = new_blocks
1562 .last()
1563 .and_then(|b| b.header.as_ref().map(|h| (&b.hash, *h.number())))
1564 {
1565 trace!(
1566 target: LOG_TARGET,
1567 "Accepted {} blocks ({:?}) with origin {:?}",
1568 new_blocks.len(),
1569 h,
1570 origin,
1571 );
1572 self.on_block_queued(h, n)
1573 }
1574 self.queue_blocks.extend(new_blocks.iter().map(|b| b.hash));
1575 if let Some(metrics) = &self.metrics {
1576 metrics
1577 .queued_blocks
1578 .set(self.queue_blocks.len().try_into().unwrap_or(u64::MAX));
1579 }
1580
1581 self.actions.push(SyncingAction::ImportBlocks { origin, blocks: new_blocks })
1582 }
1583
1584 fn update_peer_common_number(&mut self, peer_id: &PeerId, new_common: NumberFor<B>) {
1585 if let Some(peer) = self.peers.get_mut(peer_id) {
1586 peer.update_common_number(new_common);
1587 }
1588 }
1589
1590 fn on_block_queued(&mut self, hash: &B::Hash, number: NumberFor<B>) {
1595 if self.fork_targets.remove(hash).is_some() {
1596 if let Some(metrics) = &self.metrics {
1597 metrics.fork_targets.dec();
1598 }
1599 trace!(target: LOG_TARGET, "Completed fork sync {hash:?}");
1600 }
1601 if let Some(gap_sync) = &mut self.gap_sync {
1602 if number > gap_sync.best_queued_number && number <= gap_sync.target {
1603 gap_sync.best_queued_number = number;
1604 }
1605 }
1606 if number > self.best_queued_number {
1607 self.best_queued_number = number;
1608 self.best_queued_hash = *hash;
1609 for (n, peer) in self.peers.iter_mut() {
1611 if let PeerSyncState::AncestorSearch { .. } = peer.state {
1612 continue;
1614 }
1615 let new_common_number =
1616 if peer.best_number >= number { number } else { peer.best_number };
1617 trace!(
1618 target: LOG_TARGET,
1619 "Updating peer {} info, ours={}, common={}->{}, their best={}",
1620 n,
1621 number,
1622 peer.common_number,
1623 new_common_number,
1624 peer.best_number,
1625 );
1626 peer.common_number = new_common_number;
1627 }
1628 }
1629 self.allowed_requests.set_all();
1630 }
1631
1632 fn restart(&mut self) {
1636 self.blocks.clear();
1637 if let Err(e) = self.reset_sync_start_point() {
1638 warn!(target: LOG_TARGET, "๐ Unable to restart sync: {e}");
1639 }
1640 self.allowed_requests.set_all();
1641 debug!(
1642 target: LOG_TARGET,
1643 "Restarted with {} ({})",
1644 self.best_queued_number,
1645 self.best_queued_hash,
1646 );
1647 let old_peers = std::mem::take(&mut self.peers);
1648
1649 old_peers.into_iter().for_each(|(peer_id, mut peer_sync)| {
1650 match peer_sync.state {
1651 PeerSyncState::Available => {
1652 self.add_peer(peer_id, peer_sync.best_hash, peer_sync.best_number);
1653 },
1654 PeerSyncState::AncestorSearch { .. } |
1655 PeerSyncState::DownloadingNew(_) |
1656 PeerSyncState::DownloadingStale(_) |
1657 PeerSyncState::DownloadingGap(_) |
1658 PeerSyncState::DownloadingState => {
1659 self.actions
1661 .push(SyncingAction::CancelRequest { peer_id, key: Self::STRATEGY_KEY });
1662 self.add_peer(peer_id, peer_sync.best_hash, peer_sync.best_number);
1663 },
1664 PeerSyncState::DownloadingJustification(_) => {
1665 trace!(
1669 target: LOG_TARGET,
1670 "Keeping peer {} after restart, updating common number from={} => to={} (our best).",
1671 peer_id,
1672 peer_sync.common_number,
1673 self.best_queued_number,
1674 );
1675 peer_sync.common_number = self.best_queued_number;
1676 self.peers.insert(peer_id, peer_sync);
1677 },
1678 }
1679 });
1680 }
1681
1682 fn reset_sync_start_point(&mut self) -> Result<(), ClientError> {
1685 let info = self.client.info();
1686 debug!(target: LOG_TARGET, "Restarting sync with client info {info:?}");
1687
1688 if matches!(self.mode, ChainSyncMode::LightState { .. }) && info.finalized_state.is_some() {
1689 warn!(
1690 target: LOG_TARGET,
1691 "Can't use fast sync mode with a partially synced database. Reverting to full sync mode."
1692 );
1693 self.mode = ChainSyncMode::Full;
1694 }
1695
1696 self.import_existing = false;
1697 self.best_queued_hash = info.best_hash;
1698 self.best_queued_number = info.best_number;
1699
1700 if self.mode == ChainSyncMode::Full &&
1701 self.client.block_status(info.best_hash)? != BlockStatus::InChainWithState
1702 {
1703 self.import_existing = true;
1704 if let Some((hash, number)) = info.finalized_state {
1706 debug!(target: LOG_TARGET, "Starting from finalized state #{number}");
1707 self.best_queued_hash = hash;
1708 self.best_queued_number = number;
1709 } else {
1710 debug!(target: LOG_TARGET, "Restarting from genesis");
1711 self.best_queued_hash = Default::default();
1712 self.best_queued_number = Zero::zero();
1713 }
1714 }
1715
1716 if let Some(BlockGap { start, end, .. }) = info.block_gap {
1717 let old_gap = self.gap_sync.take().map(|g| (g.best_queued_number, g.target));
1718 debug!(target: LOG_TARGET, "Starting gap sync #{start} - #{end} (old gap best and target: {old_gap:?})");
1719 self.gap_sync = Some(GapSync {
1720 best_queued_number: start - One::one(),
1721 target: end,
1722 blocks: BlockCollection::new(),
1723 });
1724 }
1725 trace!(
1726 target: LOG_TARGET,
1727 "Restarted sync at #{} ({:?})",
1728 self.best_queued_number,
1729 self.best_queued_hash,
1730 );
1731 Ok(())
1732 }
1733
1734 fn block_status(&self, hash: &B::Hash) -> Result<BlockStatus, ClientError> {
1736 if self.queue_blocks.contains(hash) {
1737 return Ok(BlockStatus::Queued);
1738 }
1739 self.client.block_status(*hash)
1740 }
1741
1742 fn is_known(&self, hash: &B::Hash) -> bool {
1744 self.block_status(hash).ok().map_or(false, |s| s != BlockStatus::Unknown)
1745 }
1746
1747 fn is_already_downloading(&self, hash: &B::Hash) -> bool {
1749 self.peers
1750 .iter()
1751 .any(|(_, p)| p.state == PeerSyncState::DownloadingStale(*hash))
1752 }
1753
1754 fn ready_blocks(&mut self) -> Vec<IncomingBlock<B>> {
1756 self.blocks
1757 .ready_blocks(self.best_queued_number + One::one())
1758 .into_iter()
1759 .map(|block_data| {
1760 let justifications = block_data
1761 .block
1762 .justifications
1763 .or_else(|| legacy_justification_mapping(block_data.block.justification));
1764 IncomingBlock {
1765 hash: block_data.block.hash,
1766 header: block_data.block.header,
1767 body: block_data.block.body,
1768 indexed_body: block_data.block.indexed_body,
1769 justifications,
1770 origin: block_data.origin,
1771 allow_missing_state: true,
1772 import_existing: self.import_existing,
1773 skip_execution: self.skip_execution(),
1774 state: None,
1775 }
1776 })
1777 .collect()
1778 }
1779
1780 fn justification_requests(&mut self) -> Vec<(PeerId, BlockRequest<B>)> {
1782 let peers = &mut self.peers;
1783 let mut matcher = self.extra_justifications.matcher();
1784 std::iter::from_fn(move || {
1785 if let Some((peer, request)) = matcher.next(peers) {
1786 peers
1787 .get_mut(&peer)
1788 .expect(
1789 "`Matcher::next` guarantees the `PeerId` comes from the given peers; qed",
1790 )
1791 .state = PeerSyncState::DownloadingJustification(request.0);
1792 let req = BlockRequest::<B> {
1793 id: 0,
1794 fields: BlockAttributes::JUSTIFICATION,
1795 from: FromBlock::Hash(request.0),
1796 direction: Direction::Ascending,
1797 max: Some(1),
1798 };
1799 Some((peer, req))
1800 } else {
1801 None
1802 }
1803 })
1804 .collect()
1805 }
1806
1807 fn block_requests(&mut self) -> Vec<(PeerId, BlockRequest<B>)> {
1809 if self.allowed_requests.is_empty() || self.state_sync.is_some() {
1810 return Vec::new();
1811 }
1812
1813 if self.queue_blocks.len() > MAX_IMPORTING_BLOCKS {
1814 trace!(target: LOG_TARGET, "Too many blocks in the queue.");
1815 return Vec::new();
1816 }
1817 let is_major_syncing = self.status().state.is_major_syncing();
1818 let attrs = self.required_block_attributes();
1819 let blocks = &mut self.blocks;
1820 let fork_targets = &mut self.fork_targets;
1821 let last_finalized =
1822 std::cmp::min(self.best_queued_number, self.client.info().finalized_number);
1823 let best_queued = self.best_queued_number;
1824 let client = &self.client;
1825 let queue_blocks = &self.queue_blocks;
1826 let allowed_requests = self.allowed_requests.clone();
1827 let max_parallel = if is_major_syncing { 1 } else { self.max_parallel_downloads };
1828 let max_blocks_per_request = self.max_blocks_per_request;
1829 let gap_sync = &mut self.gap_sync;
1830 let disconnected_peers = &mut self.disconnected_peers;
1831 let metrics = self.metrics.as_ref();
1832 let requests = self
1833 .peers
1834 .iter_mut()
1835 .filter_map(move |(&id, peer)| {
1836 if !peer.state.is_available() ||
1837 !allowed_requests.contains(&id) ||
1838 !disconnected_peers.is_peer_available(&id)
1839 {
1840 return None;
1841 }
1842
1843 if best_queued.saturating_sub(peer.common_number) >
1849 MAX_BLOCKS_TO_LOOK_BACKWARDS.into() &&
1850 best_queued < peer.best_number &&
1851 peer.common_number < last_finalized &&
1852 queue_blocks.len() <= MAJOR_SYNC_BLOCKS as usize
1853 {
1854 trace!(
1855 target: LOG_TARGET,
1856 "Peer {:?} common block {} too far behind of our best {}. Starting ancestry search.",
1857 id,
1858 peer.common_number,
1859 best_queued,
1860 );
1861 let current = std::cmp::min(peer.best_number, best_queued);
1862 peer.state = PeerSyncState::AncestorSearch {
1863 current,
1864 start: best_queued,
1865 state: AncestorSearchState::ExponentialBackoff(One::one()),
1866 };
1867 Some((id, ancestry_request::<B>(current)))
1868 } else if let Some((range, req)) = peer_block_request(
1869 &id,
1870 peer,
1871 blocks,
1872 attrs,
1873 max_parallel,
1874 max_blocks_per_request,
1875 last_finalized,
1876 best_queued,
1877 ) {
1878 peer.state = PeerSyncState::DownloadingNew(range.start);
1879 trace!(
1880 target: LOG_TARGET,
1881 "New block request for {}, (best:{}, common:{}) {:?}",
1882 id,
1883 peer.best_number,
1884 peer.common_number,
1885 req,
1886 );
1887 Some((id, req))
1888 } else if let Some((hash, req)) = fork_sync_request(
1889 &id,
1890 fork_targets,
1891 best_queued,
1892 last_finalized,
1893 attrs,
1894 |hash| {
1895 if queue_blocks.contains(hash) {
1896 BlockStatus::Queued
1897 } else {
1898 client.block_status(*hash).unwrap_or(BlockStatus::Unknown)
1899 }
1900 },
1901 max_blocks_per_request,
1902 metrics,
1903 ) {
1904 trace!(target: LOG_TARGET, "Downloading fork {hash:?} from {id}");
1905 peer.state = PeerSyncState::DownloadingStale(hash);
1906 Some((id, req))
1907 } else if let Some((range, req)) = gap_sync.as_mut().and_then(|sync| {
1908 peer_gap_block_request(
1909 &id,
1910 peer,
1911 &mut sync.blocks,
1912 attrs,
1913 sync.target,
1914 sync.best_queued_number,
1915 max_blocks_per_request,
1916 )
1917 }) {
1918 peer.state = PeerSyncState::DownloadingGap(range.start);
1919 trace!(
1920 target: LOG_TARGET,
1921 "New gap block request for {}, (best:{}, common:{}) {:?}",
1922 id,
1923 peer.best_number,
1924 peer.common_number,
1925 req,
1926 );
1927 Some((id, req))
1928 } else {
1929 None
1930 }
1931 })
1932 .collect::<Vec<_>>();
1933
1934 if !requests.is_empty() {
1937 self.allowed_requests.take();
1938 }
1939
1940 requests
1941 }
1942
1943 fn state_request(&mut self) -> Option<(PeerId, StateRequest)> {
1945 if self.allowed_requests.is_empty() {
1946 return None;
1947 }
1948 if self.state_sync.is_some() &&
1949 self.peers.iter().any(|(_, peer)| peer.state == PeerSyncState::DownloadingState)
1950 {
1951 return None;
1953 }
1954 if let Some(sync) = &self.state_sync {
1955 if sync.is_complete() {
1956 return None;
1957 }
1958
1959 for (id, peer) in self.peers.iter_mut() {
1960 if peer.state.is_available() &&
1961 peer.common_number >= sync.target_number() &&
1962 self.disconnected_peers.is_peer_available(&id)
1963 {
1964 peer.state = PeerSyncState::DownloadingState;
1965 let request = sync.next_request();
1966 trace!(target: LOG_TARGET, "New StateRequest for {}: {:?}", id, request);
1967 self.allowed_requests.clear();
1968 return Some((*id, request));
1969 }
1970 }
1971 }
1972 None
1973 }
1974
1975 #[must_use]
1976 fn on_state_data(&mut self, peer_id: &PeerId, response: &[u8]) -> Result<(), BadPeer> {
1977 let response = match StateResponse::decode(response) {
1978 Ok(response) => response,
1979 Err(error) => {
1980 debug!(
1981 target: LOG_TARGET,
1982 "Failed to decode state response from peer {peer_id:?}: {error:?}.",
1983 );
1984
1985 return Err(BadPeer(*peer_id, rep::BAD_RESPONSE));
1986 },
1987 };
1988
1989 if let Some(peer) = self.peers.get_mut(peer_id) {
1990 if let PeerSyncState::DownloadingState = peer.state {
1991 peer.state = PeerSyncState::Available;
1992 self.allowed_requests.set_all();
1993 }
1994 }
1995 let import_result = if let Some(sync) = &mut self.state_sync {
1996 debug!(
1997 target: LOG_TARGET,
1998 "Importing state data from {} with {} keys, {} proof nodes.",
1999 peer_id,
2000 response.entries.len(),
2001 response.proof.len(),
2002 );
2003 sync.import(response)
2004 } else {
2005 debug!(target: LOG_TARGET, "Ignored obsolete state response from {peer_id}");
2006 return Err(BadPeer(*peer_id, rep::NOT_REQUESTED));
2007 };
2008
2009 match import_result {
2010 ImportResult::Import(hash, header, state, body, justifications) => {
2011 let origin = BlockOrigin::NetworkInitialSync;
2012 let block = IncomingBlock {
2013 hash,
2014 header: Some(header),
2015 body,
2016 indexed_body: None,
2017 justifications,
2018 origin: None,
2019 allow_missing_state: true,
2020 import_existing: true,
2021 skip_execution: self.skip_execution(),
2022 state: Some(state),
2023 };
2024 debug!(target: LOG_TARGET, "State download is complete. Import is queued");
2025 self.actions.push(SyncingAction::ImportBlocks { origin, blocks: vec![block] });
2026 Ok(())
2027 },
2028 ImportResult::Continue => Ok(()),
2029 ImportResult::BadResponse => {
2030 debug!(target: LOG_TARGET, "Bad state data received from {peer_id}");
2031 Err(BadPeer(*peer_id, rep::BAD_BLOCK))
2032 },
2033 }
2034 }
2035
2036 fn attempt_state_sync(
2037 &mut self,
2038 finalized_hash: B::Hash,
2039 finalized_number: NumberFor<B>,
2040 skip_proofs: bool,
2041 ) {
2042 let mut heads: Vec<_> = self.peers.values().map(|peer| peer.best_number).collect();
2043 heads.sort();
2044 let median = heads[heads.len() / 2];
2045 if finalized_number + STATE_SYNC_FINALITY_THRESHOLD.saturated_into() >= median {
2046 if let Ok(Some(header)) = self.client.header(finalized_hash) {
2047 log::debug!(
2048 target: LOG_TARGET,
2049 "Starting state sync for #{finalized_number} ({finalized_hash})",
2050 );
2051 self.state_sync =
2052 Some(StateSync::new(self.client.clone(), header, None, None, skip_proofs));
2053 self.allowed_requests.set_all();
2054 } else {
2055 log::error!(
2056 target: LOG_TARGET,
2057 "Failed to start state sync: header for finalized block \
2058 #{finalized_number} ({finalized_hash}) is not available",
2059 );
2060 debug_assert!(false);
2061 }
2062 }
2063 }
2064
2065 #[cfg(test)]
2067 #[must_use]
2068 fn take_actions(&mut self) -> impl Iterator<Item = SyncingAction<B>> {
2069 std::mem::take(&mut self.actions).into_iter()
2070 }
2071}
2072
2073fn legacy_justification_mapping(
2078 justification: Option<EncodedJustification>,
2079) -> Option<Justifications> {
2080 justification.map(|just| (*b"FRNK", just).into())
2081}
2082
2083fn ancestry_request<B: BlockT>(block: NumberFor<B>) -> BlockRequest<B> {
2086 BlockRequest::<B> {
2087 id: 0,
2088 fields: BlockAttributes::HEADER | BlockAttributes::JUSTIFICATION,
2089 from: FromBlock::Number(block),
2090 direction: Direction::Ascending,
2091 max: Some(1),
2092 }
2093}
2094
2095#[derive(Copy, Clone, Eq, PartialEq, Debug)]
2098pub(crate) enum AncestorSearchState<B: BlockT> {
2099 ExponentialBackoff(NumberFor<B>),
2102 BinarySearch(NumberFor<B>, NumberFor<B>),
2105}
2106
2107fn handle_ancestor_search_state<B: BlockT>(
2115 state: &AncestorSearchState<B>,
2116 curr_block_num: NumberFor<B>,
2117 block_hash_match: bool,
2118) -> Option<(AncestorSearchState<B>, NumberFor<B>)> {
2119 let two = <NumberFor<B>>::one() + <NumberFor<B>>::one();
2120 match state {
2121 AncestorSearchState::ExponentialBackoff(next_distance_to_tip) => {
2122 let next_distance_to_tip = *next_distance_to_tip;
2123 if block_hash_match && next_distance_to_tip == One::one() {
2124 return None;
2127 }
2128 if block_hash_match {
2129 let left = curr_block_num;
2130 let right = left + next_distance_to_tip / two;
2131 let middle = left + (right - left) / two;
2132 Some((AncestorSearchState::BinarySearch(left, right), middle))
2133 } else {
2134 let next_block_num =
2135 curr_block_num.checked_sub(&next_distance_to_tip).unwrap_or_else(Zero::zero);
2136 let next_distance_to_tip = next_distance_to_tip * two;
2137 Some((
2138 AncestorSearchState::ExponentialBackoff(next_distance_to_tip),
2139 next_block_num,
2140 ))
2141 }
2142 },
2143 AncestorSearchState::BinarySearch(mut left, mut right) => {
2144 if left >= curr_block_num {
2145 return None;
2146 }
2147 if block_hash_match {
2148 left = curr_block_num;
2149 } else {
2150 right = curr_block_num;
2151 }
2152 assert!(right >= left);
2153 let middle = left + (right - left) / two;
2154 if middle == curr_block_num {
2155 None
2156 } else {
2157 Some((AncestorSearchState::BinarySearch(left, right), middle))
2158 }
2159 },
2160 }
2161}
2162
2163fn peer_block_request<B: BlockT>(
2165 id: &PeerId,
2166 peer: &PeerSync<B>,
2167 blocks: &mut BlockCollection<B>,
2168 attrs: BlockAttributes,
2169 max_parallel_downloads: u32,
2170 max_blocks_per_request: u32,
2171 finalized: NumberFor<B>,
2172 best_num: NumberFor<B>,
2173) -> Option<(Range<NumberFor<B>>, BlockRequest<B>)> {
2174 if best_num >= peer.best_number {
2175 return None;
2177 } else if peer.common_number < finalized {
2178 trace!(
2179 target: LOG_TARGET,
2180 "Requesting pre-finalized chain from {:?}, common={}, finalized={}, peer best={}, our best={}",
2181 id, peer.common_number, finalized, peer.best_number, best_num,
2182 );
2183 }
2184 let range = blocks.needed_blocks(
2185 *id,
2186 max_blocks_per_request,
2187 peer.best_number,
2188 peer.common_number,
2189 max_parallel_downloads,
2190 MAX_DOWNLOAD_AHEAD,
2191 )?;
2192
2193 let last = range.end.saturating_sub(One::one());
2195
2196 let from = if peer.best_number == last {
2197 FromBlock::Hash(peer.best_hash)
2198 } else {
2199 FromBlock::Number(last)
2200 };
2201
2202 let request = BlockRequest::<B> {
2203 id: 0,
2204 fields: attrs,
2205 from,
2206 direction: Direction::Descending,
2207 max: Some((range.end - range.start).saturated_into::<u32>()),
2208 };
2209
2210 Some((range, request))
2211}
2212
2213fn peer_gap_block_request<B: BlockT>(
2215 id: &PeerId,
2216 peer: &PeerSync<B>,
2217 blocks: &mut BlockCollection<B>,
2218 attrs: BlockAttributes,
2219 target: NumberFor<B>,
2220 common_number: NumberFor<B>,
2221 max_blocks_per_request: u32,
2222) -> Option<(Range<NumberFor<B>>, BlockRequest<B>)> {
2223 let range = blocks.needed_blocks(
2224 *id,
2225 max_blocks_per_request,
2226 std::cmp::min(peer.best_number, target),
2227 common_number,
2228 1,
2229 MAX_DOWNLOAD_AHEAD,
2230 )?;
2231
2232 let last = range.end.saturating_sub(One::one());
2234 let from = FromBlock::Number(last);
2235
2236 let request = BlockRequest::<B> {
2237 id: 0,
2238 fields: attrs,
2239 from,
2240 direction: Direction::Descending,
2241 max: Some((range.end - range.start).saturated_into::<u32>()),
2242 };
2243 Some((range, request))
2244}
2245
2246fn fork_sync_request<B: BlockT>(
2248 id: &PeerId,
2249 fork_targets: &mut HashMap<B::Hash, ForkTarget<B>>,
2250 best_num: NumberFor<B>,
2251 finalized: NumberFor<B>,
2252 attributes: BlockAttributes,
2253 check_block: impl Fn(&B::Hash) -> BlockStatus,
2254 max_blocks_per_request: u32,
2255 metrics: Option<&Metrics>,
2256) -> Option<(B::Hash, BlockRequest<B>)> {
2257 fork_targets.retain(|hash, r| {
2258 if r.number <= finalized {
2259 trace!(
2260 target: LOG_TARGET,
2261 "Removed expired fork sync request {:?} (#{})",
2262 hash,
2263 r.number,
2264 );
2265 return false;
2266 }
2267 if check_block(hash) != BlockStatus::Unknown {
2268 trace!(
2269 target: LOG_TARGET,
2270 "Removed obsolete fork sync request {:?} (#{})",
2271 hash,
2272 r.number,
2273 );
2274 return false;
2275 }
2276 true
2277 });
2278 if let Some(metrics) = metrics {
2279 metrics.fork_targets.set(fork_targets.len().try_into().unwrap_or(u64::MAX));
2280 }
2281 for (hash, r) in fork_targets {
2282 if !r.peers.contains(&id) {
2283 continue;
2284 }
2285 if r.number <= best_num ||
2288 (r.number - best_num).saturated_into::<u32>() < max_blocks_per_request as u32
2289 {
2290 let parent_status = r.parent_hash.as_ref().map_or(BlockStatus::Unknown, check_block);
2291 let count = if parent_status == BlockStatus::Unknown {
2292 (r.number - finalized).saturated_into::<u32>() } else {
2294 1
2296 };
2297 trace!(
2298 target: LOG_TARGET,
2299 "Downloading requested fork {hash:?} from {id}, {count} blocks",
2300 );
2301 return Some((
2302 *hash,
2303 BlockRequest::<B> {
2304 id: 0,
2305 fields: attributes,
2306 from: FromBlock::Hash(*hash),
2307 direction: Direction::Descending,
2308 max: Some(count),
2309 },
2310 ));
2311 } else {
2312 trace!(target: LOG_TARGET, "Fork too far in the future: {:?} (#{})", hash, r.number);
2313 }
2314 }
2315 None
2316}
2317
2318fn is_descendent_of<Block, T>(
2320 client: &T,
2321 base: &Block::Hash,
2322 block: &Block::Hash,
2323) -> sp_blockchain::Result<bool>
2324where
2325 Block: BlockT,
2326 T: HeaderMetadata<Block, Error = sp_blockchain::Error> + ?Sized,
2327{
2328 if base == block {
2329 return Ok(false);
2330 }
2331
2332 let ancestor = sp_blockchain::lowest_common_ancestor(client, *block, *base)?;
2333
2334 Ok(ancestor.hash == *base)
2335}
2336
2337pub fn validate_blocks<Block: BlockT>(
2342 blocks: &Vec<BlockData<Block>>,
2343 peer_id: &PeerId,
2344 request: Option<BlockRequest<Block>>,
2345) -> Result<Option<NumberFor<Block>>, BadPeer> {
2346 if let Some(request) = request {
2347 if Some(blocks.len() as _) > request.max {
2348 debug!(
2349 target: LOG_TARGET,
2350 "Received more blocks than requested from {}. Expected in maximum {:?}, got {}.",
2351 peer_id,
2352 request.max,
2353 blocks.len(),
2354 );
2355
2356 return Err(BadPeer(*peer_id, rep::NOT_REQUESTED));
2357 }
2358
2359 let block_header =
2360 if request.direction == Direction::Descending { blocks.last() } else { blocks.first() }
2361 .and_then(|b| b.header.as_ref());
2362
2363 let expected_block = block_header.as_ref().map_or(false, |h| match request.from {
2364 FromBlock::Hash(hash) => h.hash() == hash,
2365 FromBlock::Number(n) => h.number() == &n,
2366 });
2367
2368 if !expected_block {
2369 debug!(
2370 target: LOG_TARGET,
2371 "Received block that was not requested. Requested {:?}, got {:?}.",
2372 request.from,
2373 block_header,
2374 );
2375
2376 return Err(BadPeer(*peer_id, rep::NOT_REQUESTED));
2377 }
2378
2379 if request.fields.contains(BlockAttributes::HEADER) &&
2380 blocks.iter().any(|b| b.header.is_none())
2381 {
2382 trace!(
2383 target: LOG_TARGET,
2384 "Missing requested header for a block in response from {peer_id}.",
2385 );
2386
2387 return Err(BadPeer(*peer_id, rep::BAD_RESPONSE));
2388 }
2389
2390 if request.fields.contains(BlockAttributes::BODY) && blocks.iter().any(|b| b.body.is_none())
2391 {
2392 trace!(
2393 target: LOG_TARGET,
2394 "Missing requested body for a block in response from {peer_id}.",
2395 );
2396
2397 return Err(BadPeer(*peer_id, rep::BAD_RESPONSE));
2398 }
2399 }
2400
2401 for b in blocks {
2402 if let Some(header) = &b.header {
2403 let hash = header.hash();
2404 if hash != b.hash {
2405 debug!(
2406 target: LOG_TARGET,
2407 "Bad header received from {}. Expected hash {:?}, got {:?}",
2408 peer_id,
2409 b.hash,
2410 hash,
2411 );
2412 return Err(BadPeer(*peer_id, rep::BAD_BLOCK));
2413 }
2414 }
2415 }
2416
2417 Ok(blocks.first().and_then(|b| b.header.as_ref()).map(|h| *h.number()))
2418}