1use crate::{
23 block_announce_validator::{
24 BlockAnnounceValidationResult, BlockAnnounceValidator as BlockAnnounceValidatorStream,
25 },
26 block_relay_protocol::{BlockDownloader, BlockResponseError},
27 pending_responses::{PendingResponses, ResponseEvent},
28 schema::v1::{StateRequest, StateResponse},
29 service::{
30 self,
31 syncing_service::{SyncingService, ToServiceCommand},
32 },
33 strategy::{
34 warp::{EncodedProof, WarpProofRequest},
35 StrategyKey, SyncingAction, SyncingStrategy,
36 },
37 types::{
38 BadPeer, ExtendedPeerInfo, OpaqueStateRequest, OpaqueStateResponse, PeerRequest, SyncEvent,
39 },
40 LOG_TARGET,
41};
42
43use codec::{Decode, DecodeAll, Encode};
44use futures::{channel::oneshot, FutureExt, StreamExt};
45use log::{debug, error, trace, warn};
46use prometheus_endpoint::{
47 register, Counter, Gauge, MetricSource, Opts, PrometheusError, Registry, SourcedGauge, U64,
48};
49use prost::Message;
50use schnellru::{ByLength, LruMap};
51use tokio::time::{Interval, MissedTickBehavior};
52
53use sc_client_api::{BlockBackend, HeaderBackend, ProofProvider};
54use sc_consensus::{import_queue::ImportQueueService, IncomingBlock};
55use sc_network::{
56 config::{FullNetworkConfiguration, NotificationHandshake, ProtocolId, SetConfig},
57 peer_store::PeerStoreProvider,
58 request_responses::{IfDisconnected, OutboundFailure, RequestFailure},
59 service::{
60 traits::{Direction, NotificationConfig, NotificationEvent, ValidationResult},
61 NotificationMetrics,
62 },
63 types::ProtocolName,
64 utils::LruHashSet,
65 NetworkBackend, NotificationService, ReputationChange,
66};
67use sc_network_common::{
68 role::Roles,
69 sync::message::{BlockAnnounce, BlockAnnouncesHandshake, BlockRequest, BlockState},
70};
71use sc_network_types::PeerId;
72use sc_utils::mpsc::{tracing_unbounded, TracingUnboundedReceiver, TracingUnboundedSender};
73use sp_blockchain::{Error as ClientError, HeaderMetadata};
74use sp_consensus::{block_validation::BlockAnnounceValidator, BlockOrigin};
75use sp_runtime::{
76 traits::{Block as BlockT, Header, NumberFor, Zero},
77 Justifications,
78};
79
80use std::{
81 collections::{HashMap, HashSet},
82 iter,
83 num::NonZeroUsize,
84 sync::{
85 atomic::{AtomicBool, AtomicUsize, Ordering},
86 Arc,
87 },
88};
89
90const TICK_TIMEOUT: std::time::Duration = std::time::Duration::from_millis(1100);
92
93const MAX_KNOWN_BLOCKS: usize = 1024; const MAX_BLOCK_ANNOUNCE_SIZE: u64 = 1024 * 1024;
98
99mod rep {
100 use sc_network::ReputationChange as Rep;
101 pub const GENESIS_MISMATCH: Rep = Rep::new_fatal("Genesis mismatch");
103 pub const BAD_BLOCK_ANNOUNCEMENT: Rep = Rep::new(-(1 << 12), "Bad block announcement");
105 pub const BAD_MESSAGE: Rep = Rep::new(-(1 << 12), "Bad message");
107 pub const BAD_PROTOCOL: Rep = Rep::new_fatal("Unsupported protocol");
109 pub const REFUSED: Rep = Rep::new(-(1 << 10), "Request refused");
111 pub const TIMEOUT: Rep = Rep::new(-(1 << 10), "Request timeout");
113}
114
115struct Metrics {
116 peers: Gauge<U64>,
117 import_queue_blocks_submitted: Counter<U64>,
118 import_queue_justifications_submitted: Counter<U64>,
119}
120
121impl Metrics {
122 fn register(r: &Registry, major_syncing: Arc<AtomicBool>) -> Result<Self, PrometheusError> {
123 let _ = MajorSyncingGauge::register(r, major_syncing)?;
124 Ok(Self {
125 peers: {
126 let g = Gauge::new("substrate_sync_peers", "Number of peers we sync with")?;
127 register(g, r)?
128 },
129 import_queue_blocks_submitted: {
130 let c = Counter::new(
131 "substrate_sync_import_queue_blocks_submitted",
132 "Number of blocks submitted to the import queue.",
133 )?;
134 register(c, r)?
135 },
136 import_queue_justifications_submitted: {
137 let c = Counter::new(
138 "substrate_sync_import_queue_justifications_submitted",
139 "Number of justifications submitted to the import queue.",
140 )?;
141 register(c, r)?
142 },
143 })
144 }
145}
146
147#[derive(Clone)]
149pub struct MajorSyncingGauge(Arc<AtomicBool>);
150
151impl MajorSyncingGauge {
152 fn register(registry: &Registry, value: Arc<AtomicBool>) -> Result<(), PrometheusError> {
155 prometheus_endpoint::register(
156 SourcedGauge::new(
157 &Opts::new(
158 "substrate_sub_libp2p_is_major_syncing",
159 "Whether the node is performing a major sync or not.",
160 ),
161 MajorSyncingGauge(value),
162 )?,
163 registry,
164 )?;
165
166 Ok(())
167 }
168}
169
170impl MetricSource for MajorSyncingGauge {
171 type N = u64;
172
173 fn collect(&self, mut set: impl FnMut(&[&str], Self::N)) {
174 set(&[], self.0.load(Ordering::Relaxed) as u64);
175 }
176}
177
178#[derive(Debug)]
180pub struct Peer<B: BlockT> {
181 pub info: ExtendedPeerInfo<B>,
182 pub known_blocks: LruHashSet<B::Hash>,
184 inbound: bool,
186}
187
188pub struct SyncingEngine<B: BlockT, Client> {
189 strategy: Box<dyn SyncingStrategy<B>>,
191
192 client: Arc<Client>,
194
195 num_connected: Arc<AtomicUsize>,
197
198 is_major_syncing: Arc<AtomicBool>,
200
201 network_service: service::network::NetworkServiceHandle,
203
204 service_rx: TracingUnboundedReceiver<ToServiceCommand<B>>,
206
207 roles: Roles,
209
210 genesis_hash: B::Hash,
212
213 event_streams: Vec<TracingUnboundedSender<SyncEvent>>,
215
216 tick_timeout: Interval,
218
219 peers: HashMap<PeerId, Peer<B>>,
221
222 important_peers: HashSet<PeerId>,
225
226 default_peers_set_no_slot_connected_peers: HashSet<PeerId>,
228
229 default_peers_set_no_slot_peers: HashSet<PeerId>,
231
232 default_peers_set_num_full: usize,
235
236 default_peers_set_num_light: usize,
238
239 max_in_peers: usize,
241
242 num_in_peers: usize,
244
245 block_announce_validator: BlockAnnounceValidatorStream<B>,
247
248 block_announce_data_cache: LruMap<B::Hash, Vec<u8>>,
250
251 boot_node_ids: HashSet<PeerId>,
253
254 block_announce_protocol_name: ProtocolName,
256
257 metrics: Option<Metrics>,
259
260 notification_service: Box<dyn NotificationService>,
262
263 peer_store_handle: Arc<dyn PeerStoreProvider>,
265
266 pending_responses: PendingResponses<B>,
268
269 block_downloader: Arc<dyn BlockDownloader<B>>,
271
272 import_queue: Box<dyn ImportQueueService<B>>,
274}
275
276impl<B: BlockT, Client> SyncingEngine<B, Client>
277where
278 B: BlockT,
279 Client: HeaderBackend<B>
280 + BlockBackend<B>
281 + HeaderMetadata<B, Error = sp_blockchain::Error>
282 + ProofProvider<B>
283 + Send
284 + Sync
285 + 'static,
286{
287 pub fn new<N>(
288 roles: Roles,
289 client: Arc<Client>,
290 metrics_registry: Option<&Registry>,
291 network_metrics: NotificationMetrics,
292 net_config: &FullNetworkConfiguration<B, <B as BlockT>::Hash, N>,
293 protocol_id: ProtocolId,
294 fork_id: &Option<String>,
295 block_announce_validator: Box<dyn BlockAnnounceValidator<B> + Send>,
296 syncing_strategy: Box<dyn SyncingStrategy<B>>,
297 network_service: service::network::NetworkServiceHandle,
298 import_queue: Box<dyn ImportQueueService<B>>,
299 block_downloader: Arc<dyn BlockDownloader<B>>,
300 peer_store_handle: Arc<dyn PeerStoreProvider>,
301 ) -> Result<(Self, SyncingService<B>, N::NotificationProtocolConfig), ClientError>
302 where
303 N: NetworkBackend<B, <B as BlockT>::Hash>,
304 {
305 let cache_capacity = (net_config.network_config.default_peers_set.in_peers +
306 net_config.network_config.default_peers_set.out_peers)
307 .max(1);
308 let important_peers = {
309 let mut imp_p = HashSet::new();
310 for reserved in &net_config.network_config.default_peers_set.reserved_nodes {
311 imp_p.insert(reserved.peer_id);
312 }
313 for config in net_config.notification_protocols() {
314 let peer_ids = config.set_config().reserved_nodes.iter().map(|info| info.peer_id);
315 imp_p.extend(peer_ids);
316 }
317
318 imp_p.shrink_to_fit();
319 imp_p
320 };
321 let boot_node_ids = {
322 let mut list = HashSet::new();
323 for node in &net_config.network_config.boot_nodes {
324 list.insert(node.peer_id);
325 }
326 list.shrink_to_fit();
327 list
328 };
329 let default_peers_set_no_slot_peers = {
330 let mut no_slot_p: HashSet<PeerId> = net_config
331 .network_config
332 .default_peers_set
333 .reserved_nodes
334 .iter()
335 .map(|reserved| reserved.peer_id)
336 .collect();
337 no_slot_p.shrink_to_fit();
338 no_slot_p
339 };
340 let default_peers_set_num_full =
341 net_config.network_config.default_peers_set_num_full as usize;
342 let default_peers_set_num_light = {
343 let total = net_config.network_config.default_peers_set.out_peers +
344 net_config.network_config.default_peers_set.in_peers;
345 total.saturating_sub(net_config.network_config.default_peers_set_num_full) as usize
346 };
347
348 let info = client.info();
349
350 let (block_announce_config, notification_service) =
351 Self::get_block_announce_proto_config::<N>(
352 protocol_id,
353 fork_id,
354 roles,
355 info.best_number,
356 info.best_hash,
357 info.genesis_hash,
358 &net_config.network_config.default_peers_set,
359 network_metrics,
360 Arc::clone(&peer_store_handle),
361 );
362
363 let block_announce_protocol_name = block_announce_config.protocol_name().clone();
364 let (tx, service_rx) = tracing_unbounded("mpsc_chain_sync", 100_000);
365 let num_connected = Arc::new(AtomicUsize::new(0));
366 let is_major_syncing = Arc::new(AtomicBool::new(false));
367
368 let max_full_peers = net_config.network_config.default_peers_set_num_full;
371 let max_out_peers = net_config.network_config.default_peers_set.out_peers;
372 let max_in_peers = (max_full_peers - max_out_peers) as usize;
373
374 let tick_timeout = {
375 let mut interval = tokio::time::interval(TICK_TIMEOUT);
376 interval.set_missed_tick_behavior(MissedTickBehavior::Delay);
377 interval
378 };
379
380 Ok((
381 Self {
382 roles,
383 client,
384 strategy: syncing_strategy,
385 network_service,
386 peers: HashMap::new(),
387 block_announce_data_cache: LruMap::new(ByLength::new(cache_capacity)),
388 block_announce_protocol_name,
389 block_announce_validator: BlockAnnounceValidatorStream::new(
390 block_announce_validator,
391 ),
392 num_connected: num_connected.clone(),
393 is_major_syncing: is_major_syncing.clone(),
394 service_rx,
395 genesis_hash: info.genesis_hash,
396 important_peers,
397 default_peers_set_no_slot_connected_peers: HashSet::new(),
398 boot_node_ids,
399 default_peers_set_no_slot_peers,
400 default_peers_set_num_full,
401 default_peers_set_num_light,
402 num_in_peers: 0usize,
403 max_in_peers,
404 event_streams: Vec::new(),
405 notification_service,
406 tick_timeout,
407 peer_store_handle,
408 metrics: if let Some(r) = metrics_registry {
409 match Metrics::register(r, is_major_syncing.clone()) {
410 Ok(metrics) => Some(metrics),
411 Err(err) => {
412 log::error!(target: LOG_TARGET, "Failed to register metrics {err:?}");
413 None
414 },
415 }
416 } else {
417 None
418 },
419 pending_responses: PendingResponses::new(),
420 block_downloader,
421 import_queue,
422 },
423 SyncingService::new(tx, num_connected, is_major_syncing),
424 block_announce_config,
425 ))
426 }
427
428 fn update_peer_info(
429 &mut self,
430 peer_id: &PeerId,
431 best_hash: B::Hash,
432 best_number: NumberFor<B>,
433 ) {
434 if let Some(ref mut peer) = self.peers.get_mut(peer_id) {
435 peer.info.best_hash = best_hash;
436 peer.info.best_number = best_number;
437 }
438 }
439
440 fn process_block_announce_validation_result(
442 &mut self,
443 validation_result: BlockAnnounceValidationResult<B::Header>,
444 ) {
445 match validation_result {
446 BlockAnnounceValidationResult::Skip { peer_id: _ } => {},
447 BlockAnnounceValidationResult::Process { is_new_best, peer_id, announce } => {
448 if let Some((best_hash, best_number)) =
449 self.strategy.on_validated_block_announce(is_new_best, peer_id, &announce)
450 {
451 self.update_peer_info(&peer_id, best_hash, best_number);
452 }
453
454 if let Some(data) = announce.data {
455 if !data.is_empty() {
456 self.block_announce_data_cache.insert(announce.header.hash(), data);
457 }
458 }
459 },
460 BlockAnnounceValidationResult::Failure { peer_id, disconnect } => {
461 if disconnect {
462 log::debug!(
463 target: LOG_TARGET,
464 "Disconnecting peer {peer_id} due to block announce validation failure",
465 );
466 self.network_service
467 .disconnect_peer(peer_id, self.block_announce_protocol_name.clone());
468 }
469
470 self.network_service.report_peer(peer_id, rep::BAD_BLOCK_ANNOUNCEMENT);
471 },
472 }
473 }
474
475 pub fn push_block_announce_validation(
477 &mut self,
478 peer_id: PeerId,
479 announce: BlockAnnounce<B::Header>,
480 ) {
481 let hash = announce.header.hash();
482
483 let peer = match self.peers.get_mut(&peer_id) {
484 Some(p) => p,
485 None => {
486 log::error!(
487 target: LOG_TARGET,
488 "Received block announce from disconnected peer {peer_id}",
489 );
490 debug_assert!(false);
491 return;
492 },
493 };
494 peer.known_blocks.insert(hash);
495
496 if peer.info.roles.is_full() {
497 let is_best = match announce.state.unwrap_or(BlockState::Best) {
498 BlockState::Best => true,
499 BlockState::Normal => false,
500 };
501
502 self.block_announce_validator
503 .push_block_announce_validation(peer_id, hash, announce, is_best);
504 }
505 }
506
507 pub fn announce_block(&mut self, hash: B::Hash, data: Option<Vec<u8>>) {
512 let header = match self.client.header(hash) {
513 Ok(Some(header)) => header,
514 Ok(None) => {
515 log::warn!(target: LOG_TARGET, "Trying to announce unknown block: {hash}");
516 return;
517 },
518 Err(e) => {
519 log::warn!(target: LOG_TARGET, "Error reading block header {hash}: {e}");
520 return;
521 },
522 };
523
524 if header.number().is_zero() {
526 return;
527 }
528
529 let is_best = self.client.info().best_hash == hash;
530 log::debug!(target: LOG_TARGET, "Reannouncing block {hash:?} is_best: {is_best}");
531
532 let data = data
533 .or_else(|| self.block_announce_data_cache.get(&hash).cloned())
534 .unwrap_or_default();
535
536 for (peer_id, ref mut peer) in self.peers.iter_mut() {
537 let inserted = peer.known_blocks.insert(hash);
538 if inserted {
539 log::trace!(target: LOG_TARGET, "Announcing block {hash:?} to {peer_id}");
540 let message = BlockAnnounce {
541 header: header.clone(),
542 state: if is_best { Some(BlockState::Best) } else { Some(BlockState::Normal) },
543 data: Some(data.clone()),
544 };
545
546 let _ = self.notification_service.send_sync_notification(peer_id, message.encode());
547 }
548 }
549 }
550
551 pub async fn run(mut self) {
552 loop {
553 tokio::select! {
554 _ = self.tick_timeout.tick() => {
555 },
559 command = self.service_rx.select_next_some() =>
560 self.process_service_command(command),
561 notification_event = self.notification_service.next_event() => match notification_event {
562 Some(event) => self.process_notification_event(event),
563 None => return,
564 },
565 response_event = self.pending_responses.select_next_some() =>
566 self.process_response_event(response_event),
567 validation_result = self.block_announce_validator.select_next_some() =>
568 self.process_block_announce_validation_result(validation_result),
569 }
570
571 self.is_major_syncing.store(self.strategy.is_major_syncing(), Ordering::Relaxed);
573
574 if let Err(e) = self.process_strategy_actions() {
576 error!(
577 target: LOG_TARGET,
578 "Terminating `SyncingEngine` due to fatal error: {e:?}.",
579 );
580 return;
581 }
582 }
583 }
584
585 fn process_strategy_actions(&mut self) -> Result<(), ClientError> {
586 for action in self.strategy.actions()? {
587 match action {
588 SyncingAction::SendBlockRequest { peer_id, key, request } => {
589 let removed = self.pending_responses.remove(peer_id, key);
592 self.send_block_request(peer_id, key, request.clone());
593
594 if removed {
595 warn!(
596 target: LOG_TARGET,
597 "Processed `ChainSyncAction::SendBlockRequest` to {} from {:?} with {:?}. \
598 Stale response removed!",
599 peer_id,
600 key,
601 request,
602 )
603 } else {
604 trace!(
605 target: LOG_TARGET,
606 "Processed `ChainSyncAction::SendBlockRequest` to {} from {:?} with {:?}.",
607 peer_id,
608 key,
609 request,
610 )
611 }
612 },
613 SyncingAction::CancelRequest { peer_id, key } => {
614 let removed = self.pending_responses.remove(peer_id, key);
615
616 trace!(
617 target: LOG_TARGET,
618 "Processed {action:?}, response removed: {removed}.",
619 );
620 },
621 SyncingAction::SendStateRequest { peer_id, key, protocol_name, request } => {
622 self.send_state_request(peer_id, key, protocol_name, request);
623
624 trace!(
625 target: LOG_TARGET,
626 "Processed `ChainSyncAction::SendStateRequest` to {peer_id}.",
627 );
628 },
629 SyncingAction::SendWarpProofRequest { peer_id, key, protocol_name, request } => {
630 self.send_warp_proof_request(peer_id, key, protocol_name, request.clone());
631
632 trace!(
633 target: LOG_TARGET,
634 "Processed `ChainSyncAction::SendWarpProofRequest` to {}, request: {:?}.",
635 peer_id,
636 request,
637 );
638 },
639 SyncingAction::DropPeer(BadPeer(peer_id, rep)) => {
640 self.pending_responses.remove_all(&peer_id);
641 self.network_service
642 .disconnect_peer(peer_id, self.block_announce_protocol_name.clone());
643 self.network_service.report_peer(peer_id, rep);
644
645 trace!(target: LOG_TARGET, "{peer_id:?} dropped: {rep:?}.");
646 },
647 SyncingAction::ImportBlocks { origin, blocks } => {
648 let count = blocks.len();
649 self.import_blocks(origin, blocks);
650
651 trace!(
652 target: LOG_TARGET,
653 "Processed `ChainSyncAction::ImportBlocks` with {count} blocks.",
654 );
655 },
656 SyncingAction::ImportJustifications { peer_id, hash, number, justifications } => {
657 self.import_justifications(peer_id, hash, number, justifications);
658
659 trace!(
660 target: LOG_TARGET,
661 "Processed `ChainSyncAction::ImportJustifications` from peer {} for block {} ({}).",
662 peer_id,
663 hash,
664 number,
665 )
666 },
667 SyncingAction::Finished => {},
669 }
670 }
671
672 Ok(())
673 }
674
675 fn process_service_command(&mut self, command: ToServiceCommand<B>) {
676 match command {
677 ToServiceCommand::SetSyncForkRequest(peers, hash, number) => {
678 self.strategy.set_sync_fork_request(peers, &hash, number);
679 },
680 ToServiceCommand::EventStream(tx) => self.event_streams.push(tx),
681 ToServiceCommand::RequestJustification(hash, number) =>
682 self.strategy.request_justification(&hash, number),
683 ToServiceCommand::ClearJustificationRequests =>
684 self.strategy.clear_justification_requests(),
685 ToServiceCommand::BlocksProcessed(imported, count, results) => {
686 self.strategy.on_blocks_processed(imported, count, results);
687 },
688 ToServiceCommand::JustificationImported(peer_id, hash, number, success) => {
689 self.strategy.on_justification_import(hash, number, success);
690 if !success {
691 log::info!(
692 target: LOG_TARGET,
693 "💔 Invalid justification provided by {peer_id} for #{hash}",
694 );
695 self.network_service
696 .disconnect_peer(peer_id, self.block_announce_protocol_name.clone());
697 self.network_service
698 .report_peer(peer_id, ReputationChange::new_fatal("Invalid justification"));
699 }
700 },
701 ToServiceCommand::AnnounceBlock(hash, data) => self.announce_block(hash, data),
702 ToServiceCommand::NewBestBlockImported(hash, number) => {
703 log::debug!(target: LOG_TARGET, "New best block imported {:?}/#{}", hash, number);
704
705 self.strategy.update_chain_info(&hash, number);
706 let _ = self.notification_service.try_set_handshake(
707 BlockAnnouncesHandshake::<B>::build(
708 self.roles,
709 number,
710 hash,
711 self.genesis_hash,
712 )
713 .encode(),
714 );
715 },
716 ToServiceCommand::Status(tx) => {
717 let _ = tx.send(self.strategy.status());
718 },
719 ToServiceCommand::NumActivePeers(tx) => {
720 let _ = tx.send(self.num_active_peers());
721 },
722 ToServiceCommand::NumDownloadedBlocks(tx) => {
723 let _ = tx.send(self.strategy.num_downloaded_blocks());
724 },
725 ToServiceCommand::NumSyncRequests(tx) => {
726 let _ = tx.send(self.strategy.num_sync_requests());
727 },
728 ToServiceCommand::PeersInfo(tx) => {
729 let peers_info =
730 self.peers.iter().map(|(peer_id, peer)| (*peer_id, peer.info)).collect();
731 let _ = tx.send(peers_info);
732 },
733 ToServiceCommand::OnBlockFinalized(hash, header) =>
734 self.strategy.on_block_finalized(&hash, *header.number()),
735 }
736 }
737
738 fn process_notification_event(&mut self, event: NotificationEvent) {
739 match event {
740 NotificationEvent::ValidateInboundSubstream { peer, handshake, result_tx } => {
741 let validation_result = self
742 .validate_connection(&peer, handshake, Direction::Inbound)
743 .map_or(ValidationResult::Reject, |_| ValidationResult::Accept);
744
745 let _ = result_tx.send(validation_result);
746 },
747 NotificationEvent::NotificationStreamOpened { peer, handshake, direction, .. } => {
748 log::debug!(
749 target: LOG_TARGET,
750 "Substream opened for {peer}, handshake {handshake:?}"
751 );
752
753 match self.validate_connection(&peer, handshake, direction) {
754 Ok(handshake) => {
755 if self.on_sync_peer_connected(peer, &handshake, direction).is_err() {
756 log::debug!(target: LOG_TARGET, "Failed to register peer {peer}");
757 self.network_service
758 .disconnect_peer(peer, self.block_announce_protocol_name.clone());
759 }
760 },
761 Err(wrong_genesis) => {
762 log::debug!(target: LOG_TARGET, "`SyncingEngine` rejected {peer}");
763
764 if wrong_genesis {
765 self.peer_store_handle.report_peer(peer, rep::GENESIS_MISMATCH);
766 }
767
768 self.network_service
769 .disconnect_peer(peer, self.block_announce_protocol_name.clone());
770 },
771 }
772 },
773 NotificationEvent::NotificationStreamClosed { peer } => {
774 self.on_sync_peer_disconnected(peer);
775 },
776 NotificationEvent::NotificationReceived { peer, notification } => {
777 if !self.peers.contains_key(&peer) {
778 log::error!(
779 target: LOG_TARGET,
780 "received notification from {peer} who had been earlier refused by `SyncingEngine`",
781 );
782 return;
783 }
784
785 let Ok(announce) = BlockAnnounce::decode(&mut notification.as_ref()) else {
786 log::warn!(target: LOG_TARGET, "failed to decode block announce");
787 return;
788 };
789
790 self.push_block_announce_validation(peer, announce);
791 },
792 }
793 }
794
795 fn on_sync_peer_disconnected(&mut self, peer_id: PeerId) {
799 let Some(info) = self.peers.remove(&peer_id) else {
800 log::debug!(target: LOG_TARGET, "{peer_id} does not exist in `SyncingEngine`");
801 return;
802 };
803 if let Some(metrics) = &self.metrics {
804 metrics.peers.dec();
805 }
806 self.num_connected.fetch_sub(1, Ordering::AcqRel);
807
808 if self.important_peers.contains(&peer_id) {
809 log::warn!(target: LOG_TARGET, "Reserved peer {peer_id} disconnected");
810 } else {
811 log::debug!(target: LOG_TARGET, "{peer_id} disconnected");
812 }
813
814 if !self.default_peers_set_no_slot_connected_peers.remove(&peer_id) &&
815 info.inbound &&
816 info.info.roles.is_full()
817 {
818 match self.num_in_peers.checked_sub(1) {
819 Some(value) => {
820 self.num_in_peers = value;
821 },
822 None => {
823 log::error!(
824 target: LOG_TARGET,
825 "trying to disconnect an inbound node which is not counted as inbound"
826 );
827 debug_assert!(false);
828 },
829 }
830 }
831
832 self.strategy.remove_peer(&peer_id);
833 self.pending_responses.remove_all(&peer_id);
834 self.event_streams
835 .retain(|stream| stream.unbounded_send(SyncEvent::PeerDisconnected(peer_id)).is_ok());
836 }
837
838 fn validate_handshake(
840 &mut self,
841 peer_id: &PeerId,
842 handshake: Vec<u8>,
843 ) -> Result<BlockAnnouncesHandshake<B>, bool> {
844 log::trace!(target: LOG_TARGET, "Validate handshake for {peer_id}");
845
846 let handshake = <BlockAnnouncesHandshake<B> as DecodeAll>::decode_all(&mut &handshake[..])
847 .map_err(|error| {
848 log::debug!(target: LOG_TARGET, "Failed to decode handshake for {peer_id}: {error:?}");
849 false
850 })?;
851
852 if handshake.genesis_hash != self.genesis_hash {
853 if self.important_peers.contains(&peer_id) {
854 log::error!(
855 target: LOG_TARGET,
856 "Reserved peer id `{peer_id}` is on a different chain (our genesis: {} theirs: {})",
857 self.genesis_hash,
858 handshake.genesis_hash,
859 );
860 } else if self.boot_node_ids.contains(&peer_id) {
861 log::error!(
862 target: LOG_TARGET,
863 "Bootnode with peer id `{peer_id}` is on a different chain (our genesis: {} theirs: {})",
864 self.genesis_hash,
865 handshake.genesis_hash,
866 );
867 } else {
868 log::debug!(
869 target: LOG_TARGET,
870 "Peer is on different chain (our genesis: {} theirs: {})",
871 self.genesis_hash,
872 handshake.genesis_hash
873 );
874 }
875
876 return Err(true);
877 }
878
879 Ok(handshake)
880 }
881
882 fn validate_connection(
896 &mut self,
897 peer_id: &PeerId,
898 handshake: Vec<u8>,
899 direction: Direction,
900 ) -> Result<BlockAnnouncesHandshake<B>, bool> {
901 log::trace!(target: LOG_TARGET, "New peer {peer_id} {handshake:?}");
902
903 let handshake = self.validate_handshake(peer_id, handshake)?;
904
905 if self.peers.contains_key(&peer_id) {
906 log::error!(
907 target: LOG_TARGET,
908 "Called `validate_connection()` with already connected peer {peer_id}",
909 );
910 debug_assert!(false);
911 return Err(false);
912 }
913
914 let no_slot_peer = self.default_peers_set_no_slot_peers.contains(&peer_id);
915 let this_peer_reserved_slot: usize = if no_slot_peer { 1 } else { 0 };
916
917 if handshake.roles.is_full() &&
918 self.strategy.num_peers() >=
919 self.default_peers_set_num_full +
920 self.default_peers_set_no_slot_connected_peers.len() +
921 this_peer_reserved_slot
922 {
923 log::debug!(target: LOG_TARGET, "Too many full nodes, rejecting {peer_id}");
924 return Err(false);
925 }
926
927 if !no_slot_peer &&
929 handshake.roles.is_full() &&
930 direction.is_inbound() &&
931 self.num_in_peers == self.max_in_peers
932 {
933 log::debug!(target: LOG_TARGET, "All inbound slots have been consumed, rejecting {peer_id}");
934 return Err(false);
935 }
936
937 if handshake.roles.is_light() &&
942 (self.peers.len() - self.strategy.num_peers()) >= self.default_peers_set_num_light
943 {
944 log::debug!(target: LOG_TARGET, "Too many light nodes, rejecting {peer_id}");
945 return Err(false);
946 }
947
948 Ok(handshake)
949 }
950
951 fn on_sync_peer_connected(
957 &mut self,
958 peer_id: PeerId,
959 status: &BlockAnnouncesHandshake<B>,
960 direction: Direction,
961 ) -> Result<(), ()> {
962 log::trace!(target: LOG_TARGET, "New peer {peer_id} {status:?}");
963
964 let peer = Peer {
965 info: ExtendedPeerInfo {
966 roles: status.roles,
967 best_hash: status.best_hash,
968 best_number: status.best_number,
969 },
970 known_blocks: LruHashSet::new(
971 NonZeroUsize::new(MAX_KNOWN_BLOCKS).expect("Constant is nonzero"),
972 ),
973 inbound: direction.is_inbound(),
974 };
975
976 if status.roles.is_full() {
978 self.strategy.add_peer(peer_id, peer.info.best_hash, peer.info.best_number);
979 }
980
981 log::debug!(target: LOG_TARGET, "Connected {peer_id}");
982
983 if self.peers.insert(peer_id, peer).is_none() {
984 if let Some(metrics) = &self.metrics {
985 metrics.peers.inc();
986 }
987 self.num_connected.fetch_add(1, Ordering::AcqRel);
988 }
989 self.peer_store_handle.set_peer_role(&peer_id, status.roles.into());
990
991 if self.default_peers_set_no_slot_peers.contains(&peer_id) {
992 self.default_peers_set_no_slot_connected_peers.insert(peer_id);
993 } else if direction.is_inbound() && status.roles.is_full() {
994 self.num_in_peers += 1;
995 }
996
997 self.event_streams
998 .retain(|stream| stream.unbounded_send(SyncEvent::PeerConnected(peer_id)).is_ok());
999
1000 Ok(())
1001 }
1002
1003 fn send_block_request(&mut self, peer_id: PeerId, key: StrategyKey, request: BlockRequest<B>) {
1004 if !self.peers.contains_key(&peer_id) {
1005 trace!(target: LOG_TARGET, "Cannot send block request to unknown peer {peer_id}");
1006 debug_assert!(false);
1007 return;
1008 }
1009
1010 let downloader = self.block_downloader.clone();
1011
1012 self.pending_responses.insert(
1013 peer_id,
1014 key,
1015 PeerRequest::Block(request.clone()),
1016 async move { downloader.download_blocks(peer_id, request).await }.boxed(),
1017 );
1018 }
1019
1020 fn send_state_request(
1021 &mut self,
1022 peer_id: PeerId,
1023 key: StrategyKey,
1024 protocol_name: ProtocolName,
1025 request: OpaqueStateRequest,
1026 ) {
1027 if !self.peers.contains_key(&peer_id) {
1028 trace!(target: LOG_TARGET, "Cannot send state request to unknown peer {peer_id}");
1029 debug_assert!(false);
1030 return;
1031 }
1032
1033 let (tx, rx) = oneshot::channel();
1034
1035 self.pending_responses.insert(peer_id, key, PeerRequest::State, rx.boxed());
1036
1037 match Self::encode_state_request(&request) {
1038 Ok(data) => {
1039 self.network_service.start_request(
1040 peer_id,
1041 protocol_name,
1042 data,
1043 tx,
1044 IfDisconnected::ImmediateError,
1045 );
1046 },
1047 Err(err) => {
1048 log::warn!(
1049 target: LOG_TARGET,
1050 "Failed to encode state request {request:?}: {err:?}",
1051 );
1052 },
1053 }
1054 }
1055
1056 fn send_warp_proof_request(
1057 &mut self,
1058 peer_id: PeerId,
1059 key: StrategyKey,
1060 protocol_name: ProtocolName,
1061 request: WarpProofRequest<B>,
1062 ) {
1063 if !self.peers.contains_key(&peer_id) {
1064 trace!(target: LOG_TARGET, "Cannot send warp proof request to unknown peer {peer_id}");
1065 debug_assert!(false);
1066 return;
1067 }
1068
1069 let (tx, rx) = oneshot::channel();
1070
1071 self.pending_responses.insert(peer_id, key, PeerRequest::WarpProof, rx.boxed());
1072
1073 self.network_service.start_request(
1074 peer_id,
1075 protocol_name,
1076 request.encode(),
1077 tx,
1078 IfDisconnected::ImmediateError,
1079 );
1080 }
1081
1082 fn encode_state_request(request: &OpaqueStateRequest) -> Result<Vec<u8>, String> {
1083 let request: &StateRequest = request.0.downcast_ref().ok_or_else(|| {
1084 "Failed to downcast opaque state response during encoding, this is an \
1085 implementation bug."
1086 .to_string()
1087 })?;
1088
1089 Ok(request.encode_to_vec())
1090 }
1091
1092 fn decode_state_response(response: &[u8]) -> Result<OpaqueStateResponse, String> {
1093 let response = StateResponse::decode(response)
1094 .map_err(|error| format!("Failed to decode state response: {error}"))?;
1095
1096 Ok(OpaqueStateResponse(Box::new(response)))
1097 }
1098
1099 fn process_response_event(&mut self, response_event: ResponseEvent<B>) {
1100 let ResponseEvent { peer_id, key, request, response } = response_event;
1101
1102 match response {
1103 Ok(Ok((resp, _))) => match request {
1104 PeerRequest::Block(req) => {
1105 match self.block_downloader.block_response_into_blocks(&req, resp) {
1106 Ok(blocks) => {
1107 self.strategy.on_block_response(peer_id, key, req, blocks);
1108 },
1109 Err(BlockResponseError::DecodeFailed(e)) => {
1110 debug!(
1111 target: LOG_TARGET,
1112 "Failed to decode block response from peer {:?}: {:?}.",
1113 peer_id,
1114 e
1115 );
1116 self.network_service.report_peer(peer_id, rep::BAD_MESSAGE);
1117 self.network_service.disconnect_peer(
1118 peer_id,
1119 self.block_announce_protocol_name.clone(),
1120 );
1121 return;
1122 },
1123 Err(BlockResponseError::ExtractionFailed(e)) => {
1124 debug!(
1125 target: LOG_TARGET,
1126 "Failed to extract blocks from peer response {:?}: {:?}.",
1127 peer_id,
1128 e
1129 );
1130 self.network_service.report_peer(peer_id, rep::BAD_MESSAGE);
1131 return;
1132 },
1133 }
1134 },
1135 PeerRequest::State => {
1136 let response = match Self::decode_state_response(&resp[..]) {
1137 Ok(proto) => proto,
1138 Err(e) => {
1139 debug!(
1140 target: LOG_TARGET,
1141 "Failed to decode state response from peer {peer_id:?}: {e:?}.",
1142 );
1143 self.network_service.report_peer(peer_id, rep::BAD_MESSAGE);
1144 self.network_service.disconnect_peer(
1145 peer_id,
1146 self.block_announce_protocol_name.clone(),
1147 );
1148 return;
1149 },
1150 };
1151
1152 self.strategy.on_state_response(peer_id, key, response);
1153 },
1154 PeerRequest::WarpProof => {
1155 self.strategy.on_warp_proof_response(&peer_id, key, EncodedProof(resp));
1156 },
1157 },
1158 Ok(Err(e)) => {
1159 debug!(target: LOG_TARGET, "Request to peer {peer_id:?} failed: {e:?}.");
1160
1161 match e {
1162 RequestFailure::Network(OutboundFailure::Timeout) => {
1163 self.network_service.report_peer(peer_id, rep::TIMEOUT);
1164 self.network_service
1165 .disconnect_peer(peer_id, self.block_announce_protocol_name.clone());
1166 },
1167 RequestFailure::Network(OutboundFailure::UnsupportedProtocols) => {
1168 self.network_service.report_peer(peer_id, rep::BAD_PROTOCOL);
1169 self.network_service
1170 .disconnect_peer(peer_id, self.block_announce_protocol_name.clone());
1171 },
1172 RequestFailure::Network(OutboundFailure::DialFailure) => {
1173 self.network_service
1174 .disconnect_peer(peer_id, self.block_announce_protocol_name.clone());
1175 },
1176 RequestFailure::Refused => {
1177 self.network_service.report_peer(peer_id, rep::REFUSED);
1178 self.network_service
1179 .disconnect_peer(peer_id, self.block_announce_protocol_name.clone());
1180 },
1181 RequestFailure::Network(OutboundFailure::ConnectionClosed) |
1182 RequestFailure::NotConnected => {
1183 self.network_service
1184 .disconnect_peer(peer_id, self.block_announce_protocol_name.clone());
1185 },
1186 RequestFailure::UnknownProtocol => {
1187 debug_assert!(false, "Block request protocol should always be known.");
1188 },
1189 RequestFailure::Obsolete => {
1190 debug_assert!(
1191 false,
1192 "Can not receive `RequestFailure::Obsolete` after dropping the \
1193 response receiver.",
1194 );
1195 },
1196 }
1197 },
1198 Err(oneshot::Canceled) => {
1199 trace!(
1200 target: LOG_TARGET,
1201 "Request to peer {peer_id:?} failed due to oneshot being canceled.",
1202 );
1203 self.network_service
1204 .disconnect_peer(peer_id, self.block_announce_protocol_name.clone());
1205 },
1206 }
1207 }
1208
1209 fn num_active_peers(&self) -> usize {
1211 self.pending_responses.len()
1212 }
1213
1214 fn get_block_announce_proto_config<N: NetworkBackend<B, <B as BlockT>::Hash>>(
1216 protocol_id: ProtocolId,
1217 fork_id: &Option<String>,
1218 roles: Roles,
1219 best_number: NumberFor<B>,
1220 best_hash: B::Hash,
1221 genesis_hash: B::Hash,
1222 set_config: &SetConfig,
1223 metrics: NotificationMetrics,
1224 peer_store_handle: Arc<dyn PeerStoreProvider>,
1225 ) -> (N::NotificationProtocolConfig, Box<dyn NotificationService>) {
1226 let block_announces_protocol = {
1227 let genesis_hash = genesis_hash.as_ref();
1228 if let Some(ref fork_id) = fork_id {
1229 format!(
1230 "/{}/{}/block-announces/1",
1231 array_bytes::bytes2hex("", genesis_hash),
1232 fork_id
1233 )
1234 } else {
1235 format!("/{}/block-announces/1", array_bytes::bytes2hex("", genesis_hash))
1236 }
1237 };
1238
1239 N::notification_config(
1240 block_announces_protocol.into(),
1241 iter::once(format!("/{}/block-announces/1", protocol_id.as_ref()).into()).collect(),
1242 MAX_BLOCK_ANNOUNCE_SIZE,
1243 Some(NotificationHandshake::new(BlockAnnouncesHandshake::<B>::build(
1244 roles,
1245 best_number,
1246 best_hash,
1247 genesis_hash,
1248 ))),
1249 set_config.clone(),
1250 metrics,
1251 peer_store_handle,
1252 )
1253 }
1254
1255 fn import_blocks(&mut self, origin: BlockOrigin, blocks: Vec<IncomingBlock<B>>) {
1257 if let Some(metrics) = &self.metrics {
1258 metrics.import_queue_blocks_submitted.inc();
1259 }
1260
1261 self.import_queue.import_blocks(origin, blocks);
1262 }
1263
1264 fn import_justifications(
1266 &mut self,
1267 peer_id: PeerId,
1268 hash: B::Hash,
1269 number: NumberFor<B>,
1270 justifications: Justifications,
1271 ) {
1272 if let Some(metrics) = &self.metrics {
1273 metrics.import_queue_justifications_submitted.inc();
1274 }
1275
1276 self.import_queue.import_justifications(peer_id, hash, number, justifications);
1277 }
1278}