referrerpolicy=no-referrer-when-downgrade

sc_network_sync/strategy/
chain_sync.rs

1// This file is part of Substrate.
2
3// Copyright (C) Parity Technologies (UK) Ltd.
4// SPDX-License-Identifier: GPL-3.0-or-later WITH Classpath-exception-2.0
5
6// This program is free software: you can redistribute it and/or modify
7// it under the terms of the GNU General Public License as published by
8// the Free Software Foundation, either version 3 of the License, or
9// (at your option) any later version.
10
11// This program is distributed in the hope that it will be useful,
12// but WITHOUT ANY WARRANTY; without even the implied warranty of
13// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
14// GNU General Public License for more details.
15
16// You should have received a copy of the GNU General Public License
17// along with this program. If not, see <https://www.gnu.org/licenses/>.
18
19//! Contains the state of the chain synchronization process
20//!
21//! At any given point in time, a running node tries as much as possible to be at the head of the
22//! chain. This module handles the logic of which blocks to request from remotes, and processing
23//! responses. It yields blocks to check and potentially move to the database.
24//!
25//! # Usage
26//!
27//! The `ChainSync` struct maintains the state of the block requests. Whenever something happens on
28//! the network, or whenever a block has been successfully verified, call the appropriate method in
29//! order to update it.
30
31use crate::{
32	block_relay_protocol::{BlockDownloader, BlockResponseError},
33	blocks::BlockCollection,
34	justification_requests::ExtraRequests,
35	schema::v1::{StateRequest, StateResponse},
36	service::network::NetworkServiceHandle,
37	strategy::{
38		disconnected_peers::DisconnectedPeers,
39		state_sync::{ImportResult, StateSync, StateSyncProvider},
40		warp::{WarpSyncPhase, WarpSyncProgress},
41		StrategyKey, SyncingAction, SyncingStrategy,
42	},
43	types::{BadPeer, SyncState, SyncStatus},
44	LOG_TARGET,
45};
46
47use futures::{channel::oneshot, FutureExt};
48use log::{debug, error, info, trace, warn};
49use prometheus_endpoint::{register, Gauge, PrometheusError, Registry, U64};
50use prost::Message;
51use sc_client_api::{blockchain::BlockGap, BlockBackend, ProofProvider};
52use sc_consensus::{BlockImportError, BlockImportStatus, IncomingBlock};
53use sc_network::{IfDisconnected, ProtocolName};
54use sc_network_common::sync::message::{
55	BlockAnnounce, BlockAttributes, BlockData, BlockRequest, BlockResponse, Direction, FromBlock,
56};
57use sc_network_types::PeerId;
58use sp_arithmetic::traits::Saturating;
59use sp_blockchain::{Error as ClientError, HeaderBackend, HeaderMetadata};
60use sp_consensus::{BlockOrigin, BlockStatus};
61use sp_runtime::{
62	traits::{
63		Block as BlockT, CheckedSub, Header as HeaderT, NumberFor, One, SaturatedConversion, Zero,
64	},
65	EncodedJustification, Justifications,
66};
67
68use std::{
69	any::Any,
70	collections::{HashMap, HashSet},
71	ops::Range,
72	sync::Arc,
73};
74
75#[cfg(test)]
76mod test;
77
78/// Maximum blocks to store in the import queue.
79const MAX_IMPORTING_BLOCKS: usize = 2048;
80
81/// Maximum blocks to download ahead of any gap.
82const MAX_DOWNLOAD_AHEAD: u32 = 2048;
83
84/// Maximum blocks to look backwards. The gap is the difference between the highest block and the
85/// common block of a node.
86const MAX_BLOCKS_TO_LOOK_BACKWARDS: u32 = MAX_DOWNLOAD_AHEAD / 2;
87
88/// Pick the state to sync as the latest finalized number minus this.
89const STATE_SYNC_FINALITY_THRESHOLD: u32 = 8;
90
91/// We use a heuristic that with a high likelihood, by the time
92/// `MAJOR_SYNC_BLOCKS` have been imported we'll be on the same
93/// chain as (or at least closer to) the peer so we want to delay
94/// the ancestor search to not waste time doing that when we are
95/// so far behind.
96const MAJOR_SYNC_BLOCKS: u8 = 5;
97
98mod rep {
99	use sc_network::ReputationChange as Rep;
100	/// Reputation change when a peer sent us a message that led to a
101	/// database read error.
102	pub const BLOCKCHAIN_READ_ERROR: Rep = Rep::new(-(1 << 16), "DB Error");
103
104	/// Reputation change when a peer sent us a status message with a different
105	/// genesis than us.
106	pub const GENESIS_MISMATCH: Rep = Rep::new(i32::MIN, "Genesis mismatch");
107
108	/// Reputation change for peers which send us a block with an incomplete header.
109	pub const INCOMPLETE_HEADER: Rep = Rep::new(-(1 << 20), "Incomplete header");
110
111	/// Reputation change for peers which send us a block which we fail to verify.
112	pub const VERIFICATION_FAIL: Rep = Rep::new(-(1 << 29), "Block verification failed");
113
114	/// Reputation change for peers which send us a known bad block.
115	pub const BAD_BLOCK: Rep = Rep::new(-(1 << 29), "Bad block");
116
117	/// Peer did not provide us with advertised block data.
118	pub const NO_BLOCK: Rep = Rep::new(-(1 << 29), "No requested block data");
119
120	/// Reputation change for peers which send us non-requested block data.
121	pub const NOT_REQUESTED: Rep = Rep::new(-(1 << 29), "Not requested block data");
122
123	/// Reputation change for peers which send us a block with bad justifications.
124	pub const BAD_JUSTIFICATION: Rep = Rep::new(-(1 << 16), "Bad justification");
125
126	/// Reputation change when a peer sent us invalid ancestry result.
127	pub const UNKNOWN_ANCESTOR: Rep = Rep::new(-(1 << 16), "DB Error");
128
129	/// Peer response data does not have requested bits.
130	pub const BAD_RESPONSE: Rep = Rep::new(-(1 << 12), "Incomplete response");
131
132	/// We received a message that failed to decode.
133	pub const BAD_MESSAGE: Rep = Rep::new(-(1 << 12), "Bad message");
134}
135
136struct Metrics {
137	queued_blocks: Gauge<U64>,
138	fork_targets: Gauge<U64>,
139}
140
141impl Metrics {
142	fn register(r: &Registry) -> Result<Self, PrometheusError> {
143		Ok(Self {
144			queued_blocks: {
145				let g =
146					Gauge::new("substrate_sync_queued_blocks", "Number of blocks in import queue")?;
147				register(g, r)?
148			},
149			fork_targets: {
150				let g = Gauge::new("substrate_sync_fork_targets", "Number of fork sync targets")?;
151				register(g, r)?
152			},
153		})
154	}
155}
156
157#[derive(Debug, Clone)]
158enum AllowedRequests {
159	Some(HashSet<PeerId>),
160	All,
161}
162
163impl AllowedRequests {
164	fn add(&mut self, id: &PeerId) {
165		if let Self::Some(ref mut set) = self {
166			set.insert(*id);
167		}
168	}
169
170	fn take(&mut self) -> Self {
171		std::mem::take(self)
172	}
173
174	fn set_all(&mut self) {
175		*self = Self::All;
176	}
177
178	fn contains(&self, id: &PeerId) -> bool {
179		match self {
180			Self::Some(set) => set.contains(id),
181			Self::All => true,
182		}
183	}
184
185	fn is_empty(&self) -> bool {
186		match self {
187			Self::Some(set) => set.is_empty(),
188			Self::All => false,
189		}
190	}
191
192	fn clear(&mut self) {
193		std::mem::take(self);
194	}
195}
196
197impl Default for AllowedRequests {
198	fn default() -> Self {
199		Self::Some(HashSet::default())
200	}
201}
202
203struct GapSync<B: BlockT> {
204	blocks: BlockCollection<B>,
205	best_queued_number: NumberFor<B>,
206	target: NumberFor<B>,
207}
208
209/// Sync operation mode.
210#[derive(Copy, Clone, Debug, Eq, PartialEq)]
211pub enum ChainSyncMode {
212	/// Full block download and verification.
213	Full,
214	/// Download blocks and the latest state.
215	LightState {
216		/// Skip state proof download and verification.
217		skip_proofs: bool,
218		/// Download indexed transactions for recent blocks.
219		storage_chain_mode: bool,
220	},
221}
222
223/// All the data we have about a Peer that we are trying to sync with
224#[derive(Debug, Clone)]
225pub(crate) struct PeerSync<B: BlockT> {
226	/// Peer id of this peer.
227	pub peer_id: PeerId,
228	/// The common number is the block number that is a common point of
229	/// ancestry for both our chains (as far as we know).
230	pub common_number: NumberFor<B>,
231	/// The hash of the best block that we've seen for this peer.
232	pub best_hash: B::Hash,
233	/// The number of the best block that we've seen for this peer.
234	pub best_number: NumberFor<B>,
235	/// The state of syncing this peer is in for us, generally categories
236	/// into `Available` or "busy" with something as defined by `PeerSyncState`.
237	pub state: PeerSyncState<B>,
238}
239
240impl<B: BlockT> PeerSync<B> {
241	/// Update the `common_number` iff `new_common > common_number`.
242	fn update_common_number(&mut self, new_common: NumberFor<B>) {
243		if self.common_number < new_common {
244			trace!(
245				target: LOG_TARGET,
246				"Updating peer {} common number from={} => to={}.",
247				self.peer_id,
248				self.common_number,
249				new_common,
250			);
251			self.common_number = new_common;
252		}
253	}
254}
255
256struct ForkTarget<B: BlockT> {
257	number: NumberFor<B>,
258	parent_hash: Option<B::Hash>,
259	peers: HashSet<PeerId>,
260}
261
262/// The state of syncing between a Peer and ourselves.
263///
264/// Generally two categories, "busy" or `Available`. If busy, the enum
265/// defines what we are busy with.
266#[derive(Copy, Clone, Eq, PartialEq, Debug)]
267pub(crate) enum PeerSyncState<B: BlockT> {
268	/// Available for sync requests.
269	Available,
270	/// Searching for ancestors the Peer has in common with us.
271	AncestorSearch { start: NumberFor<B>, current: NumberFor<B>, state: AncestorSearchState<B> },
272	/// Actively downloading new blocks, starting from the given Number.
273	DownloadingNew(NumberFor<B>),
274	/// Downloading a stale block with given Hash. Stale means that it is a
275	/// block with a number that is lower than our best number. It might be
276	/// from a fork and not necessarily already imported.
277	DownloadingStale(B::Hash),
278	/// Downloading justification for given block hash.
279	DownloadingJustification(B::Hash),
280	/// Downloading state.
281	DownloadingState,
282	/// Actively downloading block history after warp sync.
283	DownloadingGap(NumberFor<B>),
284}
285
286impl<B: BlockT> PeerSyncState<B> {
287	pub fn is_available(&self) -> bool {
288		matches!(self, Self::Available)
289	}
290}
291
292/// The main data structure which contains all the state for a chains
293/// active syncing strategy.
294pub struct ChainSync<B: BlockT, Client> {
295	/// Chain client.
296	client: Arc<Client>,
297	/// The active peers that we are using to sync and their PeerSync status
298	peers: HashMap<PeerId, PeerSync<B>>,
299	disconnected_peers: DisconnectedPeers,
300	/// A `BlockCollection` of blocks that are being downloaded from peers
301	blocks: BlockCollection<B>,
302	/// The best block number in our queue of blocks to import
303	best_queued_number: NumberFor<B>,
304	/// The best block hash in our queue of blocks to import
305	best_queued_hash: B::Hash,
306	/// Current mode (full/light)
307	mode: ChainSyncMode,
308	/// Any extra justification requests.
309	extra_justifications: ExtraRequests<B>,
310	/// A set of hashes of blocks that are being downloaded or have been
311	/// downloaded and are queued for import.
312	queue_blocks: HashSet<B::Hash>,
313	/// A pending attempt to start the state sync.
314	///
315	/// The initiation of state sync may be deferred in cases where other conditions
316	/// are not yet met when the finalized block notification is received, such as
317	/// when `queue_blocks` is not empty or there are no peers. This field holds the
318	/// necessary information to attempt the state sync at a later point when
319	/// conditions are satisfied.
320	pending_state_sync_attempt: Option<(B::Hash, NumberFor<B>, bool)>,
321	/// Fork sync targets.
322	fork_targets: HashMap<B::Hash, ForkTarget<B>>,
323	/// A set of peers for which there might be potential block requests
324	allowed_requests: AllowedRequests,
325	/// Maximum number of peers to ask the same blocks in parallel.
326	max_parallel_downloads: u32,
327	/// Maximum blocks per request.
328	max_blocks_per_request: u32,
329	/// Protocol name used to send out state requests
330	state_request_protocol_name: ProtocolName,
331	/// Total number of downloaded blocks.
332	downloaded_blocks: usize,
333	/// State sync in progress, if any.
334	state_sync: Option<StateSync<B, Client>>,
335	/// Enable importing existing blocks. This is used after the state download to
336	/// catch up to the latest state while re-importing blocks.
337	import_existing: bool,
338	/// Block downloader
339	block_downloader: Arc<dyn BlockDownloader<B>>,
340	/// Gap download process.
341	gap_sync: Option<GapSync<B>>,
342	/// Pending actions.
343	actions: Vec<SyncingAction<B>>,
344	/// Prometheus metrics.
345	metrics: Option<Metrics>,
346}
347
348impl<B, Client> SyncingStrategy<B> for ChainSync<B, Client>
349where
350	B: BlockT,
351	Client: HeaderBackend<B>
352		+ BlockBackend<B>
353		+ HeaderMetadata<B, Error = sp_blockchain::Error>
354		+ ProofProvider<B>
355		+ Send
356		+ Sync
357		+ 'static,
358{
359	fn add_peer(&mut self, peer_id: PeerId, best_hash: B::Hash, best_number: NumberFor<B>) {
360		match self.add_peer_inner(peer_id, best_hash, best_number) {
361			Ok(Some(request)) => {
362				let action = self.create_block_request_action(peer_id, request);
363				self.actions.push(action);
364			},
365			Ok(None) => {},
366			Err(bad_peer) => self.actions.push(SyncingAction::DropPeer(bad_peer)),
367		}
368	}
369
370	fn remove_peer(&mut self, peer_id: &PeerId) {
371		self.blocks.clear_peer_download(peer_id);
372		if let Some(gap_sync) = &mut self.gap_sync {
373			gap_sync.blocks.clear_peer_download(peer_id)
374		}
375
376		if let Some(state) = self.peers.remove(peer_id) {
377			if !state.state.is_available() {
378				if let Some(bad_peer) =
379					self.disconnected_peers.on_disconnect_during_request(*peer_id)
380				{
381					self.actions.push(SyncingAction::DropPeer(bad_peer));
382				}
383			}
384		}
385
386		self.extra_justifications.peer_disconnected(peer_id);
387		self.allowed_requests.set_all();
388		self.fork_targets.retain(|_, target| {
389			target.peers.remove(peer_id);
390			!target.peers.is_empty()
391		});
392		if let Some(metrics) = &self.metrics {
393			metrics.fork_targets.set(self.fork_targets.len().try_into().unwrap_or(u64::MAX));
394		}
395
396		let blocks = self.ready_blocks();
397
398		if !blocks.is_empty() {
399			self.validate_and_queue_blocks(blocks, false);
400		}
401	}
402
403	fn on_validated_block_announce(
404		&mut self,
405		is_best: bool,
406		peer_id: PeerId,
407		announce: &BlockAnnounce<B::Header>,
408	) -> Option<(B::Hash, NumberFor<B>)> {
409		let number = *announce.header.number();
410		let hash = announce.header.hash();
411		let parent_status =
412			self.block_status(announce.header.parent_hash()).unwrap_or(BlockStatus::Unknown);
413		let known_parent = parent_status != BlockStatus::Unknown;
414		let ancient_parent = parent_status == BlockStatus::InChainPruned;
415
416		let known = self.is_known(&hash);
417		let peer = if let Some(peer) = self.peers.get_mut(&peer_id) {
418			peer
419		} else {
420			error!(target: LOG_TARGET, "๐Ÿ’” Called `on_validated_block_announce` with a bad peer ID {peer_id}");
421			return Some((hash, number))
422		};
423
424		if let PeerSyncState::AncestorSearch { .. } = peer.state {
425			trace!(target: LOG_TARGET, "Peer {} is in the ancestor search state.", peer_id);
426			return None
427		}
428
429		let peer_info = is_best.then(|| {
430			// update their best block
431			peer.best_number = number;
432			peer.best_hash = hash;
433
434			(hash, number)
435		});
436
437		// If the announced block is the best they have and is not ahead of us, our common number
438		// is either one further ahead or it's the one they just announced, if we know about it.
439		if is_best {
440			if known && self.best_queued_number >= number {
441				self.update_peer_common_number(&peer_id, number);
442			} else if announce.header.parent_hash() == &self.best_queued_hash ||
443				known_parent && self.best_queued_number >= number
444			{
445				self.update_peer_common_number(&peer_id, number.saturating_sub(One::one()));
446			}
447		}
448		self.allowed_requests.add(&peer_id);
449
450		// known block case
451		if known || self.is_already_downloading(&hash) {
452			trace!(target: LOG_TARGET, "Known block announce from {}: {}", peer_id, hash);
453			if let Some(target) = self.fork_targets.get_mut(&hash) {
454				target.peers.insert(peer_id);
455			}
456			return peer_info
457		}
458
459		if ancient_parent {
460			trace!(
461				target: LOG_TARGET,
462				"Ignored ancient block announced from {}: {} {:?}",
463				peer_id,
464				hash,
465				announce.header,
466			);
467			return peer_info
468		}
469
470		if self.status().state == SyncState::Idle {
471			trace!(
472				target: LOG_TARGET,
473				"Added sync target for block announced from {}: {} {:?}",
474				peer_id,
475				hash,
476				announce.summary(),
477			);
478			self.fork_targets
479				.entry(hash)
480				.or_insert_with(|| {
481					if let Some(metrics) = &self.metrics {
482						metrics.fork_targets.inc();
483					}
484
485					ForkTarget {
486						number,
487						parent_hash: Some(*announce.header.parent_hash()),
488						peers: Default::default(),
489					}
490				})
491				.peers
492				.insert(peer_id);
493		}
494
495		peer_info
496	}
497
498	// The implementation is similar to `on_validated_block_announce` with unknown parent hash.
499	fn set_sync_fork_request(
500		&mut self,
501		mut peers: Vec<PeerId>,
502		hash: &B::Hash,
503		number: NumberFor<B>,
504	) {
505		if peers.is_empty() {
506			peers = self
507				.peers
508				.iter()
509				// Only request blocks from peers who are ahead or on a par.
510				.filter(|(_, peer)| peer.best_number >= number)
511				.map(|(id, _)| *id)
512				.collect();
513
514			debug!(
515				target: LOG_TARGET,
516				"Explicit sync request for block {hash:?} with no peers specified. \
517				Syncing from these peers {peers:?} instead.",
518			);
519		} else {
520			debug!(
521				target: LOG_TARGET,
522				"Explicit sync request for block {hash:?} with {peers:?}",
523			);
524		}
525
526		if self.is_known(hash) {
527			debug!(target: LOG_TARGET, "Refusing to sync known hash {hash:?}");
528			return
529		}
530
531		trace!(target: LOG_TARGET, "Downloading requested old fork {hash:?}");
532		for peer_id in &peers {
533			if let Some(peer) = self.peers.get_mut(peer_id) {
534				if let PeerSyncState::AncestorSearch { .. } = peer.state {
535					continue
536				}
537
538				if number > peer.best_number {
539					peer.best_number = number;
540					peer.best_hash = *hash;
541				}
542				self.allowed_requests.add(peer_id);
543			}
544		}
545
546		self.fork_targets
547			.entry(*hash)
548			.or_insert_with(|| {
549				if let Some(metrics) = &self.metrics {
550					metrics.fork_targets.inc();
551				}
552
553				ForkTarget { number, peers: Default::default(), parent_hash: None }
554			})
555			.peers
556			.extend(peers);
557	}
558
559	fn request_justification(&mut self, hash: &B::Hash, number: NumberFor<B>) {
560		let client = &self.client;
561		self.extra_justifications
562			.schedule((*hash, number), |base, block| is_descendent_of(&**client, base, block))
563	}
564
565	fn clear_justification_requests(&mut self) {
566		self.extra_justifications.reset();
567	}
568
569	fn on_justification_import(&mut self, hash: B::Hash, number: NumberFor<B>, success: bool) {
570		let finalization_result = if success { Ok((hash, number)) } else { Err(()) };
571		self.extra_justifications
572			.try_finalize_root((hash, number), finalization_result, true);
573		self.allowed_requests.set_all();
574	}
575
576	fn on_generic_response(
577		&mut self,
578		peer_id: &PeerId,
579		key: StrategyKey,
580		protocol_name: ProtocolName,
581		response: Box<dyn Any + Send>,
582	) {
583		if Self::STRATEGY_KEY != key {
584			warn!(
585				target: LOG_TARGET,
586				"Unexpected generic response strategy key {key:?}, protocol {protocol_name}",
587			);
588			debug_assert!(false);
589			return;
590		}
591
592		if protocol_name == self.state_request_protocol_name {
593			let Ok(response) = response.downcast::<Vec<u8>>() else {
594				warn!(target: LOG_TARGET, "Failed to downcast state response");
595				debug_assert!(false);
596				return;
597			};
598
599			if let Err(bad_peer) = self.on_state_data(&peer_id, &response) {
600				self.actions.push(SyncingAction::DropPeer(bad_peer));
601			}
602		} else if &protocol_name == self.block_downloader.protocol_name() {
603			let Ok(response) = response
604				.downcast::<(BlockRequest<B>, Result<Vec<BlockData<B>>, BlockResponseError>)>()
605			else {
606				warn!(target: LOG_TARGET, "Failed to downcast block response");
607				debug_assert!(false);
608				return;
609			};
610
611			let (request, response) = *response;
612			let blocks = match response {
613				Ok(blocks) => blocks,
614				Err(BlockResponseError::DecodeFailed(e)) => {
615					debug!(
616						target: LOG_TARGET,
617						"Failed to decode block response from peer {:?}: {:?}.",
618						peer_id,
619						e
620					);
621					self.actions.push(SyncingAction::DropPeer(BadPeer(*peer_id, rep::BAD_MESSAGE)));
622					return;
623				},
624				Err(BlockResponseError::ExtractionFailed(e)) => {
625					debug!(
626						target: LOG_TARGET,
627						"Failed to extract blocks from peer response {:?}: {:?}.",
628						peer_id,
629						e
630					);
631					self.actions.push(SyncingAction::DropPeer(BadPeer(*peer_id, rep::BAD_MESSAGE)));
632					return;
633				},
634			};
635
636			if let Err(bad_peer) = self.on_block_response(peer_id, key, request, blocks) {
637				self.actions.push(SyncingAction::DropPeer(bad_peer));
638			}
639		} else {
640			warn!(
641				target: LOG_TARGET,
642				"Unexpected generic response protocol {protocol_name}, strategy key \
643				{key:?}",
644			);
645			debug_assert!(false);
646		}
647	}
648
649	fn on_blocks_processed(
650		&mut self,
651		imported: usize,
652		count: usize,
653		results: Vec<(Result<BlockImportStatus<NumberFor<B>>, BlockImportError>, B::Hash)>,
654	) {
655		trace!(target: LOG_TARGET, "Imported {imported} of {count}");
656
657		let mut has_error = false;
658		for (_, hash) in &results {
659			if self.queue_blocks.remove(hash) {
660				if let Some(metrics) = &self.metrics {
661					metrics.queued_blocks.dec();
662				}
663			}
664			self.blocks.clear_queued(hash);
665			if let Some(gap_sync) = &mut self.gap_sync {
666				gap_sync.blocks.clear_queued(hash);
667			}
668		}
669		for (result, hash) in results {
670			if has_error {
671				break
672			}
673
674			has_error |= result.is_err();
675
676			match result {
677				Ok(BlockImportStatus::ImportedKnown(number, peer_id)) => {
678					if let Some(peer) = peer_id {
679						self.update_peer_common_number(&peer, number);
680					}
681					self.complete_gap_if_target(number);
682				},
683				Ok(BlockImportStatus::ImportedUnknown(number, aux, peer_id)) => {
684					if aux.clear_justification_requests {
685						trace!(
686							target: LOG_TARGET,
687							"Block imported clears all pending justification requests {number}: {hash:?}",
688						);
689						self.clear_justification_requests();
690					}
691
692					if aux.needs_justification {
693						trace!(
694							target: LOG_TARGET,
695							"Block imported but requires justification {number}: {hash:?}",
696						);
697						self.request_justification(&hash, number);
698					}
699
700					if aux.bad_justification {
701						if let Some(ref peer) = peer_id {
702							warn!("๐Ÿ’” Sent block with bad justification to import");
703							self.actions.push(SyncingAction::DropPeer(BadPeer(
704								*peer,
705								rep::BAD_JUSTIFICATION,
706							)));
707						}
708					}
709
710					if let Some(peer) = peer_id {
711						self.update_peer_common_number(&peer, number);
712					}
713					let state_sync_complete =
714						self.state_sync.as_ref().map_or(false, |s| s.target_hash() == hash);
715					if state_sync_complete {
716						info!(
717							target: LOG_TARGET,
718							"State sync is complete ({} MiB), restarting block sync.",
719							self.state_sync.as_ref().map_or(0, |s| s.progress().size / (1024 * 1024)),
720						);
721						self.state_sync = None;
722						self.mode = ChainSyncMode::Full;
723						self.restart();
724					}
725
726					self.complete_gap_if_target(number);
727				},
728				Err(BlockImportError::IncompleteHeader(peer_id)) =>
729					if let Some(peer) = peer_id {
730						warn!(
731							target: LOG_TARGET,
732							"๐Ÿ’” Peer sent block with incomplete header to import",
733						);
734						self.actions
735							.push(SyncingAction::DropPeer(BadPeer(peer, rep::INCOMPLETE_HEADER)));
736						self.restart();
737					},
738				Err(BlockImportError::VerificationFailed(peer_id, e)) => {
739					let extra_message = peer_id
740						.map_or_else(|| "".into(), |peer| format!(" received from ({peer})"));
741
742					warn!(
743						target: LOG_TARGET,
744						"๐Ÿ’” Verification failed for block {hash:?}{extra_message}: {e:?}",
745					);
746
747					if let Some(peer) = peer_id {
748						self.actions
749							.push(SyncingAction::DropPeer(BadPeer(peer, rep::VERIFICATION_FAIL)));
750					}
751
752					self.restart();
753				},
754				Err(BlockImportError::BadBlock(peer_id)) =>
755					if let Some(peer) = peer_id {
756						warn!(
757							target: LOG_TARGET,
758							"๐Ÿ’” Block {hash:?} received from peer {peer} has been blacklisted",
759						);
760						self.actions.push(SyncingAction::DropPeer(BadPeer(peer, rep::BAD_BLOCK)));
761					},
762				Err(BlockImportError::MissingState) => {
763					// This may happen if the chain we were requesting upon has been discarded
764					// in the meantime because other chain has been finalized.
765					// Don't mark it as bad as it still may be synced if explicitly requested.
766					trace!(target: LOG_TARGET, "Obsolete block {hash:?}");
767				},
768				e @ Err(BlockImportError::UnknownParent) | e @ Err(BlockImportError::Other(_)) => {
769					warn!(target: LOG_TARGET, "๐Ÿ’” Error importing block {hash:?}: {}", e.unwrap_err());
770					self.state_sync = None;
771					self.restart();
772				},
773				Err(BlockImportError::Cancelled) => {},
774			};
775		}
776
777		self.allowed_requests.set_all();
778	}
779
780	fn on_block_finalized(&mut self, hash: &B::Hash, number: NumberFor<B>) {
781		let client = &self.client;
782		let r = self.extra_justifications.on_block_finalized(hash, number, |base, block| {
783			is_descendent_of(&**client, base, block)
784		});
785
786		if let ChainSyncMode::LightState { skip_proofs, .. } = &self.mode {
787			if self.state_sync.is_none() {
788				if !self.peers.is_empty() && self.queue_blocks.is_empty() {
789					self.attempt_state_sync(*hash, number, *skip_proofs);
790				} else {
791					self.pending_state_sync_attempt.replace((*hash, number, *skip_proofs));
792				}
793			}
794		}
795
796		if let Err(err) = r {
797			warn!(
798				target: LOG_TARGET,
799				"๐Ÿ’” Error cleaning up pending extra justification data requests: {err}",
800			);
801		}
802	}
803
804	fn update_chain_info(&mut self, best_hash: &B::Hash, best_number: NumberFor<B>) {
805		self.on_block_queued(best_hash, best_number);
806	}
807
808	fn is_major_syncing(&self) -> bool {
809		self.status().state.is_major_syncing()
810	}
811
812	fn num_peers(&self) -> usize {
813		self.peers.len()
814	}
815
816	fn status(&self) -> SyncStatus<B> {
817		let median_seen = self.median_seen();
818		let best_seen_block =
819			median_seen.and_then(|median| (median > self.best_queued_number).then_some(median));
820		let sync_state = if let Some(target) = median_seen {
821			// A chain is classified as downloading if the provided best block is
822			// more than `MAJOR_SYNC_BLOCKS` behind the best block or as importing
823			// if the same can be said about queued blocks.
824			let best_block = self.client.info().best_number;
825			if target > best_block && target - best_block > MAJOR_SYNC_BLOCKS.into() {
826				// If target is not queued, we're downloading, otherwise importing.
827				if target > self.best_queued_number {
828					SyncState::Downloading { target }
829				} else {
830					SyncState::Importing { target }
831				}
832			} else {
833				SyncState::Idle
834			}
835		} else {
836			SyncState::Idle
837		};
838
839		let warp_sync_progress = self.gap_sync.as_ref().map(|gap_sync| WarpSyncProgress {
840			phase: WarpSyncPhase::DownloadingBlocks(gap_sync.best_queued_number),
841			total_bytes: 0,
842		});
843
844		SyncStatus {
845			state: sync_state,
846			best_seen_block,
847			num_peers: self.peers.len() as u32,
848			queued_blocks: self.queue_blocks.len() as u32,
849			state_sync: self.state_sync.as_ref().map(|s| s.progress()),
850			warp_sync: warp_sync_progress,
851		}
852	}
853
854	fn num_downloaded_blocks(&self) -> usize {
855		self.downloaded_blocks
856	}
857
858	fn num_sync_requests(&self) -> usize {
859		self.fork_targets
860			.values()
861			.filter(|f| f.number <= self.best_queued_number)
862			.count()
863	}
864
865	fn actions(
866		&mut self,
867		network_service: &NetworkServiceHandle,
868	) -> Result<Vec<SyncingAction<B>>, ClientError> {
869		if !self.peers.is_empty() && self.queue_blocks.is_empty() {
870			if let Some((hash, number, skip_proofs)) = self.pending_state_sync_attempt.take() {
871				self.attempt_state_sync(hash, number, skip_proofs);
872			}
873		}
874
875		let block_requests = self
876			.block_requests()
877			.into_iter()
878			.map(|(peer_id, request)| self.create_block_request_action(peer_id, request))
879			.collect::<Vec<_>>();
880		self.actions.extend(block_requests);
881
882		let justification_requests = self
883			.justification_requests()
884			.into_iter()
885			.map(|(peer_id, request)| self.create_block_request_action(peer_id, request))
886			.collect::<Vec<_>>();
887		self.actions.extend(justification_requests);
888
889		let state_request = self.state_request().into_iter().map(|(peer_id, request)| {
890			trace!(
891				target: LOG_TARGET,
892				"Created `StrategyRequest` to {peer_id}.",
893			);
894
895			let (tx, rx) = oneshot::channel();
896
897			network_service.start_request(
898				peer_id,
899				self.state_request_protocol_name.clone(),
900				request.encode_to_vec(),
901				tx,
902				IfDisconnected::ImmediateError,
903			);
904
905			SyncingAction::StartRequest {
906				peer_id,
907				key: Self::STRATEGY_KEY,
908				request: async move {
909					Ok(rx.await?.and_then(|(response, protocol_name)| {
910						Ok((Box::new(response) as Box<dyn Any + Send>, protocol_name))
911					}))
912				}
913				.boxed(),
914				remove_obsolete: false,
915			}
916		});
917		self.actions.extend(state_request);
918
919		Ok(std::mem::take(&mut self.actions))
920	}
921}
922
923impl<B, Client> ChainSync<B, Client>
924where
925	B: BlockT,
926	Client: HeaderBackend<B>
927		+ BlockBackend<B>
928		+ HeaderMetadata<B, Error = sp_blockchain::Error>
929		+ ProofProvider<B>
930		+ Send
931		+ Sync
932		+ 'static,
933{
934	/// Strategy key used by chain sync.
935	pub const STRATEGY_KEY: StrategyKey = StrategyKey::new("ChainSync");
936
937	/// Create a new instance.
938	pub fn new(
939		mode: ChainSyncMode,
940		client: Arc<Client>,
941		max_parallel_downloads: u32,
942		max_blocks_per_request: u32,
943		state_request_protocol_name: ProtocolName,
944		block_downloader: Arc<dyn BlockDownloader<B>>,
945		metrics_registry: Option<&Registry>,
946		initial_peers: impl Iterator<Item = (PeerId, B::Hash, NumberFor<B>)>,
947	) -> Result<Self, ClientError> {
948		let mut sync = Self {
949			client,
950			peers: HashMap::new(),
951			disconnected_peers: DisconnectedPeers::new(),
952			blocks: BlockCollection::new(),
953			best_queued_hash: Default::default(),
954			best_queued_number: Zero::zero(),
955			extra_justifications: ExtraRequests::new("justification", metrics_registry),
956			mode,
957			queue_blocks: Default::default(),
958			pending_state_sync_attempt: None,
959			fork_targets: Default::default(),
960			allowed_requests: Default::default(),
961			max_parallel_downloads,
962			max_blocks_per_request,
963			state_request_protocol_name,
964			downloaded_blocks: 0,
965			state_sync: None,
966			import_existing: false,
967			block_downloader,
968			gap_sync: None,
969			actions: Vec::new(),
970			metrics: metrics_registry.and_then(|r| match Metrics::register(r) {
971				Ok(metrics) => Some(metrics),
972				Err(err) => {
973					log::error!(
974						target: LOG_TARGET,
975						"Failed to register `ChainSync` metrics {err:?}",
976					);
977					None
978				},
979			}),
980		};
981
982		sync.reset_sync_start_point()?;
983		initial_peers.for_each(|(peer_id, best_hash, best_number)| {
984			sync.add_peer(peer_id, best_hash, best_number);
985		});
986
987		Ok(sync)
988	}
989
990	/// Complete the gap sync if the target number is reached and there is a gap.
991	fn complete_gap_if_target(&mut self, number: NumberFor<B>) {
992		let gap_sync_complete = self.gap_sync.as_ref().map_or(false, |s| s.target == number);
993		if gap_sync_complete {
994			info!(
995				target: LOG_TARGET,
996				"Block history download is complete."
997			);
998			self.gap_sync = None;
999		}
1000	}
1001
1002	#[must_use]
1003	fn add_peer_inner(
1004		&mut self,
1005		peer_id: PeerId,
1006		best_hash: B::Hash,
1007		best_number: NumberFor<B>,
1008	) -> Result<Option<BlockRequest<B>>, BadPeer> {
1009		// There is nothing sync can get from the node that has no blockchain data.
1010		match self.block_status(&best_hash) {
1011			Err(e) => {
1012				debug!(target: LOG_TARGET, "Error reading blockchain: {e}");
1013				Err(BadPeer(peer_id, rep::BLOCKCHAIN_READ_ERROR))
1014			},
1015			Ok(BlockStatus::KnownBad) => {
1016				info!(
1017					"๐Ÿ’” New peer {peer_id} with known bad best block {best_hash} ({best_number})."
1018				);
1019				Err(BadPeer(peer_id, rep::BAD_BLOCK))
1020			},
1021			Ok(BlockStatus::Unknown) => {
1022				if best_number.is_zero() {
1023					info!(
1024						"๐Ÿ’” New peer {} with unknown genesis hash {} ({}).",
1025						peer_id, best_hash, best_number,
1026					);
1027					return Err(BadPeer(peer_id, rep::GENESIS_MISMATCH));
1028				}
1029
1030				// If there are more than `MAJOR_SYNC_BLOCKS` in the import queue then we have
1031				// enough to do in the import queue that it's not worth kicking off
1032				// an ancestor search, which is what we do in the next match case below.
1033				if self.queue_blocks.len() > MAJOR_SYNC_BLOCKS as usize {
1034					debug!(
1035						target: LOG_TARGET,
1036						"New peer {} with unknown best hash {} ({}), assuming common block.",
1037						peer_id,
1038						self.best_queued_hash,
1039						self.best_queued_number
1040					);
1041					self.peers.insert(
1042						peer_id,
1043						PeerSync {
1044							peer_id,
1045							common_number: self.best_queued_number,
1046							best_hash,
1047							best_number,
1048							state: PeerSyncState::Available,
1049						},
1050					);
1051					return Ok(None);
1052				}
1053
1054				// If we are at genesis, just start downloading.
1055				let (state, req) = if self.best_queued_number.is_zero() {
1056					debug!(
1057						target: LOG_TARGET,
1058						"New peer {peer_id} with best hash {best_hash} ({best_number}).",
1059					);
1060
1061					(PeerSyncState::Available, None)
1062				} else {
1063					let common_best = std::cmp::min(self.best_queued_number, best_number);
1064
1065					debug!(
1066						target: LOG_TARGET,
1067						"New peer {} with unknown best hash {} ({}), searching for common ancestor.",
1068						peer_id,
1069						best_hash,
1070						best_number
1071					);
1072
1073					(
1074						PeerSyncState::AncestorSearch {
1075							current: common_best,
1076							start: self.best_queued_number,
1077							state: AncestorSearchState::ExponentialBackoff(One::one()),
1078						},
1079						Some(ancestry_request::<B>(common_best)),
1080					)
1081				};
1082
1083				self.allowed_requests.add(&peer_id);
1084				self.peers.insert(
1085					peer_id,
1086					PeerSync {
1087						peer_id,
1088						common_number: Zero::zero(),
1089						best_hash,
1090						best_number,
1091						state,
1092					},
1093				);
1094
1095				Ok(req)
1096			},
1097			Ok(BlockStatus::Queued) |
1098			Ok(BlockStatus::InChainWithState) |
1099			Ok(BlockStatus::InChainPruned) => {
1100				debug!(
1101					target: LOG_TARGET,
1102					"New peer {peer_id} with known best hash {best_hash} ({best_number}).",
1103				);
1104				self.peers.insert(
1105					peer_id,
1106					PeerSync {
1107						peer_id,
1108						common_number: std::cmp::min(self.best_queued_number, best_number),
1109						best_hash,
1110						best_number,
1111						state: PeerSyncState::Available,
1112					},
1113				);
1114				self.allowed_requests.add(&peer_id);
1115				Ok(None)
1116			},
1117		}
1118	}
1119
1120	fn create_block_request_action(
1121		&mut self,
1122		peer_id: PeerId,
1123		request: BlockRequest<B>,
1124	) -> SyncingAction<B> {
1125		let downloader = self.block_downloader.clone();
1126
1127		SyncingAction::StartRequest {
1128			peer_id,
1129			key: Self::STRATEGY_KEY,
1130			request: async move {
1131				Ok(downloader.download_blocks(peer_id, request.clone()).await?.and_then(
1132					|(response, protocol_name)| {
1133						let decoded_response =
1134							downloader.block_response_into_blocks(&request, response);
1135						let result = Box::new((request, decoded_response)) as Box<dyn Any + Send>;
1136						Ok((result, protocol_name))
1137					},
1138				))
1139			}
1140			.boxed(),
1141			// Sending block request implies dropping obsolete pending response as we are not
1142			// interested in it anymore.
1143			remove_obsolete: true,
1144		}
1145	}
1146
1147	/// Submit a block response for processing.
1148	#[must_use]
1149	fn on_block_data(
1150		&mut self,
1151		peer_id: &PeerId,
1152		request: Option<BlockRequest<B>>,
1153		response: BlockResponse<B>,
1154	) -> Result<(), BadPeer> {
1155		self.downloaded_blocks += response.blocks.len();
1156		let mut gap = false;
1157		let new_blocks: Vec<IncomingBlock<B>> = if let Some(peer) = self.peers.get_mut(peer_id) {
1158			let mut blocks = response.blocks;
1159			if request.as_ref().map_or(false, |r| r.direction == Direction::Descending) {
1160				trace!(target: LOG_TARGET, "Reversing incoming block list");
1161				blocks.reverse()
1162			}
1163			self.allowed_requests.add(peer_id);
1164			if let Some(request) = request {
1165				match &mut peer.state {
1166					PeerSyncState::DownloadingNew(_) => {
1167						self.blocks.clear_peer_download(peer_id);
1168						peer.state = PeerSyncState::Available;
1169						if let Some(start_block) =
1170							validate_blocks::<B>(&blocks, peer_id, Some(request))?
1171						{
1172							self.blocks.insert(start_block, blocks, *peer_id);
1173						}
1174						self.ready_blocks()
1175					},
1176					PeerSyncState::DownloadingGap(_) => {
1177						peer.state = PeerSyncState::Available;
1178						if let Some(gap_sync) = &mut self.gap_sync {
1179							gap_sync.blocks.clear_peer_download(peer_id);
1180							if let Some(start_block) =
1181								validate_blocks::<B>(&blocks, peer_id, Some(request))?
1182							{
1183								gap_sync.blocks.insert(start_block, blocks, *peer_id);
1184							}
1185							gap = true;
1186							let blocks: Vec<_> = gap_sync
1187								.blocks
1188								.ready_blocks(gap_sync.best_queued_number + One::one())
1189								.into_iter()
1190								.map(|block_data| {
1191									let justifications =
1192										block_data.block.justifications.or_else(|| {
1193											legacy_justification_mapping(
1194												block_data.block.justification,
1195											)
1196										});
1197									IncomingBlock {
1198										hash: block_data.block.hash,
1199										header: block_data.block.header,
1200										body: block_data.block.body,
1201										indexed_body: block_data.block.indexed_body,
1202										justifications,
1203										origin: block_data.origin,
1204										allow_missing_state: true,
1205										import_existing: self.import_existing,
1206										skip_execution: true,
1207										state: None,
1208									}
1209								})
1210								.collect();
1211							debug!(
1212								target: LOG_TARGET,
1213								"Drained {} gap blocks from {}",
1214								blocks.len(),
1215								gap_sync.best_queued_number,
1216							);
1217							blocks
1218						} else {
1219							debug!(target: LOG_TARGET, "Unexpected gap block response from {peer_id}");
1220							return Err(BadPeer(*peer_id, rep::NO_BLOCK));
1221						}
1222					},
1223					PeerSyncState::DownloadingStale(_) => {
1224						peer.state = PeerSyncState::Available;
1225						if blocks.is_empty() {
1226							debug!(target: LOG_TARGET, "Empty block response from {peer_id}");
1227							return Err(BadPeer(*peer_id, rep::NO_BLOCK));
1228						}
1229						validate_blocks::<B>(&blocks, peer_id, Some(request))?;
1230						blocks
1231							.into_iter()
1232							.map(|b| {
1233								let justifications = b
1234									.justifications
1235									.or_else(|| legacy_justification_mapping(b.justification));
1236								IncomingBlock {
1237									hash: b.hash,
1238									header: b.header,
1239									body: b.body,
1240									indexed_body: None,
1241									justifications,
1242									origin: Some(*peer_id),
1243									allow_missing_state: true,
1244									import_existing: self.import_existing,
1245									skip_execution: self.skip_execution(),
1246									state: None,
1247								}
1248							})
1249							.collect()
1250					},
1251					PeerSyncState::AncestorSearch { current, start, state } => {
1252						let matching_hash = match (blocks.get(0), self.client.hash(*current)) {
1253							(Some(block), Ok(maybe_our_block_hash)) => {
1254								trace!(
1255									target: LOG_TARGET,
1256									"Got ancestry block #{} ({}) from peer {}",
1257									current,
1258									block.hash,
1259									peer_id,
1260								);
1261								maybe_our_block_hash.filter(|x| x == &block.hash)
1262							},
1263							(None, _) => {
1264								debug!(
1265									target: LOG_TARGET,
1266									"Invalid response when searching for ancestor from {peer_id}",
1267								);
1268								return Err(BadPeer(*peer_id, rep::UNKNOWN_ANCESTOR));
1269							},
1270							(_, Err(e)) => {
1271								info!(
1272									target: LOG_TARGET,
1273									"โŒ Error answering legitimate blockchain query: {e}",
1274								);
1275								return Err(BadPeer(*peer_id, rep::BLOCKCHAIN_READ_ERROR));
1276							},
1277						};
1278						if matching_hash.is_some() {
1279							if *start < self.best_queued_number &&
1280								self.best_queued_number <= peer.best_number
1281							{
1282								// We've made progress on this chain since the search was started.
1283								// Opportunistically set common number to updated number
1284								// instead of the one that started the search.
1285								trace!(
1286									target: LOG_TARGET,
1287									"Ancestry search: opportunistically updating peer {} common number from={} => to={}.",
1288									*peer_id,
1289									peer.common_number,
1290									self.best_queued_number,
1291								);
1292								peer.common_number = self.best_queued_number;
1293							} else if peer.common_number < *current {
1294								trace!(
1295									target: LOG_TARGET,
1296									"Ancestry search: updating peer {} common number from={} => to={}.",
1297									*peer_id,
1298									peer.common_number,
1299									*current,
1300								);
1301								peer.common_number = *current;
1302							}
1303						}
1304						if matching_hash.is_none() && current.is_zero() {
1305							trace!(
1306								target: LOG_TARGET,
1307								"Ancestry search: genesis mismatch for peer {peer_id}",
1308							);
1309							return Err(BadPeer(*peer_id, rep::GENESIS_MISMATCH));
1310						}
1311						if let Some((next_state, next_num)) =
1312							handle_ancestor_search_state(state, *current, matching_hash.is_some())
1313						{
1314							peer.state = PeerSyncState::AncestorSearch {
1315								current: next_num,
1316								start: *start,
1317								state: next_state,
1318							};
1319							let request = ancestry_request::<B>(next_num);
1320							let action = self.create_block_request_action(*peer_id, request);
1321							self.actions.push(action);
1322							return Ok(());
1323						} else {
1324							// Ancestry search is complete. Check if peer is on a stale fork unknown
1325							// to us and add it to sync targets if necessary.
1326							trace!(
1327								target: LOG_TARGET,
1328								"Ancestry search complete. Ours={} ({}), Theirs={} ({}), Common={:?} ({})",
1329								self.best_queued_hash,
1330								self.best_queued_number,
1331								peer.best_hash,
1332								peer.best_number,
1333								matching_hash,
1334								peer.common_number,
1335							);
1336							if peer.common_number < peer.best_number &&
1337								peer.best_number < self.best_queued_number
1338							{
1339								trace!(
1340									target: LOG_TARGET,
1341									"Added fork target {} for {}",
1342									peer.best_hash,
1343									peer_id,
1344								);
1345								self.fork_targets
1346									.entry(peer.best_hash)
1347									.or_insert_with(|| {
1348										if let Some(metrics) = &self.metrics {
1349											metrics.fork_targets.inc();
1350										}
1351
1352										ForkTarget {
1353											number: peer.best_number,
1354											parent_hash: None,
1355											peers: Default::default(),
1356										}
1357									})
1358									.peers
1359									.insert(*peer_id);
1360							}
1361							peer.state = PeerSyncState::Available;
1362							return Ok(());
1363						}
1364					},
1365					PeerSyncState::Available |
1366					PeerSyncState::DownloadingJustification(..) |
1367					PeerSyncState::DownloadingState => Vec::new(),
1368				}
1369			} else {
1370				// When request.is_none() this is a block announcement. Just accept blocks.
1371				validate_blocks::<B>(&blocks, peer_id, None)?;
1372				blocks
1373					.into_iter()
1374					.map(|b| {
1375						let justifications = b
1376							.justifications
1377							.or_else(|| legacy_justification_mapping(b.justification));
1378						IncomingBlock {
1379							hash: b.hash,
1380							header: b.header,
1381							body: b.body,
1382							indexed_body: None,
1383							justifications,
1384							origin: Some(*peer_id),
1385							allow_missing_state: true,
1386							import_existing: false,
1387							skip_execution: true,
1388							state: None,
1389						}
1390					})
1391					.collect()
1392			}
1393		} else {
1394			// We don't know of this peer, so we also did not request anything from it.
1395			return Err(BadPeer(*peer_id, rep::NOT_REQUESTED));
1396		};
1397
1398		self.validate_and_queue_blocks(new_blocks, gap);
1399
1400		Ok(())
1401	}
1402
1403	fn on_block_response(
1404		&mut self,
1405		peer_id: &PeerId,
1406		key: StrategyKey,
1407		request: BlockRequest<B>,
1408		blocks: Vec<BlockData<B>>,
1409	) -> Result<(), BadPeer> {
1410		if key != Self::STRATEGY_KEY {
1411			error!(
1412				target: LOG_TARGET,
1413				"`on_block_response()` called with unexpected key {key:?} for chain sync",
1414			);
1415			debug_assert!(false);
1416		}
1417		let block_response = BlockResponse::<B> { id: request.id, blocks };
1418
1419		let blocks_range = || match (
1420			block_response
1421				.blocks
1422				.first()
1423				.and_then(|b| b.header.as_ref().map(|h| h.number())),
1424			block_response.blocks.last().and_then(|b| b.header.as_ref().map(|h| h.number())),
1425		) {
1426			(Some(first), Some(last)) if first != last => format!(" ({}..{})", first, last),
1427			(Some(first), Some(_)) => format!(" ({})", first),
1428			_ => Default::default(),
1429		};
1430		trace!(
1431			target: LOG_TARGET,
1432			"BlockResponse {} from {} with {} blocks {}",
1433			block_response.id,
1434			peer_id,
1435			block_response.blocks.len(),
1436			blocks_range(),
1437		);
1438
1439		if request.fields == BlockAttributes::JUSTIFICATION {
1440			self.on_block_justification(*peer_id, block_response)
1441		} else {
1442			self.on_block_data(peer_id, Some(request), block_response)
1443		}
1444	}
1445
1446	/// Submit a justification response for processing.
1447	#[must_use]
1448	fn on_block_justification(
1449		&mut self,
1450		peer_id: PeerId,
1451		response: BlockResponse<B>,
1452	) -> Result<(), BadPeer> {
1453		let peer = if let Some(peer) = self.peers.get_mut(&peer_id) {
1454			peer
1455		} else {
1456			error!(
1457				target: LOG_TARGET,
1458				"๐Ÿ’” Called on_block_justification with a peer ID of an unknown peer",
1459			);
1460			return Ok(());
1461		};
1462
1463		self.allowed_requests.add(&peer_id);
1464		if let PeerSyncState::DownloadingJustification(hash) = peer.state {
1465			peer.state = PeerSyncState::Available;
1466
1467			// We only request one justification at a time
1468			let justification = if let Some(block) = response.blocks.into_iter().next() {
1469				if hash != block.hash {
1470					warn!(
1471						target: LOG_TARGET,
1472						"๐Ÿ’” Invalid block justification provided by {}: requested: {:?} got: {:?}",
1473						peer_id,
1474						hash,
1475						block.hash,
1476					);
1477					return Err(BadPeer(peer_id, rep::BAD_JUSTIFICATION));
1478				}
1479
1480				block
1481					.justifications
1482					.or_else(|| legacy_justification_mapping(block.justification))
1483			} else {
1484				// we might have asked the peer for a justification on a block that we assumed it
1485				// had but didn't (regardless of whether it had a justification for it or not).
1486				trace!(
1487					target: LOG_TARGET,
1488					"Peer {peer_id:?} provided empty response for justification request {hash:?}",
1489				);
1490
1491				None
1492			};
1493
1494			if let Some((peer_id, hash, number, justifications)) =
1495				self.extra_justifications.on_response(peer_id, justification)
1496			{
1497				self.actions.push(SyncingAction::ImportJustifications {
1498					peer_id,
1499					hash,
1500					number,
1501					justifications,
1502				});
1503				return Ok(());
1504			}
1505		}
1506
1507		Ok(())
1508	}
1509
1510	/// Returns the median seen block number.
1511	fn median_seen(&self) -> Option<NumberFor<B>> {
1512		let mut best_seens = self.peers.values().map(|p| p.best_number).collect::<Vec<_>>();
1513
1514		if best_seens.is_empty() {
1515			None
1516		} else {
1517			let middle = best_seens.len() / 2;
1518
1519			// Not the "perfect median" when we have an even number of peers.
1520			Some(*best_seens.select_nth_unstable(middle).1)
1521		}
1522	}
1523
1524	fn required_block_attributes(&self) -> BlockAttributes {
1525		match self.mode {
1526			ChainSyncMode::Full =>
1527				BlockAttributes::HEADER | BlockAttributes::JUSTIFICATION | BlockAttributes::BODY,
1528			ChainSyncMode::LightState { storage_chain_mode: false, .. } =>
1529				BlockAttributes::HEADER | BlockAttributes::JUSTIFICATION | BlockAttributes::BODY,
1530			ChainSyncMode::LightState { storage_chain_mode: true, .. } =>
1531				BlockAttributes::HEADER |
1532					BlockAttributes::JUSTIFICATION |
1533					BlockAttributes::INDEXED_BODY,
1534		}
1535	}
1536
1537	fn skip_execution(&self) -> bool {
1538		match self.mode {
1539			ChainSyncMode::Full => false,
1540			ChainSyncMode::LightState { .. } => true,
1541		}
1542	}
1543
1544	fn validate_and_queue_blocks(&mut self, mut new_blocks: Vec<IncomingBlock<B>>, gap: bool) {
1545		let orig_len = new_blocks.len();
1546		new_blocks.retain(|b| !self.queue_blocks.contains(&b.hash));
1547		if new_blocks.len() != orig_len {
1548			debug!(
1549				target: LOG_TARGET,
1550				"Ignoring {} blocks that are already queued",
1551				orig_len - new_blocks.len(),
1552			);
1553		}
1554
1555		let origin = if !gap && !self.status().state.is_major_syncing() {
1556			BlockOrigin::NetworkBroadcast
1557		} else {
1558			BlockOrigin::NetworkInitialSync
1559		};
1560
1561		if let Some((h, n)) = new_blocks
1562			.last()
1563			.and_then(|b| b.header.as_ref().map(|h| (&b.hash, *h.number())))
1564		{
1565			trace!(
1566				target: LOG_TARGET,
1567				"Accepted {} blocks ({:?}) with origin {:?}",
1568				new_blocks.len(),
1569				h,
1570				origin,
1571			);
1572			self.on_block_queued(h, n)
1573		}
1574		self.queue_blocks.extend(new_blocks.iter().map(|b| b.hash));
1575		if let Some(metrics) = &self.metrics {
1576			metrics
1577				.queued_blocks
1578				.set(self.queue_blocks.len().try_into().unwrap_or(u64::MAX));
1579		}
1580
1581		self.actions.push(SyncingAction::ImportBlocks { origin, blocks: new_blocks })
1582	}
1583
1584	fn update_peer_common_number(&mut self, peer_id: &PeerId, new_common: NumberFor<B>) {
1585		if let Some(peer) = self.peers.get_mut(peer_id) {
1586			peer.update_common_number(new_common);
1587		}
1588	}
1589
1590	/// Called when a block has been queued for import.
1591	///
1592	/// Updates our internal state for best queued block and then goes
1593	/// through all peers to update our view of their state as well.
1594	fn on_block_queued(&mut self, hash: &B::Hash, number: NumberFor<B>) {
1595		if self.fork_targets.remove(hash).is_some() {
1596			if let Some(metrics) = &self.metrics {
1597				metrics.fork_targets.dec();
1598			}
1599			trace!(target: LOG_TARGET, "Completed fork sync {hash:?}");
1600		}
1601		if let Some(gap_sync) = &mut self.gap_sync {
1602			if number > gap_sync.best_queued_number && number <= gap_sync.target {
1603				gap_sync.best_queued_number = number;
1604			}
1605		}
1606		if number > self.best_queued_number {
1607			self.best_queued_number = number;
1608			self.best_queued_hash = *hash;
1609			// Update common blocks
1610			for (n, peer) in self.peers.iter_mut() {
1611				if let PeerSyncState::AncestorSearch { .. } = peer.state {
1612					// Wait for ancestry search to complete first.
1613					continue;
1614				}
1615				let new_common_number =
1616					if peer.best_number >= number { number } else { peer.best_number };
1617				trace!(
1618					target: LOG_TARGET,
1619					"Updating peer {} info, ours={}, common={}->{}, their best={}",
1620					n,
1621					number,
1622					peer.common_number,
1623					new_common_number,
1624					peer.best_number,
1625				);
1626				peer.common_number = new_common_number;
1627			}
1628		}
1629		self.allowed_requests.set_all();
1630	}
1631
1632	/// Restart the sync process. This will reset all pending block requests and return an iterator
1633	/// of new block requests to make to peers. Peers that were downloading finality data (i.e.
1634	/// their state was `DownloadingJustification`) are unaffected and will stay in the same state.
1635	fn restart(&mut self) {
1636		self.blocks.clear();
1637		if let Err(e) = self.reset_sync_start_point() {
1638			warn!(target: LOG_TARGET, "๐Ÿ’”  Unable to restart sync: {e}");
1639		}
1640		self.allowed_requests.set_all();
1641		debug!(
1642			target: LOG_TARGET,
1643			"Restarted with {} ({})",
1644			self.best_queued_number,
1645			self.best_queued_hash,
1646		);
1647		let old_peers = std::mem::take(&mut self.peers);
1648
1649		old_peers.into_iter().for_each(|(peer_id, mut peer_sync)| {
1650			match peer_sync.state {
1651				PeerSyncState::Available => {
1652					self.add_peer(peer_id, peer_sync.best_hash, peer_sync.best_number);
1653				},
1654				PeerSyncState::AncestorSearch { .. } |
1655				PeerSyncState::DownloadingNew(_) |
1656				PeerSyncState::DownloadingStale(_) |
1657				PeerSyncState::DownloadingGap(_) |
1658				PeerSyncState::DownloadingState => {
1659					// Cancel a request first, as `add_peer` may generate a new request.
1660					self.actions
1661						.push(SyncingAction::CancelRequest { peer_id, key: Self::STRATEGY_KEY });
1662					self.add_peer(peer_id, peer_sync.best_hash, peer_sync.best_number);
1663				},
1664				PeerSyncState::DownloadingJustification(_) => {
1665					// Peers that were downloading justifications
1666					// should be kept in that state.
1667					// We make sure our common number is at least something we have.
1668					trace!(
1669						target: LOG_TARGET,
1670						"Keeping peer {} after restart, updating common number from={} => to={} (our best).",
1671						peer_id,
1672						peer_sync.common_number,
1673						self.best_queued_number,
1674					);
1675					peer_sync.common_number = self.best_queued_number;
1676					self.peers.insert(peer_id, peer_sync);
1677				},
1678			}
1679		});
1680	}
1681
1682	/// Find a block to start sync from. If we sync with state, that's the latest block we have
1683	/// state for.
1684	fn reset_sync_start_point(&mut self) -> Result<(), ClientError> {
1685		let info = self.client.info();
1686		debug!(target: LOG_TARGET, "Restarting sync with client info {info:?}");
1687
1688		if matches!(self.mode, ChainSyncMode::LightState { .. }) && info.finalized_state.is_some() {
1689			warn!(
1690				target: LOG_TARGET,
1691				"Can't use fast sync mode with a partially synced database. Reverting to full sync mode."
1692			);
1693			self.mode = ChainSyncMode::Full;
1694		}
1695
1696		self.import_existing = false;
1697		self.best_queued_hash = info.best_hash;
1698		self.best_queued_number = info.best_number;
1699
1700		if self.mode == ChainSyncMode::Full &&
1701			self.client.block_status(info.best_hash)? != BlockStatus::InChainWithState
1702		{
1703			self.import_existing = true;
1704			// Latest state is missing, start with the last finalized state or genesis instead.
1705			if let Some((hash, number)) = info.finalized_state {
1706				debug!(target: LOG_TARGET, "Starting from finalized state #{number}");
1707				self.best_queued_hash = hash;
1708				self.best_queued_number = number;
1709			} else {
1710				debug!(target: LOG_TARGET, "Restarting from genesis");
1711				self.best_queued_hash = Default::default();
1712				self.best_queued_number = Zero::zero();
1713			}
1714		}
1715
1716		if let Some(BlockGap { start, end, .. }) = info.block_gap {
1717			let old_gap = self.gap_sync.take().map(|g| (g.best_queued_number, g.target));
1718			debug!(target: LOG_TARGET, "Starting gap sync #{start} - #{end} (old gap best and target: {old_gap:?})");
1719			self.gap_sync = Some(GapSync {
1720				best_queued_number: start - One::one(),
1721				target: end,
1722				blocks: BlockCollection::new(),
1723			});
1724		}
1725		trace!(
1726			target: LOG_TARGET,
1727			"Restarted sync at #{} ({:?})",
1728			self.best_queued_number,
1729			self.best_queued_hash,
1730		);
1731		Ok(())
1732	}
1733
1734	/// What is the status of the block corresponding to the given hash?
1735	fn block_status(&self, hash: &B::Hash) -> Result<BlockStatus, ClientError> {
1736		if self.queue_blocks.contains(hash) {
1737			return Ok(BlockStatus::Queued);
1738		}
1739		self.client.block_status(*hash)
1740	}
1741
1742	/// Is the block corresponding to the given hash known?
1743	fn is_known(&self, hash: &B::Hash) -> bool {
1744		self.block_status(hash).ok().map_or(false, |s| s != BlockStatus::Unknown)
1745	}
1746
1747	/// Is any peer downloading the given hash?
1748	fn is_already_downloading(&self, hash: &B::Hash) -> bool {
1749		self.peers
1750			.iter()
1751			.any(|(_, p)| p.state == PeerSyncState::DownloadingStale(*hash))
1752	}
1753
1754	/// Get the set of downloaded blocks that are ready to be queued for import.
1755	fn ready_blocks(&mut self) -> Vec<IncomingBlock<B>> {
1756		self.blocks
1757			.ready_blocks(self.best_queued_number + One::one())
1758			.into_iter()
1759			.map(|block_data| {
1760				let justifications = block_data
1761					.block
1762					.justifications
1763					.or_else(|| legacy_justification_mapping(block_data.block.justification));
1764				IncomingBlock {
1765					hash: block_data.block.hash,
1766					header: block_data.block.header,
1767					body: block_data.block.body,
1768					indexed_body: block_data.block.indexed_body,
1769					justifications,
1770					origin: block_data.origin,
1771					allow_missing_state: true,
1772					import_existing: self.import_existing,
1773					skip_execution: self.skip_execution(),
1774					state: None,
1775				}
1776			})
1777			.collect()
1778	}
1779
1780	/// Get justification requests scheduled by sync to be sent out.
1781	fn justification_requests(&mut self) -> Vec<(PeerId, BlockRequest<B>)> {
1782		let peers = &mut self.peers;
1783		let mut matcher = self.extra_justifications.matcher();
1784		std::iter::from_fn(move || {
1785			if let Some((peer, request)) = matcher.next(peers) {
1786				peers
1787					.get_mut(&peer)
1788					.expect(
1789						"`Matcher::next` guarantees the `PeerId` comes from the given peers; qed",
1790					)
1791					.state = PeerSyncState::DownloadingJustification(request.0);
1792				let req = BlockRequest::<B> {
1793					id: 0,
1794					fields: BlockAttributes::JUSTIFICATION,
1795					from: FromBlock::Hash(request.0),
1796					direction: Direction::Ascending,
1797					max: Some(1),
1798				};
1799				Some((peer, req))
1800			} else {
1801				None
1802			}
1803		})
1804		.collect()
1805	}
1806
1807	/// Get block requests scheduled by sync to be sent out.
1808	fn block_requests(&mut self) -> Vec<(PeerId, BlockRequest<B>)> {
1809		if self.allowed_requests.is_empty() || self.state_sync.is_some() {
1810			return Vec::new();
1811		}
1812
1813		if self.queue_blocks.len() > MAX_IMPORTING_BLOCKS {
1814			trace!(target: LOG_TARGET, "Too many blocks in the queue.");
1815			return Vec::new();
1816		}
1817		let is_major_syncing = self.status().state.is_major_syncing();
1818		let attrs = self.required_block_attributes();
1819		let blocks = &mut self.blocks;
1820		let fork_targets = &mut self.fork_targets;
1821		let last_finalized =
1822			std::cmp::min(self.best_queued_number, self.client.info().finalized_number);
1823		let best_queued = self.best_queued_number;
1824		let client = &self.client;
1825		let queue_blocks = &self.queue_blocks;
1826		let allowed_requests = self.allowed_requests.clone();
1827		let max_parallel = if is_major_syncing { 1 } else { self.max_parallel_downloads };
1828		let max_blocks_per_request = self.max_blocks_per_request;
1829		let gap_sync = &mut self.gap_sync;
1830		let disconnected_peers = &mut self.disconnected_peers;
1831		let metrics = self.metrics.as_ref();
1832		let requests = self
1833			.peers
1834			.iter_mut()
1835			.filter_map(move |(&id, peer)| {
1836				if !peer.state.is_available() ||
1837					!allowed_requests.contains(&id) ||
1838					!disconnected_peers.is_peer_available(&id)
1839				{
1840					return None;
1841				}
1842
1843				// If our best queued is more than `MAX_BLOCKS_TO_LOOK_BACKWARDS` blocks away from
1844				// the common number, the peer best number is higher than our best queued and the
1845				// common number is smaller than the last finalized block number, we should do an
1846				// ancestor search to find a better common block. If the queue is full we wait till
1847				// all blocks are imported though.
1848				if best_queued.saturating_sub(peer.common_number) >
1849					MAX_BLOCKS_TO_LOOK_BACKWARDS.into() &&
1850					best_queued < peer.best_number &&
1851					peer.common_number < last_finalized &&
1852					queue_blocks.len() <= MAJOR_SYNC_BLOCKS as usize
1853				{
1854					trace!(
1855						target: LOG_TARGET,
1856						"Peer {:?} common block {} too far behind of our best {}. Starting ancestry search.",
1857						id,
1858						peer.common_number,
1859						best_queued,
1860					);
1861					let current = std::cmp::min(peer.best_number, best_queued);
1862					peer.state = PeerSyncState::AncestorSearch {
1863						current,
1864						start: best_queued,
1865						state: AncestorSearchState::ExponentialBackoff(One::one()),
1866					};
1867					Some((id, ancestry_request::<B>(current)))
1868				} else if let Some((range, req)) = peer_block_request(
1869					&id,
1870					peer,
1871					blocks,
1872					attrs,
1873					max_parallel,
1874					max_blocks_per_request,
1875					last_finalized,
1876					best_queued,
1877				) {
1878					peer.state = PeerSyncState::DownloadingNew(range.start);
1879					trace!(
1880						target: LOG_TARGET,
1881						"New block request for {}, (best:{}, common:{}) {:?}",
1882						id,
1883						peer.best_number,
1884						peer.common_number,
1885						req,
1886					);
1887					Some((id, req))
1888				} else if let Some((hash, req)) = fork_sync_request(
1889					&id,
1890					fork_targets,
1891					best_queued,
1892					last_finalized,
1893					attrs,
1894					|hash| {
1895						if queue_blocks.contains(hash) {
1896							BlockStatus::Queued
1897						} else {
1898							client.block_status(*hash).unwrap_or(BlockStatus::Unknown)
1899						}
1900					},
1901					max_blocks_per_request,
1902					metrics,
1903				) {
1904					trace!(target: LOG_TARGET, "Downloading fork {hash:?} from {id}");
1905					peer.state = PeerSyncState::DownloadingStale(hash);
1906					Some((id, req))
1907				} else if let Some((range, req)) = gap_sync.as_mut().and_then(|sync| {
1908					peer_gap_block_request(
1909						&id,
1910						peer,
1911						&mut sync.blocks,
1912						attrs,
1913						sync.target,
1914						sync.best_queued_number,
1915						max_blocks_per_request,
1916					)
1917				}) {
1918					peer.state = PeerSyncState::DownloadingGap(range.start);
1919					trace!(
1920						target: LOG_TARGET,
1921						"New gap block request for {}, (best:{}, common:{}) {:?}",
1922						id,
1923						peer.best_number,
1924						peer.common_number,
1925						req,
1926					);
1927					Some((id, req))
1928				} else {
1929					None
1930				}
1931			})
1932			.collect::<Vec<_>>();
1933
1934		// Clear the allowed_requests state when sending new block requests
1935		// to prevent multiple inflight block requests from being issued.
1936		if !requests.is_empty() {
1937			self.allowed_requests.take();
1938		}
1939
1940		requests
1941	}
1942
1943	/// Get a state request scheduled by sync to be sent out (if any).
1944	fn state_request(&mut self) -> Option<(PeerId, StateRequest)> {
1945		if self.allowed_requests.is_empty() {
1946			return None;
1947		}
1948		if self.state_sync.is_some() &&
1949			self.peers.iter().any(|(_, peer)| peer.state == PeerSyncState::DownloadingState)
1950		{
1951			// Only one pending state request is allowed.
1952			return None;
1953		}
1954		if let Some(sync) = &self.state_sync {
1955			if sync.is_complete() {
1956				return None;
1957			}
1958
1959			for (id, peer) in self.peers.iter_mut() {
1960				if peer.state.is_available() &&
1961					peer.common_number >= sync.target_number() &&
1962					self.disconnected_peers.is_peer_available(&id)
1963				{
1964					peer.state = PeerSyncState::DownloadingState;
1965					let request = sync.next_request();
1966					trace!(target: LOG_TARGET, "New StateRequest for {}: {:?}", id, request);
1967					self.allowed_requests.clear();
1968					return Some((*id, request));
1969				}
1970			}
1971		}
1972		None
1973	}
1974
1975	#[must_use]
1976	fn on_state_data(&mut self, peer_id: &PeerId, response: &[u8]) -> Result<(), BadPeer> {
1977		let response = match StateResponse::decode(response) {
1978			Ok(response) => response,
1979			Err(error) => {
1980				debug!(
1981					target: LOG_TARGET,
1982					"Failed to decode state response from peer {peer_id:?}: {error:?}.",
1983				);
1984
1985				return Err(BadPeer(*peer_id, rep::BAD_RESPONSE));
1986			},
1987		};
1988
1989		if let Some(peer) = self.peers.get_mut(peer_id) {
1990			if let PeerSyncState::DownloadingState = peer.state {
1991				peer.state = PeerSyncState::Available;
1992				self.allowed_requests.set_all();
1993			}
1994		}
1995		let import_result = if let Some(sync) = &mut self.state_sync {
1996			debug!(
1997				target: LOG_TARGET,
1998				"Importing state data from {} with {} keys, {} proof nodes.",
1999				peer_id,
2000				response.entries.len(),
2001				response.proof.len(),
2002			);
2003			sync.import(response)
2004		} else {
2005			debug!(target: LOG_TARGET, "Ignored obsolete state response from {peer_id}");
2006			return Err(BadPeer(*peer_id, rep::NOT_REQUESTED));
2007		};
2008
2009		match import_result {
2010			ImportResult::Import(hash, header, state, body, justifications) => {
2011				let origin = BlockOrigin::NetworkInitialSync;
2012				let block = IncomingBlock {
2013					hash,
2014					header: Some(header),
2015					body,
2016					indexed_body: None,
2017					justifications,
2018					origin: None,
2019					allow_missing_state: true,
2020					import_existing: true,
2021					skip_execution: self.skip_execution(),
2022					state: Some(state),
2023				};
2024				debug!(target: LOG_TARGET, "State download is complete. Import is queued");
2025				self.actions.push(SyncingAction::ImportBlocks { origin, blocks: vec![block] });
2026				Ok(())
2027			},
2028			ImportResult::Continue => Ok(()),
2029			ImportResult::BadResponse => {
2030				debug!(target: LOG_TARGET, "Bad state data received from {peer_id}");
2031				Err(BadPeer(*peer_id, rep::BAD_BLOCK))
2032			},
2033		}
2034	}
2035
2036	fn attempt_state_sync(
2037		&mut self,
2038		finalized_hash: B::Hash,
2039		finalized_number: NumberFor<B>,
2040		skip_proofs: bool,
2041	) {
2042		let mut heads: Vec<_> = self.peers.values().map(|peer| peer.best_number).collect();
2043		heads.sort();
2044		let median = heads[heads.len() / 2];
2045		if finalized_number + STATE_SYNC_FINALITY_THRESHOLD.saturated_into() >= median {
2046			if let Ok(Some(header)) = self.client.header(finalized_hash) {
2047				log::debug!(
2048					target: LOG_TARGET,
2049					"Starting state sync for #{finalized_number} ({finalized_hash})",
2050				);
2051				self.state_sync =
2052					Some(StateSync::new(self.client.clone(), header, None, None, skip_proofs));
2053				self.allowed_requests.set_all();
2054			} else {
2055				log::error!(
2056					target: LOG_TARGET,
2057					"Failed to start state sync: header for finalized block \
2058					  #{finalized_number} ({finalized_hash}) is not available",
2059				);
2060				debug_assert!(false);
2061			}
2062		}
2063	}
2064
2065	/// A version of `actions()` that doesn't schedule extra requests. For testing only.
2066	#[cfg(test)]
2067	#[must_use]
2068	fn take_actions(&mut self) -> impl Iterator<Item = SyncingAction<B>> {
2069		std::mem::take(&mut self.actions).into_iter()
2070	}
2071}
2072
2073// This is purely during a backwards compatible transitionary period and should be removed
2074// once we can assume all nodes can send and receive multiple Justifications
2075// The ID tag is hardcoded here to avoid depending on the GRANDPA crate.
2076// See: https://github.com/paritytech/substrate/issues/8172
2077fn legacy_justification_mapping(
2078	justification: Option<EncodedJustification>,
2079) -> Option<Justifications> {
2080	justification.map(|just| (*b"FRNK", just).into())
2081}
2082
2083/// Request the ancestry for a block. Sends a request for header and justification for the given
2084/// block number. Used during ancestry search.
2085fn ancestry_request<B: BlockT>(block: NumberFor<B>) -> BlockRequest<B> {
2086	BlockRequest::<B> {
2087		id: 0,
2088		fields: BlockAttributes::HEADER | BlockAttributes::JUSTIFICATION,
2089		from: FromBlock::Number(block),
2090		direction: Direction::Ascending,
2091		max: Some(1),
2092	}
2093}
2094
2095/// The ancestor search state expresses which algorithm, and its stateful parameters, we are using
2096/// to try to find an ancestor block
2097#[derive(Copy, Clone, Eq, PartialEq, Debug)]
2098pub(crate) enum AncestorSearchState<B: BlockT> {
2099	/// Use exponential backoff to find an ancestor, then switch to binary search.
2100	/// We keep track of the exponent.
2101	ExponentialBackoff(NumberFor<B>),
2102	/// Using binary search to find the best ancestor.
2103	/// We keep track of left and right bounds.
2104	BinarySearch(NumberFor<B>, NumberFor<B>),
2105}
2106
2107/// This function handles the ancestor search strategy used. The goal is to find a common point
2108/// that both our chains agree on that is as close to the tip as possible.
2109/// The way this works is we first have an exponential backoff strategy, where we try to step
2110/// forward until we find a block hash mismatch. The size of the step doubles each step we take.
2111///
2112/// When we've found a block hash mismatch we then fall back to a binary search between the two
2113/// last known points to find the common block closest to the tip.
2114fn handle_ancestor_search_state<B: BlockT>(
2115	state: &AncestorSearchState<B>,
2116	curr_block_num: NumberFor<B>,
2117	block_hash_match: bool,
2118) -> Option<(AncestorSearchState<B>, NumberFor<B>)> {
2119	let two = <NumberFor<B>>::one() + <NumberFor<B>>::one();
2120	match state {
2121		AncestorSearchState::ExponentialBackoff(next_distance_to_tip) => {
2122			let next_distance_to_tip = *next_distance_to_tip;
2123			if block_hash_match && next_distance_to_tip == One::one() {
2124				// We found the ancestor in the first step so there is no need to execute binary
2125				// search.
2126				return None;
2127			}
2128			if block_hash_match {
2129				let left = curr_block_num;
2130				let right = left + next_distance_to_tip / two;
2131				let middle = left + (right - left) / two;
2132				Some((AncestorSearchState::BinarySearch(left, right), middle))
2133			} else {
2134				let next_block_num =
2135					curr_block_num.checked_sub(&next_distance_to_tip).unwrap_or_else(Zero::zero);
2136				let next_distance_to_tip = next_distance_to_tip * two;
2137				Some((
2138					AncestorSearchState::ExponentialBackoff(next_distance_to_tip),
2139					next_block_num,
2140				))
2141			}
2142		},
2143		AncestorSearchState::BinarySearch(mut left, mut right) => {
2144			if left >= curr_block_num {
2145				return None;
2146			}
2147			if block_hash_match {
2148				left = curr_block_num;
2149			} else {
2150				right = curr_block_num;
2151			}
2152			assert!(right >= left);
2153			let middle = left + (right - left) / two;
2154			if middle == curr_block_num {
2155				None
2156			} else {
2157				Some((AncestorSearchState::BinarySearch(left, right), middle))
2158			}
2159		},
2160	}
2161}
2162
2163/// Get a new block request for the peer if any.
2164fn peer_block_request<B: BlockT>(
2165	id: &PeerId,
2166	peer: &PeerSync<B>,
2167	blocks: &mut BlockCollection<B>,
2168	attrs: BlockAttributes,
2169	max_parallel_downloads: u32,
2170	max_blocks_per_request: u32,
2171	finalized: NumberFor<B>,
2172	best_num: NumberFor<B>,
2173) -> Option<(Range<NumberFor<B>>, BlockRequest<B>)> {
2174	if best_num >= peer.best_number {
2175		// Will be downloaded as alternative fork instead.
2176		return None;
2177	} else if peer.common_number < finalized {
2178		trace!(
2179			target: LOG_TARGET,
2180			"Requesting pre-finalized chain from {:?}, common={}, finalized={}, peer best={}, our best={}",
2181			id, peer.common_number, finalized, peer.best_number, best_num,
2182		);
2183	}
2184	let range = blocks.needed_blocks(
2185		*id,
2186		max_blocks_per_request,
2187		peer.best_number,
2188		peer.common_number,
2189		max_parallel_downloads,
2190		MAX_DOWNLOAD_AHEAD,
2191	)?;
2192
2193	// The end is not part of the range.
2194	let last = range.end.saturating_sub(One::one());
2195
2196	let from = if peer.best_number == last {
2197		FromBlock::Hash(peer.best_hash)
2198	} else {
2199		FromBlock::Number(last)
2200	};
2201
2202	let request = BlockRequest::<B> {
2203		id: 0,
2204		fields: attrs,
2205		from,
2206		direction: Direction::Descending,
2207		max: Some((range.end - range.start).saturated_into::<u32>()),
2208	};
2209
2210	Some((range, request))
2211}
2212
2213/// Get a new block request for the peer if any.
2214fn peer_gap_block_request<B: BlockT>(
2215	id: &PeerId,
2216	peer: &PeerSync<B>,
2217	blocks: &mut BlockCollection<B>,
2218	attrs: BlockAttributes,
2219	target: NumberFor<B>,
2220	common_number: NumberFor<B>,
2221	max_blocks_per_request: u32,
2222) -> Option<(Range<NumberFor<B>>, BlockRequest<B>)> {
2223	let range = blocks.needed_blocks(
2224		*id,
2225		max_blocks_per_request,
2226		std::cmp::min(peer.best_number, target),
2227		common_number,
2228		1,
2229		MAX_DOWNLOAD_AHEAD,
2230	)?;
2231
2232	// The end is not part of the range.
2233	let last = range.end.saturating_sub(One::one());
2234	let from = FromBlock::Number(last);
2235
2236	let request = BlockRequest::<B> {
2237		id: 0,
2238		fields: attrs,
2239		from,
2240		direction: Direction::Descending,
2241		max: Some((range.end - range.start).saturated_into::<u32>()),
2242	};
2243	Some((range, request))
2244}
2245
2246/// Get pending fork sync targets for a peer.
2247fn fork_sync_request<B: BlockT>(
2248	id: &PeerId,
2249	fork_targets: &mut HashMap<B::Hash, ForkTarget<B>>,
2250	best_num: NumberFor<B>,
2251	finalized: NumberFor<B>,
2252	attributes: BlockAttributes,
2253	check_block: impl Fn(&B::Hash) -> BlockStatus,
2254	max_blocks_per_request: u32,
2255	metrics: Option<&Metrics>,
2256) -> Option<(B::Hash, BlockRequest<B>)> {
2257	fork_targets.retain(|hash, r| {
2258		if r.number <= finalized {
2259			trace!(
2260				target: LOG_TARGET,
2261				"Removed expired fork sync request {:?} (#{})",
2262				hash,
2263				r.number,
2264			);
2265			return false;
2266		}
2267		if check_block(hash) != BlockStatus::Unknown {
2268			trace!(
2269				target: LOG_TARGET,
2270				"Removed obsolete fork sync request {:?} (#{})",
2271				hash,
2272				r.number,
2273			);
2274			return false;
2275		}
2276		true
2277	});
2278	if let Some(metrics) = metrics {
2279		metrics.fork_targets.set(fork_targets.len().try_into().unwrap_or(u64::MAX));
2280	}
2281	for (hash, r) in fork_targets {
2282		if !r.peers.contains(&id) {
2283			continue;
2284		}
2285		// Download the fork only if it is behind or not too far ahead our tip of the chain
2286		// Otherwise it should be downloaded in full sync mode.
2287		if r.number <= best_num ||
2288			(r.number - best_num).saturated_into::<u32>() < max_blocks_per_request as u32
2289		{
2290			let parent_status = r.parent_hash.as_ref().map_or(BlockStatus::Unknown, check_block);
2291			let count = if parent_status == BlockStatus::Unknown {
2292				(r.number - finalized).saturated_into::<u32>() // up to the last finalized block
2293			} else {
2294				// request only single block
2295				1
2296			};
2297			trace!(
2298				target: LOG_TARGET,
2299				"Downloading requested fork {hash:?} from {id}, {count} blocks",
2300			);
2301			return Some((
2302				*hash,
2303				BlockRequest::<B> {
2304					id: 0,
2305					fields: attributes,
2306					from: FromBlock::Hash(*hash),
2307					direction: Direction::Descending,
2308					max: Some(count),
2309				},
2310			));
2311		} else {
2312			trace!(target: LOG_TARGET, "Fork too far in the future: {:?} (#{})", hash, r.number);
2313		}
2314	}
2315	None
2316}
2317
2318/// Returns `true` if the given `block` is a descendent of `base`.
2319fn is_descendent_of<Block, T>(
2320	client: &T,
2321	base: &Block::Hash,
2322	block: &Block::Hash,
2323) -> sp_blockchain::Result<bool>
2324where
2325	Block: BlockT,
2326	T: HeaderMetadata<Block, Error = sp_blockchain::Error> + ?Sized,
2327{
2328	if base == block {
2329		return Ok(false);
2330	}
2331
2332	let ancestor = sp_blockchain::lowest_common_ancestor(client, *block, *base)?;
2333
2334	Ok(ancestor.hash == *base)
2335}
2336
2337/// Validate that the given `blocks` are correct.
2338/// Returns the number of the first block in the sequence.
2339///
2340/// It is expected that `blocks` are in ascending order.
2341pub fn validate_blocks<Block: BlockT>(
2342	blocks: &Vec<BlockData<Block>>,
2343	peer_id: &PeerId,
2344	request: Option<BlockRequest<Block>>,
2345) -> Result<Option<NumberFor<Block>>, BadPeer> {
2346	if let Some(request) = request {
2347		if Some(blocks.len() as _) > request.max {
2348			debug!(
2349				target: LOG_TARGET,
2350				"Received more blocks than requested from {}. Expected in maximum {:?}, got {}.",
2351				peer_id,
2352				request.max,
2353				blocks.len(),
2354			);
2355
2356			return Err(BadPeer(*peer_id, rep::NOT_REQUESTED));
2357		}
2358
2359		let block_header =
2360			if request.direction == Direction::Descending { blocks.last() } else { blocks.first() }
2361				.and_then(|b| b.header.as_ref());
2362
2363		let expected_block = block_header.as_ref().map_or(false, |h| match request.from {
2364			FromBlock::Hash(hash) => h.hash() == hash,
2365			FromBlock::Number(n) => h.number() == &n,
2366		});
2367
2368		if !expected_block {
2369			debug!(
2370				target: LOG_TARGET,
2371				"Received block that was not requested. Requested {:?}, got {:?}.",
2372				request.from,
2373				block_header,
2374			);
2375
2376			return Err(BadPeer(*peer_id, rep::NOT_REQUESTED));
2377		}
2378
2379		if request.fields.contains(BlockAttributes::HEADER) &&
2380			blocks.iter().any(|b| b.header.is_none())
2381		{
2382			trace!(
2383				target: LOG_TARGET,
2384				"Missing requested header for a block in response from {peer_id}.",
2385			);
2386
2387			return Err(BadPeer(*peer_id, rep::BAD_RESPONSE));
2388		}
2389
2390		if request.fields.contains(BlockAttributes::BODY) && blocks.iter().any(|b| b.body.is_none())
2391		{
2392			trace!(
2393				target: LOG_TARGET,
2394				"Missing requested body for a block in response from {peer_id}.",
2395			);
2396
2397			return Err(BadPeer(*peer_id, rep::BAD_RESPONSE));
2398		}
2399	}
2400
2401	for b in blocks {
2402		if let Some(header) = &b.header {
2403			let hash = header.hash();
2404			if hash != b.hash {
2405				debug!(
2406					target: LOG_TARGET,
2407					"Bad header received from {}. Expected hash {:?}, got {:?}",
2408					peer_id,
2409					b.hash,
2410					hash,
2411				);
2412				return Err(BadPeer(*peer_id, rep::BAD_BLOCK));
2413			}
2414		}
2415	}
2416
2417	Ok(blocks.first().and_then(|b| b.header.as_ref()).map(|h| *h.number()))
2418}