referrerpolicy=no-referrer-when-downgrade

sc_network_sync/
engine.rs

1// This file is part of Substrate.
2
3// Copyright (C) Parity Technologies (UK) Ltd.
4// SPDX-License-Identifier: GPL-3.0-or-later WITH Classpath-exception-2.0
5
6// This program is free software: you can redistribute it and/or modify
7// it under the terms of the GNU General Public License as published by
8// the Free Software Foundation, either version 3 of the License, or
9// (at your option) any later version.
10
11// This program is distributed in the hope that it will be useful,
12// but WITHOUT ANY WARRANTY; without even the implied warranty of
13// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
14// GNU General Public License for more details.
15
16// You should have received a copy of the GNU General Public License
17// along with this program. If not, see <https://www.gnu.org/licenses/>.
18
19//! `SyncingEngine` is the actor responsible for syncing Substrate chain
20//! to tip and keep the blockchain up to date with network updates.
21
22use crate::{
23	block_announce_validator::{
24		BlockAnnounceValidationResult, BlockAnnounceValidator as BlockAnnounceValidatorStream,
25	},
26	pending_responses::{PendingResponses, ResponseEvent},
27	service::{
28		self,
29		syncing_service::{SyncingService, ToServiceCommand},
30	},
31	strategy::{SyncingAction, SyncingStrategy},
32	types::{BadPeer, ExtendedPeerInfo, SyncEvent},
33	LOG_TARGET,
34};
35
36use codec::{Decode, DecodeAll, Encode};
37use futures::{channel::oneshot, StreamExt};
38use log::{debug, error, trace, warn};
39use prometheus_endpoint::{
40	register, Counter, Gauge, MetricSource, Opts, PrometheusError, Registry, SourcedGauge, U64,
41};
42use schnellru::{ByLength, LruMap};
43use tokio::time::{Interval, MissedTickBehavior};
44
45use sc_client_api::{BlockBackend, HeaderBackend, ProofProvider};
46use sc_consensus::{import_queue::ImportQueueService, IncomingBlock};
47use sc_network::{
48	config::{FullNetworkConfiguration, NotificationHandshake, ProtocolId, SetConfig},
49	peer_store::PeerStoreProvider,
50	request_responses::{OutboundFailure, RequestFailure},
51	service::{
52		traits::{Direction, NotificationConfig, NotificationEvent, ValidationResult},
53		NotificationMetrics,
54	},
55	types::ProtocolName,
56	utils::LruHashSet,
57	NetworkBackend, NotificationService, ReputationChange,
58};
59use sc_network_common::{
60	role::Roles,
61	sync::message::{BlockAnnounce, BlockAnnouncesHandshake, BlockState},
62};
63use sc_network_types::PeerId;
64use sc_utils::mpsc::{tracing_unbounded, TracingUnboundedReceiver, TracingUnboundedSender};
65use sp_blockchain::{Error as ClientError, HeaderMetadata};
66use sp_consensus::{block_validation::BlockAnnounceValidator, BlockOrigin};
67use sp_runtime::{
68	traits::{Block as BlockT, Header, NumberFor, Zero},
69	Justifications,
70};
71
72use std::{
73	collections::{HashMap, HashSet},
74	iter,
75	num::NonZeroUsize,
76	sync::{
77		atomic::{AtomicBool, AtomicUsize, Ordering},
78		Arc,
79	},
80};
81
82/// Interval at which we perform time based maintenance
83const TICK_TIMEOUT: std::time::Duration = std::time::Duration::from_millis(1100);
84
85/// Maximum number of known block hashes to keep for a peer.
86const MAX_KNOWN_BLOCKS: usize = 1024; // ~32kb per peer + LruHashSet overhead
87
88/// Maximum allowed size for a block announce.
89const MAX_BLOCK_ANNOUNCE_SIZE: u64 = 1024 * 1024;
90
91/// Generate the block announces protocol name from the genesis hash and fork id.
92pub fn block_announces_protocol_name<Hash: AsRef<[u8]>>(
93	genesis_hash: Hash,
94	fork_id: Option<&str>,
95) -> String {
96	let genesis_hash = genesis_hash.as_ref();
97	if let Some(fork_id) = fork_id {
98		format!("/{}/{}/block-announces/1", array_bytes::bytes2hex("", genesis_hash), fork_id)
99	} else {
100		format!("/{}/block-announces/1", array_bytes::bytes2hex("", genesis_hash))
101	}
102}
103
104/// Generate the legacy block announces protocol name from chain specific protocol identifier.
105pub fn block_announces_legacy_protocol_name(protocol_id: &ProtocolId) -> String {
106	format!("/{}/block-announces/1", protocol_id.as_ref())
107}
108
109mod rep {
110	use sc_network::ReputationChange as Rep;
111	/// Peer has different genesis.
112	pub const GENESIS_MISMATCH: Rep = Rep::new_fatal("Genesis mismatch");
113	/// Peer send us a block announcement that failed at validation.
114	pub const BAD_BLOCK_ANNOUNCEMENT: Rep = Rep::new(-(1 << 12), "Bad block announcement");
115	/// Peer is on unsupported protocol version.
116	pub const BAD_PROTOCOL: Rep = Rep::new_fatal("Unsupported protocol");
117	/// Reputation change when a peer refuses a request.
118	pub const REFUSED: Rep = Rep::new(-(1 << 10), "Request refused");
119	/// Reputation change when a peer doesn't respond in time to our messages.
120	pub const TIMEOUT: Rep = Rep::new(-(1 << 10), "Request timeout");
121	/// Reputation change when a peer connection failed with IO error.
122	pub const IO: Rep = Rep::new(-(1 << 10), "IO error during request");
123}
124
125struct Metrics {
126	peers: Gauge<U64>,
127	import_queue_blocks_submitted: Counter<U64>,
128	import_queue_justifications_submitted: Counter<U64>,
129}
130
131impl Metrics {
132	fn register(r: &Registry, major_syncing: Arc<AtomicBool>) -> Result<Self, PrometheusError> {
133		MajorSyncingGauge::register(r, major_syncing)?;
134		Ok(Self {
135			peers: {
136				let g = Gauge::new("substrate_sync_peers", "Number of peers we sync with")?;
137				register(g, r)?
138			},
139			import_queue_blocks_submitted: {
140				let c = Counter::new(
141					"substrate_sync_import_queue_blocks_submitted",
142					"Number of blocks submitted to the import queue.",
143				)?;
144				register(c, r)?
145			},
146			import_queue_justifications_submitted: {
147				let c = Counter::new(
148					"substrate_sync_import_queue_justifications_submitted",
149					"Number of justifications submitted to the import queue.",
150				)?;
151				register(c, r)?
152			},
153		})
154	}
155}
156
157/// The "major syncing" metric.
158#[derive(Clone)]
159pub struct MajorSyncingGauge(Arc<AtomicBool>);
160
161impl MajorSyncingGauge {
162	/// Registers the [`MajorSyncGauge`] metric whose value is
163	/// obtained from the given `AtomicBool`.
164	fn register(registry: &Registry, value: Arc<AtomicBool>) -> Result<(), PrometheusError> {
165		prometheus_endpoint::register(
166			SourcedGauge::new(
167				&Opts::new(
168					"substrate_sub_libp2p_is_major_syncing",
169					"Whether the node is performing a major sync or not.",
170				),
171				MajorSyncingGauge(value),
172			)?,
173			registry,
174		)?;
175
176		Ok(())
177	}
178}
179
180impl MetricSource for MajorSyncingGauge {
181	type N = u64;
182
183	fn collect(&self, mut set: impl FnMut(&[&str], Self::N)) {
184		set(&[], self.0.load(Ordering::Relaxed) as u64);
185	}
186}
187
188/// Peer information
189#[derive(Debug)]
190pub struct Peer<B: BlockT> {
191	pub info: ExtendedPeerInfo<B>,
192	/// Holds a set of blocks known to this peer.
193	pub known_blocks: LruHashSet<B::Hash>,
194	/// Is the peer inbound.
195	inbound: bool,
196}
197
198pub struct SyncingEngine<B: BlockT, Client> {
199	/// Syncing strategy.
200	strategy: Box<dyn SyncingStrategy<B>>,
201
202	/// Blockchain client.
203	client: Arc<Client>,
204
205	/// Number of peers we're connected to.
206	num_connected: Arc<AtomicUsize>,
207
208	/// Are we actively catching up with the chain?
209	is_major_syncing: Arc<AtomicBool>,
210
211	/// Network service.
212	network_service: service::network::NetworkServiceHandle,
213
214	/// Channel for receiving service commands
215	service_rx: TracingUnboundedReceiver<ToServiceCommand<B>>,
216
217	/// Assigned roles.
218	roles: Roles,
219
220	/// Genesis hash.
221	genesis_hash: B::Hash,
222
223	/// Set of channels for other protocols that have subscribed to syncing events.
224	event_streams: Vec<TracingUnboundedSender<SyncEvent>>,
225
226	/// Interval at which we call `tick`.
227	tick_timeout: Interval,
228
229	/// All connected peers. Contains both full and light node peers.
230	peers: HashMap<PeerId, Peer<B>>,
231
232	/// List of nodes for which we perform additional logging because they are important for the
233	/// user.
234	important_peers: HashSet<PeerId>,
235
236	/// Actual list of connected no-slot nodes.
237	default_peers_set_no_slot_connected_peers: HashSet<PeerId>,
238
239	/// List of nodes that should never occupy peer slots.
240	default_peers_set_no_slot_peers: HashSet<PeerId>,
241
242	/// Value that was passed as part of the configuration. Used to cap the number of full
243	/// nodes.
244	default_peers_set_num_full: usize,
245
246	/// Number of slots to allocate to light nodes.
247	default_peers_set_num_light: usize,
248
249	/// Maximum number of inbound peers.
250	max_in_peers: usize,
251
252	/// Number of inbound peers accepted so far.
253	num_in_peers: usize,
254
255	/// Dynamic updatable no-slot peer set (see [`SyncingService::set_no_slot_peers`]).
256	/// Treated identically to `default_peers_set_no_slot_peers` for inbound slot accounting.
257	dynamic_no_slot_peers: HashSet<PeerId>,
258
259	/// Async processor of block announce validations.
260	block_announce_validator: BlockAnnounceValidatorStream<B>,
261
262	/// A cache for the data that was associated to a block announcement.
263	block_announce_data_cache: LruMap<B::Hash, Vec<u8>>,
264
265	/// The `PeerId`'s of all boot nodes.
266	boot_node_ids: HashSet<PeerId>,
267
268	/// Protocol name used for block announcements
269	block_announce_protocol_name: ProtocolName,
270
271	/// Prometheus metrics.
272	metrics: Option<Metrics>,
273
274	/// Handle that is used to communicate with `sc_network::Notifications`.
275	notification_service: Box<dyn NotificationService>,
276
277	/// Handle to `PeerStore`.
278	peer_store_handle: Arc<dyn PeerStoreProvider>,
279
280	/// Pending responses
281	pending_responses: PendingResponses,
282
283	/// Handle to import queue.
284	import_queue: Box<dyn ImportQueueService<B>>,
285}
286
287impl<B: BlockT, Client> SyncingEngine<B, Client>
288where
289	B: BlockT,
290	Client: HeaderBackend<B>
291		+ BlockBackend<B>
292		+ HeaderMetadata<B, Error = sp_blockchain::Error>
293		+ ProofProvider<B>
294		+ Send
295		+ Sync
296		+ 'static,
297{
298	pub fn new<N>(
299		roles: Roles,
300		client: Arc<Client>,
301		metrics_registry: Option<&Registry>,
302		network_metrics: NotificationMetrics,
303		net_config: &FullNetworkConfiguration<B, <B as BlockT>::Hash, N>,
304		protocol_id: ProtocolId,
305		fork_id: Option<&str>,
306		block_announce_validator: Box<dyn BlockAnnounceValidator<B> + Send>,
307		syncing_strategy: Box<dyn SyncingStrategy<B>>,
308		network_service: service::network::NetworkServiceHandle,
309		import_queue: Box<dyn ImportQueueService<B>>,
310		peer_store_handle: Arc<dyn PeerStoreProvider>,
311	) -> Result<(Self, SyncingService<B>, N::NotificationProtocolConfig), ClientError>
312	where
313		N: NetworkBackend<B, <B as BlockT>::Hash>,
314	{
315		let cache_capacity = (net_config.network_config.default_peers_set.in_peers +
316			net_config.network_config.default_peers_set.out_peers)
317			.max(1);
318		let important_peers = {
319			let mut imp_p = HashSet::new();
320			for reserved in &net_config.network_config.default_peers_set.reserved_nodes {
321				imp_p.insert(reserved.peer_id);
322			}
323			for config in net_config.notification_protocols() {
324				let peer_ids = config.set_config().reserved_nodes.iter().map(|info| info.peer_id);
325				imp_p.extend(peer_ids);
326			}
327
328			imp_p.shrink_to_fit();
329			imp_p
330		};
331		let boot_node_ids = {
332			let mut list = HashSet::new();
333			for node in &net_config.network_config.boot_nodes {
334				list.insert(node.peer_id);
335			}
336			list.shrink_to_fit();
337			list
338		};
339		let default_peers_set_no_slot_peers = {
340			let mut no_slot_p: HashSet<PeerId> = net_config
341				.network_config
342				.default_peers_set
343				.reserved_nodes
344				.iter()
345				.map(|reserved| reserved.peer_id)
346				.collect();
347			no_slot_p.shrink_to_fit();
348			no_slot_p
349		};
350		let default_peers_set_num_full =
351			net_config.network_config.default_peers_set_num_full as usize;
352		let default_peers_set_num_light = {
353			let total = net_config.network_config.default_peers_set.out_peers +
354				net_config.network_config.default_peers_set.in_peers;
355			total.saturating_sub(net_config.network_config.default_peers_set_num_full) as usize
356		};
357
358		let info = client.info();
359
360		let (block_announce_config, notification_service) =
361			Self::get_block_announce_proto_config::<N>(
362				protocol_id,
363				fork_id,
364				roles,
365				info.best_number,
366				info.best_hash,
367				info.genesis_hash,
368				&net_config.network_config.default_peers_set,
369				network_metrics,
370				Arc::clone(&peer_store_handle),
371			);
372
373		let block_announce_protocol_name = block_announce_config.protocol_name().clone();
374		let (tx, service_rx) = tracing_unbounded("mpsc_chain_sync", 100_000);
375		let num_connected = Arc::new(AtomicUsize::new(0));
376		let is_major_syncing = Arc::new(AtomicBool::new(false));
377
378		// `default_peers_set.in_peers` contains an unspecified amount of light peers so the number
379		// of full inbound peers must be calculated from the total full peer count
380		let max_full_peers = net_config.network_config.default_peers_set_num_full;
381		let max_out_peers = net_config.network_config.default_peers_set.out_peers;
382		let max_in_peers = (max_full_peers - max_out_peers) as usize;
383
384		let tick_timeout = {
385			let mut interval = tokio::time::interval(TICK_TIMEOUT);
386			interval.set_missed_tick_behavior(MissedTickBehavior::Delay);
387			interval
388		};
389
390		Ok((
391			Self {
392				roles,
393				client,
394				strategy: syncing_strategy,
395				network_service,
396				peers: HashMap::new(),
397				block_announce_data_cache: LruMap::new(ByLength::new(cache_capacity)),
398				block_announce_protocol_name,
399				block_announce_validator: BlockAnnounceValidatorStream::new(
400					block_announce_validator,
401				),
402				num_connected: num_connected.clone(),
403				is_major_syncing: is_major_syncing.clone(),
404				service_rx,
405				genesis_hash: info.genesis_hash,
406				important_peers,
407				default_peers_set_no_slot_connected_peers: HashSet::new(),
408				boot_node_ids,
409				default_peers_set_no_slot_peers,
410				default_peers_set_num_full,
411				default_peers_set_num_light,
412				num_in_peers: 0usize,
413				max_in_peers,
414				dynamic_no_slot_peers: HashSet::new(),
415				event_streams: Vec::new(),
416				notification_service,
417				tick_timeout,
418				peer_store_handle,
419				metrics: if let Some(r) = metrics_registry {
420					match Metrics::register(r, is_major_syncing.clone()) {
421						Ok(metrics) => Some(metrics),
422						Err(err) => {
423							log::error!(target: LOG_TARGET, "Failed to register metrics {err:?}");
424							None
425						},
426					}
427				} else {
428					None
429				},
430				pending_responses: PendingResponses::new(),
431				import_queue,
432			},
433			SyncingService::new(tx, num_connected, is_major_syncing),
434			block_announce_config,
435		))
436	}
437
438	fn update_peer_info(
439		&mut self,
440		peer_id: &PeerId,
441		best_hash: B::Hash,
442		best_number: NumberFor<B>,
443	) {
444		if let Some(ref mut peer) = self.peers.get_mut(peer_id) {
445			peer.info.best_hash = best_hash;
446			peer.info.best_number = best_number;
447		}
448	}
449
450	/// Process the result of the block announce validation.
451	fn process_block_announce_validation_result(
452		&mut self,
453		validation_result: BlockAnnounceValidationResult<B::Header>,
454	) {
455		match validation_result {
456			BlockAnnounceValidationResult::Skip { peer_id: _ } => {},
457			BlockAnnounceValidationResult::Process { is_new_best, peer_id, announce } => {
458				if let Some((best_hash, best_number)) =
459					self.strategy.on_validated_block_announce(is_new_best, peer_id, &announce)
460				{
461					self.update_peer_info(&peer_id, best_hash, best_number);
462				}
463
464				if let Some(data) = announce.data {
465					if !data.is_empty() {
466						self.block_announce_data_cache.insert(announce.header.hash(), data);
467					}
468				}
469			},
470			BlockAnnounceValidationResult::Failure { peer_id, disconnect } => {
471				if disconnect {
472					log::debug!(
473						target: LOG_TARGET,
474						"Disconnecting peer {peer_id} due to block announce validation failure",
475					);
476					self.network_service
477						.disconnect_peer(peer_id, self.block_announce_protocol_name.clone());
478				}
479
480				self.network_service.report_peer(peer_id, rep::BAD_BLOCK_ANNOUNCEMENT);
481			},
482		}
483	}
484
485	/// Push a block announce validation.
486	pub fn push_block_announce_validation(
487		&mut self,
488		peer_id: PeerId,
489		announce: BlockAnnounce<B::Header>,
490	) {
491		let hash = announce.header.hash();
492
493		let peer = match self.peers.get_mut(&peer_id) {
494			Some(p) => p,
495			None => {
496				log::error!(
497					target: LOG_TARGET,
498					"Received block announce from disconnected peer {peer_id}",
499				);
500				debug_assert!(false);
501				return;
502			},
503		};
504		peer.known_blocks.insert(hash);
505
506		if peer.info.roles.is_full() {
507			let is_best = match announce.state.unwrap_or(BlockState::Best) {
508				BlockState::Best => true,
509				BlockState::Normal => false,
510			};
511
512			self.block_announce_validator
513				.push_block_announce_validation(peer_id, hash, announce, is_best);
514		}
515	}
516
517	/// Make sure an important block is propagated to peers.
518	///
519	/// In chain-based consensus, we often need to make sure non-best forks are
520	/// at least temporarily synced.
521	pub fn announce_block(&mut self, hash: B::Hash, data: Option<Vec<u8>>) {
522		let header = match self.client.header(hash) {
523			Ok(Some(header)) => header,
524			Ok(None) => {
525				log::warn!(target: LOG_TARGET, "Trying to announce unknown block: {hash}");
526				return;
527			},
528			Err(e) => {
529				log::warn!(target: LOG_TARGET, "Error reading block header {hash}: {e}");
530				return;
531			},
532		};
533
534		// don't announce genesis block since it will be ignored
535		if header.number().is_zero() {
536			return;
537		}
538
539		let is_best = self.client.info().best_hash == hash;
540		log::debug!(target: LOG_TARGET, "Reannouncing block {hash:?} is_best: {is_best}");
541
542		let data = data
543			.or_else(|| self.block_announce_data_cache.get(&hash).cloned())
544			.unwrap_or_default();
545
546		for (peer_id, ref mut peer) in self.peers.iter_mut() {
547			let inserted = peer.known_blocks.insert(hash);
548			if inserted {
549				log::trace!(target: LOG_TARGET, "Announcing block {hash:?} to {peer_id}");
550				let message = BlockAnnounce {
551					header: header.clone(),
552					state: if is_best { Some(BlockState::Best) } else { Some(BlockState::Normal) },
553					data: Some(data.clone()),
554				};
555
556				let _ = self.notification_service.send_sync_notification(peer_id, message.encode());
557			}
558		}
559	}
560
561	pub async fn run(mut self) {
562		loop {
563			tokio::select! {
564				_ = self.tick_timeout.tick() => {
565					// TODO: This tick should not be necessary, but
566					//  `self.process_strategy_actions()` is not called in some cases otherwise and
567					//  some tests fail because of this
568				},
569				command = self.service_rx.select_next_some() =>
570					self.process_service_command(command),
571				notification_event = self.notification_service.next_event() => match notification_event {
572					Some(event) => self.process_notification_event(event),
573					None => {
574						error!(
575							target: LOG_TARGET,
576							"Terminating `SyncingEngine` because `NotificationService` has terminated.",
577						);
578
579						return;
580					}
581				},
582				response_event = self.pending_responses.select_next_some() =>
583					self.process_response_event(response_event),
584				validation_result = self.block_announce_validator.select_next_some() =>
585					self.process_block_announce_validation_result(validation_result),
586			}
587
588			// Update atomic variables
589			self.is_major_syncing.store(self.strategy.is_major_syncing(), Ordering::Relaxed);
590
591			// Process actions requested by a syncing strategy.
592			if let Err(e) = self.process_strategy_actions() {
593				error!(
594					target: LOG_TARGET,
595					"Terminating `SyncingEngine` due to fatal error: {e:?}.",
596				);
597				return;
598			}
599		}
600	}
601
602	fn process_strategy_actions(&mut self) -> Result<(), ClientError> {
603		for action in self.strategy.actions(&self.network_service)? {
604			match action {
605				SyncingAction::StartRequest { peer_id, key, request, remove_obsolete } => {
606					if !self.peers.contains_key(&peer_id) {
607						trace!(
608							target: LOG_TARGET,
609							"Cannot start request with strategy key {key:?} to unknown peer \
610							{peer_id}",
611						);
612						debug_assert!(false);
613						continue;
614					}
615					if remove_obsolete {
616						if self.pending_responses.remove(peer_id, key) {
617							warn!(
618								target: LOG_TARGET,
619								"Processed `SyncingAction::StartRequest` to {peer_id} with \
620								strategy key {key:?}. Stale response removed!",
621							)
622						} else {
623							trace!(
624								target: LOG_TARGET,
625								"Processed `SyncingAction::StartRequest` to {peer_id} with \
626								strategy key {key:?}.",
627							)
628						}
629					}
630
631					self.pending_responses.insert(peer_id, key, request);
632				},
633				SyncingAction::CancelRequest { peer_id, key } => {
634					let removed = self.pending_responses.remove(peer_id, key);
635
636					trace!(
637						target: LOG_TARGET,
638						"Processed `SyncingAction::CancelRequest`, response removed: {removed}.",
639					);
640				},
641				SyncingAction::DropPeer(BadPeer(peer_id, rep)) => {
642					self.pending_responses.remove_all(&peer_id);
643					self.network_service
644						.disconnect_peer(peer_id, self.block_announce_protocol_name.clone());
645					self.network_service.report_peer(peer_id, rep);
646
647					trace!(target: LOG_TARGET, "{peer_id:?} dropped: {rep:?}.");
648				},
649				SyncingAction::ImportBlocks { origin, blocks } => {
650					let count = blocks.len();
651					self.import_blocks(origin, blocks);
652
653					trace!(
654						target: LOG_TARGET,
655						"Processed `ChainSyncAction::ImportBlocks` with {count} blocks.",
656					);
657				},
658				SyncingAction::ImportJustifications { peer_id, hash, number, justifications } => {
659					self.import_justifications(peer_id, hash, number, justifications);
660
661					trace!(
662						target: LOG_TARGET,
663						"Processed `ChainSyncAction::ImportJustifications` from peer {} for block {} ({}).",
664						peer_id,
665						hash,
666						number,
667					)
668				},
669				// Nothing to do, this is handled internally by `PolkadotSyncingStrategy`.
670				SyncingAction::Finished => {},
671			}
672		}
673
674		Ok(())
675	}
676
677	/// Reconcile per-peer slot tracking against `new_dynamic_no_slot`. See
678	/// [`apply_no_slot_set_inner`] for details.
679	fn apply_no_slot_set(&mut self, new_dynamic_no_slot: HashSet<PeerId>) {
680		let connected_peers = &self.peers;
681		apply_no_slot_set_inner(
682			|peer_id| {
683				connected_peers
684					.get(peer_id)
685					.map(|peer| peer.inbound && peer.info.roles.is_full())
686			},
687			&self.default_peers_set_no_slot_peers,
688			&self.dynamic_no_slot_peers,
689			&new_dynamic_no_slot,
690			&mut self.default_peers_set_no_slot_connected_peers,
691			&mut self.num_in_peers,
692			self.max_in_peers,
693			&self.network_service,
694			&self.block_announce_protocol_name,
695		);
696		self.dynamic_no_slot_peers = new_dynamic_no_slot;
697	}
698
699	fn process_service_command(&mut self, command: ToServiceCommand<B>) {
700		match command {
701			ToServiceCommand::SetSyncForkRequest(peers, hash, number) => {
702				self.strategy.set_sync_fork_request(peers, &hash, number);
703			},
704			ToServiceCommand::EventStream(tx) => {
705				// Let a new subscriber know about already connected peers.
706				for peer_id in self.peers.keys() {
707					let _ = tx.unbounded_send(SyncEvent::PeerConnected(*peer_id));
708				}
709				self.event_streams.push(tx);
710			},
711			ToServiceCommand::RequestJustification(hash, number) => {
712				self.strategy.request_justification(&hash, number)
713			},
714			ToServiceCommand::ClearJustificationRequests => {
715				self.strategy.clear_justification_requests()
716			},
717			ToServiceCommand::BlocksProcessed(imported, count, results) => {
718				self.strategy.on_blocks_processed(imported, count, results);
719			},
720			ToServiceCommand::JustificationImported(peer_id, hash, number, import_result) => {
721				let success =
722					matches!(import_result, sc_consensus::JustificationImportResult::Success);
723				self.strategy.on_justification_import(hash, number, success);
724
725				match import_result {
726					sc_consensus::JustificationImportResult::OutdatedJustification => {
727						log::info!(
728							target: LOG_TARGET,
729							"๐Ÿ’” Outdated justification provided by {peer_id} for #{hash}",
730						);
731					},
732					sc_consensus::JustificationImportResult::Failure => {
733						log::info!(
734							target: LOG_TARGET,
735							"๐Ÿ’” Invalid justification provided by {peer_id} for #{hash}",
736						);
737						self.network_service
738							.disconnect_peer(peer_id, self.block_announce_protocol_name.clone());
739						self.network_service.report_peer(
740							peer_id,
741							ReputationChange::new_fatal("Invalid justification"),
742						);
743					},
744					sc_consensus::JustificationImportResult::Success => {
745						log::debug!(
746							target: LOG_TARGET,
747							"Justification for block #{hash} ({number}) imported from {peer_id} successfully",
748						);
749					},
750				}
751			},
752			ToServiceCommand::AnnounceBlock(hash, data) => self.announce_block(hash, data),
753			ToServiceCommand::NewBestBlockImported(hash, number) => {
754				log::debug!(target: LOG_TARGET, "New best block imported {:?}/#{}", hash, number);
755
756				self.strategy.update_chain_info(&hash, number);
757				let _ = self.notification_service.try_set_handshake(
758					BlockAnnouncesHandshake::<B>::build(
759						self.roles,
760						number,
761						hash,
762						self.genesis_hash,
763					)
764					.encode(),
765				);
766			},
767			ToServiceCommand::Status(tx) => {
768				let _ = tx.send(self.strategy.status());
769			},
770			ToServiceCommand::NumActivePeers(tx) => {
771				let _ = tx.send(self.num_active_peers());
772			},
773			ToServiceCommand::NumDownloadedBlocks(tx) => {
774				let _ = tx.send(self.strategy.num_downloaded_blocks());
775			},
776			ToServiceCommand::NumSyncRequests(tx) => {
777				let _ = tx.send(self.strategy.num_sync_requests());
778			},
779			ToServiceCommand::PeersInfo(tx) => {
780				let peers_info =
781					self.peers.iter().map(|(peer_id, peer)| (*peer_id, peer.info)).collect();
782				let _ = tx.send(peers_info);
783			},
784			ToServiceCommand::SetNoSlotPeers(peers) => self.apply_no_slot_set(peers),
785			ToServiceCommand::OnBlockFinalized(hash, header) => {
786				self.strategy.on_block_finalized(&hash, *header.number())
787			},
788		}
789	}
790
791	fn process_notification_event(&mut self, event: NotificationEvent) {
792		match event {
793			NotificationEvent::ValidateInboundSubstream { peer, handshake, result_tx } => {
794				let validation_result = self
795					.validate_connection(&peer, handshake, Direction::Inbound)
796					.map_or(ValidationResult::Reject, |_| ValidationResult::Accept);
797
798				let _ = result_tx.send(validation_result);
799			},
800			NotificationEvent::NotificationStreamOpened { peer, handshake, direction, .. } => {
801				log::debug!(
802					target: LOG_TARGET,
803					"Substream opened for {peer}, handshake {handshake:?}"
804				);
805
806				match self.validate_connection(&peer, handshake, direction) {
807					Ok(handshake) => {
808						if self.on_sync_peer_connected(peer, &handshake, direction).is_err() {
809							log::debug!(target: LOG_TARGET, "Failed to register peer {peer}");
810							self.network_service
811								.disconnect_peer(peer, self.block_announce_protocol_name.clone());
812						}
813					},
814					Err(wrong_genesis) => {
815						log::debug!(target: LOG_TARGET, "`SyncingEngine` rejected {peer}");
816
817						if wrong_genesis {
818							self.peer_store_handle.report_peer(peer, rep::GENESIS_MISMATCH);
819						}
820
821						self.network_service
822							.disconnect_peer(peer, self.block_announce_protocol_name.clone());
823					},
824				}
825			},
826			NotificationEvent::NotificationStreamClosed { peer } => {
827				self.on_sync_peer_disconnected(peer);
828			},
829			NotificationEvent::NotificationReceived { peer, notification } => {
830				if !self.peers.contains_key(&peer) {
831					log::error!(
832						target: LOG_TARGET,
833						"received notification from {peer} who had been earlier refused by `SyncingEngine`",
834					);
835					return;
836				}
837
838				let Ok(announce) = BlockAnnounce::decode(&mut notification.as_ref()) else {
839					log::warn!(target: LOG_TARGET, "failed to decode block announce");
840					return;
841				};
842
843				self.push_block_announce_validation(peer, announce);
844			},
845		}
846	}
847
848	fn is_no_slot_peer(&self, peer_id: &PeerId) -> bool {
849		self.default_peers_set_no_slot_peers.contains(peer_id) ||
850			self.dynamic_no_slot_peers.contains(peer_id)
851	}
852
853	/// Called by peer when it is disconnecting.
854	///
855	/// Returns a result if the handshake of this peer was indeed accepted.
856	fn on_sync_peer_disconnected(&mut self, peer_id: PeerId) {
857		let Some(info) = self.peers.remove(&peer_id) else {
858			log::debug!(target: LOG_TARGET, "{peer_id} does not exist in `SyncingEngine`");
859			return;
860		};
861		if let Some(metrics) = &self.metrics {
862			metrics.peers.dec();
863		}
864		self.num_connected.fetch_sub(1, Ordering::AcqRel);
865
866		if self.important_peers.contains(&peer_id) {
867			log::warn!(target: LOG_TARGET, "Reserved peer {peer_id} disconnected");
868		} else {
869			log::debug!(target: LOG_TARGET, "{peer_id} disconnected");
870		}
871
872		if !self.default_peers_set_no_slot_connected_peers.remove(&peer_id) &&
873			info.inbound &&
874			info.info.roles.is_full()
875		{
876			match self.num_in_peers.checked_sub(1) {
877				Some(value) => {
878					self.num_in_peers = value;
879				},
880				None => {
881					log::error!(
882						target: LOG_TARGET,
883						"trying to disconnect an inbound node which is not counted as inbound"
884					);
885					debug_assert!(false);
886				},
887			}
888		}
889
890		self.strategy.remove_peer(&peer_id);
891		self.pending_responses.remove_all(&peer_id);
892		self.event_streams
893			.retain(|stream| stream.unbounded_send(SyncEvent::PeerDisconnected(peer_id)).is_ok());
894	}
895
896	/// Validate received handshake.
897	fn validate_handshake(
898		&mut self,
899		peer_id: &PeerId,
900		handshake: Vec<u8>,
901	) -> Result<BlockAnnouncesHandshake<B>, bool> {
902		log::trace!(target: LOG_TARGET, "Validate handshake for {peer_id}");
903
904		let handshake = <BlockAnnouncesHandshake<B> as DecodeAll>::decode_all(&mut &handshake[..])
905			.map_err(|error| {
906				log::debug!(target: LOG_TARGET, "Failed to decode handshake for {peer_id}: {error:?}");
907				false
908			})?;
909
910		if handshake.genesis_hash != self.genesis_hash {
911			if self.important_peers.contains(&peer_id) {
912				log::error!(
913					target: LOG_TARGET,
914					"Reserved peer id `{peer_id}` is on a different chain (our genesis: {} theirs: {})",
915					self.genesis_hash,
916					handshake.genesis_hash,
917				);
918			} else if self.boot_node_ids.contains(&peer_id) {
919				log::error!(
920					target: LOG_TARGET,
921					"Bootnode with peer id `{peer_id}` is on a different chain (our genesis: {} theirs: {})",
922					self.genesis_hash,
923					handshake.genesis_hash,
924				);
925			} else {
926				log::debug!(
927					target: LOG_TARGET,
928					"Peer is on different chain (our genesis: {} theirs: {})",
929					self.genesis_hash,
930					handshake.genesis_hash
931				);
932			}
933
934			return Err(true);
935		}
936
937		Ok(handshake)
938	}
939
940	/// Validate connection.
941	// NOTE Returning `Err(bool)` is a really ugly hack to work around the issue
942	// that `ProtocolController` thinks the peer is connected when in fact it can
943	// still be under validation. If the peer has different genesis than the
944	// local node the validation fails but the peer cannot be reported in
945	// `validate_connection()` as that is also called by
946	// `ValidateInboundSubstream` which means that the peer is still being
947	// validated and banning the peer when handling that event would
948	// result in peer getting dropped twice.
949	//
950	// The proper way to fix this is to integrate `ProtocolController` more
951	// tightly with `NotificationService` or add an additional API call for
952	// banning pre-accepted peers (which is not desirable)
953	fn validate_connection(
954		&mut self,
955		peer_id: &PeerId,
956		handshake: Vec<u8>,
957		direction: Direction,
958	) -> Result<BlockAnnouncesHandshake<B>, bool> {
959		log::trace!(target: LOG_TARGET, "New peer {peer_id} {handshake:?}");
960
961		let handshake = self.validate_handshake(peer_id, handshake)?;
962
963		if self.peers.contains_key(&peer_id) {
964			log::error!(
965				target: LOG_TARGET,
966				"Called `validate_connection()` with already connected peer {peer_id}",
967			);
968			debug_assert!(false);
969			return Err(false);
970		}
971
972		let no_slot_peer = self.is_no_slot_peer(&peer_id);
973		let this_peer_reserved_slot: usize = if no_slot_peer { 1 } else { 0 };
974
975		if handshake.roles.is_full() &&
976			self.strategy.num_peers() >=
977				self.default_peers_set_num_full +
978					self.default_peers_set_no_slot_connected_peers.len() +
979					this_peer_reserved_slot
980		{
981			log::debug!(
982				target: LOG_TARGET,
983				"Too many full nodes, rejecting {peer_id} (no_slot_peer={no_slot_peer}, num_peers={}, full_cap={}, no_slot_connected={}, this_reserved={})",
984				self.strategy.num_peers(),
985				self.default_peers_set_num_full,
986				self.default_peers_set_no_slot_connected_peers.len(),
987				this_peer_reserved_slot,
988			);
989			return Err(false);
990		}
991
992		// make sure to accept no more than `--in-peers` many full nodes
993		if !no_slot_peer &&
994			handshake.roles.is_full() &&
995			direction.is_inbound() &&
996			self.num_in_peers >= self.max_in_peers
997		{
998			if self.num_in_peers > self.max_in_peers {
999				log::warn!(
1000					target: LOG_TARGET,
1001					"num_in_peers ({}) exceeds max_in_peers ({}), this is a slot accounting bug ",
1002					self.num_in_peers,
1003					self.max_in_peers,
1004				);
1005				debug_assert!(false);
1006			}
1007			log::debug!(
1008				target: LOG_TARGET,
1009				"All inbound slots have been consumed, rejecting {peer_id} (no_slot_peer={no_slot_peer}, num_in_peers={}, max_in_peers={})",
1010				self.num_in_peers,
1011				self.max_in_peers,
1012			);
1013			return Err(false);
1014		}
1015
1016		// make sure that all slots are not occupied by light peers
1017		//
1018		// `ChainSync` only accepts full peers whereas `SyncingEngine` accepts both full and light
1019		// peers. Verify that there is a slot in `SyncingEngine` for the inbound light peer
1020		if handshake.roles.is_light() &&
1021			(self.peers.len() - self.strategy.num_peers()) >= self.default_peers_set_num_light
1022		{
1023			log::debug!(target: LOG_TARGET, "Too many light nodes, rejecting {peer_id}");
1024			return Err(false);
1025		}
1026
1027		Ok(handshake)
1028	}
1029
1030	/// Called on the first connection between two peers on the default set, after their exchange
1031	/// of handshake.
1032	///
1033	/// Returns `Ok` if the handshake is accepted and the peer added to the list of peers we sync
1034	/// from.
1035	fn on_sync_peer_connected(
1036		&mut self,
1037		peer_id: PeerId,
1038		status: &BlockAnnouncesHandshake<B>,
1039		direction: Direction,
1040	) -> Result<(), ()> {
1041		log::trace!(target: LOG_TARGET, "New peer {peer_id} {status:?}");
1042
1043		let peer = Peer {
1044			info: ExtendedPeerInfo {
1045				roles: status.roles,
1046				best_hash: status.best_hash,
1047				best_number: status.best_number,
1048			},
1049			known_blocks: LruHashSet::new(
1050				NonZeroUsize::new(MAX_KNOWN_BLOCKS).expect("Constant is nonzero"),
1051			),
1052			inbound: direction.is_inbound(),
1053		};
1054
1055		// Only forward full peers to syncing strategy.
1056		if status.roles.is_full() {
1057			self.strategy.add_peer(peer_id, peer.info.best_hash, peer.info.best_number);
1058		}
1059
1060		log::debug!(target: LOG_TARGET, "Connected {peer_id}");
1061
1062		if self.peers.insert(peer_id, peer).is_none() {
1063			if let Some(metrics) = &self.metrics {
1064				metrics.peers.inc();
1065			}
1066			self.num_connected.fetch_add(1, Ordering::AcqRel);
1067		}
1068		self.peer_store_handle.set_peer_role(&peer_id, status.roles.into());
1069
1070		if self.is_no_slot_peer(&peer_id) {
1071			self.default_peers_set_no_slot_connected_peers.insert(peer_id);
1072		} else if direction.is_inbound() && status.roles.is_full() {
1073			self.num_in_peers += 1;
1074		}
1075
1076		self.event_streams
1077			.retain(|stream| stream.unbounded_send(SyncEvent::PeerConnected(peer_id)).is_ok());
1078
1079		Ok(())
1080	}
1081
1082	fn process_response_event(&mut self, response_event: ResponseEvent) {
1083		let ResponseEvent { peer_id, key, response: response_result } = response_event;
1084
1085		match response_result {
1086			Ok(Ok((response, protocol_name))) => {
1087				self.strategy.on_generic_response(&peer_id, key, protocol_name, response);
1088			},
1089			Ok(Err(e)) => {
1090				debug!(target: LOG_TARGET, "Request to peer {peer_id:?} failed: {e:?}.");
1091
1092				match e {
1093					RequestFailure::Network(OutboundFailure::Timeout) => {
1094						self.network_service.report_peer(peer_id, rep::TIMEOUT);
1095						self.network_service
1096							.disconnect_peer(peer_id, self.block_announce_protocol_name.clone());
1097					},
1098					RequestFailure::Network(OutboundFailure::UnsupportedProtocols) => {
1099						self.network_service.report_peer(peer_id, rep::BAD_PROTOCOL);
1100						self.network_service
1101							.disconnect_peer(peer_id, self.block_announce_protocol_name.clone());
1102					},
1103					RequestFailure::Network(OutboundFailure::DialFailure) => {
1104						self.network_service
1105							.disconnect_peer(peer_id, self.block_announce_protocol_name.clone());
1106					},
1107					RequestFailure::Refused => {
1108						self.network_service.report_peer(peer_id, rep::REFUSED);
1109						self.network_service
1110							.disconnect_peer(peer_id, self.block_announce_protocol_name.clone());
1111					},
1112					RequestFailure::Network(OutboundFailure::ConnectionClosed) |
1113					RequestFailure::NotConnected => {
1114						self.network_service
1115							.disconnect_peer(peer_id, self.block_announce_protocol_name.clone());
1116					},
1117					RequestFailure::UnknownProtocol => {
1118						debug_assert!(false, "Block request protocol should always be known.");
1119					},
1120					RequestFailure::InvalidRequest => {
1121						debug_assert!(false, "Block request payload should always be valid.");
1122					},
1123					RequestFailure::Obsolete => {
1124						debug_assert!(
1125							false,
1126							"Can not receive `RequestFailure::Obsolete` after dropping the \
1127							response receiver.",
1128						);
1129					},
1130					RequestFailure::Network(OutboundFailure::Io(_)) => {
1131						self.network_service.report_peer(peer_id, rep::IO);
1132						self.network_service
1133							.disconnect_peer(peer_id, self.block_announce_protocol_name.clone());
1134					},
1135				}
1136			},
1137			Err(oneshot::Canceled) => {
1138				trace!(
1139					target: LOG_TARGET,
1140					"Request to peer {peer_id:?} failed due to oneshot being canceled.",
1141				);
1142				self.network_service
1143					.disconnect_peer(peer_id, self.block_announce_protocol_name.clone());
1144			},
1145		}
1146	}
1147
1148	/// Returns the number of peers we're connected to and that are being queried.
1149	fn num_active_peers(&self) -> usize {
1150		self.pending_responses.len()
1151	}
1152
1153	/// Get config for the block announcement protocol
1154	fn get_block_announce_proto_config<N: NetworkBackend<B, <B as BlockT>::Hash>>(
1155		protocol_id: ProtocolId,
1156		fork_id: Option<&str>,
1157		roles: Roles,
1158		best_number: NumberFor<B>,
1159		best_hash: B::Hash,
1160		genesis_hash: B::Hash,
1161		set_config: &SetConfig,
1162		metrics: NotificationMetrics,
1163		peer_store_handle: Arc<dyn PeerStoreProvider>,
1164	) -> (N::NotificationProtocolConfig, Box<dyn NotificationService>) {
1165		let block_announces_protocol = block_announces_protocol_name(genesis_hash, fork_id);
1166
1167		N::notification_config(
1168			block_announces_protocol.into(),
1169			iter::once(block_announces_legacy_protocol_name(&protocol_id).into()).collect(),
1170			MAX_BLOCK_ANNOUNCE_SIZE,
1171			Some(NotificationHandshake::new(BlockAnnouncesHandshake::<B>::build(
1172				roles,
1173				best_number,
1174				best_hash,
1175				genesis_hash,
1176			))),
1177			set_config.clone(),
1178			metrics,
1179			peer_store_handle,
1180		)
1181	}
1182
1183	/// Import blocks.
1184	fn import_blocks(&mut self, origin: BlockOrigin, blocks: Vec<IncomingBlock<B>>) {
1185		if let Some(metrics) = &self.metrics {
1186			metrics.import_queue_blocks_submitted.inc();
1187		}
1188
1189		self.import_queue.import_blocks(origin, blocks);
1190	}
1191
1192	/// Import justifications.
1193	fn import_justifications(
1194		&mut self,
1195		peer_id: PeerId,
1196		hash: B::Hash,
1197		number: NumberFor<B>,
1198		justifications: Justifications,
1199	) {
1200		if let Some(metrics) = &self.metrics {
1201			metrics.import_queue_justifications_submitted.inc();
1202		}
1203
1204		self.import_queue.import_justifications(peer_id, hash, number, justifications);
1205	}
1206}
1207
1208/// Update per-peer slot tracking for changes in the dynamic no-slot set.
1209/// Promotes newly added peers, demotes removed ones, ignoring static no-slot peers.
1210///
1211/// `peer_inbound_full(peer_id)` returns `true` if `peer_id` is inbound and full.
1212///  Returns `None` if the peer is not connected.
1213///
1214/// If removing a peer from no-slot would make `num_in_peers` exceed `max_in_peers`,
1215/// disconnect the peer instead and keep it in `connected_no_slot` until the async disconnect
1216/// handler will update `num_in_peers`..
1217/// Caller needs to update `dynamic_no_slot_peers` after calling this function.
1218fn apply_no_slot_set_inner(
1219	peer_inbound_full: impl Fn(&PeerId) -> Option<bool>,
1220	static_no_slot: &HashSet<PeerId>,
1221	old_dynamic_no_slot: &HashSet<PeerId>,
1222	new_dynamic_no_slot: &HashSet<PeerId>,
1223	connected_no_slot: &mut HashSet<PeerId>,
1224	num_in_peers: &mut usize,
1225	max_in_peers: usize,
1226	network_service: &service::network::NetworkServiceHandle,
1227	protocol: &ProtocolName,
1228) {
1229	// Skip static-set and disconnected peers and return the slot-affecting flag for the rest.
1230	let slot_impact = |peer_id: &PeerId| -> Option<bool> {
1231		if static_no_slot.contains(peer_id) {
1232			return None;
1233		}
1234		peer_inbound_full(peer_id)
1235	};
1236
1237	let mut promoted = 0;
1238	let mut demoted = 0;
1239	let mut disconnected = 0;
1240
1241	for peer_id in new_dynamic_no_slot.difference(old_dynamic_no_slot) {
1242		let Some(affects_slots) = slot_impact(peer_id) else { continue };
1243		// Defensive check, should never happen as we filter above.
1244		if !connected_no_slot.insert(*peer_id) {
1245			log::error!(
1246				target: LOG_TARGET,
1247				"{peer_id} promoted to no-slot but was already in connected_no_slot",
1248			);
1249			debug_assert!(false);
1250			continue;
1251		}
1252		if affects_slots {
1253			if let Some(n) = num_in_peers.checked_sub(1) {
1254				*num_in_peers = n;
1255			} else {
1256				log::error!(
1257					target: LOG_TARGET,
1258					"num_in_peers underflow promoting {peer_id} to no-slot",
1259				);
1260				debug_assert!(false);
1261			}
1262			promoted += 1;
1263		}
1264	}
1265
1266	for peer_id in old_dynamic_no_slot.difference(new_dynamic_no_slot) {
1267		let Some(affects_slots) = slot_impact(peer_id) else { continue };
1268		if !connected_no_slot.contains(peer_id) {
1269			continue;
1270		}
1271		if affects_slots && *num_in_peers >= max_in_peers {
1272			log::debug!(
1273				target: LOG_TARGET,
1274				"Demoting {peer_id} would exceed max_in_peers ({max_in_peers}); disconnecting",
1275			);
1276			network_service.disconnect_peer(*peer_id, protocol.clone());
1277			disconnected += 1;
1278			continue;
1279		}
1280		connected_no_slot.remove(peer_id);
1281		if affects_slots {
1282			*num_in_peers += 1;
1283			demoted += 1;
1284		}
1285	}
1286
1287	log::debug!(
1288		target: LOG_TARGET,
1289		"Dynamic no-slot peer set updated: {} peers: +{} in, -{} out, {} disconnected",
1290		new_dynamic_no_slot.len(),
1291		promoted,
1292		demoted,
1293		disconnected,
1294	);
1295}
1296
1297#[cfg(test)]
1298mod tests {
1299	use super::*;
1300
1301	fn fresh_peers<const N: usize>() -> [PeerId; N] {
1302		std::array::from_fn(|_| PeerId::random())
1303	}
1304
1305	fn set_of<const N: usize>(peers: [PeerId; N]) -> HashSet<PeerId> {
1306		peers.into_iter().collect()
1307	}
1308
1309	/// Run [`apply_no_slot_set`] with the given initial state. Uses `usize::MAX` for
1310	/// `max_in_peers` so demotion never trips the disconnect path. Returns the final
1311	/// `connected_no_slot` set and `num_in_peers`.
1312	#[track_caller]
1313	fn run_apply(
1314		connected: Vec<(PeerId, bool)>,
1315		static_no_slot: HashSet<PeerId>,
1316		old_dynamic: HashSet<PeerId>,
1317		new_dynamic: HashSet<PeerId>,
1318		initial_connected_no_slot: HashSet<PeerId>,
1319		initial_num_in_peers: usize,
1320	) -> (HashSet<PeerId>, usize) {
1321		let (connected_no_slot, num_in, disconnects) = run_apply_with_cap(
1322			connected,
1323			static_no_slot,
1324			old_dynamic,
1325			new_dynamic,
1326			initial_connected_no_slot,
1327			initial_num_in_peers,
1328			usize::MAX,
1329		);
1330		assert!(disconnects.is_empty(), "unexpected disconnects: {disconnects:?}");
1331		(connected_no_slot, num_in)
1332	}
1333
1334	/// Variant of [`run_apply`] that exposes `max_in_peers` and the list of peers the
1335	/// function asked the network to disconnect (drained from the `NetworkServiceHandle`'s
1336	/// command channel).
1337	#[track_caller]
1338	fn run_apply_with_cap(
1339		connected: Vec<(PeerId, bool)>,
1340		static_no_slot: HashSet<PeerId>,
1341		old_dynamic: HashSet<PeerId>,
1342		new_dynamic: HashSet<PeerId>,
1343		initial_connected_no_slot: HashSet<PeerId>,
1344		initial_num_in_peers: usize,
1345		max_in_peers: usize,
1346	) -> (HashSet<PeerId>, usize, Vec<PeerId>) {
1347		use crate::service::network::{NetworkServiceHandle, ToServiceCommand as NetCmd};
1348
1349		let peer_inbound_full: HashMap<PeerId, bool> = connected.into_iter().collect();
1350		let (tx, mut rx) = tracing_unbounded::<NetCmd>("test_apply_no_slot_set_disconnects", 100);
1351		let network_service = NetworkServiceHandle::new(tx);
1352		let protocol: ProtocolName = "/test/block-announces/1".into();
1353		let mut connected_no_slot = initial_connected_no_slot;
1354		let mut num_in_peers = initial_num_in_peers;
1355		apply_no_slot_set_inner(
1356			|peer_id| peer_inbound_full.get(peer_id).copied(),
1357			&static_no_slot,
1358			&old_dynamic,
1359			&new_dynamic,
1360			&mut connected_no_slot,
1361			&mut num_in_peers,
1362			max_in_peers,
1363			&network_service,
1364			&protocol,
1365		);
1366		drop(network_service);
1367
1368		let mut disconnects = Vec::new();
1369		while let Ok(cmd) = rx.try_recv() {
1370			if let NetCmd::DisconnectPeer(peer, _) = cmd {
1371				disconnects.push(peer);
1372			}
1373		}
1374		(connected_no_slot, num_in_peers, disconnects)
1375	}
1376
1377	#[test]
1378	fn apply_promotes_multiple_inbound_full_peers() {
1379		// `already` is in both old and new dynamic โ€” it must stay in `connected_no_slot`
1380		// without releasing another slot.
1381		let [a, b, c, already] = fresh_peers();
1382		let (connected_no_slot, num_in) = run_apply(
1383			vec![(a, true), (b, true), (c, true), (already, true)],
1384			HashSet::new(),
1385			set_of([already]),
1386			set_of([a, b, c, already]),
1387			set_of([already]),
1388			10,
1389		);
1390		assert_eq!(connected_no_slot, set_of([a, b, c, already]));
1391		assert_eq!(num_in, 7);
1392	}
1393
1394	#[test]
1395	fn apply_demotes_multiple_inbound_full_peers() {
1396		let [a, b, c, stays] = fresh_peers();
1397		let (connected_no_slot, num_in) = run_apply(
1398			vec![(a, true), (b, true), (c, true), (stays, true)],
1399			HashSet::new(),
1400			set_of([a, b, c, stays]),
1401			set_of([stays]),
1402			set_of([a, b, c, stays]),
1403			2,
1404		);
1405		assert_eq!(connected_no_slot, set_of([stays]));
1406		assert_eq!(num_in, 5);
1407	}
1408
1409	#[test]
1410	fn apply_ignores_non_slot_consuming_peers() {
1411		// Outbound peers and inbound light peers both yield `affects_slots = false`. Either
1412		// kind transitioning must update `connected_no_slot` but not move `num_in_peers`.
1413		let [outbound, light] = fresh_peers();
1414		let (connected_no_slot, num_in) = run_apply(
1415			vec![(outbound, false), (light, false)],
1416			HashSet::new(),
1417			HashSet::new(),
1418			set_of([outbound, light]),
1419			HashSet::new(),
1420			5,
1421		);
1422		assert_eq!(connected_no_slot, set_of([outbound, light]));
1423		assert_eq!(num_in, 5);
1424	}
1425
1426	#[test]
1427	fn apply_static_peers_stay_no_slot_when_removed_from_dynamic() {
1428		// `control` (dynamic-only) IS demoted, proving the wiring is live โ€” the static peers
1429		// must NOT be demoted because the static set takes precedence over the dynamic one.
1430		let [s1, s2, control] = fresh_peers();
1431		let (connected_no_slot, num_in) = run_apply(
1432			vec![(s1, true), (s2, true), (control, true)],
1433			set_of([s1, s2]),
1434			set_of([s1, s2, control]),
1435			HashSet::new(),
1436			set_of([s1, s2, control]),
1437			2,
1438		);
1439		assert_eq!(connected_no_slot, set_of([s1, s2]));
1440		assert_eq!(num_in, 3);
1441	}
1442
1443	#[test]
1444	fn apply_static_peers_added_to_dynamic_are_unchanged() {
1445		let [s1, s2] = fresh_peers();
1446		let (connected_no_slot, num_in) = run_apply(
1447			vec![(s1, true), (s2, true)],
1448			set_of([s1, s2]),
1449			HashSet::new(),
1450			set_of([s1, s2]),
1451			set_of([s1, s2]),
1452			4,
1453		);
1454		assert_eq!(connected_no_slot, set_of([s1, s2]));
1455		assert_eq!(num_in, 4);
1456	}
1457
1458	#[test]
1459	fn apply_unconnected_peers_in_new_set_are_ignored() {
1460		// Unconnected peers go into `dynamic_no_slot_peers` (caller-installed) and take effect
1461		// on connect; they must not appear in `connected_no_slot` here.
1462		let [connected_a, connected_b] = fresh_peers();
1463		let [unconnected_a, unconnected_b] = fresh_peers();
1464		let (connected_no_slot, num_in) = run_apply(
1465			vec![(connected_a, true), (connected_b, true)],
1466			HashSet::new(),
1467			HashSet::new(),
1468			set_of([unconnected_a, unconnected_b]),
1469			HashSet::new(),
1470			3,
1471		);
1472		assert!(connected_no_slot.is_empty());
1473		assert_eq!(num_in, 3);
1474	}
1475
1476	#[test]
1477	fn apply_idempotent_same_set() {
1478		let [in_full, out_full, light] = fresh_peers();
1479		let target = set_of([in_full, out_full, light]);
1480		let (connected_no_slot, num_in) = run_apply(
1481			vec![(in_full, true), (out_full, false), (light, false)],
1482			HashSet::new(),
1483			target.clone(),
1484			target.clone(),
1485			target.clone(),
1486			2,
1487		);
1488		assert_eq!(connected_no_slot, target);
1489		assert_eq!(num_in, 2);
1490	}
1491
1492	#[test]
1493	fn apply_empty_set_clears_dynamic_only_peers() {
1494		let [in1, in2, out, light] = fresh_peers();
1495		let [static_peer] = fresh_peers();
1496		let old = set_of([in1, in2, out, light, static_peer]);
1497		let (connected_no_slot, num_in) = run_apply(
1498			vec![(in1, true), (in2, true), (out, false), (light, false), (static_peer, true)],
1499			set_of([static_peer]),
1500			old.clone(),
1501			HashSet::new(),
1502			old,
1503			0,
1504		);
1505		assert_eq!(connected_no_slot, set_of([static_peer]));
1506		assert_eq!(num_in, 2);
1507	}
1508
1509	#[test]
1510	fn apply_mixed_promote_and_demote() {
1511		let [p1, p2] = fresh_peers();
1512		let [d1, d2] = fresh_peers();
1513		let (connected_no_slot, num_in) = run_apply(
1514			vec![(p1, true), (p2, true), (d1, true), (d2, true)],
1515			HashSet::new(),
1516			set_of([d1, d2]),
1517			set_of([p1, p2]),
1518			set_of([d1, d2]),
1519			5,
1520		);
1521		assert_eq!(connected_no_slot, set_of([p1, p2]));
1522		assert_eq!(num_in, 5);
1523	}
1524
1525	#[test]
1526	fn apply_demote_at_capacity_disconnects_peer() {
1527		// Scenario: PeerX was promoted (freeing a slot), then a regular PeerY filled that slot,
1528		// bringing `num_in_peers` back to capacity. Now PeerX is demoted out of the dynamic set
1529		// โ€” incrementing `num_in_peers` would push it strictly above `max_in_peers`. The peer
1530		// must be disconnected instead, and left in `connected_no_slot` so the async disconnect
1531		// handler is the sole updater of `num_in_peers`.
1532		let [px] = fresh_peers();
1533		let (connected_no_slot, num_in, disconnects) = run_apply_with_cap(
1534			vec![(px, true)],
1535			HashSet::new(),
1536			set_of([px]),
1537			HashSet::new(),
1538			set_of([px]),
1539			8,
1540			8,
1541		);
1542		assert_eq!(connected_no_slot, set_of([px]));
1543		assert_eq!(num_in, 8);
1544		assert_eq!(disconnects, vec![px]);
1545	}
1546
1547	#[test]
1548	fn apply_demote_below_capacity_increments_normally() {
1549		// Same shape as the over-capacity test but with `num_in_peers < max_in_peers`: the
1550		// peer is regularly demoted and `num_in_peers` is incremented.
1551		let [px] = fresh_peers();
1552		let (connected_no_slot, num_in, disconnects) = run_apply_with_cap(
1553			vec![(px, true)],
1554			HashSet::new(),
1555			set_of([px]),
1556			HashSet::new(),
1557			set_of([px]),
1558			7,
1559			8,
1560		);
1561		assert!(connected_no_slot.is_empty());
1562		assert_eq!(num_in, 8);
1563		assert!(disconnects.is_empty());
1564	}
1565}