sc_network_sync/
strategy.rs

1// This file is part of Substrate.
2
3// Copyright (C) Parity Technologies (UK) Ltd.
4// SPDX-License-Identifier: GPL-3.0-or-later WITH Classpath-exception-2.0
5
6// This program is free software: you can redistribute it and/or modify
7// it under the terms of the GNU General Public License as published by
8// the Free Software Foundation, either version 3 of the License, or
9// (at your option) any later version.
10
11// This program is distributed in the hope that it will be useful,
12// but WITHOUT ANY WARRANTY; without even the implied warranty of
13// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
14// GNU General Public License for more details.
15
16// You should have received a copy of the GNU General Public License
17// along with this program. If not, see <https://www.gnu.org/licenses/>.
18
19//! [`PolkadotSyncingStrategy`] is a proxy between [`crate::engine::SyncingEngine`]
20//! and specific syncing algorithms.
21
22pub mod chain_sync;
23mod disconnected_peers;
24mod state;
25pub mod state_sync;
26pub mod warp;
27
28use crate::{
29	block_request_handler::MAX_BLOCKS_IN_RESPONSE,
30	types::{BadPeer, OpaqueStateRequest, OpaqueStateResponse, SyncStatus},
31	LOG_TARGET,
32};
33use chain_sync::{ChainSync, ChainSyncMode};
34use log::{debug, error, info};
35use prometheus_endpoint::Registry;
36use sc_client_api::{BlockBackend, ProofProvider};
37use sc_consensus::{BlockImportError, BlockImportStatus, IncomingBlock};
38use sc_network::ProtocolName;
39use sc_network_common::sync::{
40	message::{BlockAnnounce, BlockData, BlockRequest},
41	SyncMode,
42};
43use sc_network_types::PeerId;
44use sp_blockchain::{Error as ClientError, HeaderBackend, HeaderMetadata};
45use sp_consensus::BlockOrigin;
46use sp_runtime::{
47	traits::{Block as BlockT, Header, NumberFor},
48	Justifications,
49};
50use state::{StateStrategy, StateStrategyAction};
51use std::{collections::HashMap, sync::Arc};
52use warp::{EncodedProof, WarpProofRequest, WarpSync, WarpSyncAction, WarpSyncConfig};
53
54/// Corresponding `ChainSync` mode.
55fn chain_sync_mode(sync_mode: SyncMode) -> ChainSyncMode {
56	match sync_mode {
57		SyncMode::Full => ChainSyncMode::Full,
58		SyncMode::LightState { skip_proofs, storage_chain_mode } =>
59			ChainSyncMode::LightState { skip_proofs, storage_chain_mode },
60		SyncMode::Warp => ChainSyncMode::Full,
61	}
62}
63
64/// Syncing strategy for syncing engine to use
65pub trait SyncingStrategy<B: BlockT>: Send
66where
67	B: BlockT,
68{
69	/// Notify syncing state machine that a new sync peer has connected.
70	fn add_peer(&mut self, peer_id: PeerId, best_hash: B::Hash, best_number: NumberFor<B>);
71
72	/// Notify that a sync peer has disconnected.
73	fn remove_peer(&mut self, peer_id: &PeerId);
74
75	/// Submit a validated block announcement.
76	///
77	/// Returns new best hash & best number of the peer if they are updated.
78	#[must_use]
79	fn on_validated_block_announce(
80		&mut self,
81		is_best: bool,
82		peer_id: PeerId,
83		announce: &BlockAnnounce<B::Header>,
84	) -> Option<(B::Hash, NumberFor<B>)>;
85
86	/// Configure an explicit fork sync request in case external code has detected that there is a
87	/// stale fork missing.
88	///
89	/// Note that this function should not be used for recent blocks.
90	/// Sync should be able to download all the recent forks normally.
91	///
92	/// Passing empty `peers` set effectively removes the sync request.
93	fn set_sync_fork_request(&mut self, peers: Vec<PeerId>, hash: &B::Hash, number: NumberFor<B>);
94
95	/// Request extra justification.
96	fn request_justification(&mut self, hash: &B::Hash, number: NumberFor<B>);
97
98	/// Clear extra justification requests.
99	fn clear_justification_requests(&mut self);
100
101	/// Report a justification import (successful or not).
102	fn on_justification_import(&mut self, hash: B::Hash, number: NumberFor<B>, success: bool);
103
104	/// Process block response.
105	fn on_block_response(
106		&mut self,
107		peer_id: PeerId,
108		key: StrategyKey,
109		request: BlockRequest<B>,
110		blocks: Vec<BlockData<B>>,
111	);
112
113	/// Process state response.
114	fn on_state_response(
115		&mut self,
116		peer_id: PeerId,
117		key: StrategyKey,
118		response: OpaqueStateResponse,
119	);
120
121	/// Process warp proof response.
122	fn on_warp_proof_response(
123		&mut self,
124		peer_id: &PeerId,
125		key: StrategyKey,
126		response: EncodedProof,
127	);
128
129	/// A batch of blocks that have been processed, with or without errors.
130	///
131	/// Call this when a batch of blocks that have been processed by the import queue, with or
132	/// without errors.
133	fn on_blocks_processed(
134		&mut self,
135		imported: usize,
136		count: usize,
137		results: Vec<(Result<BlockImportStatus<NumberFor<B>>, BlockImportError>, B::Hash)>,
138	);
139
140	/// Notify a syncing strategy that a block has been finalized.
141	fn on_block_finalized(&mut self, hash: &B::Hash, number: NumberFor<B>);
142
143	/// Inform sync about a new best imported block.
144	fn update_chain_info(&mut self, best_hash: &B::Hash, best_number: NumberFor<B>);
145
146	// Are we in major sync mode?
147	fn is_major_syncing(&self) -> bool;
148
149	/// Get the number of peers known to the syncing strategy.
150	fn num_peers(&self) -> usize;
151
152	/// Returns the current sync status.
153	fn status(&self) -> SyncStatus<B>;
154
155	/// Get the total number of downloaded blocks.
156	fn num_downloaded_blocks(&self) -> usize;
157
158	/// Get an estimate of the number of parallel sync requests.
159	fn num_sync_requests(&self) -> usize;
160
161	/// Get actions that should be performed by the owner on the strategy's behalf
162	#[must_use]
163	fn actions(&mut self) -> Result<Vec<SyncingAction<B>>, ClientError>;
164}
165
166/// Syncing configuration containing data for all strategies.
167#[derive(Clone, Debug)]
168pub struct SyncingConfig {
169	/// Syncing mode.
170	pub mode: SyncMode,
171	/// The number of parallel downloads to guard against slow peers.
172	pub max_parallel_downloads: u32,
173	/// Maximum number of blocks to request.
174	pub max_blocks_per_request: u32,
175	/// Prometheus metrics registry.
176	pub metrics_registry: Option<Registry>,
177	/// Protocol name used to send out state requests
178	pub state_request_protocol_name: ProtocolName,
179}
180
181/// The key identifying a specific strategy for responses routing.
182#[derive(Debug, Clone, Copy, PartialEq, Eq, Hash)]
183pub enum StrategyKey {
184	/// Warp sync initiated this request.
185	Warp,
186	/// State sync initiated this request.
187	State,
188	/// `ChainSync` initiated this request.
189	ChainSync,
190}
191
192#[derive(Debug)]
193pub enum SyncingAction<B: BlockT> {
194	/// Send block request to peer. Always implies dropping a stale block request to the same peer.
195	SendBlockRequest { peer_id: PeerId, key: StrategyKey, request: BlockRequest<B> },
196	/// Send state request to peer.
197	SendStateRequest {
198		peer_id: PeerId,
199		key: StrategyKey,
200		protocol_name: ProtocolName,
201		request: OpaqueStateRequest,
202	},
203	/// Send warp proof request to peer.
204	SendWarpProofRequest {
205		peer_id: PeerId,
206		key: StrategyKey,
207		protocol_name: ProtocolName,
208		request: WarpProofRequest<B>,
209	},
210	/// Drop stale request.
211	CancelRequest { peer_id: PeerId, key: StrategyKey },
212	/// Peer misbehaved. Disconnect, report it and cancel any requests to it.
213	DropPeer(BadPeer),
214	/// Import blocks.
215	ImportBlocks { origin: BlockOrigin, blocks: Vec<IncomingBlock<B>> },
216	/// Import justifications.
217	ImportJustifications {
218		peer_id: PeerId,
219		hash: B::Hash,
220		number: NumberFor<B>,
221		justifications: Justifications,
222	},
223	/// Strategy finished. Nothing to do, this is handled by `PolkadotSyncingStrategy`.
224	Finished,
225}
226
227impl<B: BlockT> SyncingAction<B> {
228	fn is_finished(&self) -> bool {
229		matches!(self, SyncingAction::Finished)
230	}
231}
232
233impl<B: BlockT> From<WarpSyncAction<B>> for SyncingAction<B> {
234	fn from(action: WarpSyncAction<B>) -> Self {
235		match action {
236			WarpSyncAction::SendWarpProofRequest { peer_id, protocol_name, request } =>
237				SyncingAction::SendWarpProofRequest {
238					peer_id,
239					key: StrategyKey::Warp,
240					protocol_name,
241					request,
242				},
243			WarpSyncAction::SendBlockRequest { peer_id, request } =>
244				SyncingAction::SendBlockRequest { peer_id, key: StrategyKey::Warp, request },
245			WarpSyncAction::DropPeer(bad_peer) => SyncingAction::DropPeer(bad_peer),
246			WarpSyncAction::Finished => SyncingAction::Finished,
247		}
248	}
249}
250
251impl<B: BlockT> From<StateStrategyAction<B>> for SyncingAction<B> {
252	fn from(action: StateStrategyAction<B>) -> Self {
253		match action {
254			StateStrategyAction::SendStateRequest { peer_id, protocol_name, request } =>
255				SyncingAction::SendStateRequest {
256					peer_id,
257					key: StrategyKey::State,
258					protocol_name,
259					request,
260				},
261			StateStrategyAction::DropPeer(bad_peer) => SyncingAction::DropPeer(bad_peer),
262			StateStrategyAction::ImportBlocks { origin, blocks } =>
263				SyncingAction::ImportBlocks { origin, blocks },
264			StateStrategyAction::Finished => SyncingAction::Finished,
265		}
266	}
267}
268
269/// Proxy to specific syncing strategies used in Polkadot.
270pub struct PolkadotSyncingStrategy<B: BlockT, Client> {
271	/// Initial syncing configuration.
272	config: SyncingConfig,
273	/// Client used by syncing strategies.
274	client: Arc<Client>,
275	/// Warp strategy.
276	warp: Option<WarpSync<B, Client>>,
277	/// State strategy.
278	state: Option<StateStrategy<B>>,
279	/// `ChainSync` strategy.`
280	chain_sync: Option<ChainSync<B, Client>>,
281	/// Connected peers and their best blocks used to seed a new strategy when switching to it in
282	/// `PolkadotSyncingStrategy::proceed_to_next`.
283	peer_best_blocks: HashMap<PeerId, (B::Hash, NumberFor<B>)>,
284}
285
286impl<B: BlockT, Client> SyncingStrategy<B> for PolkadotSyncingStrategy<B, Client>
287where
288	B: BlockT,
289	Client: HeaderBackend<B>
290		+ BlockBackend<B>
291		+ HeaderMetadata<B, Error = sp_blockchain::Error>
292		+ ProofProvider<B>
293		+ Send
294		+ Sync
295		+ 'static,
296{
297	fn add_peer(&mut self, peer_id: PeerId, best_hash: B::Hash, best_number: NumberFor<B>) {
298		self.peer_best_blocks.insert(peer_id, (best_hash, best_number));
299
300		self.warp.as_mut().map(|s| s.add_peer(peer_id, best_hash, best_number));
301		self.state.as_mut().map(|s| s.add_peer(peer_id, best_hash, best_number));
302		self.chain_sync.as_mut().map(|s| s.add_peer(peer_id, best_hash, best_number));
303	}
304
305	fn remove_peer(&mut self, peer_id: &PeerId) {
306		self.warp.as_mut().map(|s| s.remove_peer(peer_id));
307		self.state.as_mut().map(|s| s.remove_peer(peer_id));
308		self.chain_sync.as_mut().map(|s| s.remove_peer(peer_id));
309
310		self.peer_best_blocks.remove(peer_id);
311	}
312
313	fn on_validated_block_announce(
314		&mut self,
315		is_best: bool,
316		peer_id: PeerId,
317		announce: &BlockAnnounce<B::Header>,
318	) -> Option<(B::Hash, NumberFor<B>)> {
319		let new_best = if let Some(ref mut warp) = self.warp {
320			warp.on_validated_block_announce(is_best, peer_id, announce)
321		} else if let Some(ref mut state) = self.state {
322			state.on_validated_block_announce(is_best, peer_id, announce)
323		} else if let Some(ref mut chain_sync) = self.chain_sync {
324			chain_sync.on_validated_block_announce(is_best, peer_id, announce)
325		} else {
326			error!(target: LOG_TARGET, "No syncing strategy is active.");
327			debug_assert!(false);
328			Some((announce.header.hash(), *announce.header.number()))
329		};
330
331		if let Some(new_best) = new_best {
332			if let Some(best) = self.peer_best_blocks.get_mut(&peer_id) {
333				*best = new_best;
334			} else {
335				debug!(
336					target: LOG_TARGET,
337					"Cannot update `peer_best_blocks` as peer {peer_id} is not known to `Strategy` \
338					 (already disconnected?)",
339				);
340			}
341		}
342
343		new_best
344	}
345
346	fn set_sync_fork_request(&mut self, peers: Vec<PeerId>, hash: &B::Hash, number: NumberFor<B>) {
347		// Fork requests are only handled by `ChainSync`.
348		if let Some(ref mut chain_sync) = self.chain_sync {
349			chain_sync.set_sync_fork_request(peers.clone(), hash, number);
350		}
351	}
352
353	fn request_justification(&mut self, hash: &B::Hash, number: NumberFor<B>) {
354		// Justifications can only be requested via `ChainSync`.
355		if let Some(ref mut chain_sync) = self.chain_sync {
356			chain_sync.request_justification(hash, number);
357		}
358	}
359
360	fn clear_justification_requests(&mut self) {
361		// Justification requests can only be cleared by `ChainSync`.
362		if let Some(ref mut chain_sync) = self.chain_sync {
363			chain_sync.clear_justification_requests();
364		}
365	}
366
367	fn on_justification_import(&mut self, hash: B::Hash, number: NumberFor<B>, success: bool) {
368		// Only `ChainSync` is interested in justification import.
369		if let Some(ref mut chain_sync) = self.chain_sync {
370			chain_sync.on_justification_import(hash, number, success);
371		}
372	}
373
374	fn on_block_response(
375		&mut self,
376		peer_id: PeerId,
377		key: StrategyKey,
378		request: BlockRequest<B>,
379		blocks: Vec<BlockData<B>>,
380	) {
381		if let (StrategyKey::Warp, Some(ref mut warp)) = (key, &mut self.warp) {
382			warp.on_block_response(peer_id, request, blocks);
383		} else if let (StrategyKey::ChainSync, Some(ref mut chain_sync)) =
384			(key, &mut self.chain_sync)
385		{
386			chain_sync.on_block_response(peer_id, key, request, blocks);
387		} else {
388			error!(
389				target: LOG_TARGET,
390				"`on_block_response()` called with unexpected key {key:?} \
391				 or corresponding strategy is not active.",
392			);
393			debug_assert!(false);
394		}
395	}
396
397	fn on_state_response(
398		&mut self,
399		peer_id: PeerId,
400		key: StrategyKey,
401		response: OpaqueStateResponse,
402	) {
403		if let (StrategyKey::State, Some(ref mut state)) = (key, &mut self.state) {
404			state.on_state_response(peer_id, response);
405		} else if let (StrategyKey::ChainSync, Some(ref mut chain_sync)) =
406			(key, &mut self.chain_sync)
407		{
408			chain_sync.on_state_response(peer_id, key, response);
409		} else {
410			error!(
411				target: LOG_TARGET,
412				"`on_state_response()` called with unexpected key {key:?} \
413				 or corresponding strategy is not active.",
414			);
415			debug_assert!(false);
416		}
417	}
418
419	fn on_warp_proof_response(
420		&mut self,
421		peer_id: &PeerId,
422		key: StrategyKey,
423		response: EncodedProof,
424	) {
425		if let (StrategyKey::Warp, Some(ref mut warp)) = (key, &mut self.warp) {
426			warp.on_warp_proof_response(peer_id, response);
427		} else {
428			error!(
429				target: LOG_TARGET,
430				"`on_warp_proof_response()` called with unexpected key {key:?} \
431				 or warp strategy is not active",
432			);
433			debug_assert!(false);
434		}
435	}
436
437	fn on_blocks_processed(
438		&mut self,
439		imported: usize,
440		count: usize,
441		results: Vec<(Result<BlockImportStatus<NumberFor<B>>, BlockImportError>, B::Hash)>,
442	) {
443		// Only `StateStrategy` and `ChainSync` are interested in block processing notifications.
444		if let Some(ref mut state) = self.state {
445			state.on_blocks_processed(imported, count, results);
446		} else if let Some(ref mut chain_sync) = self.chain_sync {
447			chain_sync.on_blocks_processed(imported, count, results);
448		}
449	}
450
451	fn on_block_finalized(&mut self, hash: &B::Hash, number: NumberFor<B>) {
452		// Only `ChainSync` is interested in block finalization notifications.
453		if let Some(ref mut chain_sync) = self.chain_sync {
454			chain_sync.on_block_finalized(hash, number);
455		}
456	}
457
458	fn update_chain_info(&mut self, best_hash: &B::Hash, best_number: NumberFor<B>) {
459		// This is relevant to `ChainSync` only.
460		if let Some(ref mut chain_sync) = self.chain_sync {
461			chain_sync.update_chain_info(best_hash, best_number);
462		}
463	}
464
465	fn is_major_syncing(&self) -> bool {
466		self.warp.is_some() ||
467			self.state.is_some() ||
468			match self.chain_sync {
469				Some(ref s) => s.status().state.is_major_syncing(),
470				None => unreachable!("At least one syncing strategy is active; qed"),
471			}
472	}
473
474	fn num_peers(&self) -> usize {
475		self.peer_best_blocks.len()
476	}
477
478	fn status(&self) -> SyncStatus<B> {
479		// This function presumes that strategies are executed serially and must be refactored
480		// once we have parallel strategies.
481		if let Some(ref warp) = self.warp {
482			warp.status()
483		} else if let Some(ref state) = self.state {
484			state.status()
485		} else if let Some(ref chain_sync) = self.chain_sync {
486			chain_sync.status()
487		} else {
488			unreachable!("At least one syncing strategy is always active; qed")
489		}
490	}
491
492	fn num_downloaded_blocks(&self) -> usize {
493		self.chain_sync
494			.as_ref()
495			.map_or(0, |chain_sync| chain_sync.num_downloaded_blocks())
496	}
497
498	fn num_sync_requests(&self) -> usize {
499		self.chain_sync.as_ref().map_or(0, |chain_sync| chain_sync.num_sync_requests())
500	}
501
502	fn actions(&mut self) -> Result<Vec<SyncingAction<B>>, ClientError> {
503		// This function presumes that strategies are executed serially and must be refactored once
504		// we have parallel strategies.
505		let actions: Vec<_> = if let Some(ref mut warp) = self.warp {
506			warp.actions().map(Into::into).collect()
507		} else if let Some(ref mut state) = self.state {
508			state.actions().map(Into::into).collect()
509		} else if let Some(ref mut chain_sync) = self.chain_sync {
510			chain_sync.actions()?
511		} else {
512			unreachable!("At least one syncing strategy is always active; qed")
513		};
514
515		if actions.iter().any(SyncingAction::is_finished) {
516			self.proceed_to_next()?;
517		}
518
519		Ok(actions)
520	}
521}
522
523impl<B: BlockT, Client> PolkadotSyncingStrategy<B, Client>
524where
525	B: BlockT,
526	Client: HeaderBackend<B>
527		+ BlockBackend<B>
528		+ HeaderMetadata<B, Error = sp_blockchain::Error>
529		+ ProofProvider<B>
530		+ Send
531		+ Sync
532		+ 'static,
533{
534	/// Initialize a new syncing strategy.
535	pub fn new(
536		mut config: SyncingConfig,
537		client: Arc<Client>,
538		warp_sync_config: Option<WarpSyncConfig<B>>,
539		warp_sync_protocol_name: Option<ProtocolName>,
540	) -> Result<Self, ClientError> {
541		if config.max_blocks_per_request > MAX_BLOCKS_IN_RESPONSE as u32 {
542			info!(
543				target: LOG_TARGET,
544				"clamping maximum blocks per request to {MAX_BLOCKS_IN_RESPONSE}",
545			);
546			config.max_blocks_per_request = MAX_BLOCKS_IN_RESPONSE as u32;
547		}
548
549		if let SyncMode::Warp = config.mode {
550			let warp_sync_config = warp_sync_config
551				.expect("Warp sync configuration must be supplied in warp sync mode.");
552			let warp_sync =
553				WarpSync::new(client.clone(), warp_sync_config, warp_sync_protocol_name);
554			Ok(Self {
555				config,
556				client,
557				warp: Some(warp_sync),
558				state: None,
559				chain_sync: None,
560				peer_best_blocks: Default::default(),
561			})
562		} else {
563			let chain_sync = ChainSync::new(
564				chain_sync_mode(config.mode),
565				client.clone(),
566				config.max_parallel_downloads,
567				config.max_blocks_per_request,
568				config.state_request_protocol_name.clone(),
569				config.metrics_registry.as_ref(),
570				std::iter::empty(),
571			)?;
572			Ok(Self {
573				config,
574				client,
575				warp: None,
576				state: None,
577				chain_sync: Some(chain_sync),
578				peer_best_blocks: Default::default(),
579			})
580		}
581	}
582
583	/// Proceed with the next strategy if the active one finished.
584	pub fn proceed_to_next(&mut self) -> Result<(), ClientError> {
585		// The strategies are switched as `WarpSync` -> `StateStrategy` -> `ChainSync`.
586		if let Some(ref mut warp) = self.warp {
587			match warp.take_result() {
588				Some(res) => {
589					info!(
590						target: LOG_TARGET,
591						"Warp sync is complete, continuing with state sync."
592					);
593					let state_sync = StateStrategy::new(
594						self.client.clone(),
595						res.target_header,
596						res.target_body,
597						res.target_justifications,
598						false,
599						self.peer_best_blocks
600							.iter()
601							.map(|(peer_id, (_, best_number))| (*peer_id, *best_number)),
602						self.config.state_request_protocol_name.clone(),
603					);
604
605					self.warp = None;
606					self.state = Some(state_sync);
607					Ok(())
608				},
609				None => {
610					error!(
611						target: LOG_TARGET,
612						"Warp sync failed. Continuing with full sync."
613					);
614					let chain_sync = match ChainSync::new(
615						chain_sync_mode(self.config.mode),
616						self.client.clone(),
617						self.config.max_parallel_downloads,
618						self.config.max_blocks_per_request,
619						self.config.state_request_protocol_name.clone(),
620						self.config.metrics_registry.as_ref(),
621						self.peer_best_blocks.iter().map(|(peer_id, (best_hash, best_number))| {
622							(*peer_id, *best_hash, *best_number)
623						}),
624					) {
625						Ok(chain_sync) => chain_sync,
626						Err(e) => {
627							error!(target: LOG_TARGET, "Failed to start `ChainSync`.");
628							return Err(e)
629						},
630					};
631
632					self.warp = None;
633					self.chain_sync = Some(chain_sync);
634					Ok(())
635				},
636			}
637		} else if let Some(state) = &self.state {
638			if state.is_succeeded() {
639				info!(target: LOG_TARGET, "State sync is complete, continuing with block sync.");
640			} else {
641				error!(target: LOG_TARGET, "State sync failed. Falling back to full sync.");
642			}
643			let chain_sync = match ChainSync::new(
644				chain_sync_mode(self.config.mode),
645				self.client.clone(),
646				self.config.max_parallel_downloads,
647				self.config.max_blocks_per_request,
648				self.config.state_request_protocol_name.clone(),
649				self.config.metrics_registry.as_ref(),
650				self.peer_best_blocks.iter().map(|(peer_id, (best_hash, best_number))| {
651					(*peer_id, *best_hash, *best_number)
652				}),
653			) {
654				Ok(chain_sync) => chain_sync,
655				Err(e) => {
656					error!(target: LOG_TARGET, "Failed to start `ChainSync`.");
657					return Err(e);
658				},
659			};
660
661			self.state = None;
662			self.chain_sync = Some(chain_sync);
663			Ok(())
664		} else {
665			unreachable!("Only warp & state strategies can finish; qed")
666		}
667	}
668}