referrerpolicy=no-referrer-when-downgrade

sc_network_sync/strategy/
warp.rs

1// This file is part of Substrate.
2
3// Copyright (C) Parity Technologies (UK) Ltd.
4// SPDX-License-Identifier: GPL-3.0-or-later WITH Classpath-exception-2.0
5
6// This program is free software: you can redistribute it and/or modify
7// it under the terms of the GNU General Public License as published by
8// the Free Software Foundation, either version 3 of the License, or
9// (at your option) any later version.
10
11// This program is distributed in the hope that it will be useful,
12// but WITHOUT ANY WARRANTY; without even the implied warranty of
13// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
14// GNU General Public License for more details.
15
16// You should have received a copy of the GNU General Public License
17// along with this program. If not, see <https://www.gnu.org/licenses/>.
18
19//! Warp syncing strategy. Bootstraps chain by downloading warp proofs and state.
20
21use sc_consensus::IncomingBlock;
22use sp_consensus::BlockOrigin;
23pub use sp_consensus_grandpa::{AuthorityList, SetId};
24
25use crate::{
26	block_relay_protocol::{BlockDownloader, BlockResponseError},
27	service::network::NetworkServiceHandle,
28	strategy::{
29		chain_sync::validate_blocks, disconnected_peers::DisconnectedPeers, StrategyKey,
30		SyncingAction,
31	},
32	types::{BadPeer, SyncState, SyncStatus},
33	LOG_TARGET,
34};
35use codec::{Decode, Encode};
36use futures::{channel::oneshot, FutureExt};
37use log::{debug, error, trace, warn};
38use sc_network::{IfDisconnected, ProtocolName};
39use sc_network_common::sync::message::{
40	BlockAnnounce, BlockAttributes, BlockData, BlockRequest, Direction, FromBlock,
41};
42use sc_network_types::PeerId;
43use sp_blockchain::HeaderBackend;
44use sp_runtime::{
45	traits::{Block as BlockT, Header, NumberFor, Zero},
46	Justifications, SaturatedConversion,
47};
48use std::{any::Any, collections::HashMap, fmt, sync::Arc};
49
50/// Number of peers that need to be connected before warp sync is started.
51const MIN_PEERS_TO_START_WARP_SYNC: usize = 3;
52
53/// Scale-encoded warp sync proof response.
54pub struct EncodedProof(pub Vec<u8>);
55
56/// Warp sync request
57#[derive(Encode, Decode, Debug, Clone)]
58pub struct WarpProofRequest<B: BlockT> {
59	/// Start collecting proofs from this block.
60	pub begin: B::Hash,
61}
62
63/// Proof verification result.
64pub enum VerificationResult<Block: BlockT> {
65	/// Proof is valid, but the target was not reached.
66	Partial(SetId, AuthorityList, Block::Hash, Vec<(Block::Header, Justifications)>),
67	/// Target finality is proved.
68	Complete(SetId, AuthorityList, Block::Header, Vec<(Block::Header, Justifications)>),
69}
70
71/// Warp sync backend. Handles retrieving and verifying warp sync proofs.
72pub trait WarpSyncProvider<Block: BlockT>: Send + Sync {
73	/// Generate proof starting at given block hash. The proof is accumulated until maximum proof
74	/// size is reached.
75	fn generate(
76		&self,
77		start: Block::Hash,
78	) -> Result<EncodedProof, Box<dyn std::error::Error + Send + Sync>>;
79	/// Verify warp proof against current set of authorities.
80	fn verify(
81		&self,
82		proof: &EncodedProof,
83		set_id: SetId,
84		authorities: AuthorityList,
85	) -> Result<VerificationResult<Block>, Box<dyn std::error::Error + Send + Sync>>;
86	/// Get current list of authorities. This is supposed to be genesis authorities when starting
87	/// sync.
88	fn current_authorities(&self) -> AuthorityList;
89}
90
91mod rep {
92	use sc_network::ReputationChange as Rep;
93
94	/// Unexpected response received form a peer
95	pub const UNEXPECTED_RESPONSE: Rep = Rep::new(-(1 << 29), "Unexpected response");
96
97	/// Peer provided invalid warp proof data
98	pub const BAD_WARP_PROOF: Rep = Rep::new(-(1 << 29), "Bad warp proof");
99
100	/// Peer did not provide us with advertised block data.
101	pub const NO_BLOCK: Rep = Rep::new(-(1 << 29), "No requested block data");
102
103	/// Reputation change for peers which send us non-requested block data.
104	pub const NOT_REQUESTED: Rep = Rep::new(-(1 << 29), "Not requested block data");
105
106	/// Reputation change for peers which send us a block which we fail to verify.
107	pub const VERIFICATION_FAIL: Rep = Rep::new(-(1 << 29), "Block verification failed");
108
109	/// We received a message that failed to decode.
110	pub const BAD_MESSAGE: Rep = Rep::new(-(1 << 12), "Bad message");
111}
112
113/// Reported warp sync phase.
114#[derive(Clone, Eq, PartialEq, Debug)]
115pub enum WarpSyncPhase<Block: BlockT> {
116	/// Waiting for peers to connect.
117	AwaitingPeers { required_peers: usize },
118	/// Downloading and verifying grandpa warp proofs.
119	DownloadingWarpProofs,
120	/// Downloading target block.
121	DownloadingTargetBlock,
122	/// Downloading state data.
123	DownloadingState,
124	/// Importing state.
125	ImportingState,
126	/// Downloading block history.
127	DownloadingBlocks(NumberFor<Block>),
128	/// Warp sync is complete.
129	Complete,
130}
131
132impl<Block: BlockT> fmt::Display for WarpSyncPhase<Block> {
133	fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result {
134		match self {
135			Self::AwaitingPeers { required_peers } =>
136				write!(f, "Waiting for {required_peers} peers to be connected"),
137			Self::DownloadingWarpProofs => write!(f, "Downloading finality proofs"),
138			Self::DownloadingTargetBlock => write!(f, "Downloading target block"),
139			Self::DownloadingState => write!(f, "Downloading state"),
140			Self::ImportingState => write!(f, "Importing state"),
141			Self::DownloadingBlocks(n) => write!(f, "Downloading block history (#{})", n),
142			Self::Complete => write!(f, "Warp sync is complete"),
143		}
144	}
145}
146
147/// Reported warp sync progress.
148#[derive(Clone, Eq, PartialEq, Debug)]
149pub struct WarpSyncProgress<Block: BlockT> {
150	/// Estimated download percentage.
151	pub phase: WarpSyncPhase<Block>,
152	/// Total bytes downloaded so far.
153	pub total_bytes: u64,
154}
155
156/// Warp sync configuration as accepted by [`WarpSync`].
157pub enum WarpSyncConfig<Block: BlockT> {
158	/// Standard warp sync for the chain.
159	WithProvider(Arc<dyn WarpSyncProvider<Block>>),
160	/// Skip downloading proofs and use provided header of the state that should be downloaded.
161	///
162	/// It is expected that the header provider ensures that the header is trusted.
163	WithTarget(<Block as BlockT>::Header),
164}
165
166/// Warp sync phase used by warp sync state machine.
167enum Phase<B: BlockT> {
168	/// Waiting for enough peers to connect.
169	WaitingForPeers { warp_sync_provider: Arc<dyn WarpSyncProvider<B>> },
170	/// Downloading warp proofs.
171	WarpProof {
172		set_id: SetId,
173		authorities: AuthorityList,
174		last_hash: B::Hash,
175		warp_sync_provider: Arc<dyn WarpSyncProvider<B>>,
176	},
177	/// Downloading target block.
178	TargetBlock(B::Header),
179	/// Warp sync is complete.
180	Complete,
181}
182
183enum PeerState {
184	Available,
185	DownloadingProofs,
186	DownloadingTargetBlock,
187}
188
189impl PeerState {
190	fn is_available(&self) -> bool {
191		matches!(self, PeerState::Available)
192	}
193}
194
195struct Peer<B: BlockT> {
196	best_number: NumberFor<B>,
197	state: PeerState,
198}
199
200pub struct WarpSyncResult<B: BlockT> {
201	pub target_header: B::Header,
202	pub target_body: Option<Vec<B::Extrinsic>>,
203	pub target_justifications: Option<Justifications>,
204}
205
206/// Warp sync state machine. Accumulates warp proofs and state.
207pub struct WarpSync<B: BlockT, Client> {
208	phase: Phase<B>,
209	client: Arc<Client>,
210	total_proof_bytes: u64,
211	total_state_bytes: u64,
212	peers: HashMap<PeerId, Peer<B>>,
213	disconnected_peers: DisconnectedPeers,
214	protocol_name: Option<ProtocolName>,
215	block_downloader: Arc<dyn BlockDownloader<B>>,
216	actions: Vec<SyncingAction<B>>,
217	result: Option<WarpSyncResult<B>>,
218	/// Number of peers that need to be connected before warp sync is started.
219	min_peers_to_start_warp_sync: usize,
220}
221
222impl<B, Client> WarpSync<B, Client>
223where
224	B: BlockT,
225	Client: HeaderBackend<B> + 'static,
226{
227	/// Strategy key used by warp sync.
228	pub const STRATEGY_KEY: StrategyKey = StrategyKey::new("Warp");
229
230	/// Create a new instance. When passing a warp sync provider we will be checking for proof and
231	/// authorities. Alternatively we can pass a target block when we want to skip downloading
232	/// proofs, in this case we will continue polling until the target block is known.
233	pub fn new(
234		client: Arc<Client>,
235		warp_sync_config: WarpSyncConfig<B>,
236		protocol_name: Option<ProtocolName>,
237		block_downloader: Arc<dyn BlockDownloader<B>>,
238		min_peers_to_start_warp_sync: Option<usize>,
239	) -> Self {
240		let min_peers_to_start_warp_sync =
241			min_peers_to_start_warp_sync.unwrap_or(MIN_PEERS_TO_START_WARP_SYNC);
242		if client.info().finalized_state.is_some() {
243			error!(
244				target: LOG_TARGET,
245				"Can't use warp sync mode with a partially synced database. Reverting to full sync mode."
246			);
247			return Self {
248				client,
249				phase: Phase::Complete,
250				total_proof_bytes: 0,
251				total_state_bytes: 0,
252				peers: HashMap::new(),
253				disconnected_peers: DisconnectedPeers::new(),
254				protocol_name,
255				block_downloader,
256				actions: vec![SyncingAction::Finished],
257				result: None,
258				min_peers_to_start_warp_sync,
259			}
260		}
261
262		let phase = match warp_sync_config {
263			WarpSyncConfig::WithProvider(warp_sync_provider) =>
264				Phase::WaitingForPeers { warp_sync_provider },
265			WarpSyncConfig::WithTarget(target_header) => Phase::TargetBlock(target_header),
266		};
267
268		Self {
269			client,
270			phase,
271			total_proof_bytes: 0,
272			total_state_bytes: 0,
273			peers: HashMap::new(),
274			disconnected_peers: DisconnectedPeers::new(),
275			protocol_name,
276			block_downloader,
277			actions: Vec::new(),
278			result: None,
279			min_peers_to_start_warp_sync,
280		}
281	}
282
283	/// Notify that a new peer has connected.
284	pub fn add_peer(&mut self, peer_id: PeerId, _best_hash: B::Hash, best_number: NumberFor<B>) {
285		self.peers.insert(peer_id, Peer { best_number, state: PeerState::Available });
286
287		self.try_to_start_warp_sync();
288	}
289
290	/// Notify that a peer has disconnected.
291	pub fn remove_peer(&mut self, peer_id: &PeerId) {
292		if let Some(state) = self.peers.remove(peer_id) {
293			if !state.state.is_available() {
294				if let Some(bad_peer) =
295					self.disconnected_peers.on_disconnect_during_request(*peer_id)
296				{
297					self.actions.push(SyncingAction::DropPeer(bad_peer));
298				}
299			}
300		}
301	}
302
303	/// Submit a validated block announcement.
304	///
305	/// Returns new best hash & best number of the peer if they are updated.
306	#[must_use]
307	pub fn on_validated_block_announce(
308		&mut self,
309		is_best: bool,
310		peer_id: PeerId,
311		announce: &BlockAnnounce<B::Header>,
312	) -> Option<(B::Hash, NumberFor<B>)> {
313		is_best.then(|| {
314			let best_number = *announce.header.number();
315			let best_hash = announce.header.hash();
316			if let Some(ref mut peer) = self.peers.get_mut(&peer_id) {
317				peer.best_number = best_number;
318			}
319			// Let `SyncingEngine` know that we should update the peer info.
320			(best_hash, best_number)
321		})
322	}
323
324	/// Start warp sync as soon as we have enough peers.
325	fn try_to_start_warp_sync(&mut self) {
326		let Phase::WaitingForPeers { warp_sync_provider } = &self.phase else { return };
327
328		if self.peers.len() < self.min_peers_to_start_warp_sync {
329			return
330		}
331
332		self.phase = Phase::WarpProof {
333			set_id: 0,
334			authorities: warp_sync_provider.current_authorities(),
335			last_hash: self.client.info().genesis_hash,
336			warp_sync_provider: Arc::clone(warp_sync_provider),
337		};
338		trace!(target: LOG_TARGET, "Started warp sync with {} peers.", self.peers.len());
339	}
340
341	pub fn on_generic_response(
342		&mut self,
343		peer_id: &PeerId,
344		protocol_name: ProtocolName,
345		response: Box<dyn Any + Send>,
346	) {
347		if &protocol_name == self.block_downloader.protocol_name() {
348			let Ok(response) = response
349				.downcast::<(BlockRequest<B>, Result<Vec<BlockData<B>>, BlockResponseError>)>()
350			else {
351				warn!(target: LOG_TARGET, "Failed to downcast block response");
352				debug_assert!(false);
353				return;
354			};
355
356			let (request, response) = *response;
357			let blocks = match response {
358				Ok(blocks) => blocks,
359				Err(BlockResponseError::DecodeFailed(e)) => {
360					debug!(
361						target: LOG_TARGET,
362						"Failed to decode block response from peer {:?}: {:?}.",
363						peer_id,
364						e
365					);
366					self.actions.push(SyncingAction::DropPeer(BadPeer(*peer_id, rep::BAD_MESSAGE)));
367					return;
368				},
369				Err(BlockResponseError::ExtractionFailed(e)) => {
370					debug!(
371						target: LOG_TARGET,
372						"Failed to extract blocks from peer response {:?}: {:?}.",
373						peer_id,
374						e
375					);
376					self.actions.push(SyncingAction::DropPeer(BadPeer(*peer_id, rep::BAD_MESSAGE)));
377					return;
378				},
379			};
380
381			self.on_block_response(*peer_id, request, blocks);
382		} else {
383			let Ok(response) = response.downcast::<Vec<u8>>() else {
384				warn!(target: LOG_TARGET, "Failed to downcast warp sync response");
385				debug_assert!(false);
386				return;
387			};
388
389			self.on_warp_proof_response(peer_id, EncodedProof(*response));
390		}
391	}
392
393	/// Process warp proof response.
394	pub fn on_warp_proof_response(&mut self, peer_id: &PeerId, response: EncodedProof) {
395		if let Some(peer) = self.peers.get_mut(peer_id) {
396			peer.state = PeerState::Available;
397		}
398
399		let Phase::WarpProof { set_id, authorities, last_hash, warp_sync_provider } =
400			&mut self.phase
401		else {
402			debug!(target: LOG_TARGET, "Unexpected warp proof response");
403			self.actions
404				.push(SyncingAction::DropPeer(BadPeer(*peer_id, rep::UNEXPECTED_RESPONSE)));
405			return
406		};
407
408		let proof_to_incoming_block =
409			|(header, justifications): (B::Header, Justifications)| -> IncomingBlock<B> {
410				IncomingBlock {
411					hash: header.hash(),
412					header: Some(header),
413					body: None,
414					indexed_body: None,
415					justifications: Some(justifications),
416					origin: Some(*peer_id),
417					// We are still in warp sync, so we don't have the state. This means
418					// we also can't execute the block.
419					allow_missing_state: true,
420					skip_execution: true,
421					// Shouldn't already exist in the database.
422					import_existing: false,
423					state: None,
424				}
425			};
426
427		match warp_sync_provider.verify(&response, *set_id, authorities.clone()) {
428			Err(e) => {
429				debug!(target: LOG_TARGET, "Bad warp proof response: {}", e);
430				self.actions
431					.push(SyncingAction::DropPeer(BadPeer(*peer_id, rep::BAD_WARP_PROOF)))
432			},
433			Ok(VerificationResult::Partial(new_set_id, new_authorities, new_last_hash, proofs)) => {
434				log::debug!(target: LOG_TARGET, "Verified partial proof, set_id={:?}", new_set_id);
435				*set_id = new_set_id;
436				*authorities = new_authorities;
437				*last_hash = new_last_hash;
438				self.total_proof_bytes += response.0.len() as u64;
439				self.actions.push(SyncingAction::ImportBlocks {
440					origin: BlockOrigin::NetworkInitialSync,
441					blocks: proofs.into_iter().map(proof_to_incoming_block).collect(),
442				});
443			},
444			Ok(VerificationResult::Complete(new_set_id, _, header, proofs)) => {
445				log::debug!(
446					target: LOG_TARGET,
447					"Verified complete proof, set_id={:?}. Continuing with target block download: {} ({}).",
448					new_set_id,
449					header.hash(),
450					header.number(),
451				);
452				self.total_proof_bytes += response.0.len() as u64;
453				self.phase = Phase::TargetBlock(header);
454				self.actions.push(SyncingAction::ImportBlocks {
455					origin: BlockOrigin::NetworkInitialSync,
456					blocks: proofs.into_iter().map(proof_to_incoming_block).collect(),
457				});
458			},
459		}
460	}
461
462	/// Process (target) block response.
463	pub fn on_block_response(
464		&mut self,
465		peer_id: PeerId,
466		request: BlockRequest<B>,
467		blocks: Vec<BlockData<B>>,
468	) {
469		if let Err(bad_peer) = self.on_block_response_inner(peer_id, request, blocks) {
470			self.actions.push(SyncingAction::DropPeer(bad_peer));
471		}
472	}
473
474	fn on_block_response_inner(
475		&mut self,
476		peer_id: PeerId,
477		request: BlockRequest<B>,
478		mut blocks: Vec<BlockData<B>>,
479	) -> Result<(), BadPeer> {
480		if let Some(peer) = self.peers.get_mut(&peer_id) {
481			peer.state = PeerState::Available;
482		}
483
484		let Phase::TargetBlock(header) = &mut self.phase else {
485			debug!(target: LOG_TARGET, "Unexpected target block response from {peer_id}");
486			return Err(BadPeer(peer_id, rep::UNEXPECTED_RESPONSE))
487		};
488
489		if blocks.is_empty() {
490			debug!(
491				target: LOG_TARGET,
492				"Downloading target block failed: empty block response from {peer_id}",
493			);
494			return Err(BadPeer(peer_id, rep::NO_BLOCK))
495		}
496
497		if blocks.len() > 1 {
498			debug!(
499				target: LOG_TARGET,
500				"Too many blocks ({}) in warp target block response from {peer_id}",
501				blocks.len(),
502			);
503			return Err(BadPeer(peer_id, rep::NOT_REQUESTED))
504		}
505
506		validate_blocks::<B>(&blocks, &peer_id, Some(request))?;
507
508		let block = blocks.pop().expect("`blocks` len checked above; qed");
509
510		let Some(block_header) = &block.header else {
511			debug!(
512				target: LOG_TARGET,
513				"Downloading target block failed: missing header in response from {peer_id}.",
514			);
515			return Err(BadPeer(peer_id, rep::VERIFICATION_FAIL))
516		};
517
518		if block_header != header {
519			debug!(
520				target: LOG_TARGET,
521				"Downloading target block failed: different header in response from {peer_id}.",
522			);
523			return Err(BadPeer(peer_id, rep::VERIFICATION_FAIL))
524		}
525
526		if block.body.is_none() {
527			debug!(
528				target: LOG_TARGET,
529				"Downloading target block failed: missing body in response from {peer_id}.",
530			);
531			return Err(BadPeer(peer_id, rep::VERIFICATION_FAIL))
532		}
533
534		self.result = Some(WarpSyncResult {
535			target_header: header.clone(),
536			target_body: block.body,
537			target_justifications: block.justifications,
538		});
539		self.phase = Phase::Complete;
540		self.actions.push(SyncingAction::Finished);
541		Ok(())
542	}
543
544	/// Reserve a peer for a request assigning `new_state`.
545	fn schedule_next_peer(
546		&mut self,
547		new_state: PeerState,
548		min_best_number: Option<NumberFor<B>>,
549	) -> Option<PeerId> {
550		let mut targets: Vec<_> = self.peers.values().map(|p| p.best_number).collect();
551		if targets.is_empty() {
552			return None
553		}
554		targets.sort();
555		let median = targets[targets.len() / 2];
556		let threshold = std::cmp::max(median, min_best_number.unwrap_or(Zero::zero()));
557		// Find a random peer that is synced as much as peer majority and is above
558		// `min_best_number`.
559		for (peer_id, peer) in self.peers.iter_mut() {
560			if peer.state.is_available() &&
561				peer.best_number >= threshold &&
562				self.disconnected_peers.is_peer_available(peer_id)
563			{
564				peer.state = new_state;
565				return Some(*peer_id)
566			}
567		}
568		None
569	}
570
571	/// Produce warp proof request.
572	fn warp_proof_request(&mut self) -> Option<(PeerId, ProtocolName, WarpProofRequest<B>)> {
573		let Phase::WarpProof { last_hash, .. } = &self.phase else { return None };
574
575		// Copy `last_hash` early to cut the borrowing tie.
576		let begin = *last_hash;
577
578		if self
579			.peers
580			.values()
581			.any(|peer| matches!(peer.state, PeerState::DownloadingProofs))
582		{
583			// Only one warp proof request at a time is possible.
584			return None
585		}
586
587		let peer_id = self.schedule_next_peer(PeerState::DownloadingProofs, None)?;
588		trace!(target: LOG_TARGET, "New WarpProofRequest to {peer_id}, begin hash: {begin}.");
589
590		let request = WarpProofRequest { begin };
591
592		let Some(protocol_name) = self.protocol_name.clone() else {
593			warn!(
594				target: LOG_TARGET,
595				"Trying to send warp sync request when no protocol is configured {request:?}",
596			);
597			return None;
598		};
599
600		Some((peer_id, protocol_name, request))
601	}
602
603	/// Produce target block request.
604	fn target_block_request(&mut self) -> Option<(PeerId, BlockRequest<B>)> {
605		let Phase::TargetBlock(target_header) = &self.phase else { return None };
606
607		if self
608			.peers
609			.values()
610			.any(|peer| matches!(peer.state, PeerState::DownloadingTargetBlock))
611		{
612			// Only one target block request at a time is possible.
613			return None
614		}
615
616		// Cut the borrowing tie.
617		let target_hash = target_header.hash();
618		let target_number = *target_header.number();
619
620		let peer_id =
621			self.schedule_next_peer(PeerState::DownloadingTargetBlock, Some(target_number))?;
622
623		trace!(
624			target: LOG_TARGET,
625			"New target block request to {peer_id}, target: {} ({}).",
626			target_hash,
627			target_number,
628		);
629
630		Some((
631			peer_id,
632			BlockRequest::<B> {
633				id: 0,
634				fields: BlockAttributes::HEADER |
635					BlockAttributes::BODY |
636					BlockAttributes::JUSTIFICATION,
637				from: FromBlock::Hash(target_hash),
638				direction: Direction::Ascending,
639				max: Some(1),
640			},
641		))
642	}
643
644	/// Returns warp sync estimated progress (stage, bytes received).
645	pub fn progress(&self) -> WarpSyncProgress<B> {
646		match &self.phase {
647			Phase::WaitingForPeers { .. } => WarpSyncProgress {
648				phase: WarpSyncPhase::AwaitingPeers {
649					required_peers: self.min_peers_to_start_warp_sync,
650				},
651				total_bytes: self.total_proof_bytes,
652			},
653			Phase::WarpProof { .. } => WarpSyncProgress {
654				phase: WarpSyncPhase::DownloadingWarpProofs,
655				total_bytes: self.total_proof_bytes,
656			},
657			Phase::TargetBlock(_) => WarpSyncProgress {
658				phase: WarpSyncPhase::DownloadingTargetBlock,
659				total_bytes: self.total_proof_bytes,
660			},
661			Phase::Complete => WarpSyncProgress {
662				phase: WarpSyncPhase::Complete,
663				total_bytes: self.total_proof_bytes + self.total_state_bytes,
664			},
665		}
666	}
667
668	/// Get the number of peers known to warp sync.
669	pub fn num_peers(&self) -> usize {
670		self.peers.len()
671	}
672
673	/// Returns the current sync status.
674	pub fn status(&self) -> SyncStatus<B> {
675		SyncStatus {
676			state: match &self.phase {
677				Phase::WaitingForPeers { .. } => SyncState::Downloading { target: Zero::zero() },
678				Phase::WarpProof { .. } => SyncState::Downloading { target: Zero::zero() },
679				Phase::TargetBlock(header) => SyncState::Downloading { target: *header.number() },
680				Phase::Complete => SyncState::Idle,
681			},
682			best_seen_block: match &self.phase {
683				Phase::WaitingForPeers { .. } => None,
684				Phase::WarpProof { .. } => None,
685				Phase::TargetBlock(header) => Some(*header.number()),
686				Phase::Complete => None,
687			},
688			num_peers: self.peers.len().saturated_into(),
689			queued_blocks: 0,
690			state_sync: None,
691			warp_sync: Some(self.progress()),
692		}
693	}
694
695	/// Get actions that should be performed by the owner on [`WarpSync`]'s behalf
696	#[must_use]
697	pub fn actions(
698		&mut self,
699		network_service: &NetworkServiceHandle,
700	) -> impl Iterator<Item = SyncingAction<B>> {
701		let warp_proof_request =
702			self.warp_proof_request().into_iter().map(|(peer_id, protocol_name, request)| {
703				trace!(
704					target: LOG_TARGET,
705					"Created `WarpProofRequest` to {}, request: {:?}.",
706					peer_id,
707					request,
708				);
709
710				let (tx, rx) = oneshot::channel();
711
712				network_service.start_request(
713					peer_id,
714					protocol_name,
715					request.encode(),
716					tx,
717					IfDisconnected::ImmediateError,
718				);
719
720				SyncingAction::StartRequest {
721					peer_id,
722					key: Self::STRATEGY_KEY,
723					request: async move {
724						Ok(rx.await?.and_then(|(response, protocol_name)| {
725							Ok((Box::new(response) as Box<dyn Any + Send>, protocol_name))
726						}))
727					}
728					.boxed(),
729					remove_obsolete: false,
730				}
731			});
732		self.actions.extend(warp_proof_request);
733
734		let target_block_request =
735			self.target_block_request().into_iter().map(|(peer_id, request)| {
736				let downloader = self.block_downloader.clone();
737
738				SyncingAction::StartRequest {
739					peer_id,
740					key: Self::STRATEGY_KEY,
741					request: async move {
742						Ok(downloader.download_blocks(peer_id, request.clone()).await?.and_then(
743							|(response, protocol_name)| {
744								let decoded_response =
745									downloader.block_response_into_blocks(&request, response);
746								let result =
747									Box::new((request, decoded_response)) as Box<dyn Any + Send>;
748								Ok((result, protocol_name))
749							},
750						))
751					}
752					.boxed(),
753					// Sending block request implies dropping obsolete pending response as we are
754					// not interested in it anymore.
755					remove_obsolete: true,
756				}
757			});
758		self.actions.extend(target_block_request);
759
760		std::mem::take(&mut self.actions).into_iter()
761	}
762
763	/// Take the result of finished warp sync, returning `None` if the sync was unsuccessful.
764	#[must_use]
765	pub fn take_result(&mut self) -> Option<WarpSyncResult<B>> {
766		self.result.take()
767	}
768}
769
770#[cfg(test)]
771mod test {
772	use super::*;
773	use crate::{mock::MockBlockDownloader, service::network::NetworkServiceProvider};
774	use sc_block_builder::BlockBuilderBuilder;
775	use sp_blockchain::{BlockStatus, Error as BlockchainError, HeaderBackend, Info};
776	use sp_consensus_grandpa::{AuthorityList, SetId, GRANDPA_ENGINE_ID};
777	use sp_core::H256;
778	use sp_runtime::traits::{Block as BlockT, Header as HeaderT, NumberFor};
779	use std::{io::ErrorKind, sync::Arc};
780	use substrate_test_runtime_client::{
781		runtime::{Block, Hash},
782		BlockBuilderExt, DefaultTestClientBuilderExt, TestClientBuilder, TestClientBuilderExt,
783	};
784
785	mockall::mock! {
786		pub Client<B: BlockT> {}
787
788		impl<B: BlockT> HeaderBackend<B> for Client<B> {
789			fn header(&self, hash: B::Hash) -> Result<Option<B::Header>, BlockchainError>;
790			fn info(&self) -> Info<B>;
791			fn status(&self, hash: B::Hash) -> Result<BlockStatus, BlockchainError>;
792			fn number(
793				&self,
794				hash: B::Hash,
795			) -> Result<Option<<<B as BlockT>::Header as HeaderT>::Number>, BlockchainError>;
796			fn hash(&self, number: NumberFor<B>) -> Result<Option<B::Hash>, BlockchainError>;
797		}
798	}
799
800	mockall::mock! {
801		pub WarpSyncProvider<B: BlockT> {}
802
803		impl<B: BlockT> super::WarpSyncProvider<B> for WarpSyncProvider<B> {
804			fn generate(
805				&self,
806				start: B::Hash,
807			) -> Result<EncodedProof, Box<dyn std::error::Error + Send + Sync>>;
808			fn verify(
809				&self,
810				proof: &EncodedProof,
811				set_id: SetId,
812				authorities: AuthorityList,
813			) -> Result<VerificationResult<B>, Box<dyn std::error::Error + Send + Sync>>;
814			fn current_authorities(&self) -> AuthorityList;
815		}
816	}
817
818	fn mock_client_with_state() -> MockClient<Block> {
819		let mut client = MockClient::<Block>::new();
820		let genesis_hash = Hash::random();
821		client.expect_info().return_once(move || Info {
822			best_hash: genesis_hash,
823			best_number: 0,
824			genesis_hash,
825			finalized_hash: genesis_hash,
826			finalized_number: 0,
827			// We need some finalized state to render warp sync impossible.
828			finalized_state: Some((genesis_hash, 0)),
829			number_leaves: 0,
830			block_gap: None,
831		});
832
833		client
834	}
835
836	fn mock_client_without_state() -> MockClient<Block> {
837		let mut client = MockClient::<Block>::new();
838		let genesis_hash = Hash::random();
839		client.expect_info().returning(move || Info {
840			best_hash: genesis_hash,
841			best_number: 0,
842			genesis_hash,
843			finalized_hash: genesis_hash,
844			finalized_number: 0,
845			finalized_state: None,
846			number_leaves: 0,
847			block_gap: None,
848		});
849
850		client
851	}
852
853	#[test]
854	fn warp_sync_with_provider_for_db_with_finalized_state_is_noop() {
855		let client = mock_client_with_state();
856		let provider = MockWarpSyncProvider::<Block>::new();
857		let config = WarpSyncConfig::WithProvider(Arc::new(provider));
858		let mut warp_sync = WarpSync::new(
859			Arc::new(client),
860			config,
861			None,
862			Arc::new(MockBlockDownloader::new()),
863			None,
864		);
865
866		let network_provider = NetworkServiceProvider::new();
867		let network_handle = network_provider.handle();
868
869		// Warp sync instantly finishes
870		let actions = warp_sync.actions(&network_handle).collect::<Vec<_>>();
871		assert_eq!(actions.len(), 1);
872		assert!(matches!(actions[0], SyncingAction::Finished));
873
874		// ... with no result.
875		assert!(warp_sync.take_result().is_none());
876	}
877
878	#[test]
879	fn warp_sync_to_target_for_db_with_finalized_state_is_noop() {
880		let client = mock_client_with_state();
881		let config = WarpSyncConfig::WithTarget(<Block as BlockT>::Header::new(
882			1,
883			Default::default(),
884			Default::default(),
885			Default::default(),
886			Default::default(),
887		));
888		let mut warp_sync = WarpSync::new(
889			Arc::new(client),
890			config,
891			None,
892			Arc::new(MockBlockDownloader::new()),
893			None,
894		);
895
896		let network_provider = NetworkServiceProvider::new();
897		let network_handle = network_provider.handle();
898
899		// Warp sync instantly finishes
900		let actions = warp_sync.actions(&network_handle).collect::<Vec<_>>();
901		assert_eq!(actions.len(), 1);
902		assert!(matches!(actions[0], SyncingAction::Finished));
903
904		// ... with no result.
905		assert!(warp_sync.take_result().is_none());
906	}
907
908	#[test]
909	fn warp_sync_with_provider_for_empty_db_doesnt_finish_instantly() {
910		let client = mock_client_without_state();
911		let provider = MockWarpSyncProvider::<Block>::new();
912		let config = WarpSyncConfig::WithProvider(Arc::new(provider));
913		let mut warp_sync = WarpSync::new(
914			Arc::new(client),
915			config,
916			None,
917			Arc::new(MockBlockDownloader::new()),
918			None,
919		);
920
921		let network_provider = NetworkServiceProvider::new();
922		let network_handle = network_provider.handle();
923
924		// No actions are emitted.
925		assert_eq!(warp_sync.actions(&network_handle).count(), 0)
926	}
927
928	#[test]
929	fn warp_sync_to_target_for_empty_db_doesnt_finish_instantly() {
930		let client = mock_client_without_state();
931		let config = WarpSyncConfig::WithTarget(<Block as BlockT>::Header::new(
932			1,
933			Default::default(),
934			Default::default(),
935			Default::default(),
936			Default::default(),
937		));
938		let mut warp_sync = WarpSync::new(
939			Arc::new(client),
940			config,
941			None,
942			Arc::new(MockBlockDownloader::new()),
943			None,
944		);
945
946		let network_provider = NetworkServiceProvider::new();
947		let network_handle = network_provider.handle();
948
949		// No actions are emitted.
950		assert_eq!(warp_sync.actions(&network_handle).count(), 0)
951	}
952
953	#[test]
954	fn warp_sync_is_started_only_when_there_is_enough_peers() {
955		let client = mock_client_without_state();
956		let mut provider = MockWarpSyncProvider::<Block>::new();
957		provider
958			.expect_current_authorities()
959			.once()
960			.return_const(AuthorityList::default());
961		let config = WarpSyncConfig::WithProvider(Arc::new(provider));
962		let mut warp_sync = WarpSync::new(
963			Arc::new(client),
964			config,
965			None,
966			Arc::new(MockBlockDownloader::new()),
967			None,
968		);
969
970		// Warp sync is not started when there is not enough peers.
971		for _ in 0..(MIN_PEERS_TO_START_WARP_SYNC - 1) {
972			warp_sync.add_peer(PeerId::random(), Hash::random(), 10);
973			assert!(matches!(warp_sync.phase, Phase::WaitingForPeers { .. }))
974		}
975
976		// Now we have enough peers and warp sync is started.
977		warp_sync.add_peer(PeerId::random(), Hash::random(), 10);
978		assert!(matches!(warp_sync.phase, Phase::WarpProof { .. }))
979	}
980
981	#[test]
982	fn no_peer_is_scheduled_if_no_peers_connected() {
983		let client = mock_client_without_state();
984		let provider = MockWarpSyncProvider::<Block>::new();
985		let config = WarpSyncConfig::WithProvider(Arc::new(provider));
986		let mut warp_sync = WarpSync::new(
987			Arc::new(client),
988			config,
989			None,
990			Arc::new(MockBlockDownloader::new()),
991			None,
992		);
993
994		assert!(warp_sync.schedule_next_peer(PeerState::DownloadingProofs, None).is_none());
995	}
996
997	#[test]
998	fn enough_peers_are_used_in_tests() {
999		// Tests below use 10 peers. Fail early if it's less than a threshold for warp sync.
1000		assert!(
1001			10 >= MIN_PEERS_TO_START_WARP_SYNC,
1002			"Tests must be updated to use that many initial peers.",
1003		);
1004	}
1005
1006	#[test]
1007	fn at_least_median_synced_peer_is_scheduled() {
1008		for _ in 0..100 {
1009			let client = mock_client_without_state();
1010			let mut provider = MockWarpSyncProvider::<Block>::new();
1011			provider
1012				.expect_current_authorities()
1013				.once()
1014				.return_const(AuthorityList::default());
1015			let config = WarpSyncConfig::WithProvider(Arc::new(provider));
1016			let mut warp_sync = WarpSync::new(
1017				Arc::new(client),
1018				config,
1019				None,
1020				Arc::new(MockBlockDownloader::new()),
1021				None,
1022			);
1023
1024			for best_number in 1..11 {
1025				warp_sync.add_peer(PeerId::random(), Hash::random(), best_number);
1026			}
1027
1028			let peer_id = warp_sync.schedule_next_peer(PeerState::DownloadingProofs, None);
1029			assert!(warp_sync.peers.get(&peer_id.unwrap()).unwrap().best_number >= 6);
1030		}
1031	}
1032
1033	#[test]
1034	fn min_best_number_peer_is_scheduled() {
1035		for _ in 0..10 {
1036			let client = mock_client_without_state();
1037			let mut provider = MockWarpSyncProvider::<Block>::new();
1038			provider
1039				.expect_current_authorities()
1040				.once()
1041				.return_const(AuthorityList::default());
1042			let config = WarpSyncConfig::WithProvider(Arc::new(provider));
1043			let mut warp_sync = WarpSync::new(
1044				Arc::new(client),
1045				config,
1046				None,
1047				Arc::new(MockBlockDownloader::new()),
1048				None,
1049			);
1050
1051			for best_number in 1..11 {
1052				warp_sync.add_peer(PeerId::random(), Hash::random(), best_number);
1053			}
1054
1055			let peer_id = warp_sync.schedule_next_peer(PeerState::DownloadingProofs, Some(10));
1056			assert!(warp_sync.peers.get(&peer_id.unwrap()).unwrap().best_number == 10);
1057		}
1058	}
1059
1060	#[test]
1061	fn backedoff_number_peer_is_not_scheduled() {
1062		let client = mock_client_without_state();
1063		let mut provider = MockWarpSyncProvider::<Block>::new();
1064		provider
1065			.expect_current_authorities()
1066			.once()
1067			.return_const(AuthorityList::default());
1068		let config = WarpSyncConfig::WithProvider(Arc::new(provider));
1069		let mut warp_sync = WarpSync::new(
1070			Arc::new(client),
1071			config,
1072			None,
1073			Arc::new(MockBlockDownloader::new()),
1074			None,
1075		);
1076
1077		for best_number in 1..11 {
1078			warp_sync.add_peer(PeerId::random(), Hash::random(), best_number);
1079		}
1080
1081		let ninth_peer =
1082			*warp_sync.peers.iter().find(|(_, state)| state.best_number == 9).unwrap().0;
1083		let tenth_peer =
1084			*warp_sync.peers.iter().find(|(_, state)| state.best_number == 10).unwrap().0;
1085
1086		// Disconnecting a peer without an inflight request has no effect on persistent states.
1087		warp_sync.remove_peer(&tenth_peer);
1088		assert!(warp_sync.disconnected_peers.is_peer_available(&tenth_peer));
1089
1090		warp_sync.add_peer(tenth_peer, H256::random(), 10);
1091		let peer_id = warp_sync.schedule_next_peer(PeerState::DownloadingProofs, Some(10));
1092		assert_eq!(tenth_peer, peer_id.unwrap());
1093		warp_sync.remove_peer(&tenth_peer);
1094
1095		// Peer is backed off.
1096		assert!(!warp_sync.disconnected_peers.is_peer_available(&tenth_peer));
1097
1098		// No peer available for 10'th best block because of the backoff.
1099		warp_sync.add_peer(tenth_peer, H256::random(), 10);
1100		let peer_id: Option<PeerId> =
1101			warp_sync.schedule_next_peer(PeerState::DownloadingProofs, Some(10));
1102		assert!(peer_id.is_none());
1103
1104		// Other requests can still happen.
1105		let peer_id: Option<PeerId> =
1106			warp_sync.schedule_next_peer(PeerState::DownloadingProofs, Some(9));
1107		assert_eq!(ninth_peer, peer_id.unwrap());
1108	}
1109
1110	#[test]
1111	fn no_warp_proof_request_in_another_phase() {
1112		let client = mock_client_without_state();
1113		let mut provider = MockWarpSyncProvider::<Block>::new();
1114		provider
1115			.expect_current_authorities()
1116			.once()
1117			.return_const(AuthorityList::default());
1118		let config = WarpSyncConfig::WithProvider(Arc::new(provider));
1119		let mut warp_sync = WarpSync::new(
1120			Arc::new(client),
1121			config,
1122			Some(ProtocolName::Static("")),
1123			Arc::new(MockBlockDownloader::new()),
1124			None,
1125		);
1126
1127		// Make sure we have enough peers to make a request.
1128		for best_number in 1..11 {
1129			warp_sync.add_peer(PeerId::random(), Hash::random(), best_number);
1130		}
1131
1132		// Manually set to another phase.
1133		warp_sync.phase = Phase::TargetBlock(<Block as BlockT>::Header::new(
1134			1,
1135			Default::default(),
1136			Default::default(),
1137			Default::default(),
1138			Default::default(),
1139		));
1140
1141		// No request is made.
1142		assert!(warp_sync.warp_proof_request().is_none());
1143	}
1144
1145	#[test]
1146	fn warp_proof_request_starts_at_last_hash() {
1147		let client = mock_client_without_state();
1148		let mut provider = MockWarpSyncProvider::<Block>::new();
1149		provider
1150			.expect_current_authorities()
1151			.once()
1152			.return_const(AuthorityList::default());
1153		let config = WarpSyncConfig::WithProvider(Arc::new(provider));
1154		let mut warp_sync = WarpSync::new(
1155			Arc::new(client),
1156			config,
1157			Some(ProtocolName::Static("")),
1158			Arc::new(MockBlockDownloader::new()),
1159			None,
1160		);
1161
1162		// Make sure we have enough peers to make a request.
1163		for best_number in 1..11 {
1164			warp_sync.add_peer(PeerId::random(), Hash::random(), best_number);
1165		}
1166		assert!(matches!(warp_sync.phase, Phase::WarpProof { .. }));
1167
1168		let known_last_hash = Hash::random();
1169
1170		// Manually set last hash to known value.
1171		match &mut warp_sync.phase {
1172			Phase::WarpProof { last_hash, .. } => {
1173				*last_hash = known_last_hash;
1174			},
1175			_ => panic!("Invalid phase."),
1176		}
1177
1178		let (_peer_id, _protocol_name, request) = warp_sync.warp_proof_request().unwrap();
1179		assert_eq!(request.begin, known_last_hash);
1180	}
1181
1182	#[test]
1183	fn no_parallel_warp_proof_requests() {
1184		let client = mock_client_without_state();
1185		let mut provider = MockWarpSyncProvider::<Block>::new();
1186		provider
1187			.expect_current_authorities()
1188			.once()
1189			.return_const(AuthorityList::default());
1190		let config = WarpSyncConfig::WithProvider(Arc::new(provider));
1191		let mut warp_sync = WarpSync::new(
1192			Arc::new(client),
1193			config,
1194			Some(ProtocolName::Static("")),
1195			Arc::new(MockBlockDownloader::new()),
1196			None,
1197		);
1198
1199		// Make sure we have enough peers to make requests.
1200		for best_number in 1..11 {
1201			warp_sync.add_peer(PeerId::random(), Hash::random(), best_number);
1202		}
1203		assert!(matches!(warp_sync.phase, Phase::WarpProof { .. }));
1204
1205		// First request is made.
1206		assert!(warp_sync.warp_proof_request().is_some());
1207		// Second request is not made.
1208		assert!(warp_sync.warp_proof_request().is_none());
1209	}
1210
1211	#[test]
1212	fn bad_warp_proof_response_drops_peer() {
1213		let client = mock_client_without_state();
1214		let mut provider = MockWarpSyncProvider::<Block>::new();
1215		provider
1216			.expect_current_authorities()
1217			.once()
1218			.return_const(AuthorityList::default());
1219		// Warp proof verification fails.
1220		provider.expect_verify().return_once(|_proof, _set_id, _authorities| {
1221			Err(Box::new(std::io::Error::new(ErrorKind::Other, "test-verification-failure")))
1222		});
1223		let config = WarpSyncConfig::WithProvider(Arc::new(provider));
1224		let mut warp_sync = WarpSync::new(
1225			Arc::new(client),
1226			config,
1227			Some(ProtocolName::Static("")),
1228			Arc::new(MockBlockDownloader::new()),
1229			None,
1230		);
1231
1232		// Make sure we have enough peers to make a request.
1233		for best_number in 1..11 {
1234			warp_sync.add_peer(PeerId::random(), Hash::random(), best_number);
1235		}
1236		assert!(matches!(warp_sync.phase, Phase::WarpProof { .. }));
1237
1238		let network_provider = NetworkServiceProvider::new();
1239		let network_handle = network_provider.handle();
1240
1241		// Consume `SendWarpProofRequest` action.
1242		let actions = warp_sync.actions(&network_handle).collect::<Vec<_>>();
1243		assert_eq!(actions.len(), 1);
1244		let SyncingAction::StartRequest { peer_id: request_peer_id, .. } = actions[0] else {
1245			panic!("Invalid action");
1246		};
1247
1248		warp_sync.on_warp_proof_response(&request_peer_id, EncodedProof(Vec::new()));
1249
1250		// We only interested in already generated actions, not new requests.
1251		let actions = std::mem::take(&mut warp_sync.actions);
1252		assert_eq!(actions.len(), 1);
1253		assert!(matches!(
1254			actions[0],
1255			SyncingAction::DropPeer(BadPeer(peer_id, _rep)) if peer_id == request_peer_id
1256		));
1257		assert!(matches!(warp_sync.phase, Phase::WarpProof { .. }));
1258	}
1259
1260	#[test]
1261	fn partial_warp_proof_doesnt_advance_phase() {
1262		let client = Arc::new(TestClientBuilder::new().set_no_genesis().build());
1263		let mut provider = MockWarpSyncProvider::<Block>::new();
1264		provider
1265			.expect_current_authorities()
1266			.once()
1267			.return_const(AuthorityList::default());
1268		let target_block = BlockBuilderBuilder::new(&*client)
1269			.on_parent_block(client.chain_info().best_hash)
1270			.with_parent_block_number(client.chain_info().best_number)
1271			.build()
1272			.unwrap()
1273			.build()
1274			.unwrap()
1275			.block;
1276		let target_header = target_block.header().clone();
1277		let justifications = Justifications::new(vec![(GRANDPA_ENGINE_ID, vec![1, 2, 3, 4, 5])]);
1278		// Warp proof is partial.
1279		{
1280			let target_header = target_header.clone();
1281			let justifications = justifications.clone();
1282			provider.expect_verify().return_once(move |_proof, set_id, authorities| {
1283				Ok(VerificationResult::Partial(
1284					set_id,
1285					authorities,
1286					target_header.hash(),
1287					vec![(target_header, justifications)],
1288				))
1289			});
1290		}
1291		let config = WarpSyncConfig::WithProvider(Arc::new(provider));
1292		let mut warp_sync = WarpSync::new(
1293			client,
1294			config,
1295			Some(ProtocolName::Static("")),
1296			Arc::new(MockBlockDownloader::new()),
1297			None,
1298		);
1299
1300		// Make sure we have enough peers to make a request.
1301		for best_number in 1..11 {
1302			warp_sync.add_peer(PeerId::random(), Hash::random(), best_number);
1303		}
1304		assert!(matches!(warp_sync.phase, Phase::WarpProof { .. }));
1305
1306		let network_provider = NetworkServiceProvider::new();
1307		let network_handle = network_provider.handle();
1308
1309		// Consume `SendWarpProofRequest` action.
1310		let actions = warp_sync.actions(&network_handle).collect::<Vec<_>>();
1311		assert_eq!(actions.len(), 1);
1312		let SyncingAction::StartRequest { peer_id: request_peer_id, .. } = actions[0] else {
1313			panic!("Invalid action");
1314		};
1315
1316		warp_sync.on_warp_proof_response(&request_peer_id, EncodedProof(Vec::new()));
1317
1318		assert_eq!(warp_sync.actions.len(), 1);
1319		let SyncingAction::ImportBlocks { origin, mut blocks } = warp_sync.actions.pop().unwrap()
1320		else {
1321			panic!("Expected `ImportBlocks` action.");
1322		};
1323		assert_eq!(origin, BlockOrigin::NetworkInitialSync);
1324		assert_eq!(blocks.len(), 1);
1325		let import_block = blocks.pop().unwrap();
1326		assert_eq!(
1327			import_block,
1328			IncomingBlock {
1329				hash: target_header.hash(),
1330				header: Some(target_header),
1331				body: None,
1332				indexed_body: None,
1333				justifications: Some(justifications),
1334				origin: Some(request_peer_id),
1335				allow_missing_state: true,
1336				skip_execution: true,
1337				import_existing: false,
1338				state: None,
1339			}
1340		);
1341		assert!(matches!(warp_sync.phase, Phase::WarpProof { .. }));
1342	}
1343
1344	#[test]
1345	fn complete_warp_proof_advances_phase() {
1346		let client = Arc::new(TestClientBuilder::new().set_no_genesis().build());
1347		let mut provider = MockWarpSyncProvider::<Block>::new();
1348		provider
1349			.expect_current_authorities()
1350			.once()
1351			.return_const(AuthorityList::default());
1352		let target_block = BlockBuilderBuilder::new(&*client)
1353			.on_parent_block(client.chain_info().best_hash)
1354			.with_parent_block_number(client.chain_info().best_number)
1355			.build()
1356			.unwrap()
1357			.build()
1358			.unwrap()
1359			.block;
1360		let target_header = target_block.header().clone();
1361		let justifications = Justifications::new(vec![(GRANDPA_ENGINE_ID, vec![1, 2, 3, 4, 5])]);
1362		// Warp proof is complete.
1363		{
1364			let target_header = target_header.clone();
1365			let justifications = justifications.clone();
1366			provider.expect_verify().return_once(move |_proof, set_id, authorities| {
1367				Ok(VerificationResult::Complete(
1368					set_id,
1369					authorities,
1370					target_header.clone(),
1371					vec![(target_header, justifications)],
1372				))
1373			});
1374		}
1375		let config = WarpSyncConfig::WithProvider(Arc::new(provider));
1376		let mut warp_sync = WarpSync::new(
1377			client,
1378			config,
1379			Some(ProtocolName::Static("")),
1380			Arc::new(MockBlockDownloader::new()),
1381			None,
1382		);
1383
1384		// Make sure we have enough peers to make a request.
1385		for best_number in 1..11 {
1386			warp_sync.add_peer(PeerId::random(), Hash::random(), best_number);
1387		}
1388		assert!(matches!(warp_sync.phase, Phase::WarpProof { .. }));
1389
1390		let network_provider = NetworkServiceProvider::new();
1391		let network_handle = network_provider.handle();
1392
1393		// Consume `SendWarpProofRequest` action.
1394		let actions = warp_sync.actions(&network_handle).collect::<Vec<_>>();
1395		assert_eq!(actions.len(), 1);
1396		let SyncingAction::StartRequest { peer_id: request_peer_id, .. } = actions[0] else {
1397			panic!("Invalid action.");
1398		};
1399
1400		warp_sync.on_warp_proof_response(&request_peer_id, EncodedProof(Vec::new()));
1401
1402		assert_eq!(warp_sync.actions.len(), 1);
1403		let SyncingAction::ImportBlocks { origin, mut blocks } = warp_sync.actions.pop().unwrap()
1404		else {
1405			panic!("Expected `ImportBlocks` action.");
1406		};
1407		assert_eq!(origin, BlockOrigin::NetworkInitialSync);
1408		assert_eq!(blocks.len(), 1);
1409		let import_block = blocks.pop().unwrap();
1410		assert_eq!(
1411			import_block,
1412			IncomingBlock {
1413				hash: target_header.hash(),
1414				header: Some(target_header),
1415				body: None,
1416				indexed_body: None,
1417				justifications: Some(justifications),
1418				origin: Some(request_peer_id),
1419				allow_missing_state: true,
1420				skip_execution: true,
1421				import_existing: false,
1422				state: None,
1423			}
1424		);
1425		assert!(
1426			matches!(warp_sync.phase, Phase::TargetBlock(header) if header == *target_block.header())
1427		);
1428	}
1429
1430	#[test]
1431	fn no_target_block_requests_in_another_phase() {
1432		let client = mock_client_without_state();
1433		let mut provider = MockWarpSyncProvider::<Block>::new();
1434		provider
1435			.expect_current_authorities()
1436			.once()
1437			.return_const(AuthorityList::default());
1438		let config = WarpSyncConfig::WithProvider(Arc::new(provider));
1439		let mut warp_sync = WarpSync::new(
1440			Arc::new(client),
1441			config,
1442			None,
1443			Arc::new(MockBlockDownloader::new()),
1444			None,
1445		);
1446
1447		// Make sure we have enough peers to make a request.
1448		for best_number in 1..11 {
1449			warp_sync.add_peer(PeerId::random(), Hash::random(), best_number);
1450		}
1451		// We are not in `Phase::TargetBlock`
1452		assert!(matches!(warp_sync.phase, Phase::WarpProof { .. }));
1453
1454		// No request is made.
1455		assert!(warp_sync.target_block_request().is_none());
1456	}
1457
1458	#[test]
1459	fn target_block_request_is_correct() {
1460		let client = Arc::new(TestClientBuilder::new().set_no_genesis().build());
1461		let mut provider = MockWarpSyncProvider::<Block>::new();
1462		provider
1463			.expect_current_authorities()
1464			.once()
1465			.return_const(AuthorityList::default());
1466		let target_block = BlockBuilderBuilder::new(&*client)
1467			.on_parent_block(client.chain_info().best_hash)
1468			.with_parent_block_number(client.chain_info().best_number)
1469			.build()
1470			.unwrap()
1471			.build()
1472			.unwrap()
1473			.block;
1474		let target_header = target_block.header().clone();
1475		// Warp proof is complete.
1476		provider.expect_verify().return_once(move |_proof, set_id, authorities| {
1477			Ok(VerificationResult::Complete(set_id, authorities, target_header, Default::default()))
1478		});
1479		let config = WarpSyncConfig::WithProvider(Arc::new(provider));
1480		let mut warp_sync =
1481			WarpSync::new(client, config, None, Arc::new(MockBlockDownloader::new()), None);
1482
1483		// Make sure we have enough peers to make a request.
1484		for best_number in 1..11 {
1485			warp_sync.add_peer(PeerId::random(), Hash::random(), best_number);
1486		}
1487
1488		// Manually set `TargetBlock` phase.
1489		warp_sync.phase = Phase::TargetBlock(target_block.header().clone());
1490
1491		let (_peer_id, request) = warp_sync.target_block_request().unwrap();
1492		assert_eq!(request.from, FromBlock::Hash(target_block.header().hash()));
1493		assert_eq!(
1494			request.fields,
1495			BlockAttributes::HEADER | BlockAttributes::BODY | BlockAttributes::JUSTIFICATION
1496		);
1497		assert_eq!(request.max, Some(1));
1498	}
1499
1500	#[test]
1501	fn externally_set_target_block_is_requested() {
1502		let client = Arc::new(TestClientBuilder::new().set_no_genesis().build());
1503		let target_block = BlockBuilderBuilder::new(&*client)
1504			.on_parent_block(client.chain_info().best_hash)
1505			.with_parent_block_number(client.chain_info().best_number)
1506			.build()
1507			.unwrap()
1508			.build()
1509			.unwrap()
1510			.block;
1511		let target_header = target_block.header().clone();
1512		let config = WarpSyncConfig::WithTarget(target_header);
1513		let mut warp_sync =
1514			WarpSync::new(client, config, None, Arc::new(MockBlockDownloader::new()), None);
1515
1516		// Make sure we have enough peers to make a request.
1517		for best_number in 1..11 {
1518			warp_sync.add_peer(PeerId::random(), Hash::random(), best_number);
1519		}
1520
1521		assert!(matches!(warp_sync.phase, Phase::TargetBlock(_)));
1522
1523		let (_peer_id, request) = warp_sync.target_block_request().unwrap();
1524		assert_eq!(request.from, FromBlock::Hash(target_block.header().hash()));
1525		assert_eq!(
1526			request.fields,
1527			BlockAttributes::HEADER | BlockAttributes::BODY | BlockAttributes::JUSTIFICATION
1528		);
1529		assert_eq!(request.max, Some(1));
1530	}
1531
1532	#[test]
1533	fn no_parallel_target_block_requests() {
1534		let client = Arc::new(TestClientBuilder::new().set_no_genesis().build());
1535		let mut provider = MockWarpSyncProvider::<Block>::new();
1536		provider
1537			.expect_current_authorities()
1538			.once()
1539			.return_const(AuthorityList::default());
1540		let target_block = BlockBuilderBuilder::new(&*client)
1541			.on_parent_block(client.chain_info().best_hash)
1542			.with_parent_block_number(client.chain_info().best_number)
1543			.build()
1544			.unwrap()
1545			.build()
1546			.unwrap()
1547			.block;
1548		let target_header = target_block.header().clone();
1549		// Warp proof is complete.
1550		provider.expect_verify().return_once(move |_proof, set_id, authorities| {
1551			Ok(VerificationResult::Complete(set_id, authorities, target_header, Default::default()))
1552		});
1553		let config = WarpSyncConfig::WithProvider(Arc::new(provider));
1554		let mut warp_sync =
1555			WarpSync::new(client, config, None, Arc::new(MockBlockDownloader::new()), None);
1556
1557		// Make sure we have enough peers to make a request.
1558		for best_number in 1..11 {
1559			warp_sync.add_peer(PeerId::random(), Hash::random(), best_number);
1560		}
1561
1562		// Manually set `TargetBlock` phase.
1563		warp_sync.phase = Phase::TargetBlock(target_block.header().clone());
1564
1565		// First target block request is made.
1566		assert!(warp_sync.target_block_request().is_some());
1567		// No parallel request is made.
1568		assert!(warp_sync.target_block_request().is_none());
1569	}
1570
1571	#[test]
1572	fn target_block_response_with_no_blocks_drops_peer() {
1573		let client = Arc::new(TestClientBuilder::new().set_no_genesis().build());
1574		let mut provider = MockWarpSyncProvider::<Block>::new();
1575		provider
1576			.expect_current_authorities()
1577			.once()
1578			.return_const(AuthorityList::default());
1579		let target_block = BlockBuilderBuilder::new(&*client)
1580			.on_parent_block(client.chain_info().best_hash)
1581			.with_parent_block_number(client.chain_info().best_number)
1582			.build()
1583			.unwrap()
1584			.build()
1585			.unwrap()
1586			.block;
1587		let target_header = target_block.header().clone();
1588		// Warp proof is complete.
1589		provider.expect_verify().return_once(move |_proof, set_id, authorities| {
1590			Ok(VerificationResult::Complete(set_id, authorities, target_header, Default::default()))
1591		});
1592		let config = WarpSyncConfig::WithProvider(Arc::new(provider));
1593		let mut warp_sync =
1594			WarpSync::new(client, config, None, Arc::new(MockBlockDownloader::new()), None);
1595
1596		// Make sure we have enough peers to make a request.
1597		for best_number in 1..11 {
1598			warp_sync.add_peer(PeerId::random(), Hash::random(), best_number);
1599		}
1600
1601		// Manually set `TargetBlock` phase.
1602		warp_sync.phase = Phase::TargetBlock(target_block.header().clone());
1603
1604		let (peer_id, request) = warp_sync.target_block_request().unwrap();
1605
1606		// Empty block response received.
1607		let response = Vec::new();
1608		// Peer is dropped.
1609		assert!(matches!(
1610			warp_sync.on_block_response_inner(peer_id, request, response),
1611			Err(BadPeer(id, _rep)) if id == peer_id,
1612		));
1613	}
1614
1615	#[test]
1616	fn target_block_response_with_extra_blocks_drops_peer() {
1617		let client = Arc::new(TestClientBuilder::new().set_no_genesis().build());
1618		let mut provider = MockWarpSyncProvider::<Block>::new();
1619		provider
1620			.expect_current_authorities()
1621			.once()
1622			.return_const(AuthorityList::default());
1623		let target_block = BlockBuilderBuilder::new(&*client)
1624			.on_parent_block(client.chain_info().best_hash)
1625			.with_parent_block_number(client.chain_info().best_number)
1626			.build()
1627			.unwrap()
1628			.build()
1629			.unwrap()
1630			.block;
1631
1632		let mut extra_block_builder = BlockBuilderBuilder::new(&*client)
1633			.on_parent_block(client.chain_info().best_hash)
1634			.with_parent_block_number(client.chain_info().best_number)
1635			.build()
1636			.unwrap();
1637		extra_block_builder
1638			.push_storage_change(vec![1, 2, 3], Some(vec![4, 5, 6]))
1639			.unwrap();
1640		let extra_block = extra_block_builder.build().unwrap().block;
1641
1642		let target_header = target_block.header().clone();
1643		// Warp proof is complete.
1644		provider.expect_verify().return_once(move |_proof, set_id, authorities| {
1645			Ok(VerificationResult::Complete(set_id, authorities, target_header, Default::default()))
1646		});
1647		let config = WarpSyncConfig::WithProvider(Arc::new(provider));
1648		let mut warp_sync =
1649			WarpSync::new(client, config, None, Arc::new(MockBlockDownloader::new()), None);
1650
1651		// Make sure we have enough peers to make a request.
1652		for best_number in 1..11 {
1653			warp_sync.add_peer(PeerId::random(), Hash::random(), best_number);
1654		}
1655
1656		// Manually set `TargetBlock` phase.
1657		warp_sync.phase = Phase::TargetBlock(target_block.header().clone());
1658
1659		let (peer_id, request) = warp_sync.target_block_request().unwrap();
1660
1661		// Block response with extra blocks received.
1662		let response = vec![
1663			BlockData::<Block> {
1664				hash: target_block.header().hash(),
1665				header: Some(target_block.header().clone()),
1666				body: Some(target_block.extrinsics().iter().cloned().collect::<Vec<_>>()),
1667				indexed_body: None,
1668				receipt: None,
1669				message_queue: None,
1670				justification: None,
1671				justifications: None,
1672			},
1673			BlockData::<Block> {
1674				hash: extra_block.header().hash(),
1675				header: Some(extra_block.header().clone()),
1676				body: Some(extra_block.extrinsics().iter().cloned().collect::<Vec<_>>()),
1677				indexed_body: None,
1678				receipt: None,
1679				message_queue: None,
1680				justification: None,
1681				justifications: None,
1682			},
1683		];
1684		// Peer is dropped.
1685		assert!(matches!(
1686			warp_sync.on_block_response_inner(peer_id, request, response),
1687			Err(BadPeer(id, _rep)) if id == peer_id,
1688		));
1689	}
1690
1691	#[test]
1692	fn target_block_response_with_wrong_block_drops_peer() {
1693		sp_tracing::try_init_simple();
1694
1695		let client = Arc::new(TestClientBuilder::new().set_no_genesis().build());
1696		let mut provider = MockWarpSyncProvider::<Block>::new();
1697		provider
1698			.expect_current_authorities()
1699			.once()
1700			.return_const(AuthorityList::default());
1701		let target_block = BlockBuilderBuilder::new(&*client)
1702			.on_parent_block(client.chain_info().best_hash)
1703			.with_parent_block_number(client.chain_info().best_number)
1704			.build()
1705			.unwrap()
1706			.build()
1707			.unwrap()
1708			.block;
1709
1710		let mut wrong_block_builder = BlockBuilderBuilder::new(&*client)
1711			.on_parent_block(client.chain_info().best_hash)
1712			.with_parent_block_number(client.chain_info().best_number)
1713			.build()
1714			.unwrap();
1715		wrong_block_builder
1716			.push_storage_change(vec![1, 2, 3], Some(vec![4, 5, 6]))
1717			.unwrap();
1718		let wrong_block = wrong_block_builder.build().unwrap().block;
1719
1720		let target_header = target_block.header().clone();
1721		// Warp proof is complete.
1722		provider.expect_verify().return_once(move |_proof, set_id, authorities| {
1723			Ok(VerificationResult::Complete(set_id, authorities, target_header, Default::default()))
1724		});
1725		let config = WarpSyncConfig::WithProvider(Arc::new(provider));
1726		let mut warp_sync =
1727			WarpSync::new(client, config, None, Arc::new(MockBlockDownloader::new()), None);
1728
1729		// Make sure we have enough peers to make a request.
1730		for best_number in 1..11 {
1731			warp_sync.add_peer(PeerId::random(), Hash::random(), best_number);
1732		}
1733
1734		// Manually set `TargetBlock` phase.
1735		warp_sync.phase = Phase::TargetBlock(target_block.header().clone());
1736
1737		let (peer_id, request) = warp_sync.target_block_request().unwrap();
1738
1739		// Wrong block received.
1740		let response = vec![BlockData::<Block> {
1741			hash: wrong_block.header().hash(),
1742			header: Some(wrong_block.header().clone()),
1743			body: Some(wrong_block.extrinsics().iter().cloned().collect::<Vec<_>>()),
1744			indexed_body: None,
1745			receipt: None,
1746			message_queue: None,
1747			justification: None,
1748			justifications: None,
1749		}];
1750		// Peer is dropped.
1751		assert!(matches!(
1752			warp_sync.on_block_response_inner(peer_id, request, response),
1753			Err(BadPeer(id, _rep)) if id == peer_id,
1754		));
1755	}
1756
1757	#[test]
1758	fn correct_target_block_response_sets_strategy_result() {
1759		let client = Arc::new(TestClientBuilder::new().set_no_genesis().build());
1760		let mut provider = MockWarpSyncProvider::<Block>::new();
1761		provider
1762			.expect_current_authorities()
1763			.once()
1764			.return_const(AuthorityList::default());
1765		let mut target_block_builder = BlockBuilderBuilder::new(&*client)
1766			.on_parent_block(client.chain_info().best_hash)
1767			.with_parent_block_number(client.chain_info().best_number)
1768			.build()
1769			.unwrap();
1770		target_block_builder
1771			.push_storage_change(vec![1, 2, 3], Some(vec![4, 5, 6]))
1772			.unwrap();
1773		let target_block = target_block_builder.build().unwrap().block;
1774		let target_header = target_block.header().clone();
1775		// Warp proof is complete.
1776		provider.expect_verify().return_once(move |_proof, set_id, authorities| {
1777			Ok(VerificationResult::Complete(set_id, authorities, target_header, Default::default()))
1778		});
1779		let config = WarpSyncConfig::WithProvider(Arc::new(provider));
1780		let mut warp_sync =
1781			WarpSync::new(client, config, None, Arc::new(MockBlockDownloader::new()), None);
1782
1783		// Make sure we have enough peers to make a request.
1784		for best_number in 1..11 {
1785			warp_sync.add_peer(PeerId::random(), Hash::random(), best_number);
1786		}
1787
1788		// Manually set `TargetBlock` phase.
1789		warp_sync.phase = Phase::TargetBlock(target_block.header().clone());
1790
1791		let (peer_id, request) = warp_sync.target_block_request().unwrap();
1792
1793		// Correct block received.
1794		let body = Some(target_block.extrinsics().iter().cloned().collect::<Vec<_>>());
1795		let justifications = Some(Justifications::from((*b"FRNK", Vec::new())));
1796		let response = vec![BlockData::<Block> {
1797			hash: target_block.header().hash(),
1798			header: Some(target_block.header().clone()),
1799			body: body.clone(),
1800			indexed_body: None,
1801			receipt: None,
1802			message_queue: None,
1803			justification: None,
1804			justifications: justifications.clone(),
1805		}];
1806
1807		assert!(warp_sync.on_block_response_inner(peer_id, request, response).is_ok());
1808
1809		let network_provider = NetworkServiceProvider::new();
1810		let network_handle = network_provider.handle();
1811
1812		// Strategy finishes.
1813		let actions = warp_sync.actions(&network_handle).collect::<Vec<_>>();
1814		assert_eq!(actions.len(), 1);
1815		assert!(matches!(actions[0], SyncingAction::Finished));
1816
1817		// With correct result.
1818		let result = warp_sync.take_result().unwrap();
1819		assert_eq!(result.target_header, *target_block.header());
1820		assert_eq!(result.target_body, body);
1821		assert_eq!(result.target_justifications, justifications);
1822	}
1823}