1use crate::{
23 block_relay_protocol::BlockDownloader,
24 block_request_handler::MAX_BLOCKS_IN_RESPONSE,
25 service::network::NetworkServiceHandle,
26 strategy::{
27 chain_sync::{ChainSync, ChainSyncMode},
28 state::StateStrategy,
29 warp::{WarpSync, WarpSyncConfig},
30 StrategyKey, SyncingAction, SyncingStrategy,
31 },
32 types::SyncStatus,
33 LOG_TARGET,
34};
35use log::{debug, error, info, warn};
36use prometheus_endpoint::Registry;
37use sc_client_api::{BlockBackend, ProofProvider};
38use sc_consensus::{BlockImportError, BlockImportStatus};
39use sc_network::ProtocolName;
40use sc_network_common::sync::{message::BlockAnnounce, SyncMode};
41use sc_network_types::PeerId;
42use sp_blockchain::{Error as ClientError, HeaderBackend, HeaderMetadata};
43use sp_runtime::traits::{Block as BlockT, Header, NumberFor};
44use std::{any::Any, collections::HashMap, sync::Arc};
45
46fn chain_sync_mode(sync_mode: SyncMode) -> ChainSyncMode {
48 match sync_mode {
49 SyncMode::Full => ChainSyncMode::Full,
50 SyncMode::LightState { skip_proofs, storage_chain_mode } =>
51 ChainSyncMode::LightState { skip_proofs, storage_chain_mode },
52 SyncMode::Warp => ChainSyncMode::Full,
53 }
54}
55
56#[derive(Clone, Debug)]
58pub struct PolkadotSyncingStrategyConfig<Block>
59where
60 Block: BlockT,
61{
62 pub mode: SyncMode,
64 pub max_parallel_downloads: u32,
66 pub max_blocks_per_request: u32,
68 pub min_peers_to_start_warp_sync: Option<usize>,
70 pub metrics_registry: Option<Registry>,
72 pub state_request_protocol_name: ProtocolName,
74 pub block_downloader: Arc<dyn BlockDownloader<Block>>,
76}
77
78pub struct PolkadotSyncingStrategy<B: BlockT, Client> {
80 config: PolkadotSyncingStrategyConfig<B>,
82 client: Arc<Client>,
84 warp: Option<WarpSync<B, Client>>,
86 state: Option<StateStrategy<B>>,
88 chain_sync: Option<ChainSync<B, Client>>,
90 peer_best_blocks: HashMap<PeerId, (B::Hash, NumberFor<B>)>,
93}
94
95impl<B: BlockT, Client> SyncingStrategy<B> for PolkadotSyncingStrategy<B, Client>
96where
97 B: BlockT,
98 Client: HeaderBackend<B>
99 + BlockBackend<B>
100 + HeaderMetadata<B, Error = sp_blockchain::Error>
101 + ProofProvider<B>
102 + Send
103 + Sync
104 + 'static,
105{
106 fn add_peer(&mut self, peer_id: PeerId, best_hash: B::Hash, best_number: NumberFor<B>) {
107 self.peer_best_blocks.insert(peer_id, (best_hash, best_number));
108
109 self.warp.as_mut().map(|s| s.add_peer(peer_id, best_hash, best_number));
110 self.state.as_mut().map(|s| s.add_peer(peer_id, best_hash, best_number));
111 self.chain_sync.as_mut().map(|s| s.add_peer(peer_id, best_hash, best_number));
112 }
113
114 fn remove_peer(&mut self, peer_id: &PeerId) {
115 self.warp.as_mut().map(|s| s.remove_peer(peer_id));
116 self.state.as_mut().map(|s| s.remove_peer(peer_id));
117 self.chain_sync.as_mut().map(|s| s.remove_peer(peer_id));
118
119 self.peer_best_blocks.remove(peer_id);
120 }
121
122 fn on_validated_block_announce(
123 &mut self,
124 is_best: bool,
125 peer_id: PeerId,
126 announce: &BlockAnnounce<B::Header>,
127 ) -> Option<(B::Hash, NumberFor<B>)> {
128 let new_best = if let Some(ref mut warp) = self.warp {
129 warp.on_validated_block_announce(is_best, peer_id, announce)
130 } else if let Some(ref mut state) = self.state {
131 state.on_validated_block_announce(is_best, peer_id, announce)
132 } else if let Some(ref mut chain_sync) = self.chain_sync {
133 chain_sync.on_validated_block_announce(is_best, peer_id, announce)
134 } else {
135 error!(target: LOG_TARGET, "No syncing strategy is active.");
136 debug_assert!(false);
137 Some((announce.header.hash(), *announce.header.number()))
138 };
139
140 if let Some(new_best) = new_best {
141 if let Some(best) = self.peer_best_blocks.get_mut(&peer_id) {
142 *best = new_best;
143 } else {
144 debug!(
145 target: LOG_TARGET,
146 "Cannot update `peer_best_blocks` as peer {peer_id} is not known to `Strategy` \
147 (already disconnected?)",
148 );
149 }
150 }
151
152 new_best
153 }
154
155 fn set_sync_fork_request(&mut self, peers: Vec<PeerId>, hash: &B::Hash, number: NumberFor<B>) {
156 if let Some(ref mut chain_sync) = self.chain_sync {
158 chain_sync.set_sync_fork_request(peers.clone(), hash, number);
159 }
160 }
161
162 fn request_justification(&mut self, hash: &B::Hash, number: NumberFor<B>) {
163 if let Some(ref mut chain_sync) = self.chain_sync {
165 chain_sync.request_justification(hash, number);
166 }
167 }
168
169 fn clear_justification_requests(&mut self) {
170 if let Some(ref mut chain_sync) = self.chain_sync {
172 chain_sync.clear_justification_requests();
173 }
174 }
175
176 fn on_justification_import(&mut self, hash: B::Hash, number: NumberFor<B>, success: bool) {
177 if let Some(ref mut chain_sync) = self.chain_sync {
179 chain_sync.on_justification_import(hash, number, success);
180 }
181 }
182
183 fn on_generic_response(
184 &mut self,
185 peer_id: &PeerId,
186 key: StrategyKey,
187 protocol_name: ProtocolName,
188 response: Box<dyn Any + Send>,
189 ) {
190 match key {
191 StateStrategy::<B>::STRATEGY_KEY =>
192 if let Some(state) = &mut self.state {
193 let Ok(response) = response.downcast::<Vec<u8>>() else {
194 warn!(target: LOG_TARGET, "Failed to downcast state response");
195 debug_assert!(false);
196 return;
197 };
198
199 state.on_state_response(peer_id, *response);
200 } else if let Some(chain_sync) = &mut self.chain_sync {
201 chain_sync.on_generic_response(peer_id, key, protocol_name, response);
202 } else {
203 error!(
204 target: LOG_TARGET,
205 "`on_generic_response()` called with unexpected key {key:?} \
206 or corresponding strategy is not active.",
207 );
208 debug_assert!(false);
209 },
210 WarpSync::<B, Client>::STRATEGY_KEY =>
211 if let Some(warp) = &mut self.warp {
212 warp.on_generic_response(peer_id, protocol_name, response);
213 } else {
214 error!(
215 target: LOG_TARGET,
216 "`on_generic_response()` called with unexpected key {key:?} \
217 or warp strategy is not active",
218 );
219 debug_assert!(false);
220 },
221 ChainSync::<B, Client>::STRATEGY_KEY =>
222 if let Some(chain_sync) = &mut self.chain_sync {
223 chain_sync.on_generic_response(peer_id, key, protocol_name, response);
224 } else {
225 error!(
226 target: LOG_TARGET,
227 "`on_generic_response()` called with unexpected key {key:?} \
228 or corresponding strategy is not active.",
229 );
230 debug_assert!(false);
231 },
232 key => {
233 warn!(
234 target: LOG_TARGET,
235 "Unexpected generic response strategy key {key:?}, protocol {protocol_name}",
236 );
237 debug_assert!(false);
238 },
239 }
240 }
241
242 fn on_blocks_processed(
243 &mut self,
244 imported: usize,
245 count: usize,
246 results: Vec<(Result<BlockImportStatus<NumberFor<B>>, BlockImportError>, B::Hash)>,
247 ) {
248 if let Some(ref mut state) = self.state {
250 state.on_blocks_processed(imported, count, results);
251 } else if let Some(ref mut chain_sync) = self.chain_sync {
252 chain_sync.on_blocks_processed(imported, count, results);
253 }
254 }
255
256 fn on_block_finalized(&mut self, hash: &B::Hash, number: NumberFor<B>) {
257 if let Some(ref mut chain_sync) = self.chain_sync {
259 chain_sync.on_block_finalized(hash, number);
260 }
261 }
262
263 fn update_chain_info(&mut self, best_hash: &B::Hash, best_number: NumberFor<B>) {
264 if let Some(ref mut chain_sync) = self.chain_sync {
266 chain_sync.update_chain_info(best_hash, best_number);
267 }
268 }
269
270 fn is_major_syncing(&self) -> bool {
271 self.warp.is_some() ||
272 self.state.is_some() ||
273 match self.chain_sync {
274 Some(ref s) => s.status().state.is_major_syncing(),
275 None => unreachable!("At least one syncing strategy is active; qed"),
276 }
277 }
278
279 fn num_peers(&self) -> usize {
280 self.peer_best_blocks.len()
281 }
282
283 fn status(&self) -> SyncStatus<B> {
284 if let Some(ref warp) = self.warp {
287 warp.status()
288 } else if let Some(ref state) = self.state {
289 state.status()
290 } else if let Some(ref chain_sync) = self.chain_sync {
291 chain_sync.status()
292 } else {
293 unreachable!("At least one syncing strategy is always active; qed")
294 }
295 }
296
297 fn num_downloaded_blocks(&self) -> usize {
298 self.chain_sync
299 .as_ref()
300 .map_or(0, |chain_sync| chain_sync.num_downloaded_blocks())
301 }
302
303 fn num_sync_requests(&self) -> usize {
304 self.chain_sync.as_ref().map_or(0, |chain_sync| chain_sync.num_sync_requests())
305 }
306
307 fn actions(
308 &mut self,
309 network_service: &NetworkServiceHandle,
310 ) -> Result<Vec<SyncingAction<B>>, ClientError> {
311 let actions: Vec<_> = if let Some(ref mut warp) = self.warp {
314 warp.actions(network_service).map(Into::into).collect()
315 } else if let Some(ref mut state) = self.state {
316 state.actions(network_service).map(Into::into).collect()
317 } else if let Some(ref mut chain_sync) = self.chain_sync {
318 chain_sync.actions(network_service)?
319 } else {
320 unreachable!("At least one syncing strategy is always active; qed")
321 };
322
323 if actions.iter().any(SyncingAction::is_finished) {
324 self.proceed_to_next()?;
325 }
326
327 Ok(actions)
328 }
329}
330
331impl<B: BlockT, Client> PolkadotSyncingStrategy<B, Client>
332where
333 B: BlockT,
334 Client: HeaderBackend<B>
335 + BlockBackend<B>
336 + HeaderMetadata<B, Error = sp_blockchain::Error>
337 + ProofProvider<B>
338 + Send
339 + Sync
340 + 'static,
341{
342 pub fn new(
344 mut config: PolkadotSyncingStrategyConfig<B>,
345 client: Arc<Client>,
346 warp_sync_config: Option<WarpSyncConfig<B>>,
347 warp_sync_protocol_name: Option<ProtocolName>,
348 ) -> Result<Self, ClientError> {
349 if config.max_blocks_per_request > MAX_BLOCKS_IN_RESPONSE as u32 {
350 info!(
351 target: LOG_TARGET,
352 "clamping maximum blocks per request to {MAX_BLOCKS_IN_RESPONSE}",
353 );
354 config.max_blocks_per_request = MAX_BLOCKS_IN_RESPONSE as u32;
355 }
356
357 if let SyncMode::Warp = config.mode {
358 let warp_sync_config = warp_sync_config
359 .expect("Warp sync configuration must be supplied in warp sync mode.");
360 let warp_sync = WarpSync::new(
361 client.clone(),
362 warp_sync_config,
363 warp_sync_protocol_name,
364 config.block_downloader.clone(),
365 config.min_peers_to_start_warp_sync,
366 );
367 Ok(Self {
368 config,
369 client,
370 warp: Some(warp_sync),
371 state: None,
372 chain_sync: None,
373 peer_best_blocks: Default::default(),
374 })
375 } else {
376 let chain_sync = ChainSync::new(
377 chain_sync_mode(config.mode),
378 client.clone(),
379 config.max_parallel_downloads,
380 config.max_blocks_per_request,
381 config.state_request_protocol_name.clone(),
382 config.block_downloader.clone(),
383 config.metrics_registry.as_ref(),
384 std::iter::empty(),
385 )?;
386 Ok(Self {
387 config,
388 client,
389 warp: None,
390 state: None,
391 chain_sync: Some(chain_sync),
392 peer_best_blocks: Default::default(),
393 })
394 }
395 }
396
397 pub fn proceed_to_next(&mut self) -> Result<(), ClientError> {
399 if let Some(ref mut warp) = self.warp {
401 match warp.take_result() {
402 Some(res) => {
403 info!(
404 target: LOG_TARGET,
405 "Warp sync is complete, continuing with state sync."
406 );
407 let state_sync = StateStrategy::new(
408 self.client.clone(),
409 res.target_header,
410 res.target_body,
411 res.target_justifications,
412 false,
413 self.peer_best_blocks
414 .iter()
415 .map(|(peer_id, (_, best_number))| (*peer_id, *best_number)),
416 self.config.state_request_protocol_name.clone(),
417 );
418
419 self.warp = None;
420 self.state = Some(state_sync);
421 Ok(())
422 },
423 None => {
424 error!(
425 target: LOG_TARGET,
426 "Warp sync failed. Continuing with full sync."
427 );
428 let chain_sync = match ChainSync::new(
429 chain_sync_mode(self.config.mode),
430 self.client.clone(),
431 self.config.max_parallel_downloads,
432 self.config.max_blocks_per_request,
433 self.config.state_request_protocol_name.clone(),
434 self.config.block_downloader.clone(),
435 self.config.metrics_registry.as_ref(),
436 self.peer_best_blocks.iter().map(|(peer_id, (best_hash, best_number))| {
437 (*peer_id, *best_hash, *best_number)
438 }),
439 ) {
440 Ok(chain_sync) => chain_sync,
441 Err(e) => {
442 error!(target: LOG_TARGET, "Failed to start `ChainSync`.");
443 return Err(e)
444 },
445 };
446
447 self.warp = None;
448 self.chain_sync = Some(chain_sync);
449 Ok(())
450 },
451 }
452 } else if let Some(state) = &self.state {
453 if state.is_succeeded() {
454 info!(target: LOG_TARGET, "State sync is complete, continuing with block sync.");
455 } else {
456 error!(target: LOG_TARGET, "State sync failed. Falling back to full sync.");
457 }
458 let chain_sync = match ChainSync::new(
459 chain_sync_mode(self.config.mode),
460 self.client.clone(),
461 self.config.max_parallel_downloads,
462 self.config.max_blocks_per_request,
463 self.config.state_request_protocol_name.clone(),
464 self.config.block_downloader.clone(),
465 self.config.metrics_registry.as_ref(),
466 self.peer_best_blocks.iter().map(|(peer_id, (best_hash, best_number))| {
467 (*peer_id, *best_hash, *best_number)
468 }),
469 ) {
470 Ok(chain_sync) => chain_sync,
471 Err(e) => {
472 error!(target: LOG_TARGET, "Failed to start `ChainSync`.");
473 return Err(e);
474 },
475 };
476
477 self.state = None;
478 self.chain_sync = Some(chain_sync);
479 Ok(())
480 } else {
481 unreachable!("Only warp & state strategies can finish; qed")
482 }
483 }
484}