referrerpolicy=no-referrer-when-downgrade

sc_network/litep2p/shim/notification/
peerset.rs

1// This file is part of Substrate.
2
3// Copyright (C) Parity Technologies (UK) Ltd.
4// SPDX-License-Identifier: GPL-3.0-or-later WITH Classpath-exception-2.0
5
6// This program is free software: you can redistribute it and/or modify
7// it under the terms of the GNU General Public License as published by
8// the Free Software Foundation, either version 3 of the License, or
9// (at your option) any later version.
10
11// This program is distributed in the hope that it will be useful,
12// but WITHOUT ANY WARRANTY; without even the implied warranty of
13// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
14// GNU General Public License for more details.
15
16// You should have received a copy of the GNU General Public License
17// along with this program. If not, see <https://www.gnu.org/licenses/>.
18
19//! [`Peerset`] implementation for `litep2p`.
20//!
21//! [`Peerset`] is a separate but related component running alongside the notification protocol,
22//! responsible for maintaining connectivity to remote peers. [`Peerset`] has an imperfect view of
23//! the network as the notification protocol is behind an asynchronous channel. Based on this
24//! imperfect view, it tries to connect to remote peers and disconnect peers that should be
25//! disconnected from.
26//!
27//! [`Peerset`] knows of two types of peers:
28//!  - normal peers
29//!  - reserved peers
30//!
31//! Reserved peers are those which the [`Peerset`] should be connected at all times and it will make
32//! an effort to do so by constantly checking that there are no disconnected reserved peers (except
33//! banned) and if there are, it will open substreams to them.
34//!
35//! [`Peerset`] may also contain "slots", both inbound and outbound, which mark how many incoming
36//! and outgoing connections it should maintain at all times. Peers for the inbound slots are filled
37//! by remote peers opening inbound substreams towards the local node and peers for the outbound
38//! slots are filled by querying the `Peerstore` which contains all peers known to `sc-network`.
39//! Peers for outbound slots are selected in a decreasing order of reputation.
40
41use crate::{
42	peer_store::{PeerStoreProvider, ProtocolHandle},
43	service::traits::{self, ValidationResult},
44	ProtocolName, ReputationChange as Reputation,
45};
46
47use futures::{channel::oneshot, future::BoxFuture, stream::FuturesUnordered, Stream, StreamExt};
48use futures_timer::Delay;
49use litep2p::protocol::notification::NotificationError;
50
51use sc_network_types::PeerId;
52use sc_utils::mpsc::{tracing_unbounded, TracingUnboundedReceiver, TracingUnboundedSender};
53
54use std::{
55	collections::{HashMap, HashSet},
56	future::Future,
57	pin::Pin,
58	sync::{
59		atomic::{AtomicUsize, Ordering},
60		Arc,
61	},
62	task::{Context, Poll},
63	time::Duration,
64};
65
66/// Logging target for the file.
67const LOG_TARGET: &str = "sub-libp2p::peerset";
68
69/// Default backoff for connection re-attempts.
70const DEFAULT_BACKOFF: Duration = Duration::from_secs(5);
71
72/// Open failure backoff.
73const OPEN_FAILURE_BACKOFF: Duration = Duration::from_secs(5);
74
75/// Slot allocation frequency.
76///
77/// How often should [`Peerset`] attempt to establish outbound connections.
78const SLOT_ALLOCATION_FREQUENCY: Duration = Duration::from_secs(1);
79
80/// Reputation adjustment when a peer gets disconnected.
81///
82/// Lessens the likelyhood of the peer getting selected for an outbound connection soon.
83const DISCONNECT_ADJUSTMENT: Reputation = Reputation::new(-256, "Peer disconnected");
84
85/// Reputation adjustment when a substream fails to open.
86///
87/// Lessens the likelyhood of the peer getting selected for an outbound connection soon.
88const OPEN_FAILURE_ADJUSTMENT: Reputation = Reputation::new(-1024, "Open failure");
89
90/// Is the peer reserved?
91///
92/// Regular peers count towards slot allocation.
93#[derive(Debug, Copy, Clone, PartialEq, Eq)]
94pub enum Reserved {
95	Yes,
96	No,
97}
98
99impl From<bool> for Reserved {
100	fn from(value: bool) -> Reserved {
101		match value {
102			true => Reserved::Yes,
103			false => Reserved::No,
104		}
105	}
106}
107
108impl From<Reserved> for bool {
109	fn from(value: Reserved) -> bool {
110		std::matches!(value, Reserved::Yes)
111	}
112}
113
114#[derive(Debug, Copy, Clone, PartialEq, Eq)]
115pub enum Direction {
116	/// Inbound substream.
117	Inbound(Reserved),
118
119	/// Outbound substream.
120	Outbound(Reserved),
121}
122
123impl Direction {
124	fn set_reserved(&mut self, new_reserved: Reserved) {
125		match self {
126			Direction::Inbound(ref mut reserved) | Direction::Outbound(ref mut reserved) =>
127				*reserved = new_reserved,
128		}
129	}
130}
131
132impl From<Direction> for traits::Direction {
133	fn from(direction: Direction) -> traits::Direction {
134		match direction {
135			Direction::Inbound(_) => traits::Direction::Inbound,
136			Direction::Outbound(_) => traits::Direction::Outbound,
137		}
138	}
139}
140
141/// Open result for a fully-opened connection.
142#[derive(PartialEq, Eq, Debug)]
143pub enum OpenResult {
144	/// Accept the connection.
145	Accept {
146		/// Direction which [`Peerset`] considers to be correct.
147		direction: traits::Direction,
148	},
149
150	/// Reject the connection because it was canceled while it was opening.
151	Reject,
152}
153
154/// Commands emitted by other subsystems of the blockchain to [`Peerset`].
155#[derive(Debug)]
156pub enum PeersetCommand {
157	/// Set current reserved peer set.
158	///
159	/// This command removes all reserved peers that are not in `peers`.
160	SetReservedPeers {
161		/// New reserved peer set.
162		peers: HashSet<PeerId>,
163	},
164
165	/// Add one or more reserved peers.
166	///
167	/// This command doesn't remove any reserved peers but only add new peers.
168	AddReservedPeers {
169		/// Reserved peers to add.
170		peers: HashSet<PeerId>,
171	},
172
173	/// Remove reserved peers.
174	RemoveReservedPeers {
175		/// Reserved peers to remove.
176		peers: HashSet<PeerId>,
177	},
178
179	/// Set reserved-only mode to true/false.
180	SetReservedOnly {
181		/// Should the protocol only accept/establish connections to reserved peers.
182		reserved_only: bool,
183	},
184
185	/// Disconnect peer.
186	DisconnectPeer {
187		/// Peer ID.
188		peer: PeerId,
189	},
190
191	/// Get reserved peers.
192	GetReservedPeers {
193		/// `oneshot::Sender` for sending the current set of reserved peers.
194		tx: oneshot::Sender<Vec<PeerId>>,
195	},
196}
197
198/// Commands emitted by [`Peerset`] to the notification protocol.
199#[derive(Debug)]
200pub enum PeersetNotificationCommand {
201	/// Open substreams to one or more peers.
202	OpenSubstream {
203		/// Peer IDs.
204		peers: Vec<PeerId>,
205	},
206
207	/// Close substream to one or more peers.
208	CloseSubstream {
209		/// Peer IDs.
210		peers: Vec<PeerId>,
211	},
212}
213
214/// Peer state.
215///
216/// Peer can be in 6 different state:
217///  - disconnected
218///  - connected
219///  - connection is opening
220///  - connection is closing
221///  - connection is backed-off
222///  - connection is canceled
223///
224/// Opening and closing are separate states as litep2p guarantees to report when the substream is
225/// either fully open or fully closed and the slot allocation for opening a substream is tied to a
226/// state transition which moves the peer to [`PeerState::Opening`]. This is because it allows
227/// reserving a slot for peer to prevent infinite outbound substreams. If the substream is opened
228/// successfully, peer is moved to state [`PeerState::Connected`] but there is no modification to
229/// the slot count as an outbound slot was already allocated for the peer. If the substream fails to
230/// open, the event is reported by litep2p and [`Peerset::report_substream_open_failure()`] is
231/// called which will decrease the outbound slot count. Similarly for inbound streams, the slot is
232/// allocated in [`Peerset::report_inbound_substream()`] which will prevent `Peerset` from accepting
233/// infinite inbound substreams. If the inbound substream fails to open and since [`Peerset`] was
234/// notified of it, litep2p will report the open failure and the inbound slot count is once again
235/// decreased in [`Peerset::report_substream_open_failure()`]. If the substream is opened
236/// successfully, the slot count is not modified.
237///
238/// Since closing a substream is not instantaneous, there is a separate [`PeerState::Closing`]
239/// state which indicates that the substream is being closed but hasn't been closed by litep2p yet.
240/// This state is used to prevent invalid state transitions where, for example, [`Peerset`] would
241/// close a substream and then try to reopen it immediately.
242///
243/// Irrespective of which side closed the substream (local/remote), the substream is chilled for a
244/// small amount of time ([`DEFAULT_BACKOFF`]) and during this time no inbound or outbound
245/// substreams are accepted/established. Any request to open an outbound substream while the peer
246/// is backed-off is ignored. If the peer is a reserved peer, an outbound substream is not opened
247/// for them immediately but after the back-off has expired, `Peerset` will attempt to open a
248/// substream to the peer if it's still counted as a reserved peer.
249///
250/// Disconnections and open failures will contribute negatively to the peer score to prevent it from
251/// being selected for another outbound substream request soon after the failure/disconnection. The
252/// reputation decays towards zero over time and eventually the peer will be as likely to be
253/// selected for an outbound substream as any other freshly added peer.
254///
255/// [`Peerset`] must also be able to handle the case where an outbound substream was opened to peer
256/// and while it was opening, an inbound substream was received from that same peer. Since `litep2p`
257/// is the source of truth of the actual state of the connection, [`Peerset`] must compensate for
258/// this and if it happens that inbound substream is opened for a peer that was marked outbound, it
259/// will attempt to allocate an inbound slot for the peer. If it fails to do so, the inbound
260/// substream is rejected and the peer is marked as canceled.
261///
262/// Since substream is not opened immediately, a peer can be disconnected even if the substream was
263/// not yet open. This can happen, for example, when a peer has connected over the syncing protocol
264/// and it was added to, e.g., GRANDPA's reserved peers, an outbound substream was opened
265/// ([`PeerState::Opening`]) and then the peer disconnected. This state transition is handled by the
266/// [`Peerset`] with `PeerState::Canceled` which indicates that should the substream open
267/// successfully, it should be closed immediately and if the connection is opened successfully while
268/// the peer was marked as canceled, the substream will be closed without notifying the protocol
269/// about the substream.
270#[derive(Debug, PartialEq, Eq)]
271pub enum PeerState {
272	/// No active connection to peer.
273	Disconnected,
274
275	/// Substream to peer was recently closed and the peer is currently backed off.
276	///
277	/// Backoff only applies to outbound substreams. Inbound substream will not experience any sort
278	/// of "banning" even if the peer is backed off and an inbound substream for the peer is
279	/// received.
280	Backoff,
281
282	/// Connection to peer is pending.
283	Opening {
284		/// Direction of the connection.
285		direction: Direction,
286	},
287
288	// Connected to peer.
289	Connected {
290		/// Is the peer inbound or outbound.
291		direction: Direction,
292	},
293
294	/// Substream was opened and while it was opening (no response had been heard from litep2p),
295	/// the substream was canceled by either calling `disconnect_peer()` or by removing peer
296	/// from the reserved set.
297	///
298	/// After the opened substream is acknowledged by litep2p (open success/failure), the peer is
299	/// moved to [`PeerState::Backoff`] from which it will then be moved to
300	/// [`PeerState::Disconnected`].
301	Canceled {
302		/// Is the peer inbound or outbound.
303		direction: Direction,
304	},
305
306	/// Connection to peer is closing.
307	///
308	/// State implies that the substream was asked to be closed by the local node and litep2p is
309	/// closing the substream. No command modifying the connection state is accepted until the
310	/// state has been set to [`PeerState::Disconnected`].
311	Closing {
312		/// Is the peer inbound or outbound.
313		direction: Direction,
314	},
315}
316
317/// `Peerset` implementation.
318///
319/// `Peerset` allows other subsystems of the blockchain to modify the connection state
320/// of the notification protocol by adding and removing reserved peers.
321///
322/// `Peerset` is also responsible for maintaining the desired amount of peers the protocol is
323/// connected to by establishing outbound connections and accepting/rejecting inbound connections.
324#[derive(Debug)]
325pub struct Peerset {
326	/// Protocol name.
327	protocol: ProtocolName,
328
329	/// RX channel for receiving commands.
330	cmd_rx: TracingUnboundedReceiver<PeersetCommand>,
331
332	/// Maximum number of outbound peers.
333	max_out: usize,
334
335	/// Current number of outbound peers.
336	num_out: usize,
337
338	/// Maximum number of inbound peers.
339	max_in: usize,
340
341	/// Current number of inbound peers.
342	num_in: usize,
343
344	/// Only connect to/accept connections from reserved peers.
345	reserved_only: bool,
346
347	/// Current reserved peer set.
348	reserved_peers: HashSet<PeerId>,
349
350	/// Handle to `Peerstore`.
351	peerstore_handle: Arc<dyn PeerStoreProvider>,
352
353	/// Peers.
354	peers: HashMap<PeerId, PeerState>,
355
356	/// Counter connected peers.
357	connected_peers: Arc<AtomicUsize>,
358
359	/// Pending backoffs for peers who recently disconnected.
360	pending_backoffs: FuturesUnordered<BoxFuture<'static, (PeerId, Reputation)>>,
361
362	/// Next time when [`Peerset`] should perform slot allocation.
363	next_slot_allocation: Delay,
364}
365
366macro_rules! decrement_or_warn {
367    ($slot:expr, $protocol:expr, $peer:expr, $direction:expr) => {{
368		match $slot.checked_sub(1) {
369			Some(value) => {
370				$slot = value;
371			}
372			None => {
373				log::warn!(
374					target: LOG_TARGET,
375					"{}: state mismatch, {:?} is not counted as part of {:?} slots",
376					$protocol, $peer, $direction
377				);
378				debug_assert!(false);
379			}
380		}
381    }};
382}
383
384/// Handle to [`Peerset`], given to `Peerstore`.
385#[derive(Debug)]
386struct PeersetHandle {
387	/// TX channel for sending commands to [`Peerset`].
388	tx: TracingUnboundedSender<PeersetCommand>,
389}
390
391impl ProtocolHandle for PeersetHandle {
392	/// Disconnect peer, as a result of a ban.
393	fn disconnect_peer(&self, peer: PeerId) {
394		let _ = self.tx.unbounded_send(PeersetCommand::DisconnectPeer { peer });
395	}
396}
397
398impl Peerset {
399	/// Create new [`Peerset`].
400	pub fn new(
401		protocol: ProtocolName,
402		max_out: usize,
403		max_in: usize,
404		reserved_only: bool,
405		reserved_peers: HashSet<PeerId>,
406		connected_peers: Arc<AtomicUsize>,
407		peerstore_handle: Arc<dyn PeerStoreProvider>,
408	) -> (Self, TracingUnboundedSender<PeersetCommand>) {
409		let (cmd_tx, cmd_rx) = tracing_unbounded("mpsc-peerset-protocol", 100_000);
410		let peers = reserved_peers
411			.iter()
412			.map(|peer| (*peer, PeerState::Disconnected))
413			.collect::<HashMap<_, _>>();
414
415		// register protocol's command channel to `Peerstore` so it can issue disconnect commands
416		// if some connected peer gets banned.
417		peerstore_handle.register_protocol(Arc::new(PeersetHandle { tx: cmd_tx.clone() }));
418
419		log::debug!(
420			target: LOG_TARGET,
421			"{}: creating new peerset with max_outbound {} and max_inbound {} and reserved_only {}",
422			protocol,
423			max_out,
424			max_in,
425			reserved_only,
426		);
427
428		(
429			Self {
430				protocol,
431				max_out,
432				num_out: 0usize,
433				max_in,
434				num_in: 0usize,
435				reserved_peers,
436				cmd_rx,
437				peerstore_handle,
438				reserved_only,
439				peers,
440				connected_peers,
441				pending_backoffs: FuturesUnordered::new(),
442				next_slot_allocation: Delay::new(SLOT_ALLOCATION_FREQUENCY),
443			},
444			cmd_tx,
445		)
446	}
447
448	/// Report to [`Peerset`] that a substream was opened.
449	///
450	/// Slot for the stream was "preallocated" when it was initiated (outbound) or accepted
451	/// (inbound) by the local node which is why this function doesn't allocate a slot for the peer.
452	///
453	/// Returns `true` if the substream should be kept open and `false` if the substream had been
454	/// canceled while it was opening and litep2p should close the substream.
455	pub fn report_substream_opened(
456		&mut self,
457		peer: PeerId,
458		direction: traits::Direction,
459	) -> OpenResult {
460		log::trace!(
461			target: LOG_TARGET,
462			"{}: substream opened to {peer:?}, direction {direction:?}, reserved peer {}",
463			self.protocol,
464			self.reserved_peers.contains(&peer),
465		);
466
467		let Some(state) = self.peers.get_mut(&peer) else {
468			log::warn!(target: LOG_TARGET, "{}: substream opened for unknown peer {peer:?}", self.protocol);
469			debug_assert!(false);
470			return OpenResult::Reject
471		};
472
473		match state {
474			PeerState::Opening { direction: substream_direction } => {
475				let real_direction: traits::Direction = (*substream_direction).into();
476
477				*state = PeerState::Connected { direction: *substream_direction };
478				self.connected_peers.fetch_add(1usize, Ordering::Relaxed);
479
480				return OpenResult::Accept { direction: real_direction }
481			},
482			// litep2p doesn't support the ability to cancel an opening substream so if the
483			// substream was closed while it was opening, it was marked as canceled and if the
484			// substream opens succesfully, it will be closed
485			PeerState::Canceled { direction: substream_direction } => {
486				log::trace!(
487					target: LOG_TARGET,
488					"{}: substream to {peer:?} is canceled, issue disconnection request",
489					self.protocol,
490				);
491
492				self.connected_peers.fetch_add(1usize, Ordering::Relaxed);
493				*state = PeerState::Closing { direction: *substream_direction };
494
495				return OpenResult::Reject
496			},
497			// The peer was already rejected by the `report_inbound_substream` call and this
498			// should never happen. However, this code path is exercised by our fuzzer.
499			PeerState::Disconnected => {
500				log::debug!(
501					target: LOG_TARGET,
502					"{}: substream opened for a peer that was previously rejected {peer:?}",
503					self.protocol,
504				);
505				return OpenResult::Reject
506			},
507			state => {
508				log::error!(
509					target: LOG_TARGET,
510					"{}: substream opened for a peer in invalid state {peer:?}: {state:?}",
511					self.protocol,
512				);
513
514				debug_assert!(false);
515				return OpenResult::Reject;
516			},
517		}
518	}
519
520	/// Report to [`Peerset`] that a substream was closed.
521	///
522	/// If the peer was not a reserved peer, the inbound/outbound slot count is adjusted to account
523	/// for the disconnected peer. After the connection is closed, the peer is chilled for a
524	/// duration of [`DEFAULT_BACKOFF`] which prevens [`Peerset`] from establishing/accepting new
525	/// connections for that time period.
526	pub fn report_substream_closed(&mut self, peer: PeerId) {
527		log::trace!(target: LOG_TARGET, "{}: substream closed to {peer:?}", self.protocol);
528
529		let Some(state) = self.peers.get_mut(&peer) else {
530			log::warn!(target: LOG_TARGET, "{}: substream closed for unknown peer {peer:?}", self.protocol);
531			debug_assert!(false);
532			return
533		};
534
535		match &state {
536			// close was initiated either by remote ([`PeerState::Connected`]) or local node
537			// ([`PeerState::Closing`]) and it was a non-reserved peer
538			PeerState::Connected { direction: Direction::Inbound(Reserved::No) } |
539			PeerState::Closing { direction: Direction::Inbound(Reserved::No) } => {
540				log::trace!(
541					target: LOG_TARGET,
542					"{}: inbound substream closed to non-reserved peer {peer:?}: {state:?}",
543					self.protocol,
544				);
545
546				decrement_or_warn!(
547					self.num_in,
548					peer,
549					self.protocol,
550					Direction::Inbound(Reserved::No)
551				);
552			},
553			// close was initiated either by remote ([`PeerState::Connected`]) or local node
554			// ([`PeerState::Closing`]) and it was a non-reserved peer
555			PeerState::Connected { direction: Direction::Outbound(Reserved::No) } |
556			PeerState::Closing { direction: Direction::Outbound(Reserved::No) } => {
557				log::trace!(
558					target: LOG_TARGET,
559					"{}: outbound substream closed to non-reserved peer {peer:?} {state:?}",
560					self.protocol,
561				);
562
563				decrement_or_warn!(
564					self.num_out,
565					peer,
566					self.protocol,
567					Direction::Outbound(Reserved::No)
568				);
569			},
570			// reserved peers don't require adjustments to slot counts
571			PeerState::Closing { .. } | PeerState::Connected { .. } => {
572				log::debug!(target: LOG_TARGET, "{}: reserved peer {peer:?} disconnected", self.protocol);
573			},
574			// The peer was already rejected by the `report_inbound_substream` call and this
575			// should never happen. However, this code path is exercised by our fuzzer.
576			PeerState::Disconnected => {
577				log::debug!(
578					target: LOG_TARGET,
579					"{}: substream closed for a peer that was previously rejected {peer:?}",
580					self.protocol,
581				);
582			},
583			state => {
584				log::warn!(target: LOG_TARGET, "{}: invalid state for disconnected peer {peer:?}: {state:?}", self.protocol);
585				debug_assert!(false);
586			},
587		}
588
589		// Rejected peers do not count towards slot allocation.
590		if !matches!(state, PeerState::Disconnected) {
591			self.connected_peers.fetch_sub(1usize, Ordering::Relaxed);
592		}
593
594		*state = PeerState::Backoff;
595		self.pending_backoffs.push(Box::pin(async move {
596			Delay::new(DEFAULT_BACKOFF).await;
597			(peer, DISCONNECT_ADJUSTMENT)
598		}));
599	}
600
601	/// Report to [`Peerset`] that an inbound substream was opened and that it should validate it.
602	pub fn report_inbound_substream(&mut self, peer: PeerId) -> ValidationResult {
603		log::trace!(target: LOG_TARGET, "{}: inbound substream from {peer:?}", self.protocol);
604
605		if self.peerstore_handle.is_banned(&peer) {
606			log::debug!(
607				target: LOG_TARGET,
608				"{}: rejecting banned peer {peer:?}",
609				self.protocol,
610			);
611
612			return ValidationResult::Reject;
613		}
614
615		let state = self.peers.entry(peer).or_insert(PeerState::Disconnected);
616		let is_reserved_peer = self.reserved_peers.contains(&peer);
617
618		// Check if this is a non-reserved peer and if the protocol is in reserved-only mode.
619		let should_reject = self.reserved_only && !is_reserved_peer;
620
621		match state {
622			// disconnected peers that are reserved-only peers are rejected
623			PeerState::Disconnected if should_reject => {
624				log::trace!(
625					target: LOG_TARGET,
626					"{}: rejecting non-reserved peer {peer:?} in reserved-only mode (prev state: {state:?})",
627					self.protocol,
628				);
629
630				return ValidationResult::Reject
631			},
632			// disconnected peers proceed directly to inbound slot allocation
633			PeerState::Disconnected => {},
634			// peer is backed off but if it can be accepted (either a reserved peer or inbound slot
635			// available), accept the peer and then just ignore the back-off timer when it expires
636			PeerState::Backoff => {
637				if !is_reserved_peer && self.num_in == self.max_in {
638					log::trace!(
639						target: LOG_TARGET,
640						"{}: ({peer:?}) is backed-off and cannot accept, reject inbound substream",
641						self.protocol,
642					);
643
644					return ValidationResult::Reject
645				}
646
647				// The peer remains in the `PeerState::Backoff` state until the current timer
648				// expires. Then, the peer will be in the disconnected state, subject to further
649				// rejection if the peer is not reserved by then.
650				if should_reject {
651					return ValidationResult::Reject
652				}
653			},
654
655			// `Peerset` had initiated an outbound substream but litep2p had received an inbound
656			// substream before the command to open the substream was received, meaning local and
657			// remote desired to open a connection at the same time. Since outbound substreams
658			// cannot be canceled with litep2p and the command has already been registered, accept
659			// the inbound peer since the local node had wished a connection to be opened either way
660			// but keep the direction of the substream as it was (outbound).
661			//
662			// litep2p doesn't care what `Peerset` considers the substream direction to be and since
663			// it's used for bookkeeping for substream counts, keeping the substream direction
664			// unmodified simplies the implementation a lot. The direction would otherwise be
665			// irrelevant for protocols but because `SyncingEngine` has a hack to reject excess
666			// inbound substreams, that system has to be kept working for the time being. Once that
667			// issue is fixed, this approach can be re-evaluated if need be.
668			PeerState::Opening { direction: Direction::Outbound(reserved) } => {
669				if should_reject {
670					log::trace!(
671						target: LOG_TARGET,
672						"{}: rejecting inbound substream from {peer:?} ({reserved:?}) in reserved-only mode that was marked outbound",
673						self.protocol,
674					);
675
676					*state = PeerState::Canceled { direction: Direction::Outbound(*reserved) };
677					return ValidationResult::Reject
678				}
679
680				log::trace!(
681					target: LOG_TARGET,
682					"{}: inbound substream received for {peer:?} ({reserved:?}) that was marked outbound",
683					self.protocol,
684				);
685
686				return ValidationResult::Accept;
687			},
688			PeerState::Canceled { direction } => {
689				log::trace!(
690					target: LOG_TARGET,
691					"{}: {peer:?} is canceled, rejecting substream should_reject={should_reject}",
692					self.protocol,
693				);
694
695				*state = PeerState::Canceled { direction: *direction };
696				return ValidationResult::Reject
697			},
698			state => {
699				log::warn!(
700					target: LOG_TARGET,
701					"{}: invalid state ({state:?}) for inbound substream, peer {peer:?}",
702					self.protocol
703				);
704				debug_assert!(false);
705				return ValidationResult::Reject
706			},
707		}
708
709		if is_reserved_peer {
710			log::trace!(
711				target: LOG_TARGET,
712				"{}: {peer:?} accepting peer as reserved peer",
713				self.protocol,
714			);
715
716			*state = PeerState::Opening { direction: Direction::Inbound(is_reserved_peer.into()) };
717			return ValidationResult::Accept
718		}
719
720		if self.num_in < self.max_in {
721			log::trace!(
722				target: LOG_TARGET,
723				"{}: {peer:?} accepting peer as regular peer",
724				self.protocol,
725			);
726
727			self.num_in += 1;
728
729			*state = PeerState::Opening { direction: Direction::Inbound(is_reserved_peer.into()) };
730			return ValidationResult::Accept
731		}
732
733		log::trace!(
734			target: LOG_TARGET,
735			"{}: reject {peer:?}, not a reserved peer and no free inbound slots",
736			self.protocol,
737		);
738
739		*state = PeerState::Disconnected;
740		return ValidationResult::Reject
741	}
742
743	/// Report to [`Peerset`] that there was an error opening a substream.
744	pub fn report_substream_open_failure(&mut self, peer: PeerId, error: NotificationError) {
745		log::trace!(
746			target: LOG_TARGET,
747			"{}: failed to open substream to {peer:?}: {error:?}",
748			self.protocol,
749		);
750
751		match self.peers.get(&peer) {
752			Some(PeerState::Opening { direction: Direction::Outbound(Reserved::No) }) => {
753				decrement_or_warn!(
754					self.num_out,
755					self.protocol,
756					peer,
757					Direction::Outbound(Reserved::No)
758				);
759			},
760			Some(PeerState::Opening { direction: Direction::Inbound(Reserved::No) }) => {
761				decrement_or_warn!(
762					self.num_in,
763					self.protocol,
764					peer,
765					Direction::Inbound(Reserved::No)
766				);
767			},
768			Some(PeerState::Canceled { direction }) => match direction {
769				Direction::Inbound(Reserved::No) => {
770					decrement_or_warn!(
771						self.num_in,
772						self.protocol,
773						peer,
774						Direction::Inbound(Reserved::No)
775					);
776				},
777				Direction::Outbound(Reserved::No) => {
778					decrement_or_warn!(
779						self.num_out,
780						self.protocol,
781						peer,
782						Direction::Outbound(Reserved::No)
783					);
784				},
785				_ => {},
786			},
787			// reserved peers do not require change in the slot counts
788			Some(PeerState::Opening { direction: Direction::Inbound(Reserved::Yes) }) |
789			Some(PeerState::Opening { direction: Direction::Outbound(Reserved::Yes) }) => {
790				log::debug!(
791					target: LOG_TARGET,
792					"{}: substream open failure for reserved peer {peer:?}",
793					self.protocol,
794				);
795			},
796			state => {
797				log::debug!(
798					target: LOG_TARGET,
799					"{}: substream open failure for a unknown state: {state:?}",
800					self.protocol,
801				);
802
803				return;
804			},
805		}
806
807		self.peers.insert(peer, PeerState::Backoff);
808		self.pending_backoffs.push(Box::pin(async move {
809			Delay::new(OPEN_FAILURE_BACKOFF).await;
810			(peer, OPEN_FAILURE_ADJUSTMENT)
811		}));
812	}
813
814	/// [`Peerset`] had accepted a peer but it was then rejected by the protocol.
815	pub fn report_substream_rejected(&mut self, peer: PeerId) {
816		log::trace!(target: LOG_TARGET, "{}: {peer:?} rejected by the protocol", self.protocol);
817
818		match self.peers.remove(&peer) {
819			Some(PeerState::Opening { direction }) => match direction {
820				Direction::Inbound(Reserved::Yes) | Direction::Outbound(Reserved::Yes) => {
821					log::warn!(
822						target: LOG_TARGET,
823						"{}: reserved peer {peer:?} rejected by the protocol",
824						self.protocol,
825					);
826					self.peers.insert(peer, PeerState::Disconnected);
827				},
828				Direction::Inbound(Reserved::No) => {
829					decrement_or_warn!(
830						self.num_in,
831						peer,
832						self.protocol,
833						Direction::Inbound(Reserved::No)
834					);
835					self.peers.insert(peer, PeerState::Disconnected);
836				},
837				Direction::Outbound(Reserved::No) => {
838					decrement_or_warn!(
839						self.num_out,
840						peer,
841						self.protocol,
842						Direction::Outbound(Reserved::No)
843					);
844					self.peers.insert(peer, PeerState::Disconnected);
845				},
846			},
847			Some(state @ PeerState::Canceled { .. }) => {
848				log::debug!(
849					target: LOG_TARGET,
850					"{}: substream to {peer:?} rejected by protocol but already canceled",
851					self.protocol,
852				);
853
854				self.peers.insert(peer, state);
855			},
856			Some(state) => {
857				log::debug!(
858					target: LOG_TARGET,
859					"{}: {peer:?} rejected by the protocol but not opening anymore: {state:?}",
860					self.protocol,
861				);
862
863				self.peers.insert(peer, state);
864			},
865			None => {},
866		}
867	}
868
869	/// Calculate how many of the connected peers were counted as normal inbound/outbound peers
870	/// which is needed to adjust slot counts when new reserved peers are added.
871	///
872	/// If the peer is not already in the [`Peerset`], it is added as a disconnected peer.
873	fn calculate_slot_adjustment<'a>(
874		&'a mut self,
875		peers: impl Iterator<Item = &'a PeerId>,
876	) -> (usize, usize) {
877		peers.fold((0, 0), |(mut inbound, mut outbound), peer| {
878			match self.peers.get_mut(peer) {
879				Some(PeerState::Disconnected | PeerState::Backoff) => {},
880				Some(
881					PeerState::Opening { ref mut direction } |
882					PeerState::Connected { ref mut direction } |
883					PeerState::Canceled { ref mut direction } |
884					PeerState::Closing { ref mut direction },
885				) => {
886					*direction = match direction {
887						Direction::Inbound(Reserved::No) => {
888							inbound += 1;
889							Direction::Inbound(Reserved::Yes)
890						},
891						Direction::Outbound(Reserved::No) => {
892							outbound += 1;
893							Direction::Outbound(Reserved::Yes)
894						},
895						ref direction => **direction,
896					};
897				},
898				None => {
899					self.peers.insert(*peer, PeerState::Disconnected);
900				},
901			}
902
903			(inbound, outbound)
904		})
905	}
906
907	/// Checks if the peer should be disconnected based on the current state of the [`Peerset`]
908	/// and the provided direction.
909	///
910	/// Note: The role of the peer is not checked.
911	fn should_disconnect(&self, direction: Direction) -> bool {
912		match direction {
913			Direction::Inbound(_) => self.num_in >= self.max_in,
914			Direction::Outbound(_) => self.num_out >= self.max_out,
915		}
916	}
917
918	/// Increment the slot count for given peer.
919	fn increment_slot(&mut self, direction: Direction) {
920		match direction {
921			Direction::Inbound(Reserved::No) => self.num_in += 1,
922			Direction::Outbound(Reserved::No) => self.num_out += 1,
923			_ => {},
924		}
925	}
926
927	/// Get the number of inbound peers.
928	#[cfg(test)]
929	pub fn num_in(&self) -> usize {
930		self.num_in
931	}
932
933	/// Get the number of outbound peers.
934	#[cfg(test)]
935	pub fn num_out(&self) -> usize {
936		self.num_out
937	}
938
939	/// Get reference to known peers.
940	#[cfg(test)]
941	pub fn peers(&self) -> &HashMap<PeerId, PeerState> {
942		&self.peers
943	}
944
945	/// Get reference to known peers.
946	#[cfg(test)]
947	pub fn peers_mut(&mut self) -> &mut HashMap<PeerId, PeerState> {
948		&mut self.peers
949	}
950
951	/// Get reference to reserved peers.
952	#[cfg(test)]
953	pub fn reserved_peers(&self) -> &HashSet<PeerId> {
954		&self.reserved_peers
955	}
956}
957
958impl Stream for Peerset {
959	type Item = PeersetNotificationCommand;
960
961	fn poll_next(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
962		while let Poll::Ready(Some((peer, reputation))) = self.pending_backoffs.poll_next_unpin(cx)
963		{
964			log::trace!(target: LOG_TARGET, "{}: backoff expired for {peer:?}", self.protocol);
965
966			if std::matches!(self.peers.get(&peer), None | Some(PeerState::Backoff)) {
967				self.peers.insert(peer, PeerState::Disconnected);
968			}
969
970			self.peerstore_handle.report_peer(peer, reputation);
971		}
972
973		if let Poll::Ready(Some(action)) = Pin::new(&mut self.cmd_rx).poll_next(cx) {
974			log::trace!(target: LOG_TARGET, "{}: received command {action:?}", self.protocol);
975
976			match action {
977				PeersetCommand::DisconnectPeer { peer } if !self.reserved_peers.contains(&peer) =>
978					match self.peers.remove(&peer) {
979						Some(PeerState::Connected { direction }) => {
980							log::trace!(
981								target: LOG_TARGET,
982								"{}: close connection to {peer:?}, direction {direction:?}",
983								self.protocol,
984							);
985
986							self.peers.insert(peer, PeerState::Closing { direction });
987							return Poll::Ready(Some(PeersetNotificationCommand::CloseSubstream {
988								peers: vec![peer],
989							}))
990						},
991						Some(PeerState::Backoff) => {
992							log::trace!(
993								target: LOG_TARGET,
994								"{}: cannot disconnect {peer:?}, already backed-off",
995								self.protocol,
996							);
997
998							self.peers.insert(peer, PeerState::Backoff);
999						},
1000						// substream might have been opening but not yet fully open when the
1001						// protocol or `Peerstore` request the connection to be closed
1002						//
1003						// if the substream opens successfully, close it immediately and mark the
1004						// peer as `Disconnected`
1005						Some(PeerState::Opening { direction }) => {
1006							log::trace!(
1007								target: LOG_TARGET,
1008								"{}: canceling substream to disconnect peer {peer:?}",
1009								self.protocol,
1010							);
1011
1012							self.peers.insert(peer, PeerState::Canceled { direction });
1013						},
1014						// protocol had issued two disconnection requests in rapid succession and
1015						// the substream hadn't closed before the second disconnection request was
1016						// received, this is harmless and can be ignored.
1017						Some(state @ PeerState::Closing { .. }) => {
1018							log::trace!(
1019								target: LOG_TARGET,
1020								"{}: cannot disconnect {peer:?}, already closing ({state:?})",
1021								self.protocol,
1022							);
1023
1024							self.peers.insert(peer, state);
1025						},
1026						// if peer is banned, e.g. due to genesis mismatch, `Peerstore` will issue a
1027						// global disconnection request to all protocols, irrespective of the
1028						// connectivity state. Peer isn't necessarily connected to all protocols at
1029						// all times so this is a harmless state to be in if a disconnection request
1030						// is received.
1031						Some(state @ PeerState::Disconnected) => {
1032							self.peers.insert(peer, state);
1033						},
1034						// peer had an opening substream earlier which was canceled and then,
1035						// e.g., the peer was banned which caused it to be disconnected again
1036						Some(state @ PeerState::Canceled { .. }) => {
1037							log::debug!(
1038								target: LOG_TARGET,
1039								"{}: cannot disconnect {peer:?}, already canceled ({state:?})",
1040								self.protocol,
1041							);
1042
1043							self.peers.insert(peer, state);
1044						},
1045						// peer doesn't exist
1046						//
1047						// this can happen, for example, when peer connects over
1048						// `/block-announces/1` and it has wrong genesis hash which initiates a ban
1049						// for that peer. Since the ban is reported to all protocols but the peer
1050						// mightn't have been registered to GRANDPA or transactions yet, the peer
1051						// doesn't exist in their `Peerset`s and the error can just be ignored.
1052						None => {
1053							log::debug!(target: LOG_TARGET, "{}: {peer:?} doesn't exist", self.protocol);
1054						},
1055					},
1056				PeersetCommand::DisconnectPeer { peer } => {
1057					log::debug!(
1058						target: LOG_TARGET,
1059						"{}: ignoring disconnection request for reserved peer {peer}",
1060						self.protocol,
1061					);
1062				},
1063				// set new reserved peers for the protocol
1064				//
1065				// Current reserved peers not in the new set are moved to the regular set of peers
1066				// or disconnected (if there are no slots available). The new reserved peers are
1067				// scheduled for outbound substreams
1068				PeersetCommand::SetReservedPeers { peers } => {
1069					log::debug!(target: LOG_TARGET, "{}: set reserved peers {peers:?}", self.protocol);
1070
1071					// reserved peers don't consume any slots so if there are any regular connected
1072					// peers, inbound/outbound slot count must be adjusted to not account for these
1073					// peers anymore
1074					//
1075					// calculate how many of the previously connected peers were counted as regular
1076					// peers and substract these counts from `num_out`/`num_in`
1077					//
1078					// If a reserved peer is not already tracked, it is added as disconnected by
1079					// `calculate_slot_adjustment`. This ensures at the next slot allocation (1sec)
1080					// that we'll try to establish a connection with the reserved peer.
1081					let (in_peers, out_peers) = self.calculate_slot_adjustment(peers.iter());
1082					self.num_out -= out_peers;
1083					self.num_in -= in_peers;
1084
1085					// collect all *reserved* peers who are not in the new reserved set
1086					let reserved_peers_maybe_remove =
1087						self.reserved_peers.difference(&peers).cloned().collect::<Vec<_>>();
1088
1089					self.reserved_peers = peers;
1090
1091					let peers_to_remove = reserved_peers_maybe_remove
1092						.into_iter()
1093						.filter(|peer| {
1094							match self.peers.remove(&peer) {
1095								Some(PeerState::Connected { mut direction }) => {
1096									// The direction contains a `Reserved::Yes` flag, because this
1097									// is a reserve peer that we want to close.
1098									// The `Reserved::Yes` ensures we don't adjust the slot count
1099									// when the substream is closed.
1100
1101									let disconnect =
1102										self.reserved_only || self.should_disconnect(direction);
1103
1104									if disconnect {
1105										log::trace!(
1106											target: LOG_TARGET,
1107											"{}: close connection to previously reserved {peer:?}, direction {direction:?}",
1108											self.protocol,
1109										);
1110
1111										self.peers.insert(*peer, PeerState::Closing { direction });
1112										true
1113									} else {
1114										log::trace!(
1115											target: LOG_TARGET,
1116											"{}: {peer:?} is no longer reserved, move to regular peers, direction {direction:?}",
1117											self.protocol,
1118										);
1119
1120										// The peer is kept connected as non-reserved. This will
1121										// further count towards the slot count.
1122										direction.set_reserved(Reserved::No);
1123										self.increment_slot(direction);
1124
1125										self.peers
1126											.insert(*peer, PeerState::Connected { direction });
1127										false
1128									}
1129								},
1130								// substream might have been opening but not yet fully open when
1131								// the protocol request the reserved set to be changed
1132								Some(PeerState::Opening { direction }) => {
1133									log::trace!(
1134										target: LOG_TARGET,
1135										"{}: cancel substream to {peer:?}, direction {direction:?}",
1136										self.protocol,
1137									);
1138
1139									self.peers.insert(*peer, PeerState::Canceled { direction });
1140									false
1141								},
1142								Some(state) => {
1143									self.peers.insert(*peer, state);
1144									false
1145								},
1146								None => {
1147									log::debug!(target: LOG_TARGET, "{}: {peer:?} doesn't exist", self.protocol);
1148									debug_assert!(false);
1149									false
1150								},
1151							}
1152						})
1153						.collect();
1154
1155					log::trace!(
1156						target: LOG_TARGET,
1157						"{}: close substreams to {peers_to_remove:?}",
1158						self.protocol,
1159					);
1160
1161					return Poll::Ready(Some(PeersetNotificationCommand::CloseSubstream {
1162						peers: peers_to_remove,
1163					}))
1164				},
1165				PeersetCommand::AddReservedPeers { peers } => {
1166					log::debug!(target: LOG_TARGET, "{}: add reserved peers {peers:?}", self.protocol);
1167
1168					// reserved peers don't consume any slots so if there are any regular connected
1169					// peers, inbound/outbound slot count must be adjusted to not account for these
1170					// peers anymore
1171					//
1172					// calculate how many of the previously connected peers were counted as regular
1173					// peers and substract these counts from `num_out`/`num_in`
1174					let (in_peers, out_peers) = self.calculate_slot_adjustment(peers.iter());
1175					self.num_out -= out_peers;
1176					self.num_in -= in_peers;
1177
1178					let peers = peers
1179						.iter()
1180						.filter_map(|peer| {
1181							if !self.reserved_peers.insert(*peer) {
1182								log::warn!(
1183									target: LOG_TARGET,
1184									"{}: {peer:?} is already a reserved peer",
1185									self.protocol,
1186								);
1187								return None
1188							}
1189
1190							std::matches!(
1191								self.peers.get_mut(peer),
1192								None | Some(PeerState::Disconnected)
1193							)
1194							.then(|| {
1195								self.peers.insert(
1196									*peer,
1197									PeerState::Opening {
1198										direction: Direction::Outbound(Reserved::Yes),
1199									},
1200								);
1201								*peer
1202							})
1203						})
1204						.collect();
1205
1206					log::debug!(target: LOG_TARGET, "{}: start connecting to {peers:?}", self.protocol);
1207
1208					return Poll::Ready(Some(PeersetNotificationCommand::OpenSubstream { peers }))
1209				},
1210				PeersetCommand::RemoveReservedPeers { peers } => {
1211					log::debug!(target: LOG_TARGET, "{}: remove reserved peers {peers:?}", self.protocol);
1212
1213					let peers_to_remove = peers
1214						.iter()
1215						.filter_map(|peer| {
1216							if !self.reserved_peers.remove(peer) {
1217								log::debug!(
1218									target: LOG_TARGET,
1219									"{}: {peer} is not a reserved peer",
1220									self.protocol,
1221								);
1222								return None
1223							}
1224
1225							match self.peers.remove(peer)? {
1226								// peer might have already disconnected by the time request to
1227								// disconnect them was received and the peer was backed off but
1228								// it had no expired by the time the request to disconnect the
1229								// peer was received
1230								PeerState::Backoff => {
1231									log::trace!(
1232										target: LOG_TARGET,
1233										"{}: cannot disconnect removed reserved peer {peer:?}, already backed-off",
1234										self.protocol,
1235									);
1236
1237									self.peers.insert(*peer, PeerState::Backoff);
1238									None
1239								},
1240
1241								// if there is a rapid change in substream state, the peer may
1242								// be canceled when the substream is asked to be closed.
1243								//
1244								// this can happen if substream is first opened and the very
1245								// soon after canceled. The substream may not have had time to
1246								// open yet and second open is ignored. If the substream is now
1247								// closed again before it has had time to open, it will be in
1248								// canceled state since `Peerset` is still waiting to hear
1249								// either success/failure on the original substream it tried to
1250								// cancel.
1251								PeerState::Canceled { direction } => {
1252									log::trace!(
1253										target: LOG_TARGET,
1254										"{}: cannot disconnect removed reserved peer {peer:?}, already canceled",
1255										self.protocol,
1256									);
1257
1258									self.peers.insert(*peer, PeerState::Canceled { direction });
1259									None
1260								},
1261
1262								// substream to the peer might have failed to open which caused
1263								// the peer to be backed off
1264								//
1265								// the back-off might've expired by the time the peer was
1266								// disconnected at which point the peer is already disconnected
1267								// when the protocol asked the peer to be disconnected
1268								PeerState::Disconnected => {
1269									log::trace!(
1270										target: LOG_TARGET,
1271										"{}: cannot disconnect removed reserved peer {peer:?}, already disconnected",
1272										self.protocol,
1273									);
1274
1275									self.peers.insert(*peer, PeerState::Disconnected);
1276									None
1277								},
1278
1279								// if a node disconnects, it's put into `PeerState::Closing`
1280								// which indicates that `Peerset` wants the substream closed and
1281								// has asked litep2p to close it but it hasn't yet received a
1282								// confirmation. If the peer is added as a reserved peer while
1283								// the substream is closing, the peer will remain in the closing
1284								// state as `Peerset` can't do anything with the peer until it
1285								// has heard from litep2p. It's possible that the peer is then
1286								// removed from the reserved set before substream close event
1287								// has been reported to `Peerset` (which the code below is
1288								// handling) and it will once again be ignored until the close
1289								// event is heard from litep2p.
1290								PeerState::Closing { direction } => {
1291									log::trace!(
1292										target: LOG_TARGET,
1293										"{}: cannot disconnect removed reserved peer {peer:?}, already closing",
1294										self.protocol,
1295									);
1296
1297									self.peers.insert(*peer, PeerState::Closing { direction });
1298									None
1299								},
1300								// peer is currently connected as a reserved peer
1301								//
1302								// check if the peer can be accepted as a regular peer based on its
1303								// substream direction and available slots
1304								//
1305								// if there are enough slots, the peer is just converted to
1306								// a regular peer and the used slot count is increased and if the
1307								// peer cannot be accepted, litep2p is asked to close the substream.
1308								PeerState::Connected { mut direction } => {
1309									let disconnect = self.should_disconnect(direction);
1310
1311									if disconnect {
1312										log::trace!(
1313											target: LOG_TARGET,
1314											"{}: close connection to removed reserved {peer:?}, direction {direction:?}",
1315											self.protocol,
1316										);
1317
1318										self.peers.insert(*peer, PeerState::Closing { direction });
1319										Some(*peer)
1320									} else {
1321										log::trace!(
1322											target: LOG_TARGET,
1323											"{}: {peer:?} converted to regular peer {peer:?} direction {direction:?}",
1324											self.protocol,
1325										);
1326
1327										// The peer is kept connected as non-reserved. This will
1328										// further count towards the slot count.
1329										direction.set_reserved(Reserved::No);
1330										self.increment_slot(direction);
1331
1332										self.peers
1333											.insert(*peer, PeerState::Connected { direction });
1334
1335										None
1336									}
1337								},
1338
1339								PeerState::Opening { mut direction } => {
1340									let disconnect = self.should_disconnect(direction);
1341
1342									if disconnect {
1343										log::trace!(
1344											target: LOG_TARGET,
1345											"{}: cancel substream to disconnect removed reserved peer {peer:?}, direction {direction:?}",
1346											self.protocol,
1347										);
1348
1349										self.peers.insert(
1350											*peer,
1351											PeerState::Canceled {
1352												direction
1353											},
1354										);
1355									} else {
1356										log::trace!(
1357											target: LOG_TARGET,
1358											"{}: {peer:?} converted to regular peer {peer:?} direction {direction:?}",
1359											self.protocol,
1360										);
1361
1362										// The peer is kept connected as non-reserved. This will
1363										// further count towards the slot count.
1364										direction.set_reserved(Reserved::No);
1365										self.increment_slot(direction);
1366
1367										self.peers
1368											.insert(*peer, PeerState::Opening { direction });
1369									}
1370
1371									None
1372								},
1373							}
1374						})
1375						.collect();
1376
1377					log::debug!(
1378						target: LOG_TARGET,
1379						"{}: close substreams to {peers_to_remove:?}",
1380						self.protocol,
1381					);
1382
1383					return Poll::Ready(Some(PeersetNotificationCommand::CloseSubstream {
1384						peers: peers_to_remove,
1385					}))
1386				},
1387				PeersetCommand::SetReservedOnly { reserved_only } => {
1388					log::debug!(target: LOG_TARGET, "{}: set reserved only mode to {reserved_only}", self.protocol);
1389
1390					// update mode and if it's set to true, disconnect all non-reserved peers
1391					self.reserved_only = reserved_only;
1392
1393					if reserved_only {
1394						let peers_to_remove = self
1395							.peers
1396							.iter()
1397							.filter_map(|(peer, state)| {
1398								(!self.reserved_peers.contains(peer) &&
1399									std::matches!(state, PeerState::Connected { .. }))
1400								.then_some(*peer)
1401							})
1402							.collect::<Vec<_>>();
1403
1404						// set peers to correct states
1405
1406						// peers who are connected are move to [`PeerState::Closing`]
1407						// and peers who are already opening are moved to [`PeerState::Canceled`]
1408						// and if the substream for them opens, it will be closed right after.
1409						self.peers.iter_mut().for_each(|(_, state)| match state {
1410							PeerState::Connected { direction } => {
1411								*state = PeerState::Closing { direction: *direction };
1412							},
1413							// peer for whom a substream was opening are canceled and if the
1414							// substream opens successfully, it will be closed immediately
1415							PeerState::Opening { direction } => {
1416								*state = PeerState::Canceled { direction: *direction };
1417							},
1418							_ => {},
1419						});
1420
1421						return Poll::Ready(Some(PeersetNotificationCommand::CloseSubstream {
1422							peers: peers_to_remove,
1423						}))
1424					}
1425				},
1426				PeersetCommand::GetReservedPeers { tx } => {
1427					let _ = tx.send(self.reserved_peers.iter().cloned().collect());
1428				},
1429			}
1430		}
1431
1432		// periodically check if `Peerset` is currently not connected to some reserved peers
1433		// it should be connected to
1434		//
1435		// also check if there are free outbound slots and if so, fetch peers with highest
1436		// reputations from `Peerstore` and start opening substreams to these peers
1437		if let Poll::Ready(()) = Pin::new(&mut self.next_slot_allocation).poll(cx) {
1438			let mut connect_to = self
1439				.peers
1440				.iter()
1441				.filter_map(|(peer, state)| {
1442					(self.reserved_peers.contains(peer) &&
1443						std::matches!(state, PeerState::Disconnected) &&
1444						!self.peerstore_handle.is_banned(peer))
1445					.then_some(*peer)
1446				})
1447				.collect::<Vec<_>>();
1448
1449			connect_to.iter().for_each(|peer| {
1450				self.peers.insert(
1451					*peer,
1452					PeerState::Opening { direction: Direction::Outbound(Reserved::Yes) },
1453				);
1454			});
1455
1456			// if the number of outbound peers is lower than the desired amount of outbound peers,
1457			// query `PeerStore` and try to get a new outbound candidated.
1458			if self.num_out < self.max_out && !self.reserved_only {
1459				// From the candidates offered by the peerstore we need to ignore:
1460				// - all peers that are not in the `PeerState::Disconnected` state (ie they are
1461				//   connected / closing)
1462				// - reserved peers since we initiated a connection to them in the previous step
1463				let ignore: HashSet<PeerId> = self
1464					.peers
1465					.iter()
1466					.filter_map(|(peer, state)| {
1467						(!std::matches!(state, PeerState::Disconnected)).then_some(*peer)
1468					})
1469					.chain(self.reserved_peers.iter().cloned())
1470					.collect();
1471
1472				let peers: Vec<_> =
1473					self.peerstore_handle.outgoing_candidates(self.max_out - self.num_out, ignore);
1474
1475				if peers.len() > 0 {
1476					peers.iter().for_each(|peer| {
1477						self.peers.insert(
1478							*peer,
1479							PeerState::Opening { direction: Direction::Outbound(Reserved::No) },
1480						);
1481					});
1482
1483					self.num_out += peers.len();
1484					connect_to.extend(peers);
1485				}
1486			}
1487
1488			// start timer for the next allocation and if there were peers which the `Peerset`
1489			// wasn't connected but should be, send command to litep2p to start opening substreams.
1490			self.next_slot_allocation = Delay::new(SLOT_ALLOCATION_FREQUENCY);
1491
1492			if !connect_to.is_empty() {
1493				log::trace!(
1494					target: LOG_TARGET,
1495					"{}: start connecting to peers {connect_to:?}",
1496					self.protocol,
1497				);
1498
1499				return Poll::Ready(Some(PeersetNotificationCommand::OpenSubstream {
1500					peers: connect_to,
1501				}))
1502			}
1503		}
1504
1505		Poll::Pending
1506	}
1507}