sc_network_sync/strategy/
warp.rs

1// This file is part of Substrate.
2
3// Copyright (C) Parity Technologies (UK) Ltd.
4// SPDX-License-Identifier: GPL-3.0-or-later WITH Classpath-exception-2.0
5
6// This program is free software: you can redistribute it and/or modify
7// it under the terms of the GNU General Public License as published by
8// the Free Software Foundation, either version 3 of the License, or
9// (at your option) any later version.
10
11// This program is distributed in the hope that it will be useful,
12// but WITHOUT ANY WARRANTY; without even the implied warranty of
13// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
14// GNU General Public License for more details.
15
16// You should have received a copy of the GNU General Public License
17// along with this program. If not, see <https://www.gnu.org/licenses/>.
18
19//! Warp syncing strategy. Bootstraps chain by downloading warp proofs and state.
20
21use sc_consensus::IncomingBlock;
22use sp_consensus::BlockOrigin;
23
24use crate::{
25	block_relay_protocol::{BlockDownloader, BlockResponseError},
26	service::network::NetworkServiceHandle,
27	strategy::{
28		chain_sync::validate_blocks, disconnected_peers::DisconnectedPeers, StrategyKey,
29		SyncingAction,
30	},
31	types::{BadPeer, SyncState, SyncStatus},
32	LOG_TARGET,
33};
34use codec::{Decode, Encode};
35use futures::{channel::oneshot, FutureExt};
36use log::{debug, error, trace, warn};
37use sc_network::{IfDisconnected, ProtocolName};
38use sc_network_common::sync::message::{
39	BlockAnnounce, BlockAttributes, BlockData, BlockRequest, Direction, FromBlock,
40};
41use sc_network_types::PeerId;
42use sp_blockchain::HeaderBackend;
43use sp_runtime::{
44	traits::{Block as BlockT, Header, NumberFor, Zero},
45	Justifications, SaturatedConversion,
46};
47use std::{any::Any, collections::HashMap, fmt, sync::Arc};
48
49/// Number of peers that need to be connected before warp sync is started.
50const MIN_PEERS_TO_START_WARP_SYNC: usize = 3;
51
52/// Scale-encoded warp sync proof response.
53pub struct EncodedProof(pub Vec<u8>);
54
55/// Warp sync request
56#[derive(Encode, Decode, Debug, Clone)]
57pub struct WarpProofRequest<B: BlockT> {
58	/// Start collecting proofs from this block.
59	pub begin: B::Hash,
60}
61
62/// Verifier for warp sync proofs. Each verifier operates in a specific context.
63pub trait Verifier<Block: BlockT>: Send + Sync {
64	/// Verify a warp sync proof.
65	fn verify(
66		&mut self,
67		proof: &EncodedProof,
68	) -> Result<VerificationResult<Block>, Box<dyn std::error::Error + Send + Sync>>;
69	/// Hash to be used as the starting point for the next proof request.
70	fn next_proof_context(&self) -> Block::Hash;
71	/// Get status text for progress reporting
72	fn status(&self) -> Option<String>;
73}
74
75/// Proof verification result.
76pub enum VerificationResult<Block: BlockT> {
77	/// Proof is valid, but the target was not reached.
78	Partial(Vec<(Block::Header, Justifications)>),
79	/// Target finality is proved.
80	Complete(Block::Header, Vec<(Block::Header, Justifications)>),
81}
82
83/// Warp sync backend. Handles retrieving and verifying warp sync proofs.
84pub trait WarpSyncProvider<Block: BlockT>: Send + Sync {
85	/// Generate proof starting at given block hash. The proof is accumulated until maximum proof
86	/// size is reached.
87	fn generate(
88		&self,
89		start: Block::Hash,
90	) -> Result<EncodedProof, Box<dyn std::error::Error + Send + Sync>>;
91	/// Create a verifier for warp sync proofs.
92	fn create_verifier(&self) -> Box<dyn Verifier<Block>>;
93}
94
95mod rep {
96	use sc_network::ReputationChange as Rep;
97
98	/// Unexpected response received form a peer
99	pub const UNEXPECTED_RESPONSE: Rep = Rep::new(-(1 << 29), "Unexpected response");
100
101	/// Peer provided invalid warp proof data
102	pub const BAD_WARP_PROOF: Rep = Rep::new(-(1 << 29), "Bad warp proof");
103
104	/// Peer did not provide us with advertised block data.
105	pub const NO_BLOCK: Rep = Rep::new(-(1 << 29), "No requested block data");
106
107	/// Reputation change for peers which send us non-requested block data.
108	pub const NOT_REQUESTED: Rep = Rep::new(-(1 << 29), "Not requested block data");
109
110	/// Reputation change for peers which send us a block which we fail to verify.
111	pub const VERIFICATION_FAIL: Rep = Rep::new(-(1 << 29), "Block verification failed");
112
113	/// We received a message that failed to decode.
114	pub const BAD_MESSAGE: Rep = Rep::new(-(1 << 12), "Bad message");
115}
116
117/// Reported warp sync phase.
118#[derive(Clone, Eq, PartialEq, Debug)]
119pub enum WarpSyncPhase<Block: BlockT> {
120	/// Waiting for peers to connect.
121	AwaitingPeers { required_peers: usize },
122	/// Downloading and verifying warp proofs.
123	DownloadingWarpProofs,
124	/// Downloading target block.
125	DownloadingTargetBlock,
126	/// Downloading state data.
127	DownloadingState,
128	/// Importing state.
129	ImportingState,
130	/// Downloading block history.
131	DownloadingBlocks(NumberFor<Block>),
132	/// Warp sync is complete.
133	Complete,
134}
135
136impl<Block: BlockT> fmt::Display for WarpSyncPhase<Block> {
137	fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result {
138		match self {
139			Self::AwaitingPeers { required_peers } =>
140				write!(f, "Waiting for {required_peers} peers to be connected"),
141			Self::DownloadingWarpProofs => write!(f, "Downloading finality proofs"),
142			Self::DownloadingTargetBlock => write!(f, "Downloading target block"),
143			Self::DownloadingState => write!(f, "Downloading state"),
144			Self::ImportingState => write!(f, "Importing state"),
145			Self::DownloadingBlocks(n) => write!(f, "Downloading block history (#{})", n),
146			Self::Complete => write!(f, "Warp sync is complete"),
147		}
148	}
149}
150
151/// Reported warp sync progress.
152#[derive(Clone, Eq, PartialEq, Debug)]
153pub struct WarpSyncProgress<Block: BlockT> {
154	/// Estimated download percentage.
155	pub phase: WarpSyncPhase<Block>,
156	/// Total bytes downloaded so far.
157	pub total_bytes: u64,
158	/// Optional status text from the verifier.
159	pub status: Option<String>,
160}
161
162/// Warp sync configuration as accepted by [`WarpSync`].
163pub enum WarpSyncConfig<Block: BlockT> {
164	/// Standard warp sync for the chain.
165	WithProvider(Arc<dyn WarpSyncProvider<Block>>),
166	/// Skip downloading proofs and use provided header of the state that should be downloaded.
167	///
168	/// It is expected that the header provider ensures that the header is trusted.
169	WithTarget(<Block as BlockT>::Header),
170}
171
172/// Warp sync phase used by warp sync state machine.
173enum Phase<B: BlockT> {
174	/// Waiting for enough peers to connect.
175	WaitingForPeers { warp_sync_provider: Arc<dyn WarpSyncProvider<B>> },
176	/// Downloading warp proofs.
177	WarpProof { verifier: Box<dyn Verifier<B>> },
178	/// Downloading target block.
179	TargetBlock(B::Header),
180	/// Warp sync is complete.
181	Complete,
182}
183
184enum PeerState {
185	Available,
186	DownloadingProofs,
187	DownloadingTargetBlock,
188}
189
190impl PeerState {
191	fn is_available(&self) -> bool {
192		matches!(self, PeerState::Available)
193	}
194}
195
196struct Peer<B: BlockT> {
197	best_number: NumberFor<B>,
198	state: PeerState,
199}
200
201pub struct WarpSyncResult<B: BlockT> {
202	pub target_header: B::Header,
203	pub target_body: Option<Vec<B::Extrinsic>>,
204	pub target_justifications: Option<Justifications>,
205}
206
207/// Warp sync state machine. Accumulates warp proofs and state.
208pub struct WarpSync<B: BlockT> {
209	phase: Phase<B>,
210	total_proof_bytes: u64,
211	total_state_bytes: u64,
212	peers: HashMap<PeerId, Peer<B>>,
213	disconnected_peers: DisconnectedPeers,
214	protocol_name: Option<ProtocolName>,
215	block_downloader: Arc<dyn BlockDownloader<B>>,
216	actions: Vec<SyncingAction<B>>,
217	result: Option<WarpSyncResult<B>>,
218	/// Number of peers that need to be connected before warp sync is started.
219	min_peers_to_start_warp_sync: usize,
220}
221
222impl<B> WarpSync<B>
223where
224	B: BlockT,
225{
226	/// Strategy key used by warp sync.
227	pub const STRATEGY_KEY: StrategyKey = StrategyKey::new("Warp");
228
229	/// Create a new instance. When passing a warp sync provider we will be checking for proof and
230	/// authorities. Alternatively we can pass a target block when we want to skip downloading
231	/// proofs, in this case we will continue polling until the target block is known.
232	pub fn new<Client>(
233		client: Arc<Client>,
234		warp_sync_config: WarpSyncConfig<B>,
235		protocol_name: Option<ProtocolName>,
236		block_downloader: Arc<dyn BlockDownloader<B>>,
237		min_peers_to_start_warp_sync: Option<usize>,
238	) -> Self
239	where
240		Client: HeaderBackend<B> + 'static,
241	{
242		let min_peers_to_start_warp_sync =
243			min_peers_to_start_warp_sync.unwrap_or(MIN_PEERS_TO_START_WARP_SYNC);
244		if client.info().finalized_state.is_some() {
245			error!(
246				target: LOG_TARGET,
247				"Can't use warp sync mode with a partially synced database. Reverting to full sync mode."
248			);
249			return Self {
250				phase: Phase::Complete,
251				total_proof_bytes: 0,
252				total_state_bytes: 0,
253				peers: HashMap::new(),
254				disconnected_peers: DisconnectedPeers::new(),
255				protocol_name,
256				block_downloader,
257				actions: vec![SyncingAction::Finished],
258				result: None,
259				min_peers_to_start_warp_sync,
260			}
261		}
262
263		let phase = match warp_sync_config {
264			WarpSyncConfig::WithProvider(warp_sync_provider) =>
265				Phase::WaitingForPeers { warp_sync_provider },
266			WarpSyncConfig::WithTarget(target_header) => Phase::TargetBlock(target_header),
267		};
268
269		Self {
270			phase,
271			total_proof_bytes: 0,
272			total_state_bytes: 0,
273			peers: HashMap::new(),
274			disconnected_peers: DisconnectedPeers::new(),
275			protocol_name,
276			block_downloader,
277			actions: Vec::new(),
278			result: None,
279			min_peers_to_start_warp_sync,
280		}
281	}
282
283	/// Notify that a new peer has connected.
284	pub fn add_peer(&mut self, peer_id: PeerId, _best_hash: B::Hash, best_number: NumberFor<B>) {
285		self.peers.insert(peer_id, Peer { best_number, state: PeerState::Available });
286
287		self.try_to_start_warp_sync();
288	}
289
290	/// Notify that a peer has disconnected.
291	pub fn remove_peer(&mut self, peer_id: &PeerId) {
292		if let Some(state) = self.peers.remove(peer_id) {
293			if !state.state.is_available() {
294				if let Some(bad_peer) =
295					self.disconnected_peers.on_disconnect_during_request(*peer_id)
296				{
297					self.actions.push(SyncingAction::DropPeer(bad_peer));
298				}
299			}
300		}
301	}
302
303	/// Submit a validated block announcement.
304	///
305	/// Returns new best hash & best number of the peer if they are updated.
306	#[must_use]
307	pub fn on_validated_block_announce(
308		&mut self,
309		is_best: bool,
310		peer_id: PeerId,
311		announce: &BlockAnnounce<B::Header>,
312	) -> Option<(B::Hash, NumberFor<B>)> {
313		is_best.then(|| {
314			let best_number = *announce.header.number();
315			let best_hash = announce.header.hash();
316			if let Some(ref mut peer) = self.peers.get_mut(&peer_id) {
317				peer.best_number = best_number;
318			}
319			// Let `SyncingEngine` know that we should update the peer info.
320			(best_hash, best_number)
321		})
322	}
323
324	/// Start warp sync as soon as we have enough peers.
325	fn try_to_start_warp_sync(&mut self) {
326		let Phase::WaitingForPeers { warp_sync_provider } = &self.phase else { return };
327
328		if self.peers.len() < self.min_peers_to_start_warp_sync {
329			return
330		}
331
332		let verifier = warp_sync_provider.create_verifier();
333		self.phase = Phase::WarpProof { verifier };
334		trace!(target: LOG_TARGET, "Started warp sync with {} peers.", self.peers.len());
335	}
336
337	pub fn on_generic_response(
338		&mut self,
339		peer_id: &PeerId,
340		protocol_name: ProtocolName,
341		response: Box<dyn Any + Send>,
342	) {
343		if &protocol_name == self.block_downloader.protocol_name() {
344			let Ok(response) = response
345				.downcast::<(BlockRequest<B>, Result<Vec<BlockData<B>>, BlockResponseError>)>()
346			else {
347				warn!(target: LOG_TARGET, "Failed to downcast block response");
348				debug_assert!(false);
349				return;
350			};
351
352			let (request, response) = *response;
353			let blocks = match response {
354				Ok(blocks) => blocks,
355				Err(BlockResponseError::DecodeFailed(e)) => {
356					debug!(
357						target: LOG_TARGET,
358						"Failed to decode block response from peer {:?}: {:?}.",
359						peer_id,
360						e
361					);
362					self.actions.push(SyncingAction::DropPeer(BadPeer(*peer_id, rep::BAD_MESSAGE)));
363					return;
364				},
365				Err(BlockResponseError::ExtractionFailed(e)) => {
366					debug!(
367						target: LOG_TARGET,
368						"Failed to extract blocks from peer response {:?}: {:?}.",
369						peer_id,
370						e
371					);
372					self.actions.push(SyncingAction::DropPeer(BadPeer(*peer_id, rep::BAD_MESSAGE)));
373					return;
374				},
375			};
376
377			self.on_block_response(*peer_id, request, blocks);
378		} else {
379			let Ok(response) = response.downcast::<Vec<u8>>() else {
380				warn!(target: LOG_TARGET, "Failed to downcast warp sync response");
381				debug_assert!(false);
382				return;
383			};
384
385			self.on_warp_proof_response(peer_id, EncodedProof(*response));
386		}
387	}
388
389	/// Process warp proof response.
390	pub fn on_warp_proof_response(&mut self, peer_id: &PeerId, response: EncodedProof) {
391		if let Some(peer) = self.peers.get_mut(peer_id) {
392			peer.state = PeerState::Available;
393		}
394
395		let Phase::WarpProof { verifier } = &mut self.phase else {
396			debug!(target: LOG_TARGET, "Unexpected warp proof response");
397			self.actions
398				.push(SyncingAction::DropPeer(BadPeer(*peer_id, rep::UNEXPECTED_RESPONSE)));
399			return
400		};
401
402		let proof_to_incoming_block =
403			|(header, justifications): (B::Header, Justifications)| -> IncomingBlock<B> {
404				IncomingBlock {
405					hash: header.hash(),
406					header: Some(header),
407					body: None,
408					indexed_body: None,
409					justifications: Some(justifications),
410					origin: Some(*peer_id),
411					// We are still in warp sync, so we don't have the state. This means
412					// we also can't execute the block.
413					allow_missing_state: true,
414					skip_execution: true,
415					// Shouldn't already exist in the database.
416					import_existing: false,
417					state: None,
418				}
419			};
420
421		match verifier.verify(&response) {
422			Err(e) => {
423				debug!(target: LOG_TARGET, "Bad warp proof response: {}", e);
424				self.actions
425					.push(SyncingAction::DropPeer(BadPeer(*peer_id, rep::BAD_WARP_PROOF)))
426			},
427			Ok(VerificationResult::Partial(proofs)) => {
428				debug!(target: LOG_TARGET, "Verified partial proof");
429				self.total_proof_bytes += response.0.len() as u64;
430				self.actions.push(SyncingAction::ImportBlocks {
431					origin: BlockOrigin::NetworkInitialSync,
432					blocks: proofs.into_iter().map(proof_to_incoming_block).collect(),
433				});
434			},
435			Ok(VerificationResult::Complete(header, proofs)) => {
436				debug!(
437					target: LOG_TARGET,
438					"Verified complete proof. Continuing with target block download: {} ({}).",
439					header.hash(),
440					header.number(),
441				);
442				self.total_proof_bytes += response.0.len() as u64;
443				self.phase = Phase::TargetBlock(header);
444				self.actions.push(SyncingAction::ImportBlocks {
445					origin: BlockOrigin::NetworkInitialSync,
446					blocks: proofs.into_iter().map(proof_to_incoming_block).collect(),
447				});
448			},
449		}
450	}
451
452	/// Process (target) block response.
453	pub fn on_block_response(
454		&mut self,
455		peer_id: PeerId,
456		request: BlockRequest<B>,
457		blocks: Vec<BlockData<B>>,
458	) {
459		if let Err(bad_peer) = self.on_block_response_inner(peer_id, request, blocks) {
460			self.actions.push(SyncingAction::DropPeer(bad_peer));
461		}
462	}
463
464	fn on_block_response_inner(
465		&mut self,
466		peer_id: PeerId,
467		request: BlockRequest<B>,
468		mut blocks: Vec<BlockData<B>>,
469	) -> Result<(), BadPeer> {
470		if let Some(peer) = self.peers.get_mut(&peer_id) {
471			peer.state = PeerState::Available;
472		}
473
474		let Phase::TargetBlock(header) = &mut self.phase else {
475			debug!(target: LOG_TARGET, "Unexpected target block response from {peer_id}");
476			return Err(BadPeer(peer_id, rep::UNEXPECTED_RESPONSE))
477		};
478
479		if blocks.is_empty() {
480			debug!(
481				target: LOG_TARGET,
482				"Downloading target block failed: empty block response from {peer_id}",
483			);
484			return Err(BadPeer(peer_id, rep::NO_BLOCK))
485		}
486
487		if blocks.len() > 1 {
488			debug!(
489				target: LOG_TARGET,
490				"Too many blocks ({}) in warp target block response from {peer_id}",
491				blocks.len(),
492			);
493			return Err(BadPeer(peer_id, rep::NOT_REQUESTED))
494		}
495
496		validate_blocks::<B>(&blocks, &peer_id, Some(request))?;
497
498		let block = blocks.pop().expect("`blocks` len checked above; qed");
499
500		let Some(block_header) = &block.header else {
501			debug!(
502				target: LOG_TARGET,
503				"Downloading target block failed: missing header in response from {peer_id}.",
504			);
505			return Err(BadPeer(peer_id, rep::VERIFICATION_FAIL))
506		};
507
508		if block_header != header {
509			debug!(
510				target: LOG_TARGET,
511				"Downloading target block failed: different header in response from {peer_id}.",
512			);
513			return Err(BadPeer(peer_id, rep::VERIFICATION_FAIL))
514		}
515
516		if block.body.is_none() {
517			debug!(
518				target: LOG_TARGET,
519				"Downloading target block failed: missing body in response from {peer_id}.",
520			);
521			return Err(BadPeer(peer_id, rep::VERIFICATION_FAIL))
522		}
523
524		self.result = Some(WarpSyncResult {
525			target_header: header.clone(),
526			target_body: block.body,
527			target_justifications: block.justifications,
528		});
529		self.phase = Phase::Complete;
530		self.actions.push(SyncingAction::Finished);
531		Ok(())
532	}
533
534	/// Reserve a peer for a request assigning `new_state`.
535	fn schedule_next_peer(
536		&mut self,
537		new_state: PeerState,
538		min_best_number: Option<NumberFor<B>>,
539	) -> Option<PeerId> {
540		let mut targets: Vec<_> = self.peers.values().map(|p| p.best_number).collect();
541		if targets.is_empty() {
542			return None
543		}
544		targets.sort();
545		let median = targets[targets.len() / 2];
546		let threshold = std::cmp::max(median, min_best_number.unwrap_or(Zero::zero()));
547		// Find a random peer that is synced as much as peer majority and is above
548		// `min_best_number`.
549		for (peer_id, peer) in self.peers.iter_mut() {
550			if peer.state.is_available() &&
551				peer.best_number >= threshold &&
552				self.disconnected_peers.is_peer_available(peer_id)
553			{
554				peer.state = new_state;
555				return Some(*peer_id)
556			}
557		}
558		None
559	}
560
561	/// Produce warp proof request.
562	fn warp_proof_request(&mut self) -> Option<(PeerId, ProtocolName, WarpProofRequest<B>)> {
563		let Phase::WarpProof { verifier } = &self.phase else { return None };
564
565		// Copy verifier context early to cut the borrowing tie.
566		let begin = verifier.next_proof_context();
567
568		if self
569			.peers
570			.values()
571			.any(|peer| matches!(peer.state, PeerState::DownloadingProofs))
572		{
573			// Only one warp proof request at a time is possible.
574			return None
575		}
576
577		let peer_id = self.schedule_next_peer(PeerState::DownloadingProofs, None)?;
578		trace!(target: LOG_TARGET, "New WarpProofRequest to {peer_id}, begin hash: {begin}.");
579
580		let request = WarpProofRequest { begin };
581
582		let Some(protocol_name) = self.protocol_name.clone() else {
583			warn!(
584				target: LOG_TARGET,
585				"Trying to send warp sync request when no protocol is configured {request:?}",
586			);
587			return None;
588		};
589
590		Some((peer_id, protocol_name, request))
591	}
592
593	/// Produce target block request.
594	fn target_block_request(&mut self) -> Option<(PeerId, BlockRequest<B>)> {
595		let Phase::TargetBlock(target_header) = &self.phase else { return None };
596
597		if self
598			.peers
599			.values()
600			.any(|peer| matches!(peer.state, PeerState::DownloadingTargetBlock))
601		{
602			// Only one target block request at a time is possible.
603			return None
604		}
605
606		// Cut the borrowing tie.
607		let target_hash = target_header.hash();
608		let target_number = *target_header.number();
609
610		let peer_id =
611			self.schedule_next_peer(PeerState::DownloadingTargetBlock, Some(target_number))?;
612
613		trace!(
614			target: LOG_TARGET,
615			"New target block request to {peer_id}, target: {} ({}).",
616			target_hash,
617			target_number,
618		);
619
620		Some((
621			peer_id,
622			BlockRequest::<B> {
623				id: 0,
624				fields: BlockAttributes::HEADER |
625					BlockAttributes::BODY |
626					BlockAttributes::JUSTIFICATION,
627				from: FromBlock::Hash(target_hash),
628				direction: Direction::Ascending,
629				max: Some(1),
630			},
631		))
632	}
633
634	/// Returns warp sync estimated progress (stage, bytes received).
635	pub fn progress(&self) -> WarpSyncProgress<B> {
636		match &self.phase {
637			Phase::WaitingForPeers { .. } => WarpSyncProgress {
638				phase: WarpSyncPhase::AwaitingPeers {
639					required_peers: self.min_peers_to_start_warp_sync,
640				},
641				total_bytes: self.total_proof_bytes,
642				status: None,
643			},
644			Phase::WarpProof { verifier } => WarpSyncProgress {
645				phase: WarpSyncPhase::DownloadingWarpProofs,
646				total_bytes: self.total_proof_bytes,
647				status: verifier.status(),
648			},
649			Phase::TargetBlock(_) => WarpSyncProgress {
650				phase: WarpSyncPhase::DownloadingTargetBlock,
651				total_bytes: self.total_proof_bytes,
652				status: None,
653			},
654			Phase::Complete => WarpSyncProgress {
655				phase: WarpSyncPhase::Complete,
656				total_bytes: self.total_proof_bytes + self.total_state_bytes,
657				status: None,
658			},
659		}
660	}
661
662	/// Get the number of peers known to warp sync.
663	pub fn num_peers(&self) -> usize {
664		self.peers.len()
665	}
666
667	/// Returns the current sync status.
668	pub fn status(&self) -> SyncStatus<B> {
669		SyncStatus {
670			state: match &self.phase {
671				Phase::WaitingForPeers { .. } => SyncState::Downloading { target: Zero::zero() },
672				Phase::WarpProof { .. } => SyncState::Downloading { target: Zero::zero() },
673				Phase::TargetBlock(header) => SyncState::Downloading { target: *header.number() },
674				Phase::Complete => SyncState::Idle,
675			},
676			best_seen_block: match &self.phase {
677				Phase::WaitingForPeers { .. } => None,
678				Phase::WarpProof { .. } => None,
679				Phase::TargetBlock(header) => Some(*header.number()),
680				Phase::Complete => None,
681			},
682			num_peers: self.peers.len().saturated_into(),
683			queued_blocks: 0,
684			state_sync: None,
685			warp_sync: Some(self.progress()),
686		}
687	}
688
689	/// Get actions that should be performed by the owner on [`WarpSync`]'s behalf
690	#[must_use]
691	pub fn actions(
692		&mut self,
693		network_service: &NetworkServiceHandle,
694	) -> impl Iterator<Item = SyncingAction<B>> {
695		let warp_proof_request =
696			self.warp_proof_request().into_iter().map(|(peer_id, protocol_name, request)| {
697				trace!(
698					target: LOG_TARGET,
699					"Created `WarpProofRequest` to {}, request: {:?}.",
700					peer_id,
701					request,
702				);
703
704				let (tx, rx) = oneshot::channel();
705
706				network_service.start_request(
707					peer_id,
708					protocol_name,
709					request.encode(),
710					tx,
711					IfDisconnected::ImmediateError,
712				);
713
714				SyncingAction::StartRequest {
715					peer_id,
716					key: Self::STRATEGY_KEY,
717					request: async move {
718						Ok(rx.await?.and_then(|(response, protocol_name)| {
719							Ok((Box::new(response) as Box<dyn Any + Send>, protocol_name))
720						}))
721					}
722					.boxed(),
723					remove_obsolete: false,
724				}
725			});
726		self.actions.extend(warp_proof_request);
727
728		let target_block_request =
729			self.target_block_request().into_iter().map(|(peer_id, request)| {
730				let downloader = self.block_downloader.clone();
731
732				SyncingAction::StartRequest {
733					peer_id,
734					key: Self::STRATEGY_KEY,
735					request: async move {
736						Ok(downloader.download_blocks(peer_id, request.clone()).await?.and_then(
737							|(response, protocol_name)| {
738								let decoded_response =
739									downloader.block_response_into_blocks(&request, response);
740								let result =
741									Box::new((request, decoded_response)) as Box<dyn Any + Send>;
742								Ok((result, protocol_name))
743							},
744						))
745					}
746					.boxed(),
747					// Sending block request implies dropping obsolete pending response as we are
748					// not interested in it anymore.
749					remove_obsolete: true,
750				}
751			});
752		self.actions.extend(target_block_request);
753
754		std::mem::take(&mut self.actions).into_iter()
755	}
756
757	/// Take the result of finished warp sync, returning `None` if the sync was unsuccessful.
758	#[must_use]
759	pub fn take_result(&mut self) -> Option<WarpSyncResult<B>> {
760		self.result.take()
761	}
762}
763
764#[cfg(test)]
765mod test {
766	use super::*;
767	use crate::{mock::MockBlockDownloader, service::network::NetworkServiceProvider};
768	use sc_block_builder::BlockBuilderBuilder;
769	use sp_blockchain::{BlockStatus, Error as BlockchainError, HeaderBackend, Info};
770	use sp_core::H256;
771	use sp_runtime::{
772		traits::{Block as BlockT, Header as HeaderT, NumberFor},
773		ConsensusEngineId,
774	};
775	use std::{io::ErrorKind, sync::Arc};
776	use substrate_test_runtime_client::{
777		runtime::{Block, Hash},
778		BlockBuilderExt, DefaultTestClientBuilderExt, TestClientBuilder, TestClientBuilderExt,
779	};
780
781	pub const TEST_ENGINE_ID: ConsensusEngineId = *b"TEST";
782
783	mockall::mock! {
784		pub Client<B: BlockT> {}
785
786		impl<B: BlockT> HeaderBackend<B> for Client<B> {
787			fn header(&self, hash: B::Hash) -> Result<Option<B::Header>, BlockchainError>;
788			fn info(&self) -> Info<B>;
789			fn status(&self, hash: B::Hash) -> Result<BlockStatus, BlockchainError>;
790			fn number(
791				&self,
792				hash: B::Hash,
793			) -> Result<Option<<<B as BlockT>::Header as HeaderT>::Number>, BlockchainError>;
794			fn hash(&self, number: NumberFor<B>) -> Result<Option<B::Hash>, BlockchainError>;
795		}
796	}
797
798	mockall::mock! {
799		pub WarpSyncProvider<B: BlockT> {}
800
801		impl<B: BlockT> super::WarpSyncProvider<B> for WarpSyncProvider<B> {
802			fn generate(
803				&self,
804				start: B::Hash,
805			) -> Result<EncodedProof, Box<dyn std::error::Error + Send + Sync>>;
806			fn create_verifier(&self) -> Box<dyn super::Verifier<B>>;
807		}
808	}
809
810	mockall::mock! {
811		pub Verifier<B: BlockT> {}
812
813		impl<B: BlockT> super::Verifier<B> for Verifier<B> {
814			fn verify(
815				&mut self,
816				proof: &EncodedProof,
817			) -> Result<VerificationResult<B>, Box<dyn std::error::Error + Send + Sync>>;
818			fn next_proof_context(&self) -> B::Hash;
819			fn status(&self) -> Option<String>;
820		}
821	}
822
823	fn mock_client_with_state() -> MockClient<Block> {
824		let mut client = MockClient::<Block>::new();
825		let genesis_hash = Hash::random();
826		client.expect_info().return_once(move || Info {
827			best_hash: genesis_hash,
828			best_number: 0,
829			genesis_hash,
830			finalized_hash: genesis_hash,
831			finalized_number: 0,
832			// We need some finalized state to render warp sync impossible.
833			finalized_state: Some((genesis_hash, 0)),
834			number_leaves: 0,
835			block_gap: None,
836		});
837
838		client
839	}
840
841	fn mock_client_without_state() -> MockClient<Block> {
842		let mut client = MockClient::<Block>::new();
843		let genesis_hash = Hash::random();
844		client.expect_info().returning(move || Info {
845			best_hash: genesis_hash,
846			best_number: 0,
847			genesis_hash,
848			finalized_hash: genesis_hash,
849			finalized_number: 0,
850			finalized_state: None,
851			number_leaves: 0,
852			block_gap: None,
853		});
854
855		client
856	}
857
858	#[test]
859	fn warp_sync_with_provider_for_db_with_finalized_state_is_noop() {
860		let client = mock_client_with_state();
861		let provider = MockWarpSyncProvider::<Block>::new();
862		let config = WarpSyncConfig::WithProvider(Arc::new(provider));
863		let mut warp_sync = WarpSync::new(
864			Arc::new(client),
865			config,
866			None,
867			Arc::new(MockBlockDownloader::new()),
868			None,
869		);
870
871		let network_provider = NetworkServiceProvider::new();
872		let network_handle = network_provider.handle();
873
874		// Warp sync instantly finishes
875		let actions = warp_sync.actions(&network_handle).collect::<Vec<_>>();
876		assert_eq!(actions.len(), 1);
877		assert!(matches!(actions[0], SyncingAction::Finished));
878
879		// ... with no result.
880		assert!(warp_sync.take_result().is_none());
881	}
882
883	#[test]
884	fn warp_sync_to_target_for_db_with_finalized_state_is_noop() {
885		let client = mock_client_with_state();
886		let config = WarpSyncConfig::WithTarget(<Block as BlockT>::Header::new(
887			1,
888			Default::default(),
889			Default::default(),
890			Default::default(),
891			Default::default(),
892		));
893		let mut warp_sync = WarpSync::new(
894			Arc::new(client),
895			config,
896			None,
897			Arc::new(MockBlockDownloader::new()),
898			None,
899		);
900
901		let network_provider = NetworkServiceProvider::new();
902		let network_handle = network_provider.handle();
903
904		// Warp sync instantly finishes
905		let actions = warp_sync.actions(&network_handle).collect::<Vec<_>>();
906		assert_eq!(actions.len(), 1);
907		assert!(matches!(actions[0], SyncingAction::Finished));
908
909		// ... with no result.
910		assert!(warp_sync.take_result().is_none());
911	}
912
913	#[test]
914	fn warp_sync_with_provider_for_empty_db_doesnt_finish_instantly() {
915		let client = mock_client_without_state();
916		let provider = MockWarpSyncProvider::<Block>::new();
917		let config = WarpSyncConfig::WithProvider(Arc::new(provider));
918		let mut warp_sync = WarpSync::new(
919			Arc::new(client),
920			config,
921			None,
922			Arc::new(MockBlockDownloader::new()),
923			None,
924		);
925
926		let network_provider = NetworkServiceProvider::new();
927		let network_handle = network_provider.handle();
928
929		// No actions are emitted.
930		assert_eq!(warp_sync.actions(&network_handle).count(), 0)
931	}
932
933	#[test]
934	fn warp_sync_to_target_for_empty_db_doesnt_finish_instantly() {
935		let client = mock_client_without_state();
936		let config = WarpSyncConfig::WithTarget(<Block as BlockT>::Header::new(
937			1,
938			Default::default(),
939			Default::default(),
940			Default::default(),
941			Default::default(),
942		));
943		let mut warp_sync = WarpSync::new(
944			Arc::new(client),
945			config,
946			None,
947			Arc::new(MockBlockDownloader::new()),
948			None,
949		);
950
951		let network_provider = NetworkServiceProvider::new();
952		let network_handle = network_provider.handle();
953
954		// No actions are emitted.
955		assert_eq!(warp_sync.actions(&network_handle).count(), 0)
956	}
957
958	#[test]
959	fn warp_sync_is_started_only_when_there_is_enough_peers() {
960		let client = mock_client_without_state();
961		let mut provider = MockWarpSyncProvider::<Block>::new();
962		let mut verifier = MockVerifier::<Block>::new();
963		verifier.expect_next_proof_context().returning(|| Hash::random());
964		verifier
965			.expect_verify()
966			.returning(|_| unreachable!("verify should not be called in this test"));
967		provider.expect_create_verifier().return_once(move || Box::new(verifier));
968		let config = WarpSyncConfig::WithProvider(Arc::new(provider));
969		let mut warp_sync = WarpSync::new(
970			Arc::new(client),
971			config,
972			None,
973			Arc::new(MockBlockDownloader::new()),
974			None,
975		);
976
977		// Warp sync is not started when there is not enough peers.
978		for _ in 0..(MIN_PEERS_TO_START_WARP_SYNC - 1) {
979			warp_sync.add_peer(PeerId::random(), Hash::random(), 10);
980			assert!(matches!(warp_sync.phase, Phase::WaitingForPeers { .. }))
981		}
982
983		// Now we have enough peers and warp sync is started.
984		warp_sync.add_peer(PeerId::random(), Hash::random(), 10);
985		assert!(matches!(warp_sync.phase, Phase::WarpProof { .. }))
986	}
987
988	#[test]
989	fn no_peer_is_scheduled_if_no_peers_connected() {
990		let client = mock_client_without_state();
991		let provider = MockWarpSyncProvider::<Block>::new();
992		let config = WarpSyncConfig::WithProvider(Arc::new(provider));
993		let mut warp_sync = WarpSync::new(
994			Arc::new(client),
995			config,
996			None,
997			Arc::new(MockBlockDownloader::new()),
998			None,
999		);
1000
1001		assert!(warp_sync.schedule_next_peer(PeerState::DownloadingProofs, None).is_none());
1002	}
1003
1004	#[test]
1005	fn enough_peers_are_used_in_tests() {
1006		// Tests below use 10 peers. Fail early if it's less than a threshold for warp sync.
1007		assert!(
1008			10 >= MIN_PEERS_TO_START_WARP_SYNC,
1009			"Tests must be updated to use that many initial peers.",
1010		);
1011	}
1012
1013	#[test]
1014	fn at_least_median_synced_peer_is_scheduled() {
1015		for _ in 0..100 {
1016			let client = mock_client_without_state();
1017			let mut provider = MockWarpSyncProvider::<Block>::new();
1018			let mut verifier = MockVerifier::<Block>::new();
1019			verifier.expect_next_proof_context().returning(|| Hash::random());
1020			verifier
1021				.expect_verify()
1022				.returning(|_| unreachable!("verify should not be called in this test"));
1023			provider.expect_create_verifier().return_once(move || Box::new(verifier));
1024			let config = WarpSyncConfig::WithProvider(Arc::new(provider));
1025			let mut warp_sync = WarpSync::new(
1026				Arc::new(client),
1027				config,
1028				None,
1029				Arc::new(MockBlockDownloader::new()),
1030				None,
1031			);
1032
1033			for best_number in 1..11 {
1034				warp_sync.add_peer(PeerId::random(), Hash::random(), best_number);
1035			}
1036
1037			let peer_id = warp_sync.schedule_next_peer(PeerState::DownloadingProofs, None);
1038			assert!(warp_sync.peers.get(&peer_id.unwrap()).unwrap().best_number >= 6);
1039		}
1040	}
1041
1042	#[test]
1043	fn min_best_number_peer_is_scheduled() {
1044		for _ in 0..10 {
1045			let client = mock_client_without_state();
1046			let mut provider = MockWarpSyncProvider::<Block>::new();
1047			let mut verifier = MockVerifier::<Block>::new();
1048			verifier.expect_next_proof_context().returning(|| Hash::random());
1049			verifier
1050				.expect_verify()
1051				.returning(|_| unreachable!("verify should not be called in this test"));
1052			provider.expect_create_verifier().return_once(move || Box::new(verifier));
1053			let config = WarpSyncConfig::WithProvider(Arc::new(provider));
1054			let mut warp_sync = WarpSync::new(
1055				Arc::new(client),
1056				config,
1057				None,
1058				Arc::new(MockBlockDownloader::new()),
1059				None,
1060			);
1061
1062			for best_number in 1..11 {
1063				warp_sync.add_peer(PeerId::random(), Hash::random(), best_number);
1064			}
1065
1066			let peer_id = warp_sync.schedule_next_peer(PeerState::DownloadingProofs, Some(10));
1067			assert!(warp_sync.peers.get(&peer_id.unwrap()).unwrap().best_number == 10);
1068		}
1069	}
1070
1071	#[test]
1072	fn backedoff_number_peer_is_not_scheduled() {
1073		let client = mock_client_without_state();
1074		let mut provider = MockWarpSyncProvider::<Block>::new();
1075		let mut verifier = MockVerifier::<Block>::new();
1076		verifier.expect_next_proof_context().returning(|| Hash::random());
1077		verifier
1078			.expect_verify()
1079			.returning(|_| unreachable!("verify should not be called in this test"));
1080		provider.expect_create_verifier().return_once(move || Box::new(verifier));
1081		let config = WarpSyncConfig::WithProvider(Arc::new(provider));
1082		let mut warp_sync = WarpSync::new(
1083			Arc::new(client),
1084			config,
1085			None,
1086			Arc::new(MockBlockDownloader::new()),
1087			None,
1088		);
1089
1090		for best_number in 1..11 {
1091			warp_sync.add_peer(PeerId::random(), Hash::random(), best_number);
1092		}
1093
1094		let ninth_peer =
1095			*warp_sync.peers.iter().find(|(_, state)| state.best_number == 9).unwrap().0;
1096		let tenth_peer =
1097			*warp_sync.peers.iter().find(|(_, state)| state.best_number == 10).unwrap().0;
1098
1099		// Disconnecting a peer without an inflight request has no effect on persistent states.
1100		warp_sync.remove_peer(&tenth_peer);
1101		assert!(warp_sync.disconnected_peers.is_peer_available(&tenth_peer));
1102
1103		warp_sync.add_peer(tenth_peer, H256::random(), 10);
1104		let peer_id = warp_sync.schedule_next_peer(PeerState::DownloadingProofs, Some(10));
1105		assert_eq!(tenth_peer, peer_id.unwrap());
1106		warp_sync.remove_peer(&tenth_peer);
1107
1108		// Peer is backed off.
1109		assert!(!warp_sync.disconnected_peers.is_peer_available(&tenth_peer));
1110
1111		// No peer available for 10'th best block because of the backoff.
1112		warp_sync.add_peer(tenth_peer, H256::random(), 10);
1113		let peer_id: Option<PeerId> =
1114			warp_sync.schedule_next_peer(PeerState::DownloadingProofs, Some(10));
1115		assert!(peer_id.is_none());
1116
1117		// Other requests can still happen.
1118		let peer_id: Option<PeerId> =
1119			warp_sync.schedule_next_peer(PeerState::DownloadingProofs, Some(9));
1120		assert_eq!(ninth_peer, peer_id.unwrap());
1121	}
1122
1123	#[test]
1124	fn no_warp_proof_request_in_another_phase() {
1125		let client = mock_client_without_state();
1126		let mut provider = MockWarpSyncProvider::<Block>::new();
1127		let mut verifier = MockVerifier::<Block>::new();
1128		verifier.expect_next_proof_context().returning(|| Hash::random());
1129		verifier
1130			.expect_verify()
1131			.returning(|_| unreachable!("verify should not be called in this test"));
1132		provider.expect_create_verifier().return_once(move || Box::new(verifier));
1133		let config = WarpSyncConfig::WithProvider(Arc::new(provider));
1134		let mut warp_sync = WarpSync::new(
1135			Arc::new(client),
1136			config,
1137			Some(ProtocolName::Static("")),
1138			Arc::new(MockBlockDownloader::new()),
1139			None,
1140		);
1141
1142		// Make sure we have enough peers to make a request.
1143		for best_number in 1..11 {
1144			warp_sync.add_peer(PeerId::random(), Hash::random(), best_number);
1145		}
1146
1147		// Manually set to another phase.
1148		warp_sync.phase = Phase::TargetBlock(<Block as BlockT>::Header::new(
1149			1,
1150			Default::default(),
1151			Default::default(),
1152			Default::default(),
1153			Default::default(),
1154		));
1155
1156		// No request is made.
1157		assert!(warp_sync.warp_proof_request().is_none());
1158	}
1159
1160	#[test]
1161	fn warp_proof_request_starts_at_last_hash() {
1162		let client = mock_client_without_state();
1163		let mut provider = MockWarpSyncProvider::<Block>::new();
1164		let mut verifier = MockVerifier::<Block>::new();
1165		let known_last_hash = Hash::random();
1166		verifier.expect_next_proof_context().returning(move || known_last_hash);
1167		verifier
1168			.expect_verify()
1169			.returning(|_| unreachable!("verify should not be called in this test"));
1170		provider.expect_create_verifier().return_once(move || Box::new(verifier));
1171		let config = WarpSyncConfig::WithProvider(Arc::new(provider));
1172		let mut warp_sync = WarpSync::new(
1173			Arc::new(client),
1174			config,
1175			Some(ProtocolName::Static("")),
1176			Arc::new(MockBlockDownloader::new()),
1177			None,
1178		);
1179
1180		// Make sure we have enough peers to make a request.
1181		for best_number in 1..11 {
1182			warp_sync.add_peer(PeerId::random(), Hash::random(), best_number);
1183		}
1184		assert!(matches!(warp_sync.phase, Phase::WarpProof { .. }));
1185
1186		let (_peer_id, _protocol_name, request) = warp_sync.warp_proof_request().unwrap();
1187		assert_eq!(request.begin, known_last_hash);
1188	}
1189
1190	#[test]
1191	fn no_parallel_warp_proof_requests() {
1192		let client = mock_client_without_state();
1193		let mut provider = MockWarpSyncProvider::<Block>::new();
1194		let mut verifier = MockVerifier::<Block>::new();
1195		verifier.expect_next_proof_context().returning(|| Hash::random());
1196		verifier
1197			.expect_verify()
1198			.returning(|_| unreachable!("verify should not be called in this test"));
1199		provider.expect_create_verifier().return_once(move || Box::new(verifier));
1200		let config = WarpSyncConfig::WithProvider(Arc::new(provider));
1201		let mut warp_sync = WarpSync::new(
1202			Arc::new(client),
1203			config,
1204			Some(ProtocolName::Static("")),
1205			Arc::new(MockBlockDownloader::new()),
1206			None,
1207		);
1208
1209		// Make sure we have enough peers to make requests.
1210		for best_number in 1..11 {
1211			warp_sync.add_peer(PeerId::random(), Hash::random(), best_number);
1212		}
1213		assert!(matches!(warp_sync.phase, Phase::WarpProof { .. }));
1214
1215		// First request is made.
1216		assert!(warp_sync.warp_proof_request().is_some());
1217		// Second request is not made.
1218		assert!(warp_sync.warp_proof_request().is_none());
1219	}
1220
1221	#[test]
1222	fn bad_warp_proof_response_drops_peer() {
1223		let client = mock_client_without_state();
1224		let mut provider = MockWarpSyncProvider::<Block>::new();
1225		let mut verifier = MockVerifier::<Block>::new();
1226		verifier.expect_next_proof_context().returning(|| Hash::random());
1227		// Warp proof verification fails.
1228		verifier.expect_verify().return_once(|_proof| {
1229			Err(Box::new(std::io::Error::new(ErrorKind::Other, "test-verification-failure")))
1230		});
1231		provider.expect_create_verifier().return_once(move || Box::new(verifier));
1232		let config = WarpSyncConfig::WithProvider(Arc::new(provider));
1233		let mut warp_sync = WarpSync::new(
1234			Arc::new(client),
1235			config,
1236			Some(ProtocolName::Static("")),
1237			Arc::new(MockBlockDownloader::new()),
1238			None,
1239		);
1240
1241		// Make sure we have enough peers to make a request.
1242		for best_number in 1..11 {
1243			warp_sync.add_peer(PeerId::random(), Hash::random(), best_number);
1244		}
1245		assert!(matches!(warp_sync.phase, Phase::WarpProof { .. }));
1246
1247		let network_provider = NetworkServiceProvider::new();
1248		let network_handle = network_provider.handle();
1249
1250		// Consume `SendWarpProofRequest` action.
1251		let actions = warp_sync.actions(&network_handle).collect::<Vec<_>>();
1252		assert_eq!(actions.len(), 1);
1253		let SyncingAction::StartRequest { peer_id: request_peer_id, .. } = actions[0] else {
1254			panic!("Invalid action");
1255		};
1256
1257		warp_sync.on_warp_proof_response(&request_peer_id, EncodedProof(Vec::new()));
1258
1259		// We only interested in already generated actions, not new requests.
1260		let actions = std::mem::take(&mut warp_sync.actions);
1261		assert_eq!(actions.len(), 1);
1262		assert!(matches!(
1263			actions[0],
1264			SyncingAction::DropPeer(BadPeer(peer_id, _rep)) if peer_id == request_peer_id
1265		));
1266		assert!(matches!(warp_sync.phase, Phase::WarpProof { .. }));
1267	}
1268
1269	#[test]
1270	fn partial_warp_proof_doesnt_advance_phase() {
1271		let client = Arc::new(TestClientBuilder::new().set_no_genesis().build());
1272		let mut provider = MockWarpSyncProvider::<Block>::new();
1273		let target_block = BlockBuilderBuilder::new(&*client)
1274			.on_parent_block(client.chain_info().best_hash)
1275			.with_parent_block_number(client.chain_info().best_number)
1276			.build()
1277			.unwrap()
1278			.build()
1279			.unwrap()
1280			.block;
1281		let target_header = target_block.header().clone();
1282		let justifications = Justifications::new(vec![(TEST_ENGINE_ID, vec![1, 2, 3, 4, 5])]);
1283		// Warp proof is partial.
1284		let mut verifier = MockVerifier::<Block>::new();
1285		let context = client.info().genesis_hash;
1286		verifier.expect_next_proof_context().returning(move || context);
1287		let header_for_verify = target_header.clone();
1288		let just_for_verify = justifications.clone();
1289		verifier.expect_verify().return_once(move |_proof| {
1290			Ok(VerificationResult::Partial(vec![(
1291				header_for_verify.clone(),
1292				just_for_verify.clone(),
1293			)]))
1294		});
1295		provider.expect_create_verifier().return_once(move || Box::new(verifier));
1296		let config = WarpSyncConfig::WithProvider(Arc::new(provider));
1297		let mut warp_sync = WarpSync::new(
1298			client,
1299			config,
1300			Some(ProtocolName::Static("")),
1301			Arc::new(MockBlockDownloader::new()),
1302			None,
1303		);
1304
1305		// Make sure we have enough peers to make a request.
1306		for best_number in 1..11 {
1307			warp_sync.add_peer(PeerId::random(), Hash::random(), best_number);
1308		}
1309		assert!(matches!(warp_sync.phase, Phase::WarpProof { .. }));
1310
1311		let network_provider = NetworkServiceProvider::new();
1312		let network_handle = network_provider.handle();
1313
1314		// Consume `SendWarpProofRequest` action.
1315		let actions = warp_sync.actions(&network_handle).collect::<Vec<_>>();
1316		assert_eq!(actions.len(), 1);
1317		let SyncingAction::StartRequest { peer_id: request_peer_id, .. } = actions[0] else {
1318			panic!("Invalid action");
1319		};
1320
1321		warp_sync.on_warp_proof_response(&request_peer_id, EncodedProof(Vec::new()));
1322
1323		assert_eq!(warp_sync.actions.len(), 1);
1324		let SyncingAction::ImportBlocks { origin, mut blocks } = warp_sync.actions.pop().unwrap()
1325		else {
1326			panic!("Expected `ImportBlocks` action.");
1327		};
1328		assert_eq!(origin, BlockOrigin::NetworkInitialSync);
1329		assert_eq!(blocks.len(), 1);
1330		let import_block = blocks.pop().unwrap();
1331		assert_eq!(
1332			import_block,
1333			IncomingBlock {
1334				hash: target_header.hash(),
1335				header: Some(target_header),
1336				body: None,
1337				indexed_body: None,
1338				justifications: Some(justifications),
1339				origin: Some(request_peer_id),
1340				allow_missing_state: true,
1341				skip_execution: true,
1342				import_existing: false,
1343				state: None,
1344			}
1345		);
1346		assert!(matches!(warp_sync.phase, Phase::WarpProof { .. }));
1347	}
1348
1349	#[test]
1350	fn complete_warp_proof_advances_phase() {
1351		let client = Arc::new(TestClientBuilder::new().set_no_genesis().build());
1352		let mut provider = MockWarpSyncProvider::<Block>::new();
1353		let target_block = BlockBuilderBuilder::new(&*client)
1354			.on_parent_block(client.chain_info().best_hash)
1355			.with_parent_block_number(client.chain_info().best_number)
1356			.build()
1357			.unwrap()
1358			.build()
1359			.unwrap()
1360			.block;
1361		let target_header = target_block.header().clone();
1362		let justifications = Justifications::new(vec![(TEST_ENGINE_ID, vec![1, 2, 3, 4, 5])]);
1363		// Warp proof is complete.
1364		let mut verifier = MockVerifier::<Block>::new();
1365		let context = client.info().genesis_hash;
1366		verifier.expect_next_proof_context().returning(move || context);
1367		let header_for_verify = target_header.clone();
1368		let just_for_verify = justifications.clone();
1369		verifier.expect_verify().return_once(move |_proof| {
1370			Ok(VerificationResult::Complete(
1371				header_for_verify.clone(),
1372				vec![(header_for_verify.clone(), just_for_verify.clone())],
1373			))
1374		});
1375		provider.expect_create_verifier().return_once(move || Box::new(verifier));
1376		let config = WarpSyncConfig::WithProvider(Arc::new(provider));
1377		let mut warp_sync = WarpSync::new(
1378			client,
1379			config,
1380			Some(ProtocolName::Static("")),
1381			Arc::new(MockBlockDownloader::new()),
1382			None,
1383		);
1384
1385		// Make sure we have enough peers to make a request.
1386		for best_number in 1..11 {
1387			warp_sync.add_peer(PeerId::random(), Hash::random(), best_number);
1388		}
1389		assert!(matches!(warp_sync.phase, Phase::WarpProof { .. }));
1390
1391		let network_provider = NetworkServiceProvider::new();
1392		let network_handle = network_provider.handle();
1393
1394		// Consume `SendWarpProofRequest` action.
1395		let actions = warp_sync.actions(&network_handle).collect::<Vec<_>>();
1396		assert_eq!(actions.len(), 1);
1397		let SyncingAction::StartRequest { peer_id: request_peer_id, .. } = actions[0] else {
1398			panic!("Invalid action.");
1399		};
1400
1401		warp_sync.on_warp_proof_response(&request_peer_id, EncodedProof(Vec::new()));
1402
1403		assert_eq!(warp_sync.actions.len(), 1);
1404		let SyncingAction::ImportBlocks { origin, mut blocks } = warp_sync.actions.pop().unwrap()
1405		else {
1406			panic!("Expected `ImportBlocks` action.");
1407		};
1408		assert_eq!(origin, BlockOrigin::NetworkInitialSync);
1409		assert_eq!(blocks.len(), 1);
1410		let import_block = blocks.pop().unwrap();
1411		assert_eq!(
1412			import_block,
1413			IncomingBlock {
1414				hash: target_header.hash(),
1415				header: Some(target_header),
1416				body: None,
1417				indexed_body: None,
1418				justifications: Some(justifications),
1419				origin: Some(request_peer_id),
1420				allow_missing_state: true,
1421				skip_execution: true,
1422				import_existing: false,
1423				state: None,
1424			}
1425		);
1426		assert!(
1427			matches!(warp_sync.phase, Phase::TargetBlock(header) if header == *target_block.header())
1428		);
1429	}
1430
1431	#[test]
1432	fn no_target_block_requests_in_another_phase() {
1433		let client = mock_client_without_state();
1434		let mut provider = MockWarpSyncProvider::<Block>::new();
1435		let mut verifier = MockVerifier::<Block>::new();
1436		verifier.expect_next_proof_context().returning(|| Hash::random());
1437		verifier
1438			.expect_verify()
1439			.returning(|_| unreachable!("verify should not be called in this test"));
1440		provider.expect_create_verifier().return_once(move || Box::new(verifier));
1441		let config = WarpSyncConfig::WithProvider(Arc::new(provider));
1442		let mut warp_sync = WarpSync::new(
1443			Arc::new(client),
1444			config,
1445			None,
1446			Arc::new(MockBlockDownloader::new()),
1447			None,
1448		);
1449
1450		// Make sure we have enough peers to make a request.
1451		for best_number in 1..11 {
1452			warp_sync.add_peer(PeerId::random(), Hash::random(), best_number);
1453		}
1454		// We are not in `Phase::TargetBlock`
1455		assert!(matches!(warp_sync.phase, Phase::WarpProof { .. }));
1456
1457		// No request is made.
1458		assert!(warp_sync.target_block_request().is_none());
1459	}
1460
1461	#[test]
1462	fn target_block_request_is_correct() {
1463		let client = Arc::new(TestClientBuilder::new().set_no_genesis().build());
1464		let mut provider = MockWarpSyncProvider::<Block>::new();
1465		let mut verifier = MockVerifier::<Block>::new();
1466		let header_for_ctx = client.info().genesis_hash;
1467		verifier.expect_next_proof_context().returning(move || header_for_ctx);
1468		let target_block = BlockBuilderBuilder::new(&*client)
1469			.on_parent_block(client.chain_info().best_hash)
1470			.with_parent_block_number(client.chain_info().best_number)
1471			.build()
1472			.unwrap()
1473			.build()
1474			.unwrap()
1475			.block;
1476		let target_header = target_block.header().clone();
1477		// Warp proof is complete.
1478		let header_for_verify = target_header.clone();
1479		verifier.expect_verify().return_once(move |_proof| {
1480			Ok(VerificationResult::Complete(header_for_verify, Default::default()))
1481		});
1482		provider.expect_create_verifier().return_once(move || Box::new(verifier));
1483		let config = WarpSyncConfig::WithProvider(Arc::new(provider));
1484		let mut warp_sync =
1485			WarpSync::new(client, config, None, Arc::new(MockBlockDownloader::new()), None);
1486
1487		// Make sure we have enough peers to make a request.
1488		for best_number in 1..11 {
1489			warp_sync.add_peer(PeerId::random(), Hash::random(), best_number);
1490		}
1491
1492		// Manually set `TargetBlock` phase.
1493		warp_sync.phase = Phase::TargetBlock(target_block.header().clone());
1494
1495		let (_peer_id, request) = warp_sync.target_block_request().unwrap();
1496		assert_eq!(request.from, FromBlock::Hash(target_block.header().hash()));
1497		assert_eq!(
1498			request.fields,
1499			BlockAttributes::HEADER | BlockAttributes::BODY | BlockAttributes::JUSTIFICATION
1500		);
1501		assert_eq!(request.max, Some(1));
1502	}
1503
1504	#[test]
1505	fn externally_set_target_block_is_requested() {
1506		let client = Arc::new(TestClientBuilder::new().set_no_genesis().build());
1507		let target_block = BlockBuilderBuilder::new(&*client)
1508			.on_parent_block(client.chain_info().best_hash)
1509			.with_parent_block_number(client.chain_info().best_number)
1510			.build()
1511			.unwrap()
1512			.build()
1513			.unwrap()
1514			.block;
1515		let target_header = target_block.header().clone();
1516		let config = WarpSyncConfig::WithTarget(target_header);
1517		let mut warp_sync =
1518			WarpSync::new(client, config, None, Arc::new(MockBlockDownloader::new()), None);
1519
1520		// Make sure we have enough peers to make a request.
1521		for best_number in 1..11 {
1522			warp_sync.add_peer(PeerId::random(), Hash::random(), best_number);
1523		}
1524
1525		assert!(matches!(warp_sync.phase, Phase::TargetBlock(_)));
1526
1527		let (_peer_id, request) = warp_sync.target_block_request().unwrap();
1528		assert_eq!(request.from, FromBlock::Hash(target_block.header().hash()));
1529		assert_eq!(
1530			request.fields,
1531			BlockAttributes::HEADER | BlockAttributes::BODY | BlockAttributes::JUSTIFICATION
1532		);
1533		assert_eq!(request.max, Some(1));
1534	}
1535
1536	#[test]
1537	fn no_parallel_target_block_requests() {
1538		let client = Arc::new(TestClientBuilder::new().set_no_genesis().build());
1539		let mut provider = MockWarpSyncProvider::<Block>::new();
1540		let mut verifier = MockVerifier::<Block>::new();
1541		let header_for_ctx = client.info().genesis_hash;
1542		verifier.expect_next_proof_context().returning(move || header_for_ctx);
1543		let target_block = BlockBuilderBuilder::new(&*client)
1544			.on_parent_block(client.chain_info().best_hash)
1545			.with_parent_block_number(client.chain_info().best_number)
1546			.build()
1547			.unwrap()
1548			.build()
1549			.unwrap()
1550			.block;
1551		let target_header = target_block.header().clone();
1552		// Warp proof is complete.
1553		let header_for_verify = target_header.clone();
1554		verifier.expect_verify().return_once(move |_proof| {
1555			Ok(VerificationResult::Complete(header_for_verify, Default::default()))
1556		});
1557		provider.expect_create_verifier().return_once(move || Box::new(verifier));
1558		let config = WarpSyncConfig::WithProvider(Arc::new(provider));
1559		let mut warp_sync =
1560			WarpSync::new(client, config, None, Arc::new(MockBlockDownloader::new()), None);
1561
1562		// Make sure we have enough peers to make a request.
1563		for best_number in 1..11 {
1564			warp_sync.add_peer(PeerId::random(), Hash::random(), best_number);
1565		}
1566
1567		// Manually set `TargetBlock` phase.
1568		warp_sync.phase = Phase::TargetBlock(target_block.header().clone());
1569
1570		// First target block request is made.
1571		assert!(warp_sync.target_block_request().is_some());
1572		// No parallel request is made.
1573		assert!(warp_sync.target_block_request().is_none());
1574	}
1575
1576	#[test]
1577	fn target_block_response_with_no_blocks_drops_peer() {
1578		let client = Arc::new(TestClientBuilder::new().set_no_genesis().build());
1579		let mut provider = MockWarpSyncProvider::<Block>::new();
1580		let mut verifier = MockVerifier::<Block>::new();
1581		let header_for_ctx = client.info().genesis_hash;
1582		verifier.expect_next_proof_context().returning(move || header_for_ctx);
1583		let target_block = BlockBuilderBuilder::new(&*client)
1584			.on_parent_block(client.chain_info().best_hash)
1585			.with_parent_block_number(client.chain_info().best_number)
1586			.build()
1587			.unwrap()
1588			.build()
1589			.unwrap()
1590			.block;
1591		let target_header = target_block.header().clone();
1592		// Warp proof is complete.
1593		let header_for_verify = target_header.clone();
1594		verifier.expect_verify().return_once(move |_proof| {
1595			Ok(VerificationResult::Complete(header_for_verify, Default::default()))
1596		});
1597		provider.expect_create_verifier().return_once(move || Box::new(verifier));
1598		let config = WarpSyncConfig::WithProvider(Arc::new(provider));
1599		let mut warp_sync =
1600			WarpSync::new(client, config, None, Arc::new(MockBlockDownloader::new()), None);
1601
1602		// Make sure we have enough peers to make a request.
1603		for best_number in 1..11 {
1604			warp_sync.add_peer(PeerId::random(), Hash::random(), best_number);
1605		}
1606
1607		// Manually set `TargetBlock` phase.
1608		warp_sync.phase = Phase::TargetBlock(target_block.header().clone());
1609
1610		let (peer_id, request) = warp_sync.target_block_request().unwrap();
1611
1612		// Empty block response received.
1613		let response = Vec::new();
1614		// Peer is dropped.
1615		assert!(matches!(
1616			warp_sync.on_block_response_inner(peer_id, request, response),
1617			Err(BadPeer(id, _rep)) if id == peer_id,
1618		));
1619	}
1620
1621	#[test]
1622	fn target_block_response_with_extra_blocks_drops_peer() {
1623		let client = Arc::new(TestClientBuilder::new().set_no_genesis().build());
1624		let mut provider = MockWarpSyncProvider::<Block>::new();
1625		let mut verifier = MockVerifier::<Block>::new();
1626		let header_for_ctx = client.info().genesis_hash;
1627		verifier.expect_next_proof_context().returning(move || header_for_ctx);
1628		let target_block = BlockBuilderBuilder::new(&*client)
1629			.on_parent_block(client.chain_info().best_hash)
1630			.with_parent_block_number(client.chain_info().best_number)
1631			.build()
1632			.unwrap()
1633			.build()
1634			.unwrap()
1635			.block;
1636
1637		let mut extra_block_builder = BlockBuilderBuilder::new(&*client)
1638			.on_parent_block(client.chain_info().best_hash)
1639			.with_parent_block_number(client.chain_info().best_number)
1640			.build()
1641			.unwrap();
1642		extra_block_builder
1643			.push_storage_change(vec![1, 2, 3], Some(vec![4, 5, 6]))
1644			.unwrap();
1645		let extra_block = extra_block_builder.build().unwrap().block;
1646
1647		let target_header = target_block.header().clone();
1648		// Warp proof is complete.
1649		let header_for_verify = target_header.clone();
1650		verifier.expect_verify().return_once(move |_proof| {
1651			Ok(VerificationResult::Complete(header_for_verify, Default::default()))
1652		});
1653		provider.expect_create_verifier().return_once(move || Box::new(verifier));
1654		let config = WarpSyncConfig::WithProvider(Arc::new(provider));
1655		let mut warp_sync =
1656			WarpSync::new(client, config, None, Arc::new(MockBlockDownloader::new()), None);
1657
1658		// Make sure we have enough peers to make a request.
1659		for best_number in 1..11 {
1660			warp_sync.add_peer(PeerId::random(), Hash::random(), best_number);
1661		}
1662
1663		// Manually set `TargetBlock` phase.
1664		warp_sync.phase = Phase::TargetBlock(target_block.header().clone());
1665
1666		let (peer_id, request) = warp_sync.target_block_request().unwrap();
1667
1668		// Block response with extra blocks received.
1669		let response = vec![
1670			BlockData::<Block> {
1671				hash: target_block.header().hash(),
1672				header: Some(target_block.header().clone()),
1673				body: Some(target_block.extrinsics().iter().cloned().collect::<Vec<_>>()),
1674				indexed_body: None,
1675				receipt: None,
1676				message_queue: None,
1677				justification: None,
1678				justifications: None,
1679			},
1680			BlockData::<Block> {
1681				hash: extra_block.header().hash(),
1682				header: Some(extra_block.header().clone()),
1683				body: Some(extra_block.extrinsics().iter().cloned().collect::<Vec<_>>()),
1684				indexed_body: None,
1685				receipt: None,
1686				message_queue: None,
1687				justification: None,
1688				justifications: None,
1689			},
1690		];
1691		// Peer is dropped.
1692		assert!(matches!(
1693			warp_sync.on_block_response_inner(peer_id, request, response),
1694			Err(BadPeer(id, _rep)) if id == peer_id,
1695		));
1696	}
1697
1698	#[test]
1699	fn target_block_response_with_wrong_block_drops_peer() {
1700		sp_tracing::try_init_simple();
1701
1702		let client = Arc::new(TestClientBuilder::new().set_no_genesis().build());
1703		let mut provider = MockWarpSyncProvider::<Block>::new();
1704		let mut verifier = MockVerifier::<Block>::new();
1705		let header_for_ctx = client.info().genesis_hash;
1706		verifier.expect_next_proof_context().returning(move || header_for_ctx);
1707		let target_block = BlockBuilderBuilder::new(&*client)
1708			.on_parent_block(client.chain_info().best_hash)
1709			.with_parent_block_number(client.chain_info().best_number)
1710			.build()
1711			.unwrap()
1712			.build()
1713			.unwrap()
1714			.block;
1715
1716		let mut wrong_block_builder = BlockBuilderBuilder::new(&*client)
1717			.on_parent_block(client.chain_info().best_hash)
1718			.with_parent_block_number(client.chain_info().best_number)
1719			.build()
1720			.unwrap();
1721		wrong_block_builder
1722			.push_storage_change(vec![1, 2, 3], Some(vec![4, 5, 6]))
1723			.unwrap();
1724		let wrong_block = wrong_block_builder.build().unwrap().block;
1725
1726		let target_header = target_block.header().clone();
1727		// Warp proof is complete.
1728		let header_for_verify = target_header.clone();
1729		verifier.expect_verify().return_once(move |_proof| {
1730			Ok(VerificationResult::Complete(header_for_verify, Default::default()))
1731		});
1732		provider.expect_create_verifier().return_once(move || Box::new(verifier));
1733		let config = WarpSyncConfig::WithProvider(Arc::new(provider));
1734		let mut warp_sync =
1735			WarpSync::new(client, config, None, Arc::new(MockBlockDownloader::new()), None);
1736
1737		// Make sure we have enough peers to make a request.
1738		for best_number in 1..11 {
1739			warp_sync.add_peer(PeerId::random(), Hash::random(), best_number);
1740		}
1741
1742		// Manually set `TargetBlock` phase.
1743		warp_sync.phase = Phase::TargetBlock(target_block.header().clone());
1744
1745		let (peer_id, request) = warp_sync.target_block_request().unwrap();
1746
1747		// Wrong block received.
1748		let response = vec![BlockData::<Block> {
1749			hash: wrong_block.header().hash(),
1750			header: Some(wrong_block.header().clone()),
1751			body: Some(wrong_block.extrinsics().iter().cloned().collect::<Vec<_>>()),
1752			indexed_body: None,
1753			receipt: None,
1754			message_queue: None,
1755			justification: None,
1756			justifications: None,
1757		}];
1758		// Peer is dropped.
1759		assert!(matches!(
1760			warp_sync.on_block_response_inner(peer_id, request, response),
1761			Err(BadPeer(id, _rep)) if id == peer_id,
1762		));
1763	}
1764
1765	#[test]
1766	fn correct_target_block_response_sets_strategy_result() {
1767		let client = Arc::new(TestClientBuilder::new().set_no_genesis().build());
1768		let mut provider = MockWarpSyncProvider::<Block>::new();
1769		let mut verifier = MockVerifier::<Block>::new();
1770		let header_for_ctx = client.info().genesis_hash;
1771		verifier.expect_next_proof_context().returning(move || header_for_ctx);
1772		let mut target_block_builder = BlockBuilderBuilder::new(&*client)
1773			.on_parent_block(client.chain_info().best_hash)
1774			.with_parent_block_number(client.chain_info().best_number)
1775			.build()
1776			.unwrap();
1777		target_block_builder
1778			.push_storage_change(vec![1, 2, 3], Some(vec![4, 5, 6]))
1779			.unwrap();
1780		let target_block = target_block_builder.build().unwrap().block;
1781		let target_header = target_block.header().clone();
1782		// Warp proof is complete.
1783		let header_for_verify = target_header.clone();
1784		verifier.expect_verify().return_once(move |_proof| {
1785			Ok(VerificationResult::Complete(header_for_verify, Default::default()))
1786		});
1787		provider.expect_create_verifier().return_once(move || Box::new(verifier));
1788		let config = WarpSyncConfig::WithProvider(Arc::new(provider));
1789		let mut warp_sync =
1790			WarpSync::new(client, config, None, Arc::new(MockBlockDownloader::new()), None);
1791
1792		// Make sure we have enough peers to make a request.
1793		for best_number in 1..11 {
1794			warp_sync.add_peer(PeerId::random(), Hash::random(), best_number);
1795		}
1796
1797		// Manually set `TargetBlock` phase.
1798		warp_sync.phase = Phase::TargetBlock(target_block.header().clone());
1799
1800		let (peer_id, request) = warp_sync.target_block_request().unwrap();
1801
1802		// Correct block received.
1803		let body = Some(target_block.extrinsics().iter().cloned().collect::<Vec<_>>());
1804		let justifications = Some(Justifications::from((*b"FRNK", Vec::new())));
1805		let response = vec![BlockData::<Block> {
1806			hash: target_block.header().hash(),
1807			header: Some(target_block.header().clone()),
1808			body: body.clone(),
1809			indexed_body: None,
1810			receipt: None,
1811			message_queue: None,
1812			justification: None,
1813			justifications: justifications.clone(),
1814		}];
1815
1816		assert!(warp_sync.on_block_response_inner(peer_id, request, response).is_ok());
1817
1818		let network_provider = NetworkServiceProvider::new();
1819		let network_handle = network_provider.handle();
1820
1821		// Strategy finishes.
1822		let actions = warp_sync.actions(&network_handle).collect::<Vec<_>>();
1823		assert_eq!(actions.len(), 1);
1824		assert!(matches!(actions[0], SyncingAction::Finished));
1825
1826		// With correct result.
1827		let result = warp_sync.take_result().unwrap();
1828		assert_eq!(result.target_header, *target_block.header());
1829		assert_eq!(result.target_body, body);
1830		assert_eq!(result.target_justifications, justifications);
1831	}
1832}