1pub mod chain_sync;
23mod disconnected_peers;
24mod state;
25pub mod state_sync;
26pub mod warp;
27
28use crate::{
29 block_request_handler::MAX_BLOCKS_IN_RESPONSE,
30 types::{BadPeer, OpaqueStateRequest, OpaqueStateResponse, SyncStatus},
31 LOG_TARGET,
32};
33use chain_sync::{ChainSync, ChainSyncMode};
34use log::{debug, error, info};
35use prometheus_endpoint::Registry;
36use sc_client_api::{BlockBackend, ProofProvider};
37use sc_consensus::{BlockImportError, BlockImportStatus, IncomingBlock};
38use sc_network::ProtocolName;
39use sc_network_common::sync::{
40 message::{BlockAnnounce, BlockData, BlockRequest},
41 SyncMode,
42};
43use sc_network_types::PeerId;
44use sp_blockchain::{Error as ClientError, HeaderBackend, HeaderMetadata};
45use sp_consensus::BlockOrigin;
46use sp_runtime::{
47 traits::{Block as BlockT, Header, NumberFor},
48 Justifications,
49};
50use state::{StateStrategy, StateStrategyAction};
51use std::{collections::HashMap, sync::Arc};
52use warp::{EncodedProof, WarpProofRequest, WarpSync, WarpSyncAction, WarpSyncConfig};
53
54fn chain_sync_mode(sync_mode: SyncMode) -> ChainSyncMode {
56 match sync_mode {
57 SyncMode::Full => ChainSyncMode::Full,
58 SyncMode::LightState { skip_proofs, storage_chain_mode } =>
59 ChainSyncMode::LightState { skip_proofs, storage_chain_mode },
60 SyncMode::Warp => ChainSyncMode::Full,
61 }
62}
63
64pub trait SyncingStrategy<B: BlockT>: Send
66where
67 B: BlockT,
68{
69 fn add_peer(&mut self, peer_id: PeerId, best_hash: B::Hash, best_number: NumberFor<B>);
71
72 fn remove_peer(&mut self, peer_id: &PeerId);
74
75 #[must_use]
79 fn on_validated_block_announce(
80 &mut self,
81 is_best: bool,
82 peer_id: PeerId,
83 announce: &BlockAnnounce<B::Header>,
84 ) -> Option<(B::Hash, NumberFor<B>)>;
85
86 fn set_sync_fork_request(&mut self, peers: Vec<PeerId>, hash: &B::Hash, number: NumberFor<B>);
94
95 fn request_justification(&mut self, hash: &B::Hash, number: NumberFor<B>);
97
98 fn clear_justification_requests(&mut self);
100
101 fn on_justification_import(&mut self, hash: B::Hash, number: NumberFor<B>, success: bool);
103
104 fn on_block_response(
106 &mut self,
107 peer_id: PeerId,
108 key: StrategyKey,
109 request: BlockRequest<B>,
110 blocks: Vec<BlockData<B>>,
111 );
112
113 fn on_state_response(
115 &mut self,
116 peer_id: PeerId,
117 key: StrategyKey,
118 response: OpaqueStateResponse,
119 );
120
121 fn on_warp_proof_response(
123 &mut self,
124 peer_id: &PeerId,
125 key: StrategyKey,
126 response: EncodedProof,
127 );
128
129 fn on_blocks_processed(
134 &mut self,
135 imported: usize,
136 count: usize,
137 results: Vec<(Result<BlockImportStatus<NumberFor<B>>, BlockImportError>, B::Hash)>,
138 );
139
140 fn on_block_finalized(&mut self, hash: &B::Hash, number: NumberFor<B>);
142
143 fn update_chain_info(&mut self, best_hash: &B::Hash, best_number: NumberFor<B>);
145
146 fn is_major_syncing(&self) -> bool;
148
149 fn num_peers(&self) -> usize;
151
152 fn status(&self) -> SyncStatus<B>;
154
155 fn num_downloaded_blocks(&self) -> usize;
157
158 fn num_sync_requests(&self) -> usize;
160
161 #[must_use]
163 fn actions(&mut self) -> Result<Vec<SyncingAction<B>>, ClientError>;
164}
165
166#[derive(Clone, Debug)]
168pub struct SyncingConfig {
169 pub mode: SyncMode,
171 pub max_parallel_downloads: u32,
173 pub max_blocks_per_request: u32,
175 pub metrics_registry: Option<Registry>,
177 pub state_request_protocol_name: ProtocolName,
179}
180
181#[derive(Debug, Clone, Copy, PartialEq, Eq, Hash)]
183pub enum StrategyKey {
184 Warp,
186 State,
188 ChainSync,
190}
191
192#[derive(Debug)]
193pub enum SyncingAction<B: BlockT> {
194 SendBlockRequest { peer_id: PeerId, key: StrategyKey, request: BlockRequest<B> },
196 SendStateRequest {
198 peer_id: PeerId,
199 key: StrategyKey,
200 protocol_name: ProtocolName,
201 request: OpaqueStateRequest,
202 },
203 SendWarpProofRequest {
205 peer_id: PeerId,
206 key: StrategyKey,
207 protocol_name: ProtocolName,
208 request: WarpProofRequest<B>,
209 },
210 CancelRequest { peer_id: PeerId, key: StrategyKey },
212 DropPeer(BadPeer),
214 ImportBlocks { origin: BlockOrigin, blocks: Vec<IncomingBlock<B>> },
216 ImportJustifications {
218 peer_id: PeerId,
219 hash: B::Hash,
220 number: NumberFor<B>,
221 justifications: Justifications,
222 },
223 Finished,
225}
226
227impl<B: BlockT> SyncingAction<B> {
228 fn is_finished(&self) -> bool {
229 matches!(self, SyncingAction::Finished)
230 }
231}
232
233impl<B: BlockT> From<WarpSyncAction<B>> for SyncingAction<B> {
234 fn from(action: WarpSyncAction<B>) -> Self {
235 match action {
236 WarpSyncAction::SendWarpProofRequest { peer_id, protocol_name, request } =>
237 SyncingAction::SendWarpProofRequest {
238 peer_id,
239 key: StrategyKey::Warp,
240 protocol_name,
241 request,
242 },
243 WarpSyncAction::SendBlockRequest { peer_id, request } =>
244 SyncingAction::SendBlockRequest { peer_id, key: StrategyKey::Warp, request },
245 WarpSyncAction::DropPeer(bad_peer) => SyncingAction::DropPeer(bad_peer),
246 WarpSyncAction::Finished => SyncingAction::Finished,
247 }
248 }
249}
250
251impl<B: BlockT> From<StateStrategyAction<B>> for SyncingAction<B> {
252 fn from(action: StateStrategyAction<B>) -> Self {
253 match action {
254 StateStrategyAction::SendStateRequest { peer_id, protocol_name, request } =>
255 SyncingAction::SendStateRequest {
256 peer_id,
257 key: StrategyKey::State,
258 protocol_name,
259 request,
260 },
261 StateStrategyAction::DropPeer(bad_peer) => SyncingAction::DropPeer(bad_peer),
262 StateStrategyAction::ImportBlocks { origin, blocks } =>
263 SyncingAction::ImportBlocks { origin, blocks },
264 StateStrategyAction::Finished => SyncingAction::Finished,
265 }
266 }
267}
268
269pub struct PolkadotSyncingStrategy<B: BlockT, Client> {
271 config: SyncingConfig,
273 client: Arc<Client>,
275 warp: Option<WarpSync<B, Client>>,
277 state: Option<StateStrategy<B>>,
279 chain_sync: Option<ChainSync<B, Client>>,
281 peer_best_blocks: HashMap<PeerId, (B::Hash, NumberFor<B>)>,
284}
285
286impl<B: BlockT, Client> SyncingStrategy<B> for PolkadotSyncingStrategy<B, Client>
287where
288 B: BlockT,
289 Client: HeaderBackend<B>
290 + BlockBackend<B>
291 + HeaderMetadata<B, Error = sp_blockchain::Error>
292 + ProofProvider<B>
293 + Send
294 + Sync
295 + 'static,
296{
297 fn add_peer(&mut self, peer_id: PeerId, best_hash: B::Hash, best_number: NumberFor<B>) {
298 self.peer_best_blocks.insert(peer_id, (best_hash, best_number));
299
300 self.warp.as_mut().map(|s| s.add_peer(peer_id, best_hash, best_number));
301 self.state.as_mut().map(|s| s.add_peer(peer_id, best_hash, best_number));
302 self.chain_sync.as_mut().map(|s| s.add_peer(peer_id, best_hash, best_number));
303 }
304
305 fn remove_peer(&mut self, peer_id: &PeerId) {
306 self.warp.as_mut().map(|s| s.remove_peer(peer_id));
307 self.state.as_mut().map(|s| s.remove_peer(peer_id));
308 self.chain_sync.as_mut().map(|s| s.remove_peer(peer_id));
309
310 self.peer_best_blocks.remove(peer_id);
311 }
312
313 fn on_validated_block_announce(
314 &mut self,
315 is_best: bool,
316 peer_id: PeerId,
317 announce: &BlockAnnounce<B::Header>,
318 ) -> Option<(B::Hash, NumberFor<B>)> {
319 let new_best = if let Some(ref mut warp) = self.warp {
320 warp.on_validated_block_announce(is_best, peer_id, announce)
321 } else if let Some(ref mut state) = self.state {
322 state.on_validated_block_announce(is_best, peer_id, announce)
323 } else if let Some(ref mut chain_sync) = self.chain_sync {
324 chain_sync.on_validated_block_announce(is_best, peer_id, announce)
325 } else {
326 error!(target: LOG_TARGET, "No syncing strategy is active.");
327 debug_assert!(false);
328 Some((announce.header.hash(), *announce.header.number()))
329 };
330
331 if let Some(new_best) = new_best {
332 if let Some(best) = self.peer_best_blocks.get_mut(&peer_id) {
333 *best = new_best;
334 } else {
335 debug!(
336 target: LOG_TARGET,
337 "Cannot update `peer_best_blocks` as peer {peer_id} is not known to `Strategy` \
338 (already disconnected?)",
339 );
340 }
341 }
342
343 new_best
344 }
345
346 fn set_sync_fork_request(&mut self, peers: Vec<PeerId>, hash: &B::Hash, number: NumberFor<B>) {
347 if let Some(ref mut chain_sync) = self.chain_sync {
349 chain_sync.set_sync_fork_request(peers.clone(), hash, number);
350 }
351 }
352
353 fn request_justification(&mut self, hash: &B::Hash, number: NumberFor<B>) {
354 if let Some(ref mut chain_sync) = self.chain_sync {
356 chain_sync.request_justification(hash, number);
357 }
358 }
359
360 fn clear_justification_requests(&mut self) {
361 if let Some(ref mut chain_sync) = self.chain_sync {
363 chain_sync.clear_justification_requests();
364 }
365 }
366
367 fn on_justification_import(&mut self, hash: B::Hash, number: NumberFor<B>, success: bool) {
368 if let Some(ref mut chain_sync) = self.chain_sync {
370 chain_sync.on_justification_import(hash, number, success);
371 }
372 }
373
374 fn on_block_response(
375 &mut self,
376 peer_id: PeerId,
377 key: StrategyKey,
378 request: BlockRequest<B>,
379 blocks: Vec<BlockData<B>>,
380 ) {
381 if let (StrategyKey::Warp, Some(ref mut warp)) = (key, &mut self.warp) {
382 warp.on_block_response(peer_id, request, blocks);
383 } else if let (StrategyKey::ChainSync, Some(ref mut chain_sync)) =
384 (key, &mut self.chain_sync)
385 {
386 chain_sync.on_block_response(peer_id, key, request, blocks);
387 } else {
388 error!(
389 target: LOG_TARGET,
390 "`on_block_response()` called with unexpected key {key:?} \
391 or corresponding strategy is not active.",
392 );
393 debug_assert!(false);
394 }
395 }
396
397 fn on_state_response(
398 &mut self,
399 peer_id: PeerId,
400 key: StrategyKey,
401 response: OpaqueStateResponse,
402 ) {
403 if let (StrategyKey::State, Some(ref mut state)) = (key, &mut self.state) {
404 state.on_state_response(peer_id, response);
405 } else if let (StrategyKey::ChainSync, Some(ref mut chain_sync)) =
406 (key, &mut self.chain_sync)
407 {
408 chain_sync.on_state_response(peer_id, key, response);
409 } else {
410 error!(
411 target: LOG_TARGET,
412 "`on_state_response()` called with unexpected key {key:?} \
413 or corresponding strategy is not active.",
414 );
415 debug_assert!(false);
416 }
417 }
418
419 fn on_warp_proof_response(
420 &mut self,
421 peer_id: &PeerId,
422 key: StrategyKey,
423 response: EncodedProof,
424 ) {
425 if let (StrategyKey::Warp, Some(ref mut warp)) = (key, &mut self.warp) {
426 warp.on_warp_proof_response(peer_id, response);
427 } else {
428 error!(
429 target: LOG_TARGET,
430 "`on_warp_proof_response()` called with unexpected key {key:?} \
431 or warp strategy is not active",
432 );
433 debug_assert!(false);
434 }
435 }
436
437 fn on_blocks_processed(
438 &mut self,
439 imported: usize,
440 count: usize,
441 results: Vec<(Result<BlockImportStatus<NumberFor<B>>, BlockImportError>, B::Hash)>,
442 ) {
443 if let Some(ref mut state) = self.state {
445 state.on_blocks_processed(imported, count, results);
446 } else if let Some(ref mut chain_sync) = self.chain_sync {
447 chain_sync.on_blocks_processed(imported, count, results);
448 }
449 }
450
451 fn on_block_finalized(&mut self, hash: &B::Hash, number: NumberFor<B>) {
452 if let Some(ref mut chain_sync) = self.chain_sync {
454 chain_sync.on_block_finalized(hash, number);
455 }
456 }
457
458 fn update_chain_info(&mut self, best_hash: &B::Hash, best_number: NumberFor<B>) {
459 if let Some(ref mut chain_sync) = self.chain_sync {
461 chain_sync.update_chain_info(best_hash, best_number);
462 }
463 }
464
465 fn is_major_syncing(&self) -> bool {
466 self.warp.is_some() ||
467 self.state.is_some() ||
468 match self.chain_sync {
469 Some(ref s) => s.status().state.is_major_syncing(),
470 None => unreachable!("At least one syncing strategy is active; qed"),
471 }
472 }
473
474 fn num_peers(&self) -> usize {
475 self.peer_best_blocks.len()
476 }
477
478 fn status(&self) -> SyncStatus<B> {
479 if let Some(ref warp) = self.warp {
482 warp.status()
483 } else if let Some(ref state) = self.state {
484 state.status()
485 } else if let Some(ref chain_sync) = self.chain_sync {
486 chain_sync.status()
487 } else {
488 unreachable!("At least one syncing strategy is always active; qed")
489 }
490 }
491
492 fn num_downloaded_blocks(&self) -> usize {
493 self.chain_sync
494 .as_ref()
495 .map_or(0, |chain_sync| chain_sync.num_downloaded_blocks())
496 }
497
498 fn num_sync_requests(&self) -> usize {
499 self.chain_sync.as_ref().map_or(0, |chain_sync| chain_sync.num_sync_requests())
500 }
501
502 fn actions(&mut self) -> Result<Vec<SyncingAction<B>>, ClientError> {
503 let actions: Vec<_> = if let Some(ref mut warp) = self.warp {
506 warp.actions().map(Into::into).collect()
507 } else if let Some(ref mut state) = self.state {
508 state.actions().map(Into::into).collect()
509 } else if let Some(ref mut chain_sync) = self.chain_sync {
510 chain_sync.actions()?
511 } else {
512 unreachable!("At least one syncing strategy is always active; qed")
513 };
514
515 if actions.iter().any(SyncingAction::is_finished) {
516 self.proceed_to_next()?;
517 }
518
519 Ok(actions)
520 }
521}
522
523impl<B: BlockT, Client> PolkadotSyncingStrategy<B, Client>
524where
525 B: BlockT,
526 Client: HeaderBackend<B>
527 + BlockBackend<B>
528 + HeaderMetadata<B, Error = sp_blockchain::Error>
529 + ProofProvider<B>
530 + Send
531 + Sync
532 + 'static,
533{
534 pub fn new(
536 mut config: SyncingConfig,
537 client: Arc<Client>,
538 warp_sync_config: Option<WarpSyncConfig<B>>,
539 warp_sync_protocol_name: Option<ProtocolName>,
540 ) -> Result<Self, ClientError> {
541 if config.max_blocks_per_request > MAX_BLOCKS_IN_RESPONSE as u32 {
542 info!(
543 target: LOG_TARGET,
544 "clamping maximum blocks per request to {MAX_BLOCKS_IN_RESPONSE}",
545 );
546 config.max_blocks_per_request = MAX_BLOCKS_IN_RESPONSE as u32;
547 }
548
549 if let SyncMode::Warp = config.mode {
550 let warp_sync_config = warp_sync_config
551 .expect("Warp sync configuration must be supplied in warp sync mode.");
552 let warp_sync =
553 WarpSync::new(client.clone(), warp_sync_config, warp_sync_protocol_name);
554 Ok(Self {
555 config,
556 client,
557 warp: Some(warp_sync),
558 state: None,
559 chain_sync: None,
560 peer_best_blocks: Default::default(),
561 })
562 } else {
563 let chain_sync = ChainSync::new(
564 chain_sync_mode(config.mode),
565 client.clone(),
566 config.max_parallel_downloads,
567 config.max_blocks_per_request,
568 config.state_request_protocol_name.clone(),
569 config.metrics_registry.as_ref(),
570 std::iter::empty(),
571 )?;
572 Ok(Self {
573 config,
574 client,
575 warp: None,
576 state: None,
577 chain_sync: Some(chain_sync),
578 peer_best_blocks: Default::default(),
579 })
580 }
581 }
582
583 pub fn proceed_to_next(&mut self) -> Result<(), ClientError> {
585 if let Some(ref mut warp) = self.warp {
587 match warp.take_result() {
588 Some(res) => {
589 info!(
590 target: LOG_TARGET,
591 "Warp sync is complete, continuing with state sync."
592 );
593 let state_sync = StateStrategy::new(
594 self.client.clone(),
595 res.target_header,
596 res.target_body,
597 res.target_justifications,
598 false,
599 self.peer_best_blocks
600 .iter()
601 .map(|(peer_id, (_, best_number))| (*peer_id, *best_number)),
602 self.config.state_request_protocol_name.clone(),
603 );
604
605 self.warp = None;
606 self.state = Some(state_sync);
607 Ok(())
608 },
609 None => {
610 error!(
611 target: LOG_TARGET,
612 "Warp sync failed. Continuing with full sync."
613 );
614 let chain_sync = match ChainSync::new(
615 chain_sync_mode(self.config.mode),
616 self.client.clone(),
617 self.config.max_parallel_downloads,
618 self.config.max_blocks_per_request,
619 self.config.state_request_protocol_name.clone(),
620 self.config.metrics_registry.as_ref(),
621 self.peer_best_blocks.iter().map(|(peer_id, (best_hash, best_number))| {
622 (*peer_id, *best_hash, *best_number)
623 }),
624 ) {
625 Ok(chain_sync) => chain_sync,
626 Err(e) => {
627 error!(target: LOG_TARGET, "Failed to start `ChainSync`.");
628 return Err(e)
629 },
630 };
631
632 self.warp = None;
633 self.chain_sync = Some(chain_sync);
634 Ok(())
635 },
636 }
637 } else if let Some(state) = &self.state {
638 if state.is_succeeded() {
639 info!(target: LOG_TARGET, "State sync is complete, continuing with block sync.");
640 } else {
641 error!(target: LOG_TARGET, "State sync failed. Falling back to full sync.");
642 }
643 let chain_sync = match ChainSync::new(
644 chain_sync_mode(self.config.mode),
645 self.client.clone(),
646 self.config.max_parallel_downloads,
647 self.config.max_blocks_per_request,
648 self.config.state_request_protocol_name.clone(),
649 self.config.metrics_registry.as_ref(),
650 self.peer_best_blocks.iter().map(|(peer_id, (best_hash, best_number))| {
651 (*peer_id, *best_hash, *best_number)
652 }),
653 ) {
654 Ok(chain_sync) => chain_sync,
655 Err(e) => {
656 error!(target: LOG_TARGET, "Failed to start `ChainSync`.");
657 return Err(e);
658 },
659 };
660
661 self.state = None;
662 self.chain_sync = Some(chain_sync);
663 Ok(())
664 } else {
665 unreachable!("Only warp & state strategies can finish; qed")
666 }
667 }
668}