referrerpolicy=no-referrer-when-downgrade

sc_network_sync/strategy/
polkadot.rs

1// This file is part of Substrate.
2
3// Copyright (C) Parity Technologies (UK) Ltd.
4// SPDX-License-Identifier: GPL-3.0-or-later WITH Classpath-exception-2.0
5
6// This program is free software: you can redistribute it and/or modify
7// it under the terms of the GNU General Public License as published by
8// the Free Software Foundation, either version 3 of the License, or
9// (at your option) any later version.
10
11// This program is distributed in the hope that it will be useful,
12// but WITHOUT ANY WARRANTY; without even the implied warranty of
13// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
14// GNU General Public License for more details.
15
16// You should have received a copy of the GNU General Public License
17// along with this program. If not, see <https://www.gnu.org/licenses/>.
18
19//! [`PolkadotSyncingStrategy`] is a proxy between [`crate::engine::SyncingEngine`]
20//! and specific syncing algorithms.
21
22use crate::{
23	block_relay_protocol::BlockDownloader,
24	block_request_handler::MAX_BLOCKS_IN_RESPONSE,
25	service::network::NetworkServiceHandle,
26	strategy::{
27		chain_sync::{ChainSync, ChainSyncMode},
28		state::StateStrategy,
29		warp::{WarpSync, WarpSyncConfig},
30		StrategyKey, SyncingAction, SyncingStrategy,
31	},
32	types::SyncStatus,
33	LOG_TARGET,
34};
35use log::{debug, error, info, warn};
36use prometheus_endpoint::Registry;
37use sc_client_api::{BlockBackend, ProofProvider};
38use sc_consensus::{BlockImportError, BlockImportStatus};
39use sc_network::ProtocolName;
40use sc_network_common::sync::{message::BlockAnnounce, SyncMode};
41use sc_network_types::PeerId;
42use sp_blockchain::{Error as ClientError, HeaderBackend, HeaderMetadata};
43use sp_runtime::traits::{Block as BlockT, Header, NumberFor};
44use std::{any::Any, collections::HashMap, sync::Arc};
45
46/// Corresponding `ChainSync` mode.
47fn chain_sync_mode(sync_mode: SyncMode) -> ChainSyncMode {
48	match sync_mode {
49		SyncMode::Full => ChainSyncMode::Full,
50		SyncMode::LightState { skip_proofs, storage_chain_mode } =>
51			ChainSyncMode::LightState { skip_proofs, storage_chain_mode },
52		SyncMode::Warp => ChainSyncMode::Full,
53	}
54}
55
56/// Syncing configuration containing data for [`PolkadotSyncingStrategy`].
57#[derive(Clone, Debug)]
58pub struct PolkadotSyncingStrategyConfig<Block>
59where
60	Block: BlockT,
61{
62	/// Syncing mode.
63	pub mode: SyncMode,
64	/// The number of parallel downloads to guard against slow peers.
65	pub max_parallel_downloads: u32,
66	/// Maximum number of blocks to request.
67	pub max_blocks_per_request: u32,
68	/// Number of peers that need to be connected before warp sync is started.
69	pub min_peers_to_start_warp_sync: Option<usize>,
70	/// Prometheus metrics registry.
71	pub metrics_registry: Option<Registry>,
72	/// Protocol name used to send out state requests
73	pub state_request_protocol_name: ProtocolName,
74	/// Block downloader
75	pub block_downloader: Arc<dyn BlockDownloader<Block>>,
76}
77
78/// Proxy to specific syncing strategies used in Polkadot.
79pub struct PolkadotSyncingStrategy<B: BlockT, Client> {
80	/// Initial syncing configuration.
81	config: PolkadotSyncingStrategyConfig<B>,
82	/// Client used by syncing strategies.
83	client: Arc<Client>,
84	/// Warp strategy.
85	warp: Option<WarpSync<B, Client>>,
86	/// State strategy.
87	state: Option<StateStrategy<B>>,
88	/// `ChainSync` strategy.`
89	chain_sync: Option<ChainSync<B, Client>>,
90	/// Connected peers and their best blocks used to seed a new strategy when switching to it in
91	/// `PolkadotSyncingStrategy::proceed_to_next`.
92	peer_best_blocks: HashMap<PeerId, (B::Hash, NumberFor<B>)>,
93}
94
95impl<B: BlockT, Client> SyncingStrategy<B> for PolkadotSyncingStrategy<B, Client>
96where
97	B: BlockT,
98	Client: HeaderBackend<B>
99		+ BlockBackend<B>
100		+ HeaderMetadata<B, Error = sp_blockchain::Error>
101		+ ProofProvider<B>
102		+ Send
103		+ Sync
104		+ 'static,
105{
106	fn add_peer(&mut self, peer_id: PeerId, best_hash: B::Hash, best_number: NumberFor<B>) {
107		self.peer_best_blocks.insert(peer_id, (best_hash, best_number));
108
109		self.warp.as_mut().map(|s| s.add_peer(peer_id, best_hash, best_number));
110		self.state.as_mut().map(|s| s.add_peer(peer_id, best_hash, best_number));
111		self.chain_sync.as_mut().map(|s| s.add_peer(peer_id, best_hash, best_number));
112	}
113
114	fn remove_peer(&mut self, peer_id: &PeerId) {
115		self.warp.as_mut().map(|s| s.remove_peer(peer_id));
116		self.state.as_mut().map(|s| s.remove_peer(peer_id));
117		self.chain_sync.as_mut().map(|s| s.remove_peer(peer_id));
118
119		self.peer_best_blocks.remove(peer_id);
120	}
121
122	fn on_validated_block_announce(
123		&mut self,
124		is_best: bool,
125		peer_id: PeerId,
126		announce: &BlockAnnounce<B::Header>,
127	) -> Option<(B::Hash, NumberFor<B>)> {
128		let new_best = if let Some(ref mut warp) = self.warp {
129			warp.on_validated_block_announce(is_best, peer_id, announce)
130		} else if let Some(ref mut state) = self.state {
131			state.on_validated_block_announce(is_best, peer_id, announce)
132		} else if let Some(ref mut chain_sync) = self.chain_sync {
133			chain_sync.on_validated_block_announce(is_best, peer_id, announce)
134		} else {
135			error!(target: LOG_TARGET, "No syncing strategy is active.");
136			debug_assert!(false);
137			Some((announce.header.hash(), *announce.header.number()))
138		};
139
140		if let Some(new_best) = new_best {
141			if let Some(best) = self.peer_best_blocks.get_mut(&peer_id) {
142				*best = new_best;
143			} else {
144				debug!(
145					target: LOG_TARGET,
146					"Cannot update `peer_best_blocks` as peer {peer_id} is not known to `Strategy` \
147					 (already disconnected?)",
148				);
149			}
150		}
151
152		new_best
153	}
154
155	fn set_sync_fork_request(&mut self, peers: Vec<PeerId>, hash: &B::Hash, number: NumberFor<B>) {
156		// Fork requests are only handled by `ChainSync`.
157		if let Some(ref mut chain_sync) = self.chain_sync {
158			chain_sync.set_sync_fork_request(peers.clone(), hash, number);
159		}
160	}
161
162	fn request_justification(&mut self, hash: &B::Hash, number: NumberFor<B>) {
163		// Justifications can only be requested via `ChainSync`.
164		if let Some(ref mut chain_sync) = self.chain_sync {
165			chain_sync.request_justification(hash, number);
166		}
167	}
168
169	fn clear_justification_requests(&mut self) {
170		// Justification requests can only be cleared by `ChainSync`.
171		if let Some(ref mut chain_sync) = self.chain_sync {
172			chain_sync.clear_justification_requests();
173		}
174	}
175
176	fn on_justification_import(&mut self, hash: B::Hash, number: NumberFor<B>, success: bool) {
177		// Only `ChainSync` is interested in justification import.
178		if let Some(ref mut chain_sync) = self.chain_sync {
179			chain_sync.on_justification_import(hash, number, success);
180		}
181	}
182
183	fn on_generic_response(
184		&mut self,
185		peer_id: &PeerId,
186		key: StrategyKey,
187		protocol_name: ProtocolName,
188		response: Box<dyn Any + Send>,
189	) {
190		match key {
191			StateStrategy::<B>::STRATEGY_KEY =>
192				if let Some(state) = &mut self.state {
193					let Ok(response) = response.downcast::<Vec<u8>>() else {
194						warn!(target: LOG_TARGET, "Failed to downcast state response");
195						debug_assert!(false);
196						return;
197					};
198
199					state.on_state_response(peer_id, *response);
200				} else if let Some(chain_sync) = &mut self.chain_sync {
201					chain_sync.on_generic_response(peer_id, key, protocol_name, response);
202				} else {
203					error!(
204						target: LOG_TARGET,
205						"`on_generic_response()` called with unexpected key {key:?} \
206						 or corresponding strategy is not active.",
207					);
208					debug_assert!(false);
209				},
210			WarpSync::<B, Client>::STRATEGY_KEY =>
211				if let Some(warp) = &mut self.warp {
212					warp.on_generic_response(peer_id, protocol_name, response);
213				} else {
214					error!(
215						target: LOG_TARGET,
216						"`on_generic_response()` called with unexpected key {key:?} \
217						 or warp strategy is not active",
218					);
219					debug_assert!(false);
220				},
221			ChainSync::<B, Client>::STRATEGY_KEY =>
222				if let Some(chain_sync) = &mut self.chain_sync {
223					chain_sync.on_generic_response(peer_id, key, protocol_name, response);
224				} else {
225					error!(
226						target: LOG_TARGET,
227						"`on_generic_response()` called with unexpected key {key:?} \
228						 or corresponding strategy is not active.",
229					);
230					debug_assert!(false);
231				},
232			key => {
233				warn!(
234					target: LOG_TARGET,
235					"Unexpected generic response strategy key {key:?}, protocol {protocol_name}",
236				);
237				debug_assert!(false);
238			},
239		}
240	}
241
242	fn on_blocks_processed(
243		&mut self,
244		imported: usize,
245		count: usize,
246		results: Vec<(Result<BlockImportStatus<NumberFor<B>>, BlockImportError>, B::Hash)>,
247	) {
248		// Only `StateStrategy` and `ChainSync` are interested in block processing notifications.
249		if let Some(ref mut state) = self.state {
250			state.on_blocks_processed(imported, count, results);
251		} else if let Some(ref mut chain_sync) = self.chain_sync {
252			chain_sync.on_blocks_processed(imported, count, results);
253		}
254	}
255
256	fn on_block_finalized(&mut self, hash: &B::Hash, number: NumberFor<B>) {
257		// Only `ChainSync` is interested in block finalization notifications.
258		if let Some(ref mut chain_sync) = self.chain_sync {
259			chain_sync.on_block_finalized(hash, number);
260		}
261	}
262
263	fn update_chain_info(&mut self, best_hash: &B::Hash, best_number: NumberFor<B>) {
264		// This is relevant to `ChainSync` only.
265		if let Some(ref mut chain_sync) = self.chain_sync {
266			chain_sync.update_chain_info(best_hash, best_number);
267		}
268	}
269
270	fn is_major_syncing(&self) -> bool {
271		self.warp.is_some() ||
272			self.state.is_some() ||
273			match self.chain_sync {
274				Some(ref s) => s.status().state.is_major_syncing(),
275				None => unreachable!("At least one syncing strategy is active; qed"),
276			}
277	}
278
279	fn num_peers(&self) -> usize {
280		self.peer_best_blocks.len()
281	}
282
283	fn status(&self) -> SyncStatus<B> {
284		// This function presumes that strategies are executed serially and must be refactored
285		// once we have parallel strategies.
286		if let Some(ref warp) = self.warp {
287			warp.status()
288		} else if let Some(ref state) = self.state {
289			state.status()
290		} else if let Some(ref chain_sync) = self.chain_sync {
291			chain_sync.status()
292		} else {
293			unreachable!("At least one syncing strategy is always active; qed")
294		}
295	}
296
297	fn num_downloaded_blocks(&self) -> usize {
298		self.chain_sync
299			.as_ref()
300			.map_or(0, |chain_sync| chain_sync.num_downloaded_blocks())
301	}
302
303	fn num_sync_requests(&self) -> usize {
304		self.chain_sync.as_ref().map_or(0, |chain_sync| chain_sync.num_sync_requests())
305	}
306
307	fn actions(
308		&mut self,
309		network_service: &NetworkServiceHandle,
310	) -> Result<Vec<SyncingAction<B>>, ClientError> {
311		// This function presumes that strategies are executed serially and must be refactored once
312		// we have parallel strategies.
313		let actions: Vec<_> = if let Some(ref mut warp) = self.warp {
314			warp.actions(network_service).map(Into::into).collect()
315		} else if let Some(ref mut state) = self.state {
316			state.actions(network_service).map(Into::into).collect()
317		} else if let Some(ref mut chain_sync) = self.chain_sync {
318			chain_sync.actions(network_service)?
319		} else {
320			unreachable!("At least one syncing strategy is always active; qed")
321		};
322
323		if actions.iter().any(SyncingAction::is_finished) {
324			self.proceed_to_next()?;
325		}
326
327		Ok(actions)
328	}
329}
330
331impl<B: BlockT, Client> PolkadotSyncingStrategy<B, Client>
332where
333	B: BlockT,
334	Client: HeaderBackend<B>
335		+ BlockBackend<B>
336		+ HeaderMetadata<B, Error = sp_blockchain::Error>
337		+ ProofProvider<B>
338		+ Send
339		+ Sync
340		+ 'static,
341{
342	/// Initialize a new syncing strategy.
343	pub fn new(
344		mut config: PolkadotSyncingStrategyConfig<B>,
345		client: Arc<Client>,
346		warp_sync_config: Option<WarpSyncConfig<B>>,
347		warp_sync_protocol_name: Option<ProtocolName>,
348	) -> Result<Self, ClientError> {
349		if config.max_blocks_per_request > MAX_BLOCKS_IN_RESPONSE as u32 {
350			info!(
351				target: LOG_TARGET,
352				"clamping maximum blocks per request to {MAX_BLOCKS_IN_RESPONSE}",
353			);
354			config.max_blocks_per_request = MAX_BLOCKS_IN_RESPONSE as u32;
355		}
356
357		if let SyncMode::Warp = config.mode {
358			let warp_sync_config = warp_sync_config
359				.expect("Warp sync configuration must be supplied in warp sync mode.");
360			let warp_sync = WarpSync::new(
361				client.clone(),
362				warp_sync_config,
363				warp_sync_protocol_name,
364				config.block_downloader.clone(),
365				config.min_peers_to_start_warp_sync,
366			);
367			Ok(Self {
368				config,
369				client,
370				warp: Some(warp_sync),
371				state: None,
372				chain_sync: None,
373				peer_best_blocks: Default::default(),
374			})
375		} else {
376			let chain_sync = ChainSync::new(
377				chain_sync_mode(config.mode),
378				client.clone(),
379				config.max_parallel_downloads,
380				config.max_blocks_per_request,
381				config.state_request_protocol_name.clone(),
382				config.block_downloader.clone(),
383				config.metrics_registry.as_ref(),
384				std::iter::empty(),
385			)?;
386			Ok(Self {
387				config,
388				client,
389				warp: None,
390				state: None,
391				chain_sync: Some(chain_sync),
392				peer_best_blocks: Default::default(),
393			})
394		}
395	}
396
397	/// Proceed with the next strategy if the active one finished.
398	pub fn proceed_to_next(&mut self) -> Result<(), ClientError> {
399		// The strategies are switched as `WarpSync` -> `StateStrategy` -> `ChainSync`.
400		if let Some(ref mut warp) = self.warp {
401			match warp.take_result() {
402				Some(res) => {
403					info!(
404						target: LOG_TARGET,
405						"Warp sync is complete, continuing with state sync."
406					);
407					let state_sync = StateStrategy::new(
408						self.client.clone(),
409						res.target_header,
410						res.target_body,
411						res.target_justifications,
412						false,
413						self.peer_best_blocks
414							.iter()
415							.map(|(peer_id, (_, best_number))| (*peer_id, *best_number)),
416						self.config.state_request_protocol_name.clone(),
417					);
418
419					self.warp = None;
420					self.state = Some(state_sync);
421					Ok(())
422				},
423				None => {
424					error!(
425						target: LOG_TARGET,
426						"Warp sync failed. Continuing with full sync."
427					);
428					let chain_sync = match ChainSync::new(
429						chain_sync_mode(self.config.mode),
430						self.client.clone(),
431						self.config.max_parallel_downloads,
432						self.config.max_blocks_per_request,
433						self.config.state_request_protocol_name.clone(),
434						self.config.block_downloader.clone(),
435						self.config.metrics_registry.as_ref(),
436						self.peer_best_blocks.iter().map(|(peer_id, (best_hash, best_number))| {
437							(*peer_id, *best_hash, *best_number)
438						}),
439					) {
440						Ok(chain_sync) => chain_sync,
441						Err(e) => {
442							error!(target: LOG_TARGET, "Failed to start `ChainSync`.");
443							return Err(e)
444						},
445					};
446
447					self.warp = None;
448					self.chain_sync = Some(chain_sync);
449					Ok(())
450				},
451			}
452		} else if let Some(state) = &self.state {
453			if state.is_succeeded() {
454				info!(target: LOG_TARGET, "State sync is complete, continuing with block sync.");
455			} else {
456				error!(target: LOG_TARGET, "State sync failed. Falling back to full sync.");
457			}
458			let chain_sync = match ChainSync::new(
459				chain_sync_mode(self.config.mode),
460				self.client.clone(),
461				self.config.max_parallel_downloads,
462				self.config.max_blocks_per_request,
463				self.config.state_request_protocol_name.clone(),
464				self.config.block_downloader.clone(),
465				self.config.metrics_registry.as_ref(),
466				self.peer_best_blocks.iter().map(|(peer_id, (best_hash, best_number))| {
467					(*peer_id, *best_hash, *best_number)
468				}),
469			) {
470				Ok(chain_sync) => chain_sync,
471				Err(e) => {
472					error!(target: LOG_TARGET, "Failed to start `ChainSync`.");
473					return Err(e);
474				},
475			};
476
477			self.state = None;
478			self.chain_sync = Some(chain_sync);
479			Ok(())
480		} else {
481			unreachable!("Only warp & state strategies can finish; qed")
482		}
483	}
484}