sc_network_sync/strategy/
chain_sync.rs

1// This file is part of Substrate.
2
3// Copyright (C) Parity Technologies (UK) Ltd.
4// SPDX-License-Identifier: GPL-3.0-or-later WITH Classpath-exception-2.0
5
6// This program is free software: you can redistribute it and/or modify
7// it under the terms of the GNU General Public License as published by
8// the Free Software Foundation, either version 3 of the License, or
9// (at your option) any later version.
10
11// This program is distributed in the hope that it will be useful,
12// but WITHOUT ANY WARRANTY; without even the implied warranty of
13// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
14// GNU General Public License for more details.
15
16// You should have received a copy of the GNU General Public License
17// along with this program. If not, see <https://www.gnu.org/licenses/>.
18
19//! Contains the state of the chain synchronization process
20//!
21//! At any given point in time, a running node tries as much as possible to be at the head of the
22//! chain. This module handles the logic of which blocks to request from remotes, and processing
23//! responses. It yields blocks to check and potentially move to the database.
24//!
25//! # Usage
26//!
27//! The `ChainSync` struct maintains the state of the block requests. Whenever something happens on
28//! the network, or whenever a block has been successfully verified, call the appropriate method in
29//! order to update it.
30
31use crate::{
32	blocks::BlockCollection,
33	justification_requests::ExtraRequests,
34	schema::v1::StateResponse,
35	strategy::{
36		disconnected_peers::DisconnectedPeers,
37		state_sync::{ImportResult, StateSync, StateSyncProvider},
38		warp::{EncodedProof, WarpSyncPhase, WarpSyncProgress},
39		StrategyKey, SyncingAction, SyncingStrategy,
40	},
41	types::{BadPeer, OpaqueStateRequest, OpaqueStateResponse, SyncState, SyncStatus},
42	LOG_TARGET,
43};
44
45use codec::Encode;
46use log::{debug, error, info, trace, warn};
47use prometheus_endpoint::{register, Gauge, PrometheusError, Registry, U64};
48use sc_client_api::{blockchain::BlockGap, BlockBackend, ProofProvider};
49use sc_consensus::{BlockImportError, BlockImportStatus, IncomingBlock};
50use sc_network::ProtocolName;
51use sc_network_common::sync::message::{
52	BlockAnnounce, BlockAttributes, BlockData, BlockRequest, BlockResponse, Direction, FromBlock,
53};
54use sc_network_types::PeerId;
55use sp_arithmetic::traits::Saturating;
56use sp_blockchain::{Error as ClientError, HeaderBackend, HeaderMetadata};
57use sp_consensus::{BlockOrigin, BlockStatus};
58use sp_runtime::{
59	traits::{
60		Block as BlockT, CheckedSub, Hash, HashingFor, Header as HeaderT, NumberFor, One,
61		SaturatedConversion, Zero,
62	},
63	EncodedJustification, Justifications,
64};
65
66use std::{
67	collections::{HashMap, HashSet},
68	ops::Range,
69	sync::Arc,
70};
71
72#[cfg(test)]
73mod test;
74
75/// Maximum blocks to store in the import queue.
76const MAX_IMPORTING_BLOCKS: usize = 2048;
77
78/// Maximum blocks to download ahead of any gap.
79const MAX_DOWNLOAD_AHEAD: u32 = 2048;
80
81/// Maximum blocks to look backwards. The gap is the difference between the highest block and the
82/// common block of a node.
83const MAX_BLOCKS_TO_LOOK_BACKWARDS: u32 = MAX_DOWNLOAD_AHEAD / 2;
84
85/// Pick the state to sync as the latest finalized number minus this.
86const STATE_SYNC_FINALITY_THRESHOLD: u32 = 8;
87
88/// We use a heuristic that with a high likelihood, by the time
89/// `MAJOR_SYNC_BLOCKS` have been imported we'll be on the same
90/// chain as (or at least closer to) the peer so we want to delay
91/// the ancestor search to not waste time doing that when we are
92/// so far behind.
93const MAJOR_SYNC_BLOCKS: u8 = 5;
94
95mod rep {
96	use sc_network::ReputationChange as Rep;
97	/// Reputation change when a peer sent us a message that led to a
98	/// database read error.
99	pub const BLOCKCHAIN_READ_ERROR: Rep = Rep::new(-(1 << 16), "DB Error");
100
101	/// Reputation change when a peer sent us a status message with a different
102	/// genesis than us.
103	pub const GENESIS_MISMATCH: Rep = Rep::new(i32::MIN, "Genesis mismatch");
104
105	/// Reputation change for peers which send us a block with an incomplete header.
106	pub const INCOMPLETE_HEADER: Rep = Rep::new(-(1 << 20), "Incomplete header");
107
108	/// Reputation change for peers which send us a block which we fail to verify.
109	pub const VERIFICATION_FAIL: Rep = Rep::new(-(1 << 29), "Block verification failed");
110
111	/// Reputation change for peers which send us a known bad block.
112	pub const BAD_BLOCK: Rep = Rep::new(-(1 << 29), "Bad block");
113
114	/// Peer did not provide us with advertised block data.
115	pub const NO_BLOCK: Rep = Rep::new(-(1 << 29), "No requested block data");
116
117	/// Reputation change for peers which send us non-requested block data.
118	pub const NOT_REQUESTED: Rep = Rep::new(-(1 << 29), "Not requested block data");
119
120	/// Reputation change for peers which send us a block with bad justifications.
121	pub const BAD_JUSTIFICATION: Rep = Rep::new(-(1 << 16), "Bad justification");
122
123	/// Reputation change when a peer sent us invalid ancestry result.
124	pub const UNKNOWN_ANCESTOR: Rep = Rep::new(-(1 << 16), "DB Error");
125
126	/// Peer response data does not have requested bits.
127	pub const BAD_RESPONSE: Rep = Rep::new(-(1 << 12), "Incomplete response");
128}
129
130struct Metrics {
131	queued_blocks: Gauge<U64>,
132	fork_targets: Gauge<U64>,
133}
134
135impl Metrics {
136	fn register(r: &Registry) -> Result<Self, PrometheusError> {
137		Ok(Self {
138			queued_blocks: {
139				let g =
140					Gauge::new("substrate_sync_queued_blocks", "Number of blocks in import queue")?;
141				register(g, r)?
142			},
143			fork_targets: {
144				let g = Gauge::new("substrate_sync_fork_targets", "Number of fork sync targets")?;
145				register(g, r)?
146			},
147		})
148	}
149}
150
151#[derive(Debug, Clone)]
152enum AllowedRequests {
153	Some(HashSet<PeerId>),
154	All,
155}
156
157impl AllowedRequests {
158	fn add(&mut self, id: &PeerId) {
159		if let Self::Some(ref mut set) = self {
160			set.insert(*id);
161		}
162	}
163
164	fn take(&mut self) -> Self {
165		std::mem::take(self)
166	}
167
168	fn set_all(&mut self) {
169		*self = Self::All;
170	}
171
172	fn contains(&self, id: &PeerId) -> bool {
173		match self {
174			Self::Some(set) => set.contains(id),
175			Self::All => true,
176		}
177	}
178
179	fn is_empty(&self) -> bool {
180		match self {
181			Self::Some(set) => set.is_empty(),
182			Self::All => false,
183		}
184	}
185
186	fn clear(&mut self) {
187		std::mem::take(self);
188	}
189}
190
191impl Default for AllowedRequests {
192	fn default() -> Self {
193		Self::Some(HashSet::default())
194	}
195}
196
197struct GapSync<B: BlockT> {
198	blocks: BlockCollection<B>,
199	best_queued_number: NumberFor<B>,
200	target: NumberFor<B>,
201}
202
203/// Sync operation mode.
204#[derive(Copy, Clone, Debug, Eq, PartialEq)]
205pub enum ChainSyncMode {
206	/// Full block download and verification.
207	Full,
208	/// Download blocks and the latest state.
209	LightState {
210		/// Skip state proof download and verification.
211		skip_proofs: bool,
212		/// Download indexed transactions for recent blocks.
213		storage_chain_mode: bool,
214	},
215}
216
217/// All the data we have about a Peer that we are trying to sync with
218#[derive(Debug, Clone)]
219pub(crate) struct PeerSync<B: BlockT> {
220	/// Peer id of this peer.
221	pub peer_id: PeerId,
222	/// The common number is the block number that is a common point of
223	/// ancestry for both our chains (as far as we know).
224	pub common_number: NumberFor<B>,
225	/// The hash of the best block that we've seen for this peer.
226	pub best_hash: B::Hash,
227	/// The number of the best block that we've seen for this peer.
228	pub best_number: NumberFor<B>,
229	/// The state of syncing this peer is in for us, generally categories
230	/// into `Available` or "busy" with something as defined by `PeerSyncState`.
231	pub state: PeerSyncState<B>,
232}
233
234impl<B: BlockT> PeerSync<B> {
235	/// Update the `common_number` iff `new_common > common_number`.
236	fn update_common_number(&mut self, new_common: NumberFor<B>) {
237		if self.common_number < new_common {
238			trace!(
239				target: LOG_TARGET,
240				"Updating peer {} common number from={} => to={}.",
241				self.peer_id,
242				self.common_number,
243				new_common,
244			);
245			self.common_number = new_common;
246		}
247	}
248}
249
250struct ForkTarget<B: BlockT> {
251	number: NumberFor<B>,
252	parent_hash: Option<B::Hash>,
253	peers: HashSet<PeerId>,
254}
255
256/// The state of syncing between a Peer and ourselves.
257///
258/// Generally two categories, "busy" or `Available`. If busy, the enum
259/// defines what we are busy with.
260#[derive(Copy, Clone, Eq, PartialEq, Debug)]
261pub(crate) enum PeerSyncState<B: BlockT> {
262	/// Available for sync requests.
263	Available,
264	/// Searching for ancestors the Peer has in common with us.
265	AncestorSearch { start: NumberFor<B>, current: NumberFor<B>, state: AncestorSearchState<B> },
266	/// Actively downloading new blocks, starting from the given Number.
267	DownloadingNew(NumberFor<B>),
268	/// Downloading a stale block with given Hash. Stale means that it is a
269	/// block with a number that is lower than our best number. It might be
270	/// from a fork and not necessarily already imported.
271	DownloadingStale(B::Hash),
272	/// Downloading justification for given block hash.
273	DownloadingJustification(B::Hash),
274	/// Downloading state.
275	DownloadingState,
276	/// Actively downloading block history after warp sync.
277	DownloadingGap(NumberFor<B>),
278}
279
280impl<B: BlockT> PeerSyncState<B> {
281	pub fn is_available(&self) -> bool {
282		matches!(self, Self::Available)
283	}
284}
285
286/// The main data structure which contains all the state for a chains
287/// active syncing strategy.
288pub struct ChainSync<B: BlockT, Client> {
289	/// Chain client.
290	client: Arc<Client>,
291	/// The active peers that we are using to sync and their PeerSync status
292	peers: HashMap<PeerId, PeerSync<B>>,
293	disconnected_peers: DisconnectedPeers,
294	/// A `BlockCollection` of blocks that are being downloaded from peers
295	blocks: BlockCollection<B>,
296	/// The best block number in our queue of blocks to import
297	best_queued_number: NumberFor<B>,
298	/// The best block hash in our queue of blocks to import
299	best_queued_hash: B::Hash,
300	/// Current mode (full/light)
301	mode: ChainSyncMode,
302	/// Any extra justification requests.
303	extra_justifications: ExtraRequests<B>,
304	/// A set of hashes of blocks that are being downloaded or have been
305	/// downloaded and are queued for import.
306	queue_blocks: HashSet<B::Hash>,
307	/// A pending attempt to start the state sync.
308	///
309	/// The initiation of state sync may be deferred in cases where other conditions
310	/// are not yet met when the finalized block notification is received, such as
311	/// when `queue_blocks` is not empty or there are no peers. This field holds the
312	/// necessary information to attempt the state sync at a later point when
313	/// conditions are satisfied.
314	pending_state_sync_attempt: Option<(B::Hash, NumberFor<B>, bool)>,
315	/// Fork sync targets.
316	fork_targets: HashMap<B::Hash, ForkTarget<B>>,
317	/// A set of peers for which there might be potential block requests
318	allowed_requests: AllowedRequests,
319	/// Maximum number of peers to ask the same blocks in parallel.
320	max_parallel_downloads: u32,
321	/// Maximum blocks per request.
322	max_blocks_per_request: u32,
323	/// Protocol name used to send out state requests
324	state_request_protocol_name: ProtocolName,
325	/// Total number of downloaded blocks.
326	downloaded_blocks: usize,
327	/// State sync in progress, if any.
328	state_sync: Option<StateSync<B, Client>>,
329	/// Enable importing existing blocks. This is used used after the state download to
330	/// catch up to the latest state while re-importing blocks.
331	import_existing: bool,
332	/// Gap download process.
333	gap_sync: Option<GapSync<B>>,
334	/// Pending actions.
335	actions: Vec<SyncingAction<B>>,
336	/// Prometheus metrics.
337	metrics: Option<Metrics>,
338}
339
340impl<B, Client> SyncingStrategy<B> for ChainSync<B, Client>
341where
342	B: BlockT,
343	Client: HeaderBackend<B>
344		+ BlockBackend<B>
345		+ HeaderMetadata<B, Error = sp_blockchain::Error>
346		+ ProofProvider<B>
347		+ Send
348		+ Sync
349		+ 'static,
350{
351	fn add_peer(&mut self, peer_id: PeerId, best_hash: B::Hash, best_number: NumberFor<B>) {
352		match self.add_peer_inner(peer_id, best_hash, best_number) {
353			Ok(Some(request)) => self.actions.push(SyncingAction::SendBlockRequest {
354				peer_id,
355				key: StrategyKey::ChainSync,
356				request,
357			}),
358			Ok(None) => {},
359			Err(bad_peer) => self.actions.push(SyncingAction::DropPeer(bad_peer)),
360		}
361	}
362
363	fn remove_peer(&mut self, peer_id: &PeerId) {
364		self.blocks.clear_peer_download(peer_id);
365		if let Some(gap_sync) = &mut self.gap_sync {
366			gap_sync.blocks.clear_peer_download(peer_id)
367		}
368
369		if let Some(state) = self.peers.remove(peer_id) {
370			if !state.state.is_available() {
371				if let Some(bad_peer) =
372					self.disconnected_peers.on_disconnect_during_request(*peer_id)
373				{
374					self.actions.push(SyncingAction::DropPeer(bad_peer));
375				}
376			}
377		}
378
379		self.extra_justifications.peer_disconnected(peer_id);
380		self.allowed_requests.set_all();
381		self.fork_targets.retain(|_, target| {
382			target.peers.remove(peer_id);
383			!target.peers.is_empty()
384		});
385		if let Some(metrics) = &self.metrics {
386			metrics.fork_targets.set(self.fork_targets.len().try_into().unwrap_or(u64::MAX));
387		}
388
389		let blocks = self.ready_blocks();
390
391		if !blocks.is_empty() {
392			self.validate_and_queue_blocks(blocks, false);
393		}
394	}
395
396	fn on_validated_block_announce(
397		&mut self,
398		is_best: bool,
399		peer_id: PeerId,
400		announce: &BlockAnnounce<B::Header>,
401	) -> Option<(B::Hash, NumberFor<B>)> {
402		let number = *announce.header.number();
403		let hash = announce.header.hash();
404		let parent_status =
405			self.block_status(announce.header.parent_hash()).unwrap_or(BlockStatus::Unknown);
406		let known_parent = parent_status != BlockStatus::Unknown;
407		let ancient_parent = parent_status == BlockStatus::InChainPruned;
408
409		let known = self.is_known(&hash);
410		let peer = if let Some(peer) = self.peers.get_mut(&peer_id) {
411			peer
412		} else {
413			error!(target: LOG_TARGET, "💔 Called `on_validated_block_announce` with a bad peer ID {peer_id}");
414			return Some((hash, number))
415		};
416
417		if let PeerSyncState::AncestorSearch { .. } = peer.state {
418			trace!(target: LOG_TARGET, "Peer {} is in the ancestor search state.", peer_id);
419			return None
420		}
421
422		let peer_info = is_best.then(|| {
423			// update their best block
424			peer.best_number = number;
425			peer.best_hash = hash;
426
427			(hash, number)
428		});
429
430		// If the announced block is the best they have and is not ahead of us, our common number
431		// is either one further ahead or it's the one they just announced, if we know about it.
432		if is_best {
433			if known && self.best_queued_number >= number {
434				self.update_peer_common_number(&peer_id, number);
435			} else if announce.header.parent_hash() == &self.best_queued_hash ||
436				known_parent && self.best_queued_number >= number
437			{
438				self.update_peer_common_number(&peer_id, number.saturating_sub(One::one()));
439			}
440		}
441		self.allowed_requests.add(&peer_id);
442
443		// known block case
444		if known || self.is_already_downloading(&hash) {
445			trace!(target: LOG_TARGET, "Known block announce from {}: {}", peer_id, hash);
446			if let Some(target) = self.fork_targets.get_mut(&hash) {
447				target.peers.insert(peer_id);
448			}
449			return peer_info
450		}
451
452		if ancient_parent {
453			trace!(
454				target: LOG_TARGET,
455				"Ignored ancient block announced from {}: {} {:?}",
456				peer_id,
457				hash,
458				announce.header,
459			);
460			return peer_info
461		}
462
463		if self.status().state == SyncState::Idle {
464			trace!(
465				target: LOG_TARGET,
466				"Added sync target for block announced from {}: {} {:?}",
467				peer_id,
468				hash,
469				announce.summary(),
470			);
471			self.fork_targets
472				.entry(hash)
473				.or_insert_with(|| {
474					if let Some(metrics) = &self.metrics {
475						metrics.fork_targets.inc();
476					}
477
478					ForkTarget {
479						number,
480						parent_hash: Some(*announce.header.parent_hash()),
481						peers: Default::default(),
482					}
483				})
484				.peers
485				.insert(peer_id);
486		}
487
488		peer_info
489	}
490
491	// The implementation is similar to `on_validated_block_announce` with unknown parent hash.
492	fn set_sync_fork_request(
493		&mut self,
494		mut peers: Vec<PeerId>,
495		hash: &B::Hash,
496		number: NumberFor<B>,
497	) {
498		if peers.is_empty() {
499			peers = self
500				.peers
501				.iter()
502				// Only request blocks from peers who are ahead or on a par.
503				.filter(|(_, peer)| peer.best_number >= number)
504				.map(|(id, _)| *id)
505				.collect();
506
507			debug!(
508				target: LOG_TARGET,
509				"Explicit sync request for block {hash:?} with no peers specified. \
510				Syncing from these peers {peers:?} instead.",
511			);
512		} else {
513			debug!(
514				target: LOG_TARGET,
515				"Explicit sync request for block {hash:?} with {peers:?}",
516			);
517		}
518
519		if self.is_known(hash) {
520			debug!(target: LOG_TARGET, "Refusing to sync known hash {hash:?}");
521			return
522		}
523
524		trace!(target: LOG_TARGET, "Downloading requested old fork {hash:?}");
525		for peer_id in &peers {
526			if let Some(peer) = self.peers.get_mut(peer_id) {
527				if let PeerSyncState::AncestorSearch { .. } = peer.state {
528					continue
529				}
530
531				if number > peer.best_number {
532					peer.best_number = number;
533					peer.best_hash = *hash;
534				}
535				self.allowed_requests.add(peer_id);
536			}
537		}
538
539		self.fork_targets
540			.entry(*hash)
541			.or_insert_with(|| {
542				if let Some(metrics) = &self.metrics {
543					metrics.fork_targets.inc();
544				}
545
546				ForkTarget { number, peers: Default::default(), parent_hash: None }
547			})
548			.peers
549			.extend(peers);
550	}
551
552	fn request_justification(&mut self, hash: &B::Hash, number: NumberFor<B>) {
553		let client = &self.client;
554		self.extra_justifications
555			.schedule((*hash, number), |base, block| is_descendent_of(&**client, base, block))
556	}
557
558	fn clear_justification_requests(&mut self) {
559		self.extra_justifications.reset();
560	}
561
562	fn on_justification_import(&mut self, hash: B::Hash, number: NumberFor<B>, success: bool) {
563		let finalization_result = if success { Ok((hash, number)) } else { Err(()) };
564		self.extra_justifications
565			.try_finalize_root((hash, number), finalization_result, true);
566		self.allowed_requests.set_all();
567	}
568
569	fn on_block_response(
570		&mut self,
571		peer_id: PeerId,
572		key: StrategyKey,
573		request: BlockRequest<B>,
574		blocks: Vec<BlockData<B>>,
575	) {
576		if key != StrategyKey::ChainSync {
577			error!(
578				target: LOG_TARGET,
579				"`on_block_response()` called with unexpected key {key:?} for chain sync",
580			);
581			debug_assert!(false);
582		}
583		let block_response = BlockResponse::<B> { id: request.id, blocks };
584
585		let blocks_range = || match (
586			block_response
587				.blocks
588				.first()
589				.and_then(|b| b.header.as_ref().map(|h| h.number())),
590			block_response.blocks.last().and_then(|b| b.header.as_ref().map(|h| h.number())),
591		) {
592			(Some(first), Some(last)) if first != last => format!(" ({}..{})", first, last),
593			(Some(first), Some(_)) => format!(" ({})", first),
594			_ => Default::default(),
595		};
596		trace!(
597			target: LOG_TARGET,
598			"BlockResponse {} from {} with {} blocks {}",
599			block_response.id,
600			peer_id,
601			block_response.blocks.len(),
602			blocks_range(),
603		);
604
605		let res = if request.fields == BlockAttributes::JUSTIFICATION {
606			self.on_block_justification(peer_id, block_response)
607		} else {
608			self.on_block_data(&peer_id, Some(request), block_response)
609		};
610
611		if let Err(bad_peer) = res {
612			self.actions.push(SyncingAction::DropPeer(bad_peer));
613		}
614	}
615
616	fn on_state_response(
617		&mut self,
618		peer_id: PeerId,
619		key: StrategyKey,
620		response: OpaqueStateResponse,
621	) {
622		if key != StrategyKey::ChainSync {
623			error!(
624				target: LOG_TARGET,
625				"`on_state_response()` called with unexpected key {key:?} for chain sync",
626			);
627			debug_assert!(false);
628		}
629		if let Err(bad_peer) = self.on_state_data(&peer_id, response) {
630			self.actions.push(SyncingAction::DropPeer(bad_peer));
631		}
632	}
633
634	fn on_warp_proof_response(
635		&mut self,
636		_peer_id: &PeerId,
637		_key: StrategyKey,
638		_response: EncodedProof,
639	) {
640		error!(
641			target: LOG_TARGET,
642			"`on_warp_proof_response()` called for chain sync strategy",
643		);
644		debug_assert!(false);
645	}
646
647	fn on_blocks_processed(
648		&mut self,
649		imported: usize,
650		count: usize,
651		results: Vec<(Result<BlockImportStatus<NumberFor<B>>, BlockImportError>, B::Hash)>,
652	) {
653		trace!(target: LOG_TARGET, "Imported {imported} of {count}");
654
655		let mut has_error = false;
656		for (_, hash) in &results {
657			if self.queue_blocks.remove(hash) {
658				if let Some(metrics) = &self.metrics {
659					metrics.queued_blocks.dec();
660				}
661			}
662			self.blocks.clear_queued(hash);
663			if let Some(gap_sync) = &mut self.gap_sync {
664				gap_sync.blocks.clear_queued(hash);
665			}
666		}
667		for (result, hash) in results {
668			if has_error {
669				break
670			}
671
672			has_error |= result.is_err();
673
674			match result {
675				Ok(BlockImportStatus::ImportedKnown(number, peer_id)) =>
676					if let Some(peer) = peer_id {
677						self.update_peer_common_number(&peer, number);
678					},
679				Ok(BlockImportStatus::ImportedUnknown(number, aux, peer_id)) => {
680					if aux.clear_justification_requests {
681						trace!(
682							target: LOG_TARGET,
683							"Block imported clears all pending justification requests {number}: {hash:?}",
684						);
685						self.clear_justification_requests();
686					}
687
688					if aux.needs_justification {
689						trace!(
690							target: LOG_TARGET,
691							"Block imported but requires justification {number}: {hash:?}",
692						);
693						self.request_justification(&hash, number);
694					}
695
696					if aux.bad_justification {
697						if let Some(ref peer) = peer_id {
698							warn!("💔 Sent block with bad justification to import");
699							self.actions.push(SyncingAction::DropPeer(BadPeer(
700								*peer,
701								rep::BAD_JUSTIFICATION,
702							)));
703						}
704					}
705
706					if let Some(peer) = peer_id {
707						self.update_peer_common_number(&peer, number);
708					}
709					let state_sync_complete =
710						self.state_sync.as_ref().map_or(false, |s| s.target_hash() == hash);
711					if state_sync_complete {
712						info!(
713							target: LOG_TARGET,
714							"State sync is complete ({} MiB), restarting block sync.",
715							self.state_sync.as_ref().map_or(0, |s| s.progress().size / (1024 * 1024)),
716						);
717						self.state_sync = None;
718						self.mode = ChainSyncMode::Full;
719						self.restart();
720					}
721					let gap_sync_complete =
722						self.gap_sync.as_ref().map_or(false, |s| s.target == number);
723					if gap_sync_complete {
724						info!(
725							target: LOG_TARGET,
726							"Block history download is complete."
727						);
728						self.gap_sync = None;
729					}
730				},
731				Err(BlockImportError::IncompleteHeader(peer_id)) =>
732					if let Some(peer) = peer_id {
733						warn!(
734							target: LOG_TARGET,
735							"💔 Peer sent block with incomplete header to import",
736						);
737						self.actions
738							.push(SyncingAction::DropPeer(BadPeer(peer, rep::INCOMPLETE_HEADER)));
739						self.restart();
740					},
741				Err(BlockImportError::VerificationFailed(peer_id, e)) => {
742					let extra_message = peer_id
743						.map_or_else(|| "".into(), |peer| format!(" received from ({peer})"));
744
745					warn!(
746						target: LOG_TARGET,
747						"💔 Verification failed for block {hash:?}{extra_message}: {e:?}",
748					);
749
750					if let Some(peer) = peer_id {
751						self.actions
752							.push(SyncingAction::DropPeer(BadPeer(peer, rep::VERIFICATION_FAIL)));
753					}
754
755					self.restart();
756				},
757				Err(BlockImportError::BadBlock(peer_id)) =>
758					if let Some(peer) = peer_id {
759						warn!(
760							target: LOG_TARGET,
761							"💔 Block {hash:?} received from peer {peer} has been blacklisted",
762						);
763						self.actions.push(SyncingAction::DropPeer(BadPeer(peer, rep::BAD_BLOCK)));
764					},
765				Err(BlockImportError::MissingState) => {
766					// This may happen if the chain we were requesting upon has been discarded
767					// in the meantime because other chain has been finalized.
768					// Don't mark it as bad as it still may be synced if explicitly requested.
769					trace!(target: LOG_TARGET, "Obsolete block {hash:?}");
770				},
771				e @ Err(BlockImportError::UnknownParent) | e @ Err(BlockImportError::Other(_)) => {
772					warn!(target: LOG_TARGET, "💔 Error importing block {hash:?}: {}", e.unwrap_err());
773					self.state_sync = None;
774					self.restart();
775				},
776				Err(BlockImportError::Cancelled) => {},
777			};
778		}
779
780		self.allowed_requests.set_all();
781	}
782
783	fn on_block_finalized(&mut self, hash: &B::Hash, number: NumberFor<B>) {
784		let client = &self.client;
785		let r = self.extra_justifications.on_block_finalized(hash, number, |base, block| {
786			is_descendent_of(&**client, base, block)
787		});
788
789		if let ChainSyncMode::LightState { skip_proofs, .. } = &self.mode {
790			if self.state_sync.is_none() {
791				if !self.peers.is_empty() && self.queue_blocks.is_empty() {
792					self.attempt_state_sync(*hash, number, *skip_proofs);
793				} else {
794					self.pending_state_sync_attempt.replace((*hash, number, *skip_proofs));
795				}
796			}
797		}
798
799		if let Err(err) = r {
800			warn!(
801				target: LOG_TARGET,
802				"💔 Error cleaning up pending extra justification data requests: {err}",
803			);
804		}
805	}
806
807	fn update_chain_info(&mut self, best_hash: &B::Hash, best_number: NumberFor<B>) {
808		self.on_block_queued(best_hash, best_number);
809	}
810
811	fn is_major_syncing(&self) -> bool {
812		self.status().state.is_major_syncing()
813	}
814
815	fn num_peers(&self) -> usize {
816		self.peers.len()
817	}
818
819	fn status(&self) -> SyncStatus<B> {
820		let median_seen = self.median_seen();
821		let best_seen_block =
822			median_seen.and_then(|median| (median > self.best_queued_number).then_some(median));
823		let sync_state = if let Some(target) = median_seen {
824			// A chain is classified as downloading if the provided best block is
825			// more than `MAJOR_SYNC_BLOCKS` behind the best block or as importing
826			// if the same can be said about queued blocks.
827			let best_block = self.client.info().best_number;
828			if target > best_block && target - best_block > MAJOR_SYNC_BLOCKS.into() {
829				// If target is not queued, we're downloading, otherwise importing.
830				if target > self.best_queued_number {
831					SyncState::Downloading { target }
832				} else {
833					SyncState::Importing { target }
834				}
835			} else {
836				SyncState::Idle
837			}
838		} else {
839			SyncState::Idle
840		};
841
842		let warp_sync_progress = self.gap_sync.as_ref().map(|gap_sync| WarpSyncProgress {
843			phase: WarpSyncPhase::DownloadingBlocks(gap_sync.best_queued_number),
844			total_bytes: 0,
845		});
846
847		SyncStatus {
848			state: sync_state,
849			best_seen_block,
850			num_peers: self.peers.len() as u32,
851			queued_blocks: self.queue_blocks.len() as u32,
852			state_sync: self.state_sync.as_ref().map(|s| s.progress()),
853			warp_sync: warp_sync_progress,
854		}
855	}
856
857	fn num_downloaded_blocks(&self) -> usize {
858		self.downloaded_blocks
859	}
860
861	fn num_sync_requests(&self) -> usize {
862		self.fork_targets
863			.values()
864			.filter(|f| f.number <= self.best_queued_number)
865			.count()
866	}
867
868	fn actions(&mut self) -> Result<Vec<SyncingAction<B>>, ClientError> {
869		if !self.peers.is_empty() && self.queue_blocks.is_empty() {
870			if let Some((hash, number, skip_proofs)) = self.pending_state_sync_attempt.take() {
871				self.attempt_state_sync(hash, number, skip_proofs);
872			}
873		}
874
875		let block_requests = self.block_requests().into_iter().map(|(peer_id, request)| {
876			SyncingAction::SendBlockRequest { peer_id, key: StrategyKey::ChainSync, request }
877		});
878		self.actions.extend(block_requests);
879
880		let justification_requests =
881			self.justification_requests().into_iter().map(|(peer_id, request)| {
882				SyncingAction::SendBlockRequest { peer_id, key: StrategyKey::ChainSync, request }
883			});
884		self.actions.extend(justification_requests);
885
886		let state_request = self.state_request().into_iter().map(|(peer_id, request)| {
887			SyncingAction::SendStateRequest {
888				peer_id,
889				key: StrategyKey::ChainSync,
890				protocol_name: self.state_request_protocol_name.clone(),
891				request,
892			}
893		});
894		self.actions.extend(state_request);
895
896		Ok(std::mem::take(&mut self.actions))
897	}
898}
899
900impl<B, Client> ChainSync<B, Client>
901where
902	B: BlockT,
903	Client: HeaderBackend<B>
904		+ BlockBackend<B>
905		+ HeaderMetadata<B, Error = sp_blockchain::Error>
906		+ ProofProvider<B>
907		+ Send
908		+ Sync
909		+ 'static,
910{
911	/// Create a new instance.
912	pub fn new(
913		mode: ChainSyncMode,
914		client: Arc<Client>,
915		max_parallel_downloads: u32,
916		max_blocks_per_request: u32,
917		state_request_protocol_name: ProtocolName,
918		metrics_registry: Option<&Registry>,
919		initial_peers: impl Iterator<Item = (PeerId, B::Hash, NumberFor<B>)>,
920	) -> Result<Self, ClientError> {
921		let mut sync = Self {
922			client,
923			peers: HashMap::new(),
924			disconnected_peers: DisconnectedPeers::new(),
925			blocks: BlockCollection::new(),
926			best_queued_hash: Default::default(),
927			best_queued_number: Zero::zero(),
928			extra_justifications: ExtraRequests::new("justification", metrics_registry),
929			mode,
930			queue_blocks: Default::default(),
931			pending_state_sync_attempt: None,
932			fork_targets: Default::default(),
933			allowed_requests: Default::default(),
934			max_parallel_downloads,
935			max_blocks_per_request,
936			state_request_protocol_name,
937			downloaded_blocks: 0,
938			state_sync: None,
939			import_existing: false,
940			gap_sync: None,
941			actions: Vec::new(),
942			metrics: metrics_registry.and_then(|r| match Metrics::register(r) {
943				Ok(metrics) => Some(metrics),
944				Err(err) => {
945					log::error!(
946						target: LOG_TARGET,
947						"Failed to register `ChainSync` metrics {err:?}",
948					);
949					None
950				},
951			}),
952		};
953
954		sync.reset_sync_start_point()?;
955		initial_peers.for_each(|(peer_id, best_hash, best_number)| {
956			sync.add_peer(peer_id, best_hash, best_number);
957		});
958
959		Ok(sync)
960	}
961
962	#[must_use]
963	fn add_peer_inner(
964		&mut self,
965		peer_id: PeerId,
966		best_hash: B::Hash,
967		best_number: NumberFor<B>,
968	) -> Result<Option<BlockRequest<B>>, BadPeer> {
969		// There is nothing sync can get from the node that has no blockchain data.
970		match self.block_status(&best_hash) {
971			Err(e) => {
972				debug!(target: LOG_TARGET, "Error reading blockchain: {e}");
973				Err(BadPeer(peer_id, rep::BLOCKCHAIN_READ_ERROR))
974			},
975			Ok(BlockStatus::KnownBad) => {
976				info!(
977					"💔 New peer {peer_id} with known bad best block {best_hash} ({best_number})."
978				);
979				Err(BadPeer(peer_id, rep::BAD_BLOCK))
980			},
981			Ok(BlockStatus::Unknown) => {
982				if best_number.is_zero() {
983					info!(
984						"💔 New peer {} with unknown genesis hash {} ({}).",
985						peer_id, best_hash, best_number,
986					);
987					return Err(BadPeer(peer_id, rep::GENESIS_MISMATCH));
988				}
989
990				// If there are more than `MAJOR_SYNC_BLOCKS` in the import queue then we have
991				// enough to do in the import queue that it's not worth kicking off
992				// an ancestor search, which is what we do in the next match case below.
993				if self.queue_blocks.len() > MAJOR_SYNC_BLOCKS.into() {
994					debug!(
995						target: LOG_TARGET,
996						"New peer {} with unknown best hash {} ({}), assuming common block.",
997						peer_id,
998						self.best_queued_hash,
999						self.best_queued_number
1000					);
1001					self.peers.insert(
1002						peer_id,
1003						PeerSync {
1004							peer_id,
1005							common_number: self.best_queued_number,
1006							best_hash,
1007							best_number,
1008							state: PeerSyncState::Available,
1009						},
1010					);
1011					return Ok(None);
1012				}
1013
1014				// If we are at genesis, just start downloading.
1015				let (state, req) = if self.best_queued_number.is_zero() {
1016					debug!(
1017						target: LOG_TARGET,
1018						"New peer {peer_id} with best hash {best_hash} ({best_number}).",
1019					);
1020
1021					(PeerSyncState::Available, None)
1022				} else {
1023					let common_best = std::cmp::min(self.best_queued_number, best_number);
1024
1025					debug!(
1026						target: LOG_TARGET,
1027						"New peer {} with unknown best hash {} ({}), searching for common ancestor.",
1028						peer_id,
1029						best_hash,
1030						best_number
1031					);
1032
1033					(
1034						PeerSyncState::AncestorSearch {
1035							current: common_best,
1036							start: self.best_queued_number,
1037							state: AncestorSearchState::ExponentialBackoff(One::one()),
1038						},
1039						Some(ancestry_request::<B>(common_best)),
1040					)
1041				};
1042
1043				self.allowed_requests.add(&peer_id);
1044				self.peers.insert(
1045					peer_id,
1046					PeerSync {
1047						peer_id,
1048						common_number: Zero::zero(),
1049						best_hash,
1050						best_number,
1051						state,
1052					},
1053				);
1054
1055				Ok(req)
1056			},
1057			Ok(BlockStatus::Queued) |
1058			Ok(BlockStatus::InChainWithState) |
1059			Ok(BlockStatus::InChainPruned) => {
1060				debug!(
1061					target: LOG_TARGET,
1062					"New peer {peer_id} with known best hash {best_hash} ({best_number}).",
1063				);
1064				self.peers.insert(
1065					peer_id,
1066					PeerSync {
1067						peer_id,
1068						common_number: std::cmp::min(self.best_queued_number, best_number),
1069						best_hash,
1070						best_number,
1071						state: PeerSyncState::Available,
1072					},
1073				);
1074				self.allowed_requests.add(&peer_id);
1075				Ok(None)
1076			},
1077		}
1078	}
1079
1080	/// Submit a block response for processing.
1081	#[must_use]
1082	fn on_block_data(
1083		&mut self,
1084		peer_id: &PeerId,
1085		request: Option<BlockRequest<B>>,
1086		response: BlockResponse<B>,
1087	) -> Result<(), BadPeer> {
1088		self.downloaded_blocks += response.blocks.len();
1089		let mut gap = false;
1090		let new_blocks: Vec<IncomingBlock<B>> = if let Some(peer) = self.peers.get_mut(peer_id) {
1091			let mut blocks = response.blocks;
1092			if request.as_ref().map_or(false, |r| r.direction == Direction::Descending) {
1093				trace!(target: LOG_TARGET, "Reversing incoming block list");
1094				blocks.reverse()
1095			}
1096			self.allowed_requests.add(peer_id);
1097			if let Some(request) = request {
1098				match &mut peer.state {
1099					PeerSyncState::DownloadingNew(_) => {
1100						self.blocks.clear_peer_download(peer_id);
1101						peer.state = PeerSyncState::Available;
1102						if let Some(start_block) =
1103							validate_blocks::<B>(&blocks, peer_id, Some(request))?
1104						{
1105							self.blocks.insert(start_block, blocks, *peer_id);
1106						}
1107						self.ready_blocks()
1108					},
1109					PeerSyncState::DownloadingGap(_) => {
1110						peer.state = PeerSyncState::Available;
1111						if let Some(gap_sync) = &mut self.gap_sync {
1112							gap_sync.blocks.clear_peer_download(peer_id);
1113							if let Some(start_block) =
1114								validate_blocks::<B>(&blocks, peer_id, Some(request))?
1115							{
1116								gap_sync.blocks.insert(start_block, blocks, *peer_id);
1117							}
1118							gap = true;
1119							let blocks: Vec<_> = gap_sync
1120								.blocks
1121								.ready_blocks(gap_sync.best_queued_number + One::one())
1122								.into_iter()
1123								.map(|block_data| {
1124									let justifications =
1125										block_data.block.justifications.or_else(|| {
1126											legacy_justification_mapping(
1127												block_data.block.justification,
1128											)
1129										});
1130									IncomingBlock {
1131										hash: block_data.block.hash,
1132										header: block_data.block.header,
1133										body: block_data.block.body,
1134										indexed_body: block_data.block.indexed_body,
1135										justifications,
1136										origin: block_data.origin,
1137										allow_missing_state: true,
1138										import_existing: self.import_existing,
1139										skip_execution: true,
1140										state: None,
1141									}
1142								})
1143								.collect();
1144							debug!(
1145								target: LOG_TARGET,
1146								"Drained {} gap blocks from {}",
1147								blocks.len(),
1148								gap_sync.best_queued_number,
1149							);
1150							blocks
1151						} else {
1152							debug!(target: LOG_TARGET, "Unexpected gap block response from {peer_id}");
1153							return Err(BadPeer(*peer_id, rep::NO_BLOCK));
1154						}
1155					},
1156					PeerSyncState::DownloadingStale(_) => {
1157						peer.state = PeerSyncState::Available;
1158						if blocks.is_empty() {
1159							debug!(target: LOG_TARGET, "Empty block response from {peer_id}");
1160							return Err(BadPeer(*peer_id, rep::NO_BLOCK));
1161						}
1162						validate_blocks::<B>(&blocks, peer_id, Some(request))?;
1163						blocks
1164							.into_iter()
1165							.map(|b| {
1166								let justifications = b
1167									.justifications
1168									.or_else(|| legacy_justification_mapping(b.justification));
1169								IncomingBlock {
1170									hash: b.hash,
1171									header: b.header,
1172									body: b.body,
1173									indexed_body: None,
1174									justifications,
1175									origin: Some(*peer_id),
1176									allow_missing_state: true,
1177									import_existing: self.import_existing,
1178									skip_execution: self.skip_execution(),
1179									state: None,
1180								}
1181							})
1182							.collect()
1183					},
1184					PeerSyncState::AncestorSearch { current, start, state } => {
1185						let matching_hash = match (blocks.get(0), self.client.hash(*current)) {
1186							(Some(block), Ok(maybe_our_block_hash)) => {
1187								trace!(
1188									target: LOG_TARGET,
1189									"Got ancestry block #{} ({}) from peer {}",
1190									current,
1191									block.hash,
1192									peer_id,
1193								);
1194								maybe_our_block_hash.filter(|x| x == &block.hash)
1195							},
1196							(None, _) => {
1197								debug!(
1198									target: LOG_TARGET,
1199									"Invalid response when searching for ancestor from {peer_id}",
1200								);
1201								return Err(BadPeer(*peer_id, rep::UNKNOWN_ANCESTOR));
1202							},
1203							(_, Err(e)) => {
1204								info!(
1205									target: LOG_TARGET,
1206									"❌ Error answering legitimate blockchain query: {e}",
1207								);
1208								return Err(BadPeer(*peer_id, rep::BLOCKCHAIN_READ_ERROR));
1209							},
1210						};
1211						if matching_hash.is_some() {
1212							if *start < self.best_queued_number &&
1213								self.best_queued_number <= peer.best_number
1214							{
1215								// We've made progress on this chain since the search was started.
1216								// Opportunistically set common number to updated number
1217								// instead of the one that started the search.
1218								trace!(
1219									target: LOG_TARGET,
1220									"Ancestry search: opportunistically updating peer {} common number from={} => to={}.",
1221									*peer_id,
1222									peer.common_number,
1223									self.best_queued_number,
1224								);
1225								peer.common_number = self.best_queued_number;
1226							} else if peer.common_number < *current {
1227								trace!(
1228									target: LOG_TARGET,
1229									"Ancestry search: updating peer {} common number from={} => to={}.",
1230									*peer_id,
1231									peer.common_number,
1232									*current,
1233								);
1234								peer.common_number = *current;
1235							}
1236						}
1237						if matching_hash.is_none() && current.is_zero() {
1238							trace!(
1239								target: LOG_TARGET,
1240								"Ancestry search: genesis mismatch for peer {peer_id}",
1241							);
1242							return Err(BadPeer(*peer_id, rep::GENESIS_MISMATCH));
1243						}
1244						if let Some((next_state, next_num)) =
1245							handle_ancestor_search_state(state, *current, matching_hash.is_some())
1246						{
1247							peer.state = PeerSyncState::AncestorSearch {
1248								current: next_num,
1249								start: *start,
1250								state: next_state,
1251							};
1252							let request = ancestry_request::<B>(next_num);
1253							self.actions.push(SyncingAction::SendBlockRequest {
1254								peer_id: *peer_id,
1255								key: StrategyKey::ChainSync,
1256								request,
1257							});
1258							return Ok(());
1259						} else {
1260							// Ancestry search is complete. Check if peer is on a stale fork unknown
1261							// to us and add it to sync targets if necessary.
1262							trace!(
1263								target: LOG_TARGET,
1264								"Ancestry search complete. Ours={} ({}), Theirs={} ({}), Common={:?} ({})",
1265								self.best_queued_hash,
1266								self.best_queued_number,
1267								peer.best_hash,
1268								peer.best_number,
1269								matching_hash,
1270								peer.common_number,
1271							);
1272							if peer.common_number < peer.best_number &&
1273								peer.best_number < self.best_queued_number
1274							{
1275								trace!(
1276									target: LOG_TARGET,
1277									"Added fork target {} for {}",
1278									peer.best_hash,
1279									peer_id,
1280								);
1281								self.fork_targets
1282									.entry(peer.best_hash)
1283									.or_insert_with(|| {
1284										if let Some(metrics) = &self.metrics {
1285											metrics.fork_targets.inc();
1286										}
1287
1288										ForkTarget {
1289											number: peer.best_number,
1290											parent_hash: None,
1291											peers: Default::default(),
1292										}
1293									})
1294									.peers
1295									.insert(*peer_id);
1296							}
1297							peer.state = PeerSyncState::Available;
1298							return Ok(());
1299						}
1300					},
1301					PeerSyncState::Available |
1302					PeerSyncState::DownloadingJustification(..) |
1303					PeerSyncState::DownloadingState => Vec::new(),
1304				}
1305			} else {
1306				// When request.is_none() this is a block announcement. Just accept blocks.
1307				validate_blocks::<B>(&blocks, peer_id, None)?;
1308				blocks
1309					.into_iter()
1310					.map(|b| {
1311						let justifications = b
1312							.justifications
1313							.or_else(|| legacy_justification_mapping(b.justification));
1314						IncomingBlock {
1315							hash: b.hash,
1316							header: b.header,
1317							body: b.body,
1318							indexed_body: None,
1319							justifications,
1320							origin: Some(*peer_id),
1321							allow_missing_state: true,
1322							import_existing: false,
1323							skip_execution: true,
1324							state: None,
1325						}
1326					})
1327					.collect()
1328			}
1329		} else {
1330			// We don't know of this peer, so we also did not request anything from it.
1331			return Err(BadPeer(*peer_id, rep::NOT_REQUESTED));
1332		};
1333
1334		self.validate_and_queue_blocks(new_blocks, gap);
1335
1336		Ok(())
1337	}
1338
1339	/// Submit a justification response for processing.
1340	#[must_use]
1341	fn on_block_justification(
1342		&mut self,
1343		peer_id: PeerId,
1344		response: BlockResponse<B>,
1345	) -> Result<(), BadPeer> {
1346		let peer = if let Some(peer) = self.peers.get_mut(&peer_id) {
1347			peer
1348		} else {
1349			error!(
1350				target: LOG_TARGET,
1351				"💔 Called on_block_justification with a peer ID of an unknown peer",
1352			);
1353			return Ok(());
1354		};
1355
1356		self.allowed_requests.add(&peer_id);
1357		if let PeerSyncState::DownloadingJustification(hash) = peer.state {
1358			peer.state = PeerSyncState::Available;
1359
1360			// We only request one justification at a time
1361			let justification = if let Some(block) = response.blocks.into_iter().next() {
1362				if hash != block.hash {
1363					warn!(
1364						target: LOG_TARGET,
1365						"💔 Invalid block justification provided by {}: requested: {:?} got: {:?}",
1366						peer_id,
1367						hash,
1368						block.hash,
1369					);
1370					return Err(BadPeer(peer_id, rep::BAD_JUSTIFICATION));
1371				}
1372
1373				block
1374					.justifications
1375					.or_else(|| legacy_justification_mapping(block.justification))
1376			} else {
1377				// we might have asked the peer for a justification on a block that we assumed it
1378				// had but didn't (regardless of whether it had a justification for it or not).
1379				trace!(
1380					target: LOG_TARGET,
1381					"Peer {peer_id:?} provided empty response for justification request {hash:?}",
1382				);
1383
1384				None
1385			};
1386
1387			if let Some((peer_id, hash, number, justifications)) =
1388				self.extra_justifications.on_response(peer_id, justification)
1389			{
1390				self.actions.push(SyncingAction::ImportJustifications {
1391					peer_id,
1392					hash,
1393					number,
1394					justifications,
1395				});
1396				return Ok(());
1397			}
1398		}
1399
1400		Ok(())
1401	}
1402
1403	/// Returns the median seen block number.
1404	fn median_seen(&self) -> Option<NumberFor<B>> {
1405		let mut best_seens = self.peers.values().map(|p| p.best_number).collect::<Vec<_>>();
1406
1407		if best_seens.is_empty() {
1408			None
1409		} else {
1410			let middle = best_seens.len() / 2;
1411
1412			// Not the "perfect median" when we have an even number of peers.
1413			Some(*best_seens.select_nth_unstable(middle).1)
1414		}
1415	}
1416
1417	fn required_block_attributes(&self) -> BlockAttributes {
1418		match self.mode {
1419			ChainSyncMode::Full =>
1420				BlockAttributes::HEADER | BlockAttributes::JUSTIFICATION | BlockAttributes::BODY,
1421			ChainSyncMode::LightState { storage_chain_mode: false, .. } =>
1422				BlockAttributes::HEADER | BlockAttributes::JUSTIFICATION | BlockAttributes::BODY,
1423			ChainSyncMode::LightState { storage_chain_mode: true, .. } =>
1424				BlockAttributes::HEADER |
1425					BlockAttributes::JUSTIFICATION |
1426					BlockAttributes::INDEXED_BODY,
1427		}
1428	}
1429
1430	fn skip_execution(&self) -> bool {
1431		match self.mode {
1432			ChainSyncMode::Full => false,
1433			ChainSyncMode::LightState { .. } => true,
1434		}
1435	}
1436
1437	fn validate_and_queue_blocks(&mut self, mut new_blocks: Vec<IncomingBlock<B>>, gap: bool) {
1438		let orig_len = new_blocks.len();
1439		new_blocks.retain(|b| !self.queue_blocks.contains(&b.hash));
1440		if new_blocks.len() != orig_len {
1441			debug!(
1442				target: LOG_TARGET,
1443				"Ignoring {} blocks that are already queued",
1444				orig_len - new_blocks.len(),
1445			);
1446		}
1447
1448		let origin = if !gap && !self.status().state.is_major_syncing() {
1449			BlockOrigin::NetworkBroadcast
1450		} else {
1451			BlockOrigin::NetworkInitialSync
1452		};
1453
1454		if let Some((h, n)) = new_blocks
1455			.last()
1456			.and_then(|b| b.header.as_ref().map(|h| (&b.hash, *h.number())))
1457		{
1458			trace!(
1459				target: LOG_TARGET,
1460				"Accepted {} blocks ({:?}) with origin {:?}",
1461				new_blocks.len(),
1462				h,
1463				origin,
1464			);
1465			self.on_block_queued(h, n)
1466		}
1467		self.queue_blocks.extend(new_blocks.iter().map(|b| b.hash));
1468		if let Some(metrics) = &self.metrics {
1469			metrics
1470				.queued_blocks
1471				.set(self.queue_blocks.len().try_into().unwrap_or(u64::MAX));
1472		}
1473
1474		self.actions.push(SyncingAction::ImportBlocks { origin, blocks: new_blocks })
1475	}
1476
1477	fn update_peer_common_number(&mut self, peer_id: &PeerId, new_common: NumberFor<B>) {
1478		if let Some(peer) = self.peers.get_mut(peer_id) {
1479			peer.update_common_number(new_common);
1480		}
1481	}
1482
1483	/// Called when a block has been queued for import.
1484	///
1485	/// Updates our internal state for best queued block and then goes
1486	/// through all peers to update our view of their state as well.
1487	fn on_block_queued(&mut self, hash: &B::Hash, number: NumberFor<B>) {
1488		if self.fork_targets.remove(hash).is_some() {
1489			if let Some(metrics) = &self.metrics {
1490				metrics.fork_targets.dec();
1491			}
1492			trace!(target: LOG_TARGET, "Completed fork sync {hash:?}");
1493		}
1494		if let Some(gap_sync) = &mut self.gap_sync {
1495			if number > gap_sync.best_queued_number && number <= gap_sync.target {
1496				gap_sync.best_queued_number = number;
1497			}
1498		}
1499		if number > self.best_queued_number {
1500			self.best_queued_number = number;
1501			self.best_queued_hash = *hash;
1502			// Update common blocks
1503			for (n, peer) in self.peers.iter_mut() {
1504				if let PeerSyncState::AncestorSearch { .. } = peer.state {
1505					// Wait for ancestry search to complete first.
1506					continue;
1507				}
1508				let new_common_number =
1509					if peer.best_number >= number { number } else { peer.best_number };
1510				trace!(
1511					target: LOG_TARGET,
1512					"Updating peer {} info, ours={}, common={}->{}, their best={}",
1513					n,
1514					number,
1515					peer.common_number,
1516					new_common_number,
1517					peer.best_number,
1518				);
1519				peer.common_number = new_common_number;
1520			}
1521		}
1522		self.allowed_requests.set_all();
1523	}
1524
1525	/// Restart the sync process. This will reset all pending block requests and return an iterator
1526	/// of new block requests to make to peers. Peers that were downloading finality data (i.e.
1527	/// their state was `DownloadingJustification`) are unaffected and will stay in the same state.
1528	fn restart(&mut self) {
1529		self.blocks.clear();
1530		if let Err(e) = self.reset_sync_start_point() {
1531			warn!(target: LOG_TARGET, "💔  Unable to restart sync: {e}");
1532		}
1533		self.allowed_requests.set_all();
1534		debug!(
1535			target: LOG_TARGET,
1536			"Restarted with {} ({})",
1537			self.best_queued_number,
1538			self.best_queued_hash,
1539		);
1540		let old_peers = std::mem::take(&mut self.peers);
1541
1542		old_peers.into_iter().for_each(|(peer_id, mut peer_sync)| {
1543			match peer_sync.state {
1544				PeerSyncState::Available => {
1545					self.add_peer(peer_id, peer_sync.best_hash, peer_sync.best_number);
1546				},
1547				PeerSyncState::AncestorSearch { .. } |
1548				PeerSyncState::DownloadingNew(_) |
1549				PeerSyncState::DownloadingStale(_) |
1550				PeerSyncState::DownloadingGap(_) |
1551				PeerSyncState::DownloadingState => {
1552					// Cancel a request first, as `add_peer` may generate a new request.
1553					self.actions.push(SyncingAction::CancelRequest {
1554						peer_id,
1555						key: StrategyKey::ChainSync,
1556					});
1557					self.add_peer(peer_id, peer_sync.best_hash, peer_sync.best_number);
1558				},
1559				PeerSyncState::DownloadingJustification(_) => {
1560					// Peers that were downloading justifications
1561					// should be kept in that state.
1562					// We make sure our common number is at least something we have.
1563					trace!(
1564						target: LOG_TARGET,
1565						"Keeping peer {} after restart, updating common number from={} => to={} (our best).",
1566						peer_id,
1567						peer_sync.common_number,
1568						self.best_queued_number,
1569					);
1570					peer_sync.common_number = self.best_queued_number;
1571					self.peers.insert(peer_id, peer_sync);
1572				},
1573			}
1574		});
1575	}
1576
1577	/// Find a block to start sync from. If we sync with state, that's the latest block we have
1578	/// state for.
1579	fn reset_sync_start_point(&mut self) -> Result<(), ClientError> {
1580		let info = self.client.info();
1581		if matches!(self.mode, ChainSyncMode::LightState { .. }) && info.finalized_state.is_some() {
1582			warn!(
1583				target: LOG_TARGET,
1584				"Can't use fast sync mode with a partially synced database. Reverting to full sync mode."
1585			);
1586			self.mode = ChainSyncMode::Full;
1587		}
1588
1589		self.import_existing = false;
1590		self.best_queued_hash = info.best_hash;
1591		self.best_queued_number = info.best_number;
1592
1593		if self.mode == ChainSyncMode::Full &&
1594			self.client.block_status(info.best_hash)? != BlockStatus::InChainWithState
1595		{
1596			self.import_existing = true;
1597			// Latest state is missing, start with the last finalized state or genesis instead.
1598			if let Some((hash, number)) = info.finalized_state {
1599				debug!(target: LOG_TARGET, "Starting from finalized state #{number}");
1600				self.best_queued_hash = hash;
1601				self.best_queued_number = number;
1602			} else {
1603				debug!(target: LOG_TARGET, "Restarting from genesis");
1604				self.best_queued_hash = Default::default();
1605				self.best_queued_number = Zero::zero();
1606			}
1607		}
1608
1609		if let Some(BlockGap { start, end, .. }) = info.block_gap {
1610			debug!(target: LOG_TARGET, "Starting gap sync #{start} - #{end}");
1611			self.gap_sync = Some(GapSync {
1612				best_queued_number: start - One::one(),
1613				target: end,
1614				blocks: BlockCollection::new(),
1615			});
1616		}
1617		trace!(
1618			target: LOG_TARGET,
1619			"Restarted sync at #{} ({:?})",
1620			self.best_queued_number,
1621			self.best_queued_hash,
1622		);
1623		Ok(())
1624	}
1625
1626	/// What is the status of the block corresponding to the given hash?
1627	fn block_status(&self, hash: &B::Hash) -> Result<BlockStatus, ClientError> {
1628		if self.queue_blocks.contains(hash) {
1629			return Ok(BlockStatus::Queued);
1630		}
1631		self.client.block_status(*hash)
1632	}
1633
1634	/// Is the block corresponding to the given hash known?
1635	fn is_known(&self, hash: &B::Hash) -> bool {
1636		self.block_status(hash).ok().map_or(false, |s| s != BlockStatus::Unknown)
1637	}
1638
1639	/// Is any peer downloading the given hash?
1640	fn is_already_downloading(&self, hash: &B::Hash) -> bool {
1641		self.peers
1642			.iter()
1643			.any(|(_, p)| p.state == PeerSyncState::DownloadingStale(*hash))
1644	}
1645
1646	/// Get the set of downloaded blocks that are ready to be queued for import.
1647	fn ready_blocks(&mut self) -> Vec<IncomingBlock<B>> {
1648		self.blocks
1649			.ready_blocks(self.best_queued_number + One::one())
1650			.into_iter()
1651			.map(|block_data| {
1652				let justifications = block_data
1653					.block
1654					.justifications
1655					.or_else(|| legacy_justification_mapping(block_data.block.justification));
1656				IncomingBlock {
1657					hash: block_data.block.hash,
1658					header: block_data.block.header,
1659					body: block_data.block.body,
1660					indexed_body: block_data.block.indexed_body,
1661					justifications,
1662					origin: block_data.origin,
1663					allow_missing_state: true,
1664					import_existing: self.import_existing,
1665					skip_execution: self.skip_execution(),
1666					state: None,
1667				}
1668			})
1669			.collect()
1670	}
1671
1672	/// Get justification requests scheduled by sync to be sent out.
1673	fn justification_requests(&mut self) -> Vec<(PeerId, BlockRequest<B>)> {
1674		let peers = &mut self.peers;
1675		let mut matcher = self.extra_justifications.matcher();
1676		std::iter::from_fn(move || {
1677			if let Some((peer, request)) = matcher.next(peers) {
1678				peers
1679					.get_mut(&peer)
1680					.expect(
1681						"`Matcher::next` guarantees the `PeerId` comes from the given peers; qed",
1682					)
1683					.state = PeerSyncState::DownloadingJustification(request.0);
1684				let req = BlockRequest::<B> {
1685					id: 0,
1686					fields: BlockAttributes::JUSTIFICATION,
1687					from: FromBlock::Hash(request.0),
1688					direction: Direction::Ascending,
1689					max: Some(1),
1690				};
1691				Some((peer, req))
1692			} else {
1693				None
1694			}
1695		})
1696		.collect()
1697	}
1698
1699	/// Get block requests scheduled by sync to be sent out.
1700	fn block_requests(&mut self) -> Vec<(PeerId, BlockRequest<B>)> {
1701		if self.allowed_requests.is_empty() || self.state_sync.is_some() {
1702			return Vec::new();
1703		}
1704
1705		if self.queue_blocks.len() > MAX_IMPORTING_BLOCKS {
1706			trace!(target: LOG_TARGET, "Too many blocks in the queue.");
1707			return Vec::new();
1708		}
1709		let is_major_syncing = self.status().state.is_major_syncing();
1710		let attrs = self.required_block_attributes();
1711		let blocks = &mut self.blocks;
1712		let fork_targets = &mut self.fork_targets;
1713		let last_finalized =
1714			std::cmp::min(self.best_queued_number, self.client.info().finalized_number);
1715		let best_queued = self.best_queued_number;
1716		let client = &self.client;
1717		let queue_blocks = &self.queue_blocks;
1718		let allowed_requests = self.allowed_requests.clone();
1719		let max_parallel = if is_major_syncing { 1 } else { self.max_parallel_downloads };
1720		let max_blocks_per_request = self.max_blocks_per_request;
1721		let gap_sync = &mut self.gap_sync;
1722		let disconnected_peers = &mut self.disconnected_peers;
1723		let metrics = self.metrics.as_ref();
1724		let requests = self
1725			.peers
1726			.iter_mut()
1727			.filter_map(move |(&id, peer)| {
1728				if !peer.state.is_available() ||
1729					!allowed_requests.contains(&id) ||
1730					!disconnected_peers.is_peer_available(&id)
1731				{
1732					return None;
1733				}
1734
1735				// If our best queued is more than `MAX_BLOCKS_TO_LOOK_BACKWARDS` blocks away from
1736				// the common number, the peer best number is higher than our best queued and the
1737				// common number is smaller than the last finalized block number, we should do an
1738				// ancestor search to find a better common block. If the queue is full we wait till
1739				// all blocks are imported though.
1740				if best_queued.saturating_sub(peer.common_number) >
1741					MAX_BLOCKS_TO_LOOK_BACKWARDS.into() &&
1742					best_queued < peer.best_number &&
1743					peer.common_number < last_finalized &&
1744					queue_blocks.len() <= MAJOR_SYNC_BLOCKS.into()
1745				{
1746					trace!(
1747						target: LOG_TARGET,
1748						"Peer {:?} common block {} too far behind of our best {}. Starting ancestry search.",
1749						id,
1750						peer.common_number,
1751						best_queued,
1752					);
1753					let current = std::cmp::min(peer.best_number, best_queued);
1754					peer.state = PeerSyncState::AncestorSearch {
1755						current,
1756						start: best_queued,
1757						state: AncestorSearchState::ExponentialBackoff(One::one()),
1758					};
1759					Some((id, ancestry_request::<B>(current)))
1760				} else if let Some((range, req)) = peer_block_request(
1761					&id,
1762					peer,
1763					blocks,
1764					attrs,
1765					max_parallel,
1766					max_blocks_per_request,
1767					last_finalized,
1768					best_queued,
1769				) {
1770					peer.state = PeerSyncState::DownloadingNew(range.start);
1771					trace!(
1772						target: LOG_TARGET,
1773						"New block request for {}, (best:{}, common:{}) {:?}",
1774						id,
1775						peer.best_number,
1776						peer.common_number,
1777						req,
1778					);
1779					Some((id, req))
1780				} else if let Some((hash, req)) = fork_sync_request(
1781					&id,
1782					fork_targets,
1783					best_queued,
1784					last_finalized,
1785					attrs,
1786					|hash| {
1787						if queue_blocks.contains(hash) {
1788							BlockStatus::Queued
1789						} else {
1790							client.block_status(*hash).unwrap_or(BlockStatus::Unknown)
1791						}
1792					},
1793					max_blocks_per_request,
1794					metrics,
1795				) {
1796					trace!(target: LOG_TARGET, "Downloading fork {hash:?} from {id}");
1797					peer.state = PeerSyncState::DownloadingStale(hash);
1798					Some((id, req))
1799				} else if let Some((range, req)) = gap_sync.as_mut().and_then(|sync| {
1800					peer_gap_block_request(
1801						&id,
1802						peer,
1803						&mut sync.blocks,
1804						attrs,
1805						sync.target,
1806						sync.best_queued_number,
1807						max_blocks_per_request,
1808					)
1809				}) {
1810					peer.state = PeerSyncState::DownloadingGap(range.start);
1811					trace!(
1812						target: LOG_TARGET,
1813						"New gap block request for {}, (best:{}, common:{}) {:?}",
1814						id,
1815						peer.best_number,
1816						peer.common_number,
1817						req,
1818					);
1819					Some((id, req))
1820				} else {
1821					None
1822				}
1823			})
1824			.collect::<Vec<_>>();
1825
1826		// Clear the allowed_requests state when sending new block requests
1827		// to prevent multiple inflight block requests from being issued.
1828		if !requests.is_empty() {
1829			self.allowed_requests.take();
1830		}
1831
1832		requests
1833	}
1834
1835	/// Get a state request scheduled by sync to be sent out (if any).
1836	fn state_request(&mut self) -> Option<(PeerId, OpaqueStateRequest)> {
1837		if self.allowed_requests.is_empty() {
1838			return None;
1839		}
1840		if self.state_sync.is_some() &&
1841			self.peers.iter().any(|(_, peer)| peer.state == PeerSyncState::DownloadingState)
1842		{
1843			// Only one pending state request is allowed.
1844			return None;
1845		}
1846		if let Some(sync) = &self.state_sync {
1847			if sync.is_complete() {
1848				return None;
1849			}
1850
1851			for (id, peer) in self.peers.iter_mut() {
1852				if peer.state.is_available() &&
1853					peer.common_number >= sync.target_number() &&
1854					self.disconnected_peers.is_peer_available(&id)
1855				{
1856					peer.state = PeerSyncState::DownloadingState;
1857					let request = sync.next_request();
1858					trace!(target: LOG_TARGET, "New StateRequest for {}: {:?}", id, request);
1859					self.allowed_requests.clear();
1860					return Some((*id, OpaqueStateRequest(Box::new(request))));
1861				}
1862			}
1863		}
1864		None
1865	}
1866
1867	#[must_use]
1868	fn on_state_data(
1869		&mut self,
1870		peer_id: &PeerId,
1871		response: OpaqueStateResponse,
1872	) -> Result<(), BadPeer> {
1873		let response: Box<StateResponse> = response.0.downcast().map_err(|_error| {
1874			error!(
1875				target: LOG_TARGET,
1876				"Failed to downcast opaque state response, this is an implementation bug."
1877			);
1878
1879			BadPeer(*peer_id, rep::BAD_RESPONSE)
1880		})?;
1881
1882		if let Some(peer) = self.peers.get_mut(peer_id) {
1883			if let PeerSyncState::DownloadingState = peer.state {
1884				peer.state = PeerSyncState::Available;
1885				self.allowed_requests.set_all();
1886			}
1887		}
1888		let import_result = if let Some(sync) = &mut self.state_sync {
1889			debug!(
1890				target: LOG_TARGET,
1891				"Importing state data from {} with {} keys, {} proof nodes.",
1892				peer_id,
1893				response.entries.len(),
1894				response.proof.len(),
1895			);
1896			sync.import(*response)
1897		} else {
1898			debug!(target: LOG_TARGET, "Ignored obsolete state response from {peer_id}");
1899			return Err(BadPeer(*peer_id, rep::NOT_REQUESTED));
1900		};
1901
1902		match import_result {
1903			ImportResult::Import(hash, header, state, body, justifications) => {
1904				let origin = BlockOrigin::NetworkInitialSync;
1905				let block = IncomingBlock {
1906					hash,
1907					header: Some(header),
1908					body,
1909					indexed_body: None,
1910					justifications,
1911					origin: None,
1912					allow_missing_state: true,
1913					import_existing: true,
1914					skip_execution: self.skip_execution(),
1915					state: Some(state),
1916				};
1917				debug!(target: LOG_TARGET, "State download is complete. Import is queued");
1918				self.actions.push(SyncingAction::ImportBlocks { origin, blocks: vec![block] });
1919				Ok(())
1920			},
1921			ImportResult::Continue => Ok(()),
1922			ImportResult::BadResponse => {
1923				debug!(target: LOG_TARGET, "Bad state data received from {peer_id}");
1924				Err(BadPeer(*peer_id, rep::BAD_BLOCK))
1925			},
1926		}
1927	}
1928
1929	fn attempt_state_sync(
1930		&mut self,
1931		finalized_hash: B::Hash,
1932		finalized_number: NumberFor<B>,
1933		skip_proofs: bool,
1934	) {
1935		let mut heads: Vec<_> = self.peers.values().map(|peer| peer.best_number).collect();
1936		heads.sort();
1937		let median = heads[heads.len() / 2];
1938		if finalized_number + STATE_SYNC_FINALITY_THRESHOLD.saturated_into() >= median {
1939			if let Ok(Some(header)) = self.client.header(finalized_hash) {
1940				log::debug!(
1941					target: LOG_TARGET,
1942					"Starting state sync for #{finalized_number} ({finalized_hash})",
1943				);
1944				self.state_sync =
1945					Some(StateSync::new(self.client.clone(), header, None, None, skip_proofs));
1946				self.allowed_requests.set_all();
1947			} else {
1948				log::error!(
1949					target: LOG_TARGET,
1950					"Failed to start state sync: header for finalized block \
1951					  #{finalized_number} ({finalized_hash}) is not available",
1952				);
1953				debug_assert!(false);
1954			}
1955		}
1956	}
1957
1958	/// A version of `actions()` that doesn't schedule extra requests. For testing only.
1959	#[cfg(test)]
1960	#[must_use]
1961	fn take_actions(&mut self) -> impl Iterator<Item = SyncingAction<B>> {
1962		std::mem::take(&mut self.actions).into_iter()
1963	}
1964}
1965
1966// This is purely during a backwards compatible transitionary period and should be removed
1967// once we can assume all nodes can send and receive multiple Justifications
1968// The ID tag is hardcoded here to avoid depending on the GRANDPA crate.
1969// See: https://github.com/paritytech/substrate/issues/8172
1970fn legacy_justification_mapping(
1971	justification: Option<EncodedJustification>,
1972) -> Option<Justifications> {
1973	justification.map(|just| (*b"FRNK", just).into())
1974}
1975
1976/// Request the ancestry for a block. Sends a request for header and justification for the given
1977/// block number. Used during ancestry search.
1978fn ancestry_request<B: BlockT>(block: NumberFor<B>) -> BlockRequest<B> {
1979	BlockRequest::<B> {
1980		id: 0,
1981		fields: BlockAttributes::HEADER | BlockAttributes::JUSTIFICATION,
1982		from: FromBlock::Number(block),
1983		direction: Direction::Ascending,
1984		max: Some(1),
1985	}
1986}
1987
1988/// The ancestor search state expresses which algorithm, and its stateful parameters, we are using
1989/// to try to find an ancestor block
1990#[derive(Copy, Clone, Eq, PartialEq, Debug)]
1991pub(crate) enum AncestorSearchState<B: BlockT> {
1992	/// Use exponential backoff to find an ancestor, then switch to binary search.
1993	/// We keep track of the exponent.
1994	ExponentialBackoff(NumberFor<B>),
1995	/// Using binary search to find the best ancestor.
1996	/// We keep track of left and right bounds.
1997	BinarySearch(NumberFor<B>, NumberFor<B>),
1998}
1999
2000/// This function handles the ancestor search strategy used. The goal is to find a common point
2001/// that both our chains agree on that is as close to the tip as possible.
2002/// The way this works is we first have an exponential backoff strategy, where we try to step
2003/// forward until we find a block hash mismatch. The size of the step doubles each step we take.
2004///
2005/// When we've found a block hash mismatch we then fall back to a binary search between the two
2006/// last known points to find the common block closest to the tip.
2007fn handle_ancestor_search_state<B: BlockT>(
2008	state: &AncestorSearchState<B>,
2009	curr_block_num: NumberFor<B>,
2010	block_hash_match: bool,
2011) -> Option<(AncestorSearchState<B>, NumberFor<B>)> {
2012	let two = <NumberFor<B>>::one() + <NumberFor<B>>::one();
2013	match state {
2014		AncestorSearchState::ExponentialBackoff(next_distance_to_tip) => {
2015			let next_distance_to_tip = *next_distance_to_tip;
2016			if block_hash_match && next_distance_to_tip == One::one() {
2017				// We found the ancestor in the first step so there is no need to execute binary
2018				// search.
2019				return None;
2020			}
2021			if block_hash_match {
2022				let left = curr_block_num;
2023				let right = left + next_distance_to_tip / two;
2024				let middle = left + (right - left) / two;
2025				Some((AncestorSearchState::BinarySearch(left, right), middle))
2026			} else {
2027				let next_block_num =
2028					curr_block_num.checked_sub(&next_distance_to_tip).unwrap_or_else(Zero::zero);
2029				let next_distance_to_tip = next_distance_to_tip * two;
2030				Some((
2031					AncestorSearchState::ExponentialBackoff(next_distance_to_tip),
2032					next_block_num,
2033				))
2034			}
2035		},
2036		AncestorSearchState::BinarySearch(mut left, mut right) => {
2037			if left >= curr_block_num {
2038				return None;
2039			}
2040			if block_hash_match {
2041				left = curr_block_num;
2042			} else {
2043				right = curr_block_num;
2044			}
2045			assert!(right >= left);
2046			let middle = left + (right - left) / two;
2047			if middle == curr_block_num {
2048				None
2049			} else {
2050				Some((AncestorSearchState::BinarySearch(left, right), middle))
2051			}
2052		},
2053	}
2054}
2055
2056/// Get a new block request for the peer if any.
2057fn peer_block_request<B: BlockT>(
2058	id: &PeerId,
2059	peer: &PeerSync<B>,
2060	blocks: &mut BlockCollection<B>,
2061	attrs: BlockAttributes,
2062	max_parallel_downloads: u32,
2063	max_blocks_per_request: u32,
2064	finalized: NumberFor<B>,
2065	best_num: NumberFor<B>,
2066) -> Option<(Range<NumberFor<B>>, BlockRequest<B>)> {
2067	if best_num >= peer.best_number {
2068		// Will be downloaded as alternative fork instead.
2069		return None;
2070	} else if peer.common_number < finalized {
2071		trace!(
2072			target: LOG_TARGET,
2073			"Requesting pre-finalized chain from {:?}, common={}, finalized={}, peer best={}, our best={}",
2074			id, peer.common_number, finalized, peer.best_number, best_num,
2075		);
2076	}
2077	let range = blocks.needed_blocks(
2078		*id,
2079		max_blocks_per_request,
2080		peer.best_number,
2081		peer.common_number,
2082		max_parallel_downloads,
2083		MAX_DOWNLOAD_AHEAD,
2084	)?;
2085
2086	// The end is not part of the range.
2087	let last = range.end.saturating_sub(One::one());
2088
2089	let from = if peer.best_number == last {
2090		FromBlock::Hash(peer.best_hash)
2091	} else {
2092		FromBlock::Number(last)
2093	};
2094
2095	let request = BlockRequest::<B> {
2096		id: 0,
2097		fields: attrs,
2098		from,
2099		direction: Direction::Descending,
2100		max: Some((range.end - range.start).saturated_into::<u32>()),
2101	};
2102
2103	Some((range, request))
2104}
2105
2106/// Get a new block request for the peer if any.
2107fn peer_gap_block_request<B: BlockT>(
2108	id: &PeerId,
2109	peer: &PeerSync<B>,
2110	blocks: &mut BlockCollection<B>,
2111	attrs: BlockAttributes,
2112	target: NumberFor<B>,
2113	common_number: NumberFor<B>,
2114	max_blocks_per_request: u32,
2115) -> Option<(Range<NumberFor<B>>, BlockRequest<B>)> {
2116	let range = blocks.needed_blocks(
2117		*id,
2118		max_blocks_per_request,
2119		std::cmp::min(peer.best_number, target),
2120		common_number,
2121		1,
2122		MAX_DOWNLOAD_AHEAD,
2123	)?;
2124
2125	// The end is not part of the range.
2126	let last = range.end.saturating_sub(One::one());
2127	let from = FromBlock::Number(last);
2128
2129	let request = BlockRequest::<B> {
2130		id: 0,
2131		fields: attrs,
2132		from,
2133		direction: Direction::Descending,
2134		max: Some((range.end - range.start).saturated_into::<u32>()),
2135	};
2136	Some((range, request))
2137}
2138
2139/// Get pending fork sync targets for a peer.
2140fn fork_sync_request<B: BlockT>(
2141	id: &PeerId,
2142	fork_targets: &mut HashMap<B::Hash, ForkTarget<B>>,
2143	best_num: NumberFor<B>,
2144	finalized: NumberFor<B>,
2145	attributes: BlockAttributes,
2146	check_block: impl Fn(&B::Hash) -> BlockStatus,
2147	max_blocks_per_request: u32,
2148	metrics: Option<&Metrics>,
2149) -> Option<(B::Hash, BlockRequest<B>)> {
2150	fork_targets.retain(|hash, r| {
2151		if r.number <= finalized {
2152			trace!(
2153				target: LOG_TARGET,
2154				"Removed expired fork sync request {:?} (#{})",
2155				hash,
2156				r.number,
2157			);
2158			return false;
2159		}
2160		if check_block(hash) != BlockStatus::Unknown {
2161			trace!(
2162				target: LOG_TARGET,
2163				"Removed obsolete fork sync request {:?} (#{})",
2164				hash,
2165				r.number,
2166			);
2167			return false;
2168		}
2169		true
2170	});
2171	if let Some(metrics) = metrics {
2172		metrics.fork_targets.set(fork_targets.len().try_into().unwrap_or(u64::MAX));
2173	}
2174	for (hash, r) in fork_targets {
2175		if !r.peers.contains(&id) {
2176			continue;
2177		}
2178		// Download the fork only if it is behind or not too far ahead our tip of the chain
2179		// Otherwise it should be downloaded in full sync mode.
2180		if r.number <= best_num ||
2181			(r.number - best_num).saturated_into::<u32>() < max_blocks_per_request as u32
2182		{
2183			let parent_status = r.parent_hash.as_ref().map_or(BlockStatus::Unknown, check_block);
2184			let count = if parent_status == BlockStatus::Unknown {
2185				(r.number - finalized).saturated_into::<u32>() // up to the last finalized block
2186			} else {
2187				// request only single block
2188				1
2189			};
2190			trace!(
2191				target: LOG_TARGET,
2192				"Downloading requested fork {hash:?} from {id}, {count} blocks",
2193			);
2194			return Some((
2195				*hash,
2196				BlockRequest::<B> {
2197					id: 0,
2198					fields: attributes,
2199					from: FromBlock::Hash(*hash),
2200					direction: Direction::Descending,
2201					max: Some(count),
2202				},
2203			));
2204		} else {
2205			trace!(target: LOG_TARGET, "Fork too far in the future: {:?} (#{})", hash, r.number);
2206		}
2207	}
2208	None
2209}
2210
2211/// Returns `true` if the given `block` is a descendent of `base`.
2212fn is_descendent_of<Block, T>(
2213	client: &T,
2214	base: &Block::Hash,
2215	block: &Block::Hash,
2216) -> sp_blockchain::Result<bool>
2217where
2218	Block: BlockT,
2219	T: HeaderMetadata<Block, Error = sp_blockchain::Error> + ?Sized,
2220{
2221	if base == block {
2222		return Ok(false);
2223	}
2224
2225	let ancestor = sp_blockchain::lowest_common_ancestor(client, *block, *base)?;
2226
2227	Ok(ancestor.hash == *base)
2228}
2229
2230/// Validate that the given `blocks` are correct.
2231/// Returns the number of the first block in the sequence.
2232///
2233/// It is expected that `blocks` are in ascending order.
2234pub fn validate_blocks<Block: BlockT>(
2235	blocks: &Vec<BlockData<Block>>,
2236	peer_id: &PeerId,
2237	request: Option<BlockRequest<Block>>,
2238) -> Result<Option<NumberFor<Block>>, BadPeer> {
2239	if let Some(request) = request {
2240		if Some(blocks.len() as _) > request.max {
2241			debug!(
2242				target: LOG_TARGET,
2243				"Received more blocks than requested from {}. Expected in maximum {:?}, got {}.",
2244				peer_id,
2245				request.max,
2246				blocks.len(),
2247			);
2248
2249			return Err(BadPeer(*peer_id, rep::NOT_REQUESTED));
2250		}
2251
2252		let block_header =
2253			if request.direction == Direction::Descending { blocks.last() } else { blocks.first() }
2254				.and_then(|b| b.header.as_ref());
2255
2256		let expected_block = block_header.as_ref().map_or(false, |h| match request.from {
2257			FromBlock::Hash(hash) => h.hash() == hash,
2258			FromBlock::Number(n) => h.number() == &n,
2259		});
2260
2261		if !expected_block {
2262			debug!(
2263				target: LOG_TARGET,
2264				"Received block that was not requested. Requested {:?}, got {:?}.",
2265				request.from,
2266				block_header,
2267			);
2268
2269			return Err(BadPeer(*peer_id, rep::NOT_REQUESTED));
2270		}
2271
2272		if request.fields.contains(BlockAttributes::HEADER) &&
2273			blocks.iter().any(|b| b.header.is_none())
2274		{
2275			trace!(
2276				target: LOG_TARGET,
2277				"Missing requested header for a block in response from {peer_id}.",
2278			);
2279
2280			return Err(BadPeer(*peer_id, rep::BAD_RESPONSE));
2281		}
2282
2283		if request.fields.contains(BlockAttributes::BODY) && blocks.iter().any(|b| b.body.is_none())
2284		{
2285			trace!(
2286				target: LOG_TARGET,
2287				"Missing requested body for a block in response from {peer_id}.",
2288			);
2289
2290			return Err(BadPeer(*peer_id, rep::BAD_RESPONSE));
2291		}
2292	}
2293
2294	for b in blocks {
2295		if let Some(header) = &b.header {
2296			let hash = header.hash();
2297			if hash != b.hash {
2298				debug!(
2299					target: LOG_TARGET,
2300					"Bad header received from {}. Expected hash {:?}, got {:?}",
2301					peer_id,
2302					b.hash,
2303					hash,
2304				);
2305				return Err(BadPeer(*peer_id, rep::BAD_BLOCK));
2306			}
2307		}
2308		if let (Some(header), Some(body)) = (&b.header, &b.body) {
2309			let expected = *header.extrinsics_root();
2310			let got = HashingFor::<Block>::ordered_trie_root(
2311				body.iter().map(Encode::encode).collect(),
2312				sp_runtime::StateVersion::V0,
2313			);
2314			if expected != got {
2315				debug!(
2316					target: LOG_TARGET,
2317					"Bad extrinsic root for a block {} received from {}. Expected {:?}, got {:?}",
2318					b.hash,
2319					peer_id,
2320					expected,
2321					got,
2322				);
2323				return Err(BadPeer(*peer_id, rep::BAD_BLOCK));
2324			}
2325		}
2326	}
2327
2328	Ok(blocks.first().and_then(|b| b.header.as_ref()).map(|h| *h.number()))
2329}