sc_network_sync/
engine.rs

1// This file is part of Substrate.
2
3// Copyright (C) Parity Technologies (UK) Ltd.
4// SPDX-License-Identifier: GPL-3.0-or-later WITH Classpath-exception-2.0
5
6// This program is free software: you can redistribute it and/or modify
7// it under the terms of the GNU General Public License as published by
8// the Free Software Foundation, either version 3 of the License, or
9// (at your option) any later version.
10
11// This program is distributed in the hope that it will be useful,
12// but WITHOUT ANY WARRANTY; without even the implied warranty of
13// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
14// GNU General Public License for more details.
15
16// You should have received a copy of the GNU General Public License
17// along with this program. If not, see <https://www.gnu.org/licenses/>.
18
19//! `SyncingEngine` is the actor responsible for syncing Substrate chain
20//! to tip and keep the blockchain up to date with network updates.
21
22use crate::{
23	block_announce_validator::{
24		BlockAnnounceValidationResult, BlockAnnounceValidator as BlockAnnounceValidatorStream,
25	},
26	block_relay_protocol::{BlockDownloader, BlockResponseError},
27	pending_responses::{PendingResponses, ResponseEvent},
28	schema::v1::{StateRequest, StateResponse},
29	service::{
30		self,
31		syncing_service::{SyncingService, ToServiceCommand},
32	},
33	strategy::{
34		warp::{EncodedProof, WarpProofRequest},
35		StrategyKey, SyncingAction, SyncingStrategy,
36	},
37	types::{
38		BadPeer, ExtendedPeerInfo, OpaqueStateRequest, OpaqueStateResponse, PeerRequest, SyncEvent,
39	},
40	LOG_TARGET,
41};
42
43use codec::{Decode, DecodeAll, Encode};
44use futures::{channel::oneshot, FutureExt, StreamExt};
45use log::{debug, error, trace, warn};
46use prometheus_endpoint::{
47	register, Counter, Gauge, MetricSource, Opts, PrometheusError, Registry, SourcedGauge, U64,
48};
49use prost::Message;
50use schnellru::{ByLength, LruMap};
51use tokio::time::{Interval, MissedTickBehavior};
52
53use sc_client_api::{BlockBackend, HeaderBackend, ProofProvider};
54use sc_consensus::{import_queue::ImportQueueService, IncomingBlock};
55use sc_network::{
56	config::{FullNetworkConfiguration, NotificationHandshake, ProtocolId, SetConfig},
57	peer_store::PeerStoreProvider,
58	request_responses::{IfDisconnected, OutboundFailure, RequestFailure},
59	service::{
60		traits::{Direction, NotificationConfig, NotificationEvent, ValidationResult},
61		NotificationMetrics,
62	},
63	types::ProtocolName,
64	utils::LruHashSet,
65	NetworkBackend, NotificationService, ReputationChange,
66};
67use sc_network_common::{
68	role::Roles,
69	sync::message::{BlockAnnounce, BlockAnnouncesHandshake, BlockRequest, BlockState},
70};
71use sc_network_types::PeerId;
72use sc_utils::mpsc::{tracing_unbounded, TracingUnboundedReceiver, TracingUnboundedSender};
73use sp_blockchain::{Error as ClientError, HeaderMetadata};
74use sp_consensus::{block_validation::BlockAnnounceValidator, BlockOrigin};
75use sp_runtime::{
76	traits::{Block as BlockT, Header, NumberFor, Zero},
77	Justifications,
78};
79
80use std::{
81	collections::{HashMap, HashSet},
82	iter,
83	num::NonZeroUsize,
84	sync::{
85		atomic::{AtomicBool, AtomicUsize, Ordering},
86		Arc,
87	},
88};
89
90/// Interval at which we perform time based maintenance
91const TICK_TIMEOUT: std::time::Duration = std::time::Duration::from_millis(1100);
92
93/// Maximum number of known block hashes to keep for a peer.
94const MAX_KNOWN_BLOCKS: usize = 1024; // ~32kb per peer + LruHashSet overhead
95
96/// Maximum allowed size for a block announce.
97const MAX_BLOCK_ANNOUNCE_SIZE: u64 = 1024 * 1024;
98
99mod rep {
100	use sc_network::ReputationChange as Rep;
101	/// Peer has different genesis.
102	pub const GENESIS_MISMATCH: Rep = Rep::new_fatal("Genesis mismatch");
103	/// Peer send us a block announcement that failed at validation.
104	pub const BAD_BLOCK_ANNOUNCEMENT: Rep = Rep::new(-(1 << 12), "Bad block announcement");
105	/// We received a message that failed to decode.
106	pub const BAD_MESSAGE: Rep = Rep::new(-(1 << 12), "Bad message");
107	/// Peer is on unsupported protocol version.
108	pub const BAD_PROTOCOL: Rep = Rep::new_fatal("Unsupported protocol");
109	/// Reputation change when a peer refuses a request.
110	pub const REFUSED: Rep = Rep::new(-(1 << 10), "Request refused");
111	/// Reputation change when a peer doesn't respond in time to our messages.
112	pub const TIMEOUT: Rep = Rep::new(-(1 << 10), "Request timeout");
113}
114
115struct Metrics {
116	peers: Gauge<U64>,
117	import_queue_blocks_submitted: Counter<U64>,
118	import_queue_justifications_submitted: Counter<U64>,
119}
120
121impl Metrics {
122	fn register(r: &Registry, major_syncing: Arc<AtomicBool>) -> Result<Self, PrometheusError> {
123		let _ = MajorSyncingGauge::register(r, major_syncing)?;
124		Ok(Self {
125			peers: {
126				let g = Gauge::new("substrate_sync_peers", "Number of peers we sync with")?;
127				register(g, r)?
128			},
129			import_queue_blocks_submitted: {
130				let c = Counter::new(
131					"substrate_sync_import_queue_blocks_submitted",
132					"Number of blocks submitted to the import queue.",
133				)?;
134				register(c, r)?
135			},
136			import_queue_justifications_submitted: {
137				let c = Counter::new(
138					"substrate_sync_import_queue_justifications_submitted",
139					"Number of justifications submitted to the import queue.",
140				)?;
141				register(c, r)?
142			},
143		})
144	}
145}
146
147/// The "major syncing" metric.
148#[derive(Clone)]
149pub struct MajorSyncingGauge(Arc<AtomicBool>);
150
151impl MajorSyncingGauge {
152	/// Registers the [`MajorSyncGauge`] metric whose value is
153	/// obtained from the given `AtomicBool`.
154	fn register(registry: &Registry, value: Arc<AtomicBool>) -> Result<(), PrometheusError> {
155		prometheus_endpoint::register(
156			SourcedGauge::new(
157				&Opts::new(
158					"substrate_sub_libp2p_is_major_syncing",
159					"Whether the node is performing a major sync or not.",
160				),
161				MajorSyncingGauge(value),
162			)?,
163			registry,
164		)?;
165
166		Ok(())
167	}
168}
169
170impl MetricSource for MajorSyncingGauge {
171	type N = u64;
172
173	fn collect(&self, mut set: impl FnMut(&[&str], Self::N)) {
174		set(&[], self.0.load(Ordering::Relaxed) as u64);
175	}
176}
177
178/// Peer information
179#[derive(Debug)]
180pub struct Peer<B: BlockT> {
181	pub info: ExtendedPeerInfo<B>,
182	/// Holds a set of blocks known to this peer.
183	pub known_blocks: LruHashSet<B::Hash>,
184	/// Is the peer inbound.
185	inbound: bool,
186}
187
188pub struct SyncingEngine<B: BlockT, Client> {
189	/// Syncing strategy.
190	strategy: Box<dyn SyncingStrategy<B>>,
191
192	/// Blockchain client.
193	client: Arc<Client>,
194
195	/// Number of peers we're connected to.
196	num_connected: Arc<AtomicUsize>,
197
198	/// Are we actively catching up with the chain?
199	is_major_syncing: Arc<AtomicBool>,
200
201	/// Network service.
202	network_service: service::network::NetworkServiceHandle,
203
204	/// Channel for receiving service commands
205	service_rx: TracingUnboundedReceiver<ToServiceCommand<B>>,
206
207	/// Assigned roles.
208	roles: Roles,
209
210	/// Genesis hash.
211	genesis_hash: B::Hash,
212
213	/// Set of channels for other protocols that have subscribed to syncing events.
214	event_streams: Vec<TracingUnboundedSender<SyncEvent>>,
215
216	/// Interval at which we call `tick`.
217	tick_timeout: Interval,
218
219	/// All connected peers. Contains both full and light node peers.
220	peers: HashMap<PeerId, Peer<B>>,
221
222	/// List of nodes for which we perform additional logging because they are important for the
223	/// user.
224	important_peers: HashSet<PeerId>,
225
226	/// Actual list of connected no-slot nodes.
227	default_peers_set_no_slot_connected_peers: HashSet<PeerId>,
228
229	/// List of nodes that should never occupy peer slots.
230	default_peers_set_no_slot_peers: HashSet<PeerId>,
231
232	/// Value that was passed as part of the configuration. Used to cap the number of full
233	/// nodes.
234	default_peers_set_num_full: usize,
235
236	/// Number of slots to allocate to light nodes.
237	default_peers_set_num_light: usize,
238
239	/// Maximum number of inbound peers.
240	max_in_peers: usize,
241
242	/// Number of inbound peers accepted so far.
243	num_in_peers: usize,
244
245	/// Async processor of block announce validations.
246	block_announce_validator: BlockAnnounceValidatorStream<B>,
247
248	/// A cache for the data that was associated to a block announcement.
249	block_announce_data_cache: LruMap<B::Hash, Vec<u8>>,
250
251	/// The `PeerId`'s of all boot nodes.
252	boot_node_ids: HashSet<PeerId>,
253
254	/// Protocol name used for block announcements
255	block_announce_protocol_name: ProtocolName,
256
257	/// Prometheus metrics.
258	metrics: Option<Metrics>,
259
260	/// Handle that is used to communicate with `sc_network::Notifications`.
261	notification_service: Box<dyn NotificationService>,
262
263	/// Handle to `PeerStore`.
264	peer_store_handle: Arc<dyn PeerStoreProvider>,
265
266	/// Pending responses
267	pending_responses: PendingResponses<B>,
268
269	/// Block downloader
270	block_downloader: Arc<dyn BlockDownloader<B>>,
271
272	/// Handle to import queue.
273	import_queue: Box<dyn ImportQueueService<B>>,
274}
275
276impl<B: BlockT, Client> SyncingEngine<B, Client>
277where
278	B: BlockT,
279	Client: HeaderBackend<B>
280		+ BlockBackend<B>
281		+ HeaderMetadata<B, Error = sp_blockchain::Error>
282		+ ProofProvider<B>
283		+ Send
284		+ Sync
285		+ 'static,
286{
287	pub fn new<N>(
288		roles: Roles,
289		client: Arc<Client>,
290		metrics_registry: Option<&Registry>,
291		network_metrics: NotificationMetrics,
292		net_config: &FullNetworkConfiguration<B, <B as BlockT>::Hash, N>,
293		protocol_id: ProtocolId,
294		fork_id: &Option<String>,
295		block_announce_validator: Box<dyn BlockAnnounceValidator<B> + Send>,
296		syncing_strategy: Box<dyn SyncingStrategy<B>>,
297		network_service: service::network::NetworkServiceHandle,
298		import_queue: Box<dyn ImportQueueService<B>>,
299		block_downloader: Arc<dyn BlockDownloader<B>>,
300		peer_store_handle: Arc<dyn PeerStoreProvider>,
301	) -> Result<(Self, SyncingService<B>, N::NotificationProtocolConfig), ClientError>
302	where
303		N: NetworkBackend<B, <B as BlockT>::Hash>,
304	{
305		let cache_capacity = (net_config.network_config.default_peers_set.in_peers +
306			net_config.network_config.default_peers_set.out_peers)
307			.max(1);
308		let important_peers = {
309			let mut imp_p = HashSet::new();
310			for reserved in &net_config.network_config.default_peers_set.reserved_nodes {
311				imp_p.insert(reserved.peer_id);
312			}
313			for config in net_config.notification_protocols() {
314				let peer_ids = config.set_config().reserved_nodes.iter().map(|info| info.peer_id);
315				imp_p.extend(peer_ids);
316			}
317
318			imp_p.shrink_to_fit();
319			imp_p
320		};
321		let boot_node_ids = {
322			let mut list = HashSet::new();
323			for node in &net_config.network_config.boot_nodes {
324				list.insert(node.peer_id);
325			}
326			list.shrink_to_fit();
327			list
328		};
329		let default_peers_set_no_slot_peers = {
330			let mut no_slot_p: HashSet<PeerId> = net_config
331				.network_config
332				.default_peers_set
333				.reserved_nodes
334				.iter()
335				.map(|reserved| reserved.peer_id)
336				.collect();
337			no_slot_p.shrink_to_fit();
338			no_slot_p
339		};
340		let default_peers_set_num_full =
341			net_config.network_config.default_peers_set_num_full as usize;
342		let default_peers_set_num_light = {
343			let total = net_config.network_config.default_peers_set.out_peers +
344				net_config.network_config.default_peers_set.in_peers;
345			total.saturating_sub(net_config.network_config.default_peers_set_num_full) as usize
346		};
347
348		let info = client.info();
349
350		let (block_announce_config, notification_service) =
351			Self::get_block_announce_proto_config::<N>(
352				protocol_id,
353				fork_id,
354				roles,
355				info.best_number,
356				info.best_hash,
357				info.genesis_hash,
358				&net_config.network_config.default_peers_set,
359				network_metrics,
360				Arc::clone(&peer_store_handle),
361			);
362
363		let block_announce_protocol_name = block_announce_config.protocol_name().clone();
364		let (tx, service_rx) = tracing_unbounded("mpsc_chain_sync", 100_000);
365		let num_connected = Arc::new(AtomicUsize::new(0));
366		let is_major_syncing = Arc::new(AtomicBool::new(false));
367
368		// `default_peers_set.in_peers` contains an unspecified amount of light peers so the number
369		// of full inbound peers must be calculated from the total full peer count
370		let max_full_peers = net_config.network_config.default_peers_set_num_full;
371		let max_out_peers = net_config.network_config.default_peers_set.out_peers;
372		let max_in_peers = (max_full_peers - max_out_peers) as usize;
373
374		let tick_timeout = {
375			let mut interval = tokio::time::interval(TICK_TIMEOUT);
376			interval.set_missed_tick_behavior(MissedTickBehavior::Delay);
377			interval
378		};
379
380		Ok((
381			Self {
382				roles,
383				client,
384				strategy: syncing_strategy,
385				network_service,
386				peers: HashMap::new(),
387				block_announce_data_cache: LruMap::new(ByLength::new(cache_capacity)),
388				block_announce_protocol_name,
389				block_announce_validator: BlockAnnounceValidatorStream::new(
390					block_announce_validator,
391				),
392				num_connected: num_connected.clone(),
393				is_major_syncing: is_major_syncing.clone(),
394				service_rx,
395				genesis_hash: info.genesis_hash,
396				important_peers,
397				default_peers_set_no_slot_connected_peers: HashSet::new(),
398				boot_node_ids,
399				default_peers_set_no_slot_peers,
400				default_peers_set_num_full,
401				default_peers_set_num_light,
402				num_in_peers: 0usize,
403				max_in_peers,
404				event_streams: Vec::new(),
405				notification_service,
406				tick_timeout,
407				peer_store_handle,
408				metrics: if let Some(r) = metrics_registry {
409					match Metrics::register(r, is_major_syncing.clone()) {
410						Ok(metrics) => Some(metrics),
411						Err(err) => {
412							log::error!(target: LOG_TARGET, "Failed to register metrics {err:?}");
413							None
414						},
415					}
416				} else {
417					None
418				},
419				pending_responses: PendingResponses::new(),
420				block_downloader,
421				import_queue,
422			},
423			SyncingService::new(tx, num_connected, is_major_syncing),
424			block_announce_config,
425		))
426	}
427
428	fn update_peer_info(
429		&mut self,
430		peer_id: &PeerId,
431		best_hash: B::Hash,
432		best_number: NumberFor<B>,
433	) {
434		if let Some(ref mut peer) = self.peers.get_mut(peer_id) {
435			peer.info.best_hash = best_hash;
436			peer.info.best_number = best_number;
437		}
438	}
439
440	/// Process the result of the block announce validation.
441	fn process_block_announce_validation_result(
442		&mut self,
443		validation_result: BlockAnnounceValidationResult<B::Header>,
444	) {
445		match validation_result {
446			BlockAnnounceValidationResult::Skip { peer_id: _ } => {},
447			BlockAnnounceValidationResult::Process { is_new_best, peer_id, announce } => {
448				if let Some((best_hash, best_number)) =
449					self.strategy.on_validated_block_announce(is_new_best, peer_id, &announce)
450				{
451					self.update_peer_info(&peer_id, best_hash, best_number);
452				}
453
454				if let Some(data) = announce.data {
455					if !data.is_empty() {
456						self.block_announce_data_cache.insert(announce.header.hash(), data);
457					}
458				}
459			},
460			BlockAnnounceValidationResult::Failure { peer_id, disconnect } => {
461				if disconnect {
462					log::debug!(
463						target: LOG_TARGET,
464						"Disconnecting peer {peer_id} due to block announce validation failure",
465					);
466					self.network_service
467						.disconnect_peer(peer_id, self.block_announce_protocol_name.clone());
468				}
469
470				self.network_service.report_peer(peer_id, rep::BAD_BLOCK_ANNOUNCEMENT);
471			},
472		}
473	}
474
475	/// Push a block announce validation.
476	pub fn push_block_announce_validation(
477		&mut self,
478		peer_id: PeerId,
479		announce: BlockAnnounce<B::Header>,
480	) {
481		let hash = announce.header.hash();
482
483		let peer = match self.peers.get_mut(&peer_id) {
484			Some(p) => p,
485			None => {
486				log::error!(
487					target: LOG_TARGET,
488					"Received block announce from disconnected peer {peer_id}",
489				);
490				debug_assert!(false);
491				return;
492			},
493		};
494		peer.known_blocks.insert(hash);
495
496		if peer.info.roles.is_full() {
497			let is_best = match announce.state.unwrap_or(BlockState::Best) {
498				BlockState::Best => true,
499				BlockState::Normal => false,
500			};
501
502			self.block_announce_validator
503				.push_block_announce_validation(peer_id, hash, announce, is_best);
504		}
505	}
506
507	/// Make sure an important block is propagated to peers.
508	///
509	/// In chain-based consensus, we often need to make sure non-best forks are
510	/// at least temporarily synced.
511	pub fn announce_block(&mut self, hash: B::Hash, data: Option<Vec<u8>>) {
512		let header = match self.client.header(hash) {
513			Ok(Some(header)) => header,
514			Ok(None) => {
515				log::warn!(target: LOG_TARGET, "Trying to announce unknown block: {hash}");
516				return;
517			},
518			Err(e) => {
519				log::warn!(target: LOG_TARGET, "Error reading block header {hash}: {e}");
520				return;
521			},
522		};
523
524		// don't announce genesis block since it will be ignored
525		if header.number().is_zero() {
526			return;
527		}
528
529		let is_best = self.client.info().best_hash == hash;
530		log::debug!(target: LOG_TARGET, "Reannouncing block {hash:?} is_best: {is_best}");
531
532		let data = data
533			.or_else(|| self.block_announce_data_cache.get(&hash).cloned())
534			.unwrap_or_default();
535
536		for (peer_id, ref mut peer) in self.peers.iter_mut() {
537			let inserted = peer.known_blocks.insert(hash);
538			if inserted {
539				log::trace!(target: LOG_TARGET, "Announcing block {hash:?} to {peer_id}");
540				let message = BlockAnnounce {
541					header: header.clone(),
542					state: if is_best { Some(BlockState::Best) } else { Some(BlockState::Normal) },
543					data: Some(data.clone()),
544				};
545
546				let _ = self.notification_service.send_sync_notification(peer_id, message.encode());
547			}
548		}
549	}
550
551	pub async fn run(mut self) {
552		loop {
553			tokio::select! {
554				_ = self.tick_timeout.tick() => {
555					// TODO: This tick should not be necessary, but
556					//  `self.process_strategy_actions()` is not called in some cases otherwise and
557					//  some tests fail because of this
558				},
559				command = self.service_rx.select_next_some() =>
560					self.process_service_command(command),
561				notification_event = self.notification_service.next_event() => match notification_event {
562					Some(event) => self.process_notification_event(event),
563					None => return,
564				},
565				response_event = self.pending_responses.select_next_some() =>
566					self.process_response_event(response_event),
567				validation_result = self.block_announce_validator.select_next_some() =>
568					self.process_block_announce_validation_result(validation_result),
569			}
570
571			// Update atomic variables
572			self.is_major_syncing.store(self.strategy.is_major_syncing(), Ordering::Relaxed);
573
574			// Process actions requested by a syncing strategy.
575			if let Err(e) = self.process_strategy_actions() {
576				error!(
577					target: LOG_TARGET,
578					"Terminating `SyncingEngine` due to fatal error: {e:?}.",
579				);
580				return;
581			}
582		}
583	}
584
585	fn process_strategy_actions(&mut self) -> Result<(), ClientError> {
586		for action in self.strategy.actions()? {
587			match action {
588				SyncingAction::SendBlockRequest { peer_id, key, request } => {
589					// Sending block request implies dropping obsolete pending response as we are
590					// not interested in it anymore (see [`SyncingAction::SendBlockRequest`]).
591					let removed = self.pending_responses.remove(peer_id, key);
592					self.send_block_request(peer_id, key, request.clone());
593
594					if removed {
595						warn!(
596							target: LOG_TARGET,
597							"Processed `ChainSyncAction::SendBlockRequest` to {} from {:?} with {:?}. \
598							 Stale response removed!",
599							peer_id,
600							key,
601							request,
602						)
603					} else {
604						trace!(
605							target: LOG_TARGET,
606							"Processed `ChainSyncAction::SendBlockRequest` to {} from {:?} with {:?}.",
607							peer_id,
608							key,
609							request,
610						)
611					}
612				},
613				SyncingAction::CancelRequest { peer_id, key } => {
614					let removed = self.pending_responses.remove(peer_id, key);
615
616					trace!(
617						target: LOG_TARGET,
618						"Processed {action:?}, response removed: {removed}.",
619					);
620				},
621				SyncingAction::SendStateRequest { peer_id, key, protocol_name, request } => {
622					self.send_state_request(peer_id, key, protocol_name, request);
623
624					trace!(
625						target: LOG_TARGET,
626						"Processed `ChainSyncAction::SendStateRequest` to {peer_id}.",
627					);
628				},
629				SyncingAction::SendWarpProofRequest { peer_id, key, protocol_name, request } => {
630					self.send_warp_proof_request(peer_id, key, protocol_name, request.clone());
631
632					trace!(
633						target: LOG_TARGET,
634						"Processed `ChainSyncAction::SendWarpProofRequest` to {}, request: {:?}.",
635						peer_id,
636						request,
637					);
638				},
639				SyncingAction::DropPeer(BadPeer(peer_id, rep)) => {
640					self.pending_responses.remove_all(&peer_id);
641					self.network_service
642						.disconnect_peer(peer_id, self.block_announce_protocol_name.clone());
643					self.network_service.report_peer(peer_id, rep);
644
645					trace!(target: LOG_TARGET, "{peer_id:?} dropped: {rep:?}.");
646				},
647				SyncingAction::ImportBlocks { origin, blocks } => {
648					let count = blocks.len();
649					self.import_blocks(origin, blocks);
650
651					trace!(
652						target: LOG_TARGET,
653						"Processed `ChainSyncAction::ImportBlocks` with {count} blocks.",
654					);
655				},
656				SyncingAction::ImportJustifications { peer_id, hash, number, justifications } => {
657					self.import_justifications(peer_id, hash, number, justifications);
658
659					trace!(
660						target: LOG_TARGET,
661						"Processed `ChainSyncAction::ImportJustifications` from peer {} for block {} ({}).",
662						peer_id,
663						hash,
664						number,
665					)
666				},
667				// Nothing to do, this is handled internally by `PolkadotSyncingStrategy`.
668				SyncingAction::Finished => {},
669			}
670		}
671
672		Ok(())
673	}
674
675	fn process_service_command(&mut self, command: ToServiceCommand<B>) {
676		match command {
677			ToServiceCommand::SetSyncForkRequest(peers, hash, number) => {
678				self.strategy.set_sync_fork_request(peers, &hash, number);
679			},
680			ToServiceCommand::EventStream(tx) => self.event_streams.push(tx),
681			ToServiceCommand::RequestJustification(hash, number) =>
682				self.strategy.request_justification(&hash, number),
683			ToServiceCommand::ClearJustificationRequests =>
684				self.strategy.clear_justification_requests(),
685			ToServiceCommand::BlocksProcessed(imported, count, results) => {
686				self.strategy.on_blocks_processed(imported, count, results);
687			},
688			ToServiceCommand::JustificationImported(peer_id, hash, number, success) => {
689				self.strategy.on_justification_import(hash, number, success);
690				if !success {
691					log::info!(
692						target: LOG_TARGET,
693						"💔 Invalid justification provided by {peer_id} for #{hash}",
694					);
695					self.network_service
696						.disconnect_peer(peer_id, self.block_announce_protocol_name.clone());
697					self.network_service
698						.report_peer(peer_id, ReputationChange::new_fatal("Invalid justification"));
699				}
700			},
701			ToServiceCommand::AnnounceBlock(hash, data) => self.announce_block(hash, data),
702			ToServiceCommand::NewBestBlockImported(hash, number) => {
703				log::debug!(target: LOG_TARGET, "New best block imported {:?}/#{}", hash, number);
704
705				self.strategy.update_chain_info(&hash, number);
706				let _ = self.notification_service.try_set_handshake(
707					BlockAnnouncesHandshake::<B>::build(
708						self.roles,
709						number,
710						hash,
711						self.genesis_hash,
712					)
713					.encode(),
714				);
715			},
716			ToServiceCommand::Status(tx) => {
717				let _ = tx.send(self.strategy.status());
718			},
719			ToServiceCommand::NumActivePeers(tx) => {
720				let _ = tx.send(self.num_active_peers());
721			},
722			ToServiceCommand::NumDownloadedBlocks(tx) => {
723				let _ = tx.send(self.strategy.num_downloaded_blocks());
724			},
725			ToServiceCommand::NumSyncRequests(tx) => {
726				let _ = tx.send(self.strategy.num_sync_requests());
727			},
728			ToServiceCommand::PeersInfo(tx) => {
729				let peers_info =
730					self.peers.iter().map(|(peer_id, peer)| (*peer_id, peer.info)).collect();
731				let _ = tx.send(peers_info);
732			},
733			ToServiceCommand::OnBlockFinalized(hash, header) =>
734				self.strategy.on_block_finalized(&hash, *header.number()),
735		}
736	}
737
738	fn process_notification_event(&mut self, event: NotificationEvent) {
739		match event {
740			NotificationEvent::ValidateInboundSubstream { peer, handshake, result_tx } => {
741				let validation_result = self
742					.validate_connection(&peer, handshake, Direction::Inbound)
743					.map_or(ValidationResult::Reject, |_| ValidationResult::Accept);
744
745				let _ = result_tx.send(validation_result);
746			},
747			NotificationEvent::NotificationStreamOpened { peer, handshake, direction, .. } => {
748				log::debug!(
749					target: LOG_TARGET,
750					"Substream opened for {peer}, handshake {handshake:?}"
751				);
752
753				match self.validate_connection(&peer, handshake, direction) {
754					Ok(handshake) => {
755						if self.on_sync_peer_connected(peer, &handshake, direction).is_err() {
756							log::debug!(target: LOG_TARGET, "Failed to register peer {peer}");
757							self.network_service
758								.disconnect_peer(peer, self.block_announce_protocol_name.clone());
759						}
760					},
761					Err(wrong_genesis) => {
762						log::debug!(target: LOG_TARGET, "`SyncingEngine` rejected {peer}");
763
764						if wrong_genesis {
765							self.peer_store_handle.report_peer(peer, rep::GENESIS_MISMATCH);
766						}
767
768						self.network_service
769							.disconnect_peer(peer, self.block_announce_protocol_name.clone());
770					},
771				}
772			},
773			NotificationEvent::NotificationStreamClosed { peer } => {
774				self.on_sync_peer_disconnected(peer);
775			},
776			NotificationEvent::NotificationReceived { peer, notification } => {
777				if !self.peers.contains_key(&peer) {
778					log::error!(
779						target: LOG_TARGET,
780						"received notification from {peer} who had been earlier refused by `SyncingEngine`",
781					);
782					return;
783				}
784
785				let Ok(announce) = BlockAnnounce::decode(&mut notification.as_ref()) else {
786					log::warn!(target: LOG_TARGET, "failed to decode block announce");
787					return;
788				};
789
790				self.push_block_announce_validation(peer, announce);
791			},
792		}
793	}
794
795	/// Called by peer when it is disconnecting.
796	///
797	/// Returns a result if the handshake of this peer was indeed accepted.
798	fn on_sync_peer_disconnected(&mut self, peer_id: PeerId) {
799		let Some(info) = self.peers.remove(&peer_id) else {
800			log::debug!(target: LOG_TARGET, "{peer_id} does not exist in `SyncingEngine`");
801			return;
802		};
803		if let Some(metrics) = &self.metrics {
804			metrics.peers.dec();
805		}
806		self.num_connected.fetch_sub(1, Ordering::AcqRel);
807
808		if self.important_peers.contains(&peer_id) {
809			log::warn!(target: LOG_TARGET, "Reserved peer {peer_id} disconnected");
810		} else {
811			log::debug!(target: LOG_TARGET, "{peer_id} disconnected");
812		}
813
814		if !self.default_peers_set_no_slot_connected_peers.remove(&peer_id) &&
815			info.inbound &&
816			info.info.roles.is_full()
817		{
818			match self.num_in_peers.checked_sub(1) {
819				Some(value) => {
820					self.num_in_peers = value;
821				},
822				None => {
823					log::error!(
824						target: LOG_TARGET,
825						"trying to disconnect an inbound node which is not counted as inbound"
826					);
827					debug_assert!(false);
828				},
829			}
830		}
831
832		self.strategy.remove_peer(&peer_id);
833		self.pending_responses.remove_all(&peer_id);
834		self.event_streams
835			.retain(|stream| stream.unbounded_send(SyncEvent::PeerDisconnected(peer_id)).is_ok());
836	}
837
838	/// Validate received handshake.
839	fn validate_handshake(
840		&mut self,
841		peer_id: &PeerId,
842		handshake: Vec<u8>,
843	) -> Result<BlockAnnouncesHandshake<B>, bool> {
844		log::trace!(target: LOG_TARGET, "Validate handshake for {peer_id}");
845
846		let handshake = <BlockAnnouncesHandshake<B> as DecodeAll>::decode_all(&mut &handshake[..])
847			.map_err(|error| {
848				log::debug!(target: LOG_TARGET, "Failed to decode handshake for {peer_id}: {error:?}");
849				false
850			})?;
851
852		if handshake.genesis_hash != self.genesis_hash {
853			if self.important_peers.contains(&peer_id) {
854				log::error!(
855					target: LOG_TARGET,
856					"Reserved peer id `{peer_id}` is on a different chain (our genesis: {} theirs: {})",
857					self.genesis_hash,
858					handshake.genesis_hash,
859				);
860			} else if self.boot_node_ids.contains(&peer_id) {
861				log::error!(
862					target: LOG_TARGET,
863					"Bootnode with peer id `{peer_id}` is on a different chain (our genesis: {} theirs: {})",
864					self.genesis_hash,
865					handshake.genesis_hash,
866				);
867			} else {
868				log::debug!(
869					target: LOG_TARGET,
870					"Peer is on different chain (our genesis: {} theirs: {})",
871					self.genesis_hash,
872					handshake.genesis_hash
873				);
874			}
875
876			return Err(true);
877		}
878
879		Ok(handshake)
880	}
881
882	/// Validate connection.
883	// NOTE Returning `Err(bool)` is a really ugly hack to work around the issue
884	// that `ProtocolController` thinks the peer is connected when in fact it can
885	// still be under validation. If the peer has different genesis than the
886	// local node the validation fails but the peer cannot be reported in
887	// `validate_connection()` as that is also called by
888	// `ValidateInboundSubstream` which means that the peer is still being
889	// validated and banning the peer when handling that event would
890	// result in peer getting dropped twice.
891	//
892	// The proper way to fix this is to integrate `ProtocolController` more
893	// tightly with `NotificationService` or add an additional API call for
894	// banning pre-accepted peers (which is not desirable)
895	fn validate_connection(
896		&mut self,
897		peer_id: &PeerId,
898		handshake: Vec<u8>,
899		direction: Direction,
900	) -> Result<BlockAnnouncesHandshake<B>, bool> {
901		log::trace!(target: LOG_TARGET, "New peer {peer_id} {handshake:?}");
902
903		let handshake = self.validate_handshake(peer_id, handshake)?;
904
905		if self.peers.contains_key(&peer_id) {
906			log::error!(
907				target: LOG_TARGET,
908				"Called `validate_connection()` with already connected peer {peer_id}",
909			);
910			debug_assert!(false);
911			return Err(false);
912		}
913
914		let no_slot_peer = self.default_peers_set_no_slot_peers.contains(&peer_id);
915		let this_peer_reserved_slot: usize = if no_slot_peer { 1 } else { 0 };
916
917		if handshake.roles.is_full() &&
918			self.strategy.num_peers() >=
919				self.default_peers_set_num_full +
920					self.default_peers_set_no_slot_connected_peers.len() +
921					this_peer_reserved_slot
922		{
923			log::debug!(target: LOG_TARGET, "Too many full nodes, rejecting {peer_id}");
924			return Err(false);
925		}
926
927		// make sure to accept no more than `--in-peers` many full nodes
928		if !no_slot_peer &&
929			handshake.roles.is_full() &&
930			direction.is_inbound() &&
931			self.num_in_peers == self.max_in_peers
932		{
933			log::debug!(target: LOG_TARGET, "All inbound slots have been consumed, rejecting {peer_id}");
934			return Err(false);
935		}
936
937		// make sure that all slots are not occupied by light peers
938		//
939		// `ChainSync` only accepts full peers whereas `SyncingEngine` accepts both full and light
940		// peers. Verify that there is a slot in `SyncingEngine` for the inbound light peer
941		if handshake.roles.is_light() &&
942			(self.peers.len() - self.strategy.num_peers()) >= self.default_peers_set_num_light
943		{
944			log::debug!(target: LOG_TARGET, "Too many light nodes, rejecting {peer_id}");
945			return Err(false);
946		}
947
948		Ok(handshake)
949	}
950
951	/// Called on the first connection between two peers on the default set, after their exchange
952	/// of handshake.
953	///
954	/// Returns `Ok` if the handshake is accepted and the peer added to the list of peers we sync
955	/// from.
956	fn on_sync_peer_connected(
957		&mut self,
958		peer_id: PeerId,
959		status: &BlockAnnouncesHandshake<B>,
960		direction: Direction,
961	) -> Result<(), ()> {
962		log::trace!(target: LOG_TARGET, "New peer {peer_id} {status:?}");
963
964		let peer = Peer {
965			info: ExtendedPeerInfo {
966				roles: status.roles,
967				best_hash: status.best_hash,
968				best_number: status.best_number,
969			},
970			known_blocks: LruHashSet::new(
971				NonZeroUsize::new(MAX_KNOWN_BLOCKS).expect("Constant is nonzero"),
972			),
973			inbound: direction.is_inbound(),
974		};
975
976		// Only forward full peers to syncing strategy.
977		if status.roles.is_full() {
978			self.strategy.add_peer(peer_id, peer.info.best_hash, peer.info.best_number);
979		}
980
981		log::debug!(target: LOG_TARGET, "Connected {peer_id}");
982
983		if self.peers.insert(peer_id, peer).is_none() {
984			if let Some(metrics) = &self.metrics {
985				metrics.peers.inc();
986			}
987			self.num_connected.fetch_add(1, Ordering::AcqRel);
988		}
989		self.peer_store_handle.set_peer_role(&peer_id, status.roles.into());
990
991		if self.default_peers_set_no_slot_peers.contains(&peer_id) {
992			self.default_peers_set_no_slot_connected_peers.insert(peer_id);
993		} else if direction.is_inbound() && status.roles.is_full() {
994			self.num_in_peers += 1;
995		}
996
997		self.event_streams
998			.retain(|stream| stream.unbounded_send(SyncEvent::PeerConnected(peer_id)).is_ok());
999
1000		Ok(())
1001	}
1002
1003	fn send_block_request(&mut self, peer_id: PeerId, key: StrategyKey, request: BlockRequest<B>) {
1004		if !self.peers.contains_key(&peer_id) {
1005			trace!(target: LOG_TARGET, "Cannot send block request to unknown peer {peer_id}");
1006			debug_assert!(false);
1007			return;
1008		}
1009
1010		let downloader = self.block_downloader.clone();
1011
1012		self.pending_responses.insert(
1013			peer_id,
1014			key,
1015			PeerRequest::Block(request.clone()),
1016			async move { downloader.download_blocks(peer_id, request).await }.boxed(),
1017		);
1018	}
1019
1020	fn send_state_request(
1021		&mut self,
1022		peer_id: PeerId,
1023		key: StrategyKey,
1024		protocol_name: ProtocolName,
1025		request: OpaqueStateRequest,
1026	) {
1027		if !self.peers.contains_key(&peer_id) {
1028			trace!(target: LOG_TARGET, "Cannot send state request to unknown peer {peer_id}");
1029			debug_assert!(false);
1030			return;
1031		}
1032
1033		let (tx, rx) = oneshot::channel();
1034
1035		self.pending_responses.insert(peer_id, key, PeerRequest::State, rx.boxed());
1036
1037		match Self::encode_state_request(&request) {
1038			Ok(data) => {
1039				self.network_service.start_request(
1040					peer_id,
1041					protocol_name,
1042					data,
1043					tx,
1044					IfDisconnected::ImmediateError,
1045				);
1046			},
1047			Err(err) => {
1048				log::warn!(
1049					target: LOG_TARGET,
1050					"Failed to encode state request {request:?}: {err:?}",
1051				);
1052			},
1053		}
1054	}
1055
1056	fn send_warp_proof_request(
1057		&mut self,
1058		peer_id: PeerId,
1059		key: StrategyKey,
1060		protocol_name: ProtocolName,
1061		request: WarpProofRequest<B>,
1062	) {
1063		if !self.peers.contains_key(&peer_id) {
1064			trace!(target: LOG_TARGET, "Cannot send warp proof request to unknown peer {peer_id}");
1065			debug_assert!(false);
1066			return;
1067		}
1068
1069		let (tx, rx) = oneshot::channel();
1070
1071		self.pending_responses.insert(peer_id, key, PeerRequest::WarpProof, rx.boxed());
1072
1073		self.network_service.start_request(
1074			peer_id,
1075			protocol_name,
1076			request.encode(),
1077			tx,
1078			IfDisconnected::ImmediateError,
1079		);
1080	}
1081
1082	fn encode_state_request(request: &OpaqueStateRequest) -> Result<Vec<u8>, String> {
1083		let request: &StateRequest = request.0.downcast_ref().ok_or_else(|| {
1084			"Failed to downcast opaque state response during encoding, this is an \
1085				implementation bug."
1086				.to_string()
1087		})?;
1088
1089		Ok(request.encode_to_vec())
1090	}
1091
1092	fn decode_state_response(response: &[u8]) -> Result<OpaqueStateResponse, String> {
1093		let response = StateResponse::decode(response)
1094			.map_err(|error| format!("Failed to decode state response: {error}"))?;
1095
1096		Ok(OpaqueStateResponse(Box::new(response)))
1097	}
1098
1099	fn process_response_event(&mut self, response_event: ResponseEvent<B>) {
1100		let ResponseEvent { peer_id, key, request, response } = response_event;
1101
1102		match response {
1103			Ok(Ok((resp, _))) => match request {
1104				PeerRequest::Block(req) => {
1105					match self.block_downloader.block_response_into_blocks(&req, resp) {
1106						Ok(blocks) => {
1107							self.strategy.on_block_response(peer_id, key, req, blocks);
1108						},
1109						Err(BlockResponseError::DecodeFailed(e)) => {
1110							debug!(
1111								target: LOG_TARGET,
1112								"Failed to decode block response from peer {:?}: {:?}.",
1113								peer_id,
1114								e
1115							);
1116							self.network_service.report_peer(peer_id, rep::BAD_MESSAGE);
1117							self.network_service.disconnect_peer(
1118								peer_id,
1119								self.block_announce_protocol_name.clone(),
1120							);
1121							return;
1122						},
1123						Err(BlockResponseError::ExtractionFailed(e)) => {
1124							debug!(
1125								target: LOG_TARGET,
1126								"Failed to extract blocks from peer response {:?}: {:?}.",
1127								peer_id,
1128								e
1129							);
1130							self.network_service.report_peer(peer_id, rep::BAD_MESSAGE);
1131							return;
1132						},
1133					}
1134				},
1135				PeerRequest::State => {
1136					let response = match Self::decode_state_response(&resp[..]) {
1137						Ok(proto) => proto,
1138						Err(e) => {
1139							debug!(
1140								target: LOG_TARGET,
1141								"Failed to decode state response from peer {peer_id:?}: {e:?}.",
1142							);
1143							self.network_service.report_peer(peer_id, rep::BAD_MESSAGE);
1144							self.network_service.disconnect_peer(
1145								peer_id,
1146								self.block_announce_protocol_name.clone(),
1147							);
1148							return;
1149						},
1150					};
1151
1152					self.strategy.on_state_response(peer_id, key, response);
1153				},
1154				PeerRequest::WarpProof => {
1155					self.strategy.on_warp_proof_response(&peer_id, key, EncodedProof(resp));
1156				},
1157			},
1158			Ok(Err(e)) => {
1159				debug!(target: LOG_TARGET, "Request to peer {peer_id:?} failed: {e:?}.");
1160
1161				match e {
1162					RequestFailure::Network(OutboundFailure::Timeout) => {
1163						self.network_service.report_peer(peer_id, rep::TIMEOUT);
1164						self.network_service
1165							.disconnect_peer(peer_id, self.block_announce_protocol_name.clone());
1166					},
1167					RequestFailure::Network(OutboundFailure::UnsupportedProtocols) => {
1168						self.network_service.report_peer(peer_id, rep::BAD_PROTOCOL);
1169						self.network_service
1170							.disconnect_peer(peer_id, self.block_announce_protocol_name.clone());
1171					},
1172					RequestFailure::Network(OutboundFailure::DialFailure) => {
1173						self.network_service
1174							.disconnect_peer(peer_id, self.block_announce_protocol_name.clone());
1175					},
1176					RequestFailure::Refused => {
1177						self.network_service.report_peer(peer_id, rep::REFUSED);
1178						self.network_service
1179							.disconnect_peer(peer_id, self.block_announce_protocol_name.clone());
1180					},
1181					RequestFailure::Network(OutboundFailure::ConnectionClosed) |
1182					RequestFailure::NotConnected => {
1183						self.network_service
1184							.disconnect_peer(peer_id, self.block_announce_protocol_name.clone());
1185					},
1186					RequestFailure::UnknownProtocol => {
1187						debug_assert!(false, "Block request protocol should always be known.");
1188					},
1189					RequestFailure::Obsolete => {
1190						debug_assert!(
1191							false,
1192							"Can not receive `RequestFailure::Obsolete` after dropping the \
1193								response receiver.",
1194						);
1195					},
1196				}
1197			},
1198			Err(oneshot::Canceled) => {
1199				trace!(
1200					target: LOG_TARGET,
1201					"Request to peer {peer_id:?} failed due to oneshot being canceled.",
1202				);
1203				self.network_service
1204					.disconnect_peer(peer_id, self.block_announce_protocol_name.clone());
1205			},
1206		}
1207	}
1208
1209	/// Returns the number of peers we're connected to and that are being queried.
1210	fn num_active_peers(&self) -> usize {
1211		self.pending_responses.len()
1212	}
1213
1214	/// Get config for the block announcement protocol
1215	fn get_block_announce_proto_config<N: NetworkBackend<B, <B as BlockT>::Hash>>(
1216		protocol_id: ProtocolId,
1217		fork_id: &Option<String>,
1218		roles: Roles,
1219		best_number: NumberFor<B>,
1220		best_hash: B::Hash,
1221		genesis_hash: B::Hash,
1222		set_config: &SetConfig,
1223		metrics: NotificationMetrics,
1224		peer_store_handle: Arc<dyn PeerStoreProvider>,
1225	) -> (N::NotificationProtocolConfig, Box<dyn NotificationService>) {
1226		let block_announces_protocol = {
1227			let genesis_hash = genesis_hash.as_ref();
1228			if let Some(ref fork_id) = fork_id {
1229				format!(
1230					"/{}/{}/block-announces/1",
1231					array_bytes::bytes2hex("", genesis_hash),
1232					fork_id
1233				)
1234			} else {
1235				format!("/{}/block-announces/1", array_bytes::bytes2hex("", genesis_hash))
1236			}
1237		};
1238
1239		N::notification_config(
1240			block_announces_protocol.into(),
1241			iter::once(format!("/{}/block-announces/1", protocol_id.as_ref()).into()).collect(),
1242			MAX_BLOCK_ANNOUNCE_SIZE,
1243			Some(NotificationHandshake::new(BlockAnnouncesHandshake::<B>::build(
1244				roles,
1245				best_number,
1246				best_hash,
1247				genesis_hash,
1248			))),
1249			set_config.clone(),
1250			metrics,
1251			peer_store_handle,
1252		)
1253	}
1254
1255	/// Import blocks.
1256	fn import_blocks(&mut self, origin: BlockOrigin, blocks: Vec<IncomingBlock<B>>) {
1257		if let Some(metrics) = &self.metrics {
1258			metrics.import_queue_blocks_submitted.inc();
1259		}
1260
1261		self.import_queue.import_blocks(origin, blocks);
1262	}
1263
1264	/// Import justifications.
1265	fn import_justifications(
1266		&mut self,
1267		peer_id: PeerId,
1268		hash: B::Hash,
1269		number: NumberFor<B>,
1270		justifications: Justifications,
1271	) {
1272		if let Some(metrics) = &self.metrics {
1273			metrics.import_queue_justifications_submitted.inc();
1274		}
1275
1276		self.import_queue.import_justifications(peer_id, hash, number, justifications);
1277	}
1278}