sc_network/litep2p/shim/notification/
peerset.rs

1// This file is part of Substrate.
2
3// Copyright (C) Parity Technologies (UK) Ltd.
4// SPDX-License-Identifier: GPL-3.0-or-later WITH Classpath-exception-2.0
5
6// This program is free software: you can redistribute it and/or modify
7// it under the terms of the GNU General Public License as published by
8// the Free Software Foundation, either version 3 of the License, or
9// (at your option) any later version.
10
11// This program is distributed in the hope that it will be useful,
12// but WITHOUT ANY WARRANTY; without even the implied warranty of
13// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
14// GNU General Public License for more details.
15
16// You should have received a copy of the GNU General Public License
17// along with this program. If not, see <https://www.gnu.org/licenses/>.
18
19//! [`Peerset`] implementation for `litep2p`.
20//!
21//! [`Peerset`] is a separate but related component running alongside the notification protocol,
22//! responsible for maintaining connectivity to remote peers. [`Peerset`] has an imperfect view of
23//! the network as the notification protocol is behind an asynchronous channel. Based on this
24//! imperfect view, it tries to connect to remote peers and disconnect peers that should be
25//! disconnected from.
26//!
27//! [`Peerset`] knows of two types of peers:
28//!  - normal peers
29//!  - reserved peers
30//!
31//! Reserved peers are those which the [`Peerset`] should be connected at all times and it will make
32//! an effort to do so by constantly checking that there are no disconnected reserved peers (except
33//! banned) and if there are, it will open substreams to them.
34//!
35//! [`Peerset`] may also contain "slots", both inbound and outbound, which mark how many incoming
36//! and outgoing connections it should maintain at all times. Peers for the inbound slots are filled
37//! by remote peers opening inbound substreams towards the local node and peers for the outbound
38//! slots are filled by querying the `Peerstore` which contains all peers known to `sc-network`.
39//! Peers for outbound slots are selected in a decreasing order of reputation.
40
41use crate::{
42	peer_store::{PeerStoreProvider, ProtocolHandle},
43	service::traits::{self, ValidationResult},
44	ProtocolName, ReputationChange as Reputation,
45};
46
47use futures::{channel::oneshot, future::BoxFuture, stream::FuturesUnordered, Stream, StreamExt};
48use futures_timer::Delay;
49use litep2p::protocol::notification::NotificationError;
50
51use sc_network_types::PeerId;
52use sc_utils::mpsc::{tracing_unbounded, TracingUnboundedReceiver, TracingUnboundedSender};
53
54use std::{
55	collections::{HashMap, HashSet},
56	future::Future,
57	pin::Pin,
58	sync::{
59		atomic::{AtomicUsize, Ordering},
60		Arc,
61	},
62	task::{Context, Poll},
63	time::Duration,
64};
65
66/// Logging target for the file.
67const LOG_TARGET: &str = "sub-libp2p::peerset";
68
69/// Default backoff for connection re-attempts.
70const DEFAULT_BACKOFF: Duration = Duration::from_secs(5);
71
72/// Open failure backoff.
73const OPEN_FAILURE_BACKOFF: Duration = Duration::from_secs(5);
74
75/// Slot allocation frequency.
76///
77/// How often should [`Peerset`] attempt to establish outbound connections.
78const SLOT_ALLOCATION_FREQUENCY: Duration = Duration::from_secs(1);
79
80/// Reputation adjustment when a peer gets disconnected.
81///
82/// Lessens the likelyhood of the peer getting selected for an outbound connection soon.
83const DISCONNECT_ADJUSTMENT: Reputation = Reputation::new(-256, "Peer disconnected");
84
85/// Reputation adjustment when a substream fails to open.
86///
87/// Lessens the likelyhood of the peer getting selected for an outbound connection soon.
88const OPEN_FAILURE_ADJUSTMENT: Reputation = Reputation::new(-1024, "Open failure");
89
90/// Is the peer reserved?
91#[derive(Debug, Copy, Clone, PartialEq, Eq)]
92pub enum Reserved {
93	Yes,
94	No,
95}
96
97impl From<bool> for Reserved {
98	fn from(value: bool) -> Reserved {
99		match value {
100			true => Reserved::Yes,
101			false => Reserved::No,
102		}
103	}
104}
105
106impl From<Reserved> for bool {
107	fn from(value: Reserved) -> bool {
108		std::matches!(value, Reserved::Yes)
109	}
110}
111
112#[derive(Debug, Copy, Clone, PartialEq, Eq)]
113pub enum Direction {
114	/// Inbound substream.
115	Inbound(Reserved),
116
117	/// Outbound substream.
118	Outbound(Reserved),
119}
120
121impl From<Direction> for traits::Direction {
122	fn from(direction: Direction) -> traits::Direction {
123		match direction {
124			Direction::Inbound(_) => traits::Direction::Inbound,
125			Direction::Outbound(_) => traits::Direction::Outbound,
126		}
127	}
128}
129
130/// Open result for a fully-opened connection.
131#[derive(PartialEq, Eq)]
132pub enum OpenResult {
133	/// Accept the connection.
134	Accept {
135		/// Direction which [`Peerset`] considers to be correct.
136		direction: traits::Direction,
137	},
138
139	/// Reject the connection because it was canceled while it was opening.
140	Reject,
141}
142
143/// Commands emitted by other subsystems of the blockchain to [`Peerset`].
144#[derive(Debug)]
145pub enum PeersetCommand {
146	/// Set current reserved peer set.
147	///
148	/// This command removes all reserved peers that are not in `peers`.
149	SetReservedPeers {
150		/// New reserved peer set.
151		peers: HashSet<PeerId>,
152	},
153
154	/// Add one or more reserved peers.
155	///
156	/// This command doesn't remove any reserved peers but only add new peers.
157	AddReservedPeers {
158		/// Reserved peers to add.
159		peers: HashSet<PeerId>,
160	},
161
162	/// Remove reserved peers.
163	RemoveReservedPeers {
164		/// Reserved peers to remove.
165		peers: HashSet<PeerId>,
166	},
167
168	/// Set reserved-only mode to true/false.
169	SetReservedOnly {
170		/// Should the protocol only accept/establish connections to reserved peers.
171		reserved_only: bool,
172	},
173
174	/// Disconnect peer.
175	DisconnectPeer {
176		/// Peer ID.
177		peer: PeerId,
178	},
179
180	/// Get reserved peers.
181	GetReservedPeers {
182		/// `oneshot::Sender` for sending the current set of reserved peers.
183		tx: oneshot::Sender<Vec<PeerId>>,
184	},
185}
186
187/// Commands emitted by [`Peerset`] to the notification protocol.
188#[derive(Debug)]
189pub enum PeersetNotificationCommand {
190	/// Open substreams to one or more peers.
191	OpenSubstream {
192		/// Peer IDs.
193		peers: Vec<PeerId>,
194	},
195
196	/// Close substream to one or more peers.
197	CloseSubstream {
198		/// Peer IDs.
199		peers: Vec<PeerId>,
200	},
201}
202
203/// Peer state.
204///
205/// Peer can be in 6 different state:
206///  - disconnected
207///  - connected
208///  - connection is opening
209///  - connection is closing
210///  - connection is backed-off
211///  - connection is canceled
212///
213/// Opening and closing are separate states as litep2p guarantees to report when the substream is
214/// either fully open or fully closed and the slot allocation for opening a substream is tied to a
215/// state transition which moves the peer to [`PeerState::Opening`]. This is because it allows
216/// reserving a slot for peer to prevent infinite outbound substreams. If the substream is opened
217/// successfully, peer is moved to state [`PeerState::Connected`] but there is no modification to
218/// the slot count as an outbound slot was already allocated for the peer. If the substream fails to
219/// open, the event is reported by litep2p and [`Peerset::report_substream_open_failure()`] is
220/// called which will decrease the outbound slot count. Similarly for inbound streams, the slot is
221/// allocated in [`Peerset::report_inbound_substream()`] which will prevent `Peerset` from accepting
222/// infinite inbound substreams. If the inbound substream fails to open and since [`Peerset`] was
223/// notified of it, litep2p will report the open failure and the inbound slot count is once again
224/// decreased in [`Peerset::report_substream_open_failure()`]. If the substream is opened
225/// successfully, the slot count is not modified.
226///
227/// Since closing a substream is not instantaneous, there is a separate [`PeerState::Closing`]
228/// state which indicates that the substream is being closed but hasn't been closed by litep2p yet.
229/// This state is used to prevent invalid state transitions where, for example, [`Peerset`] would
230/// close a substream and then try to reopen it immediately.
231///
232/// Irrespective of which side closed the substream (local/remote), the substream is chilled for a
233/// small amount of time ([`DEFAULT_BACKOFF`]) and during this time no inbound or outbound
234/// substreams are accepted/established. Any request to open an outbound substream while the peer
235/// is backed-off is ignored. If the peer is a reserved peer, an outbound substream is not opened
236/// for them immediately but after the back-off has expired, `Peerset` will attempt to open a
237/// substream to the peer if it's still counted as a reserved peer.
238///
239/// Disconnections and open failures will contribute negatively to the peer score to prevent it from
240/// being selected for another outbound substream request soon after the failure/disconnection. The
241/// reputation decays towards zero over time and eventually the peer will be as likely to be
242/// selected for an outbound substream as any other freshly added peer.
243///
244/// [`Peerset`] must also be able to handle the case where an outbound substream was opened to peer
245/// and while it was opening, an inbound substream was received from that same peer. Since `litep2p`
246/// is the source of truth of the actual state of the connection, [`Peerset`] must compensate for
247/// this and if it happens that inbound substream is opened for a peer that was marked outbound, it
248/// will attempt to allocate an inbound slot for the peer. If it fails to do so, the inbound
249/// substream is rejected and the peer is marked as canceled.
250///
251/// Since substream is not opened immediately, a peer can be disconnected even if the substream was
252/// not yet open. This can happen, for example, when a peer has connected over the syncing protocol
253/// and it was added to, e.g., GRANDPA's reserved peers, an outbound substream was opened
254/// ([`PeerState::Opening`]) and then the peer disconnected. This state transition is handled by the
255/// [`Peerset`] with `PeerState::Canceled` which indicates that should the substream open
256/// successfully, it should be closed immediately and if the connection is opened successfully while
257/// the peer was marked as canceled, the substream will be closed without notifying the protocol
258/// about the substream.
259#[derive(Debug, PartialEq, Eq)]
260pub enum PeerState {
261	/// No active connection to peer.
262	Disconnected,
263
264	/// Substream to peer was recently closed and the peer is currently backed off.
265	///
266	/// Backoff only applies to outbound substreams. Inbound substream will not experience any sort
267	/// of "banning" even if the peer is backed off and an inbound substream for the peer is
268	/// received.
269	Backoff,
270
271	/// Connection to peer is pending.
272	Opening {
273		/// Direction of the connection.
274		direction: Direction,
275	},
276
277	// Connected to peer.
278	Connected {
279		/// Is the peer inbound or outbound.
280		direction: Direction,
281	},
282
283	/// Substream was opened and while it was opening (no response had been heard from litep2p),
284	/// the substream was canceled by either calling `disconnect_peer()` or by removing peer
285	/// from the reserved set.
286	///
287	/// After the opened substream is acknowledged by litep2p (open success/failure), the peer is
288	/// moved to [`PeerState::Backoff`] from which it will then be moved to
289	/// [`PeerState::Disconnected`].
290	Canceled {
291		/// Is the peer inbound or outbound.
292		direction: Direction,
293	},
294
295	/// Connection to peer is closing.
296	///
297	/// State implies that the substream was asked to be closed by the local node and litep2p is
298	/// closing the substream. No command modifying the connection state is accepted until the
299	/// state has been set to [`PeerState::Disconnected`].
300	Closing {
301		/// Is the peer inbound or outbound.
302		direction: Direction,
303	},
304}
305
306/// `Peerset` implementation.
307///
308/// `Peerset` allows other subsystems of the blockchain to modify the connection state
309/// of the notification protocol by adding and removing reserved peers.
310///
311/// `Peerset` is also responsible for maintaining the desired amount of peers the protocol is
312/// connected to by establishing outbound connections and accepting/rejecting inbound connections.
313#[derive(Debug)]
314pub struct Peerset {
315	/// Protocol name.
316	protocol: ProtocolName,
317
318	/// RX channel for receiving commands.
319	cmd_rx: TracingUnboundedReceiver<PeersetCommand>,
320
321	/// Maximum number of outbound peers.
322	max_out: usize,
323
324	/// Current number of outbound peers.
325	num_out: usize,
326
327	/// Maximum number of inbound peers.
328	max_in: usize,
329
330	/// Current number of inbound peers.
331	num_in: usize,
332
333	/// Only connect to/accept connections from reserved peers.
334	reserved_only: bool,
335
336	/// Current reserved peer set.
337	reserved_peers: HashSet<PeerId>,
338
339	/// Handle to `Peerstore`.
340	peerstore_handle: Arc<dyn PeerStoreProvider>,
341
342	/// Peers.
343	peers: HashMap<PeerId, PeerState>,
344
345	/// Counter connected peers.
346	connected_peers: Arc<AtomicUsize>,
347
348	/// Pending backoffs for peers who recently disconnected.
349	pending_backoffs: FuturesUnordered<BoxFuture<'static, (PeerId, Reputation)>>,
350
351	/// Next time when [`Peerset`] should perform slot allocation.
352	next_slot_allocation: Delay,
353}
354
355macro_rules! decrement_or_warn {
356    ($slot:expr, $protocol:expr, $peer:expr, $direction:expr) => {{
357		match $slot.checked_sub(1) {
358			Some(value) => {
359				$slot = value;
360			}
361			None => {
362				log::warn!(
363					target: LOG_TARGET,
364					"{}: state mismatch, {:?} is not counted as part of {:?} slots",
365					$protocol, $peer, $direction
366				);
367				debug_assert!(false);
368			}
369		}
370    }};
371}
372
373/// Handle to [`Peerset`], given to `Peerstore`.
374#[derive(Debug)]
375struct PeersetHandle {
376	/// TX channel for sending commands to [`Peerset`].
377	tx: TracingUnboundedSender<PeersetCommand>,
378}
379
380impl ProtocolHandle for PeersetHandle {
381	/// Disconnect peer, as a result of a ban.
382	fn disconnect_peer(&self, peer: PeerId) {
383		let _ = self.tx.unbounded_send(PeersetCommand::DisconnectPeer { peer });
384	}
385}
386
387impl Peerset {
388	/// Create new [`Peerset`].
389	pub fn new(
390		protocol: ProtocolName,
391		max_out: usize,
392		max_in: usize,
393		reserved_only: bool,
394		reserved_peers: HashSet<PeerId>,
395		connected_peers: Arc<AtomicUsize>,
396		peerstore_handle: Arc<dyn PeerStoreProvider>,
397	) -> (Self, TracingUnboundedSender<PeersetCommand>) {
398		let (cmd_tx, cmd_rx) = tracing_unbounded("mpsc-peerset-protocol", 100_000);
399		let peers = reserved_peers
400			.iter()
401			.map(|peer| (*peer, PeerState::Disconnected))
402			.collect::<HashMap<_, _>>();
403
404		// register protocol's command channel to `Peerstore` so it can issue disconnect commands
405		// if some connected peer gets banned.
406		peerstore_handle.register_protocol(Arc::new(PeersetHandle { tx: cmd_tx.clone() }));
407
408		(
409			Self {
410				protocol,
411				max_out,
412				num_out: 0usize,
413				max_in,
414				num_in: 0usize,
415				reserved_peers,
416				cmd_rx,
417				peerstore_handle,
418				reserved_only,
419				peers,
420				connected_peers,
421				pending_backoffs: FuturesUnordered::new(),
422				next_slot_allocation: Delay::new(SLOT_ALLOCATION_FREQUENCY),
423			},
424			cmd_tx,
425		)
426	}
427
428	/// Report to [`Peerset`] that a substream was opened.
429	///
430	/// Slot for the stream was "preallocated" when it was initiated (outbound) or accepted
431	/// (inbound) by the local node which is why this function doesn't allocate a slot for the peer.
432	///
433	/// Returns `true` if the substream should be kept open and `false` if the substream had been
434	/// canceled while it was opening and litep2p should close the substream.
435	pub fn report_substream_opened(
436		&mut self,
437		peer: PeerId,
438		direction: traits::Direction,
439	) -> OpenResult {
440		log::trace!(
441			target: LOG_TARGET,
442			"{}: substream opened to {peer:?}, direction {direction:?}, reserved peer {}",
443			self.protocol,
444			self.reserved_peers.contains(&peer),
445		);
446
447		let Some(state) = self.peers.get_mut(&peer) else {
448			log::warn!(target: LOG_TARGET, "{}: substream opened for unknown peer {peer:?}", self.protocol);
449			debug_assert!(false);
450			return OpenResult::Reject
451		};
452
453		match state {
454			PeerState::Opening { direction: substream_direction } => {
455				let real_direction: traits::Direction = (*substream_direction).into();
456
457				*state = PeerState::Connected { direction: *substream_direction };
458				self.connected_peers.fetch_add(1usize, Ordering::Relaxed);
459
460				return OpenResult::Accept { direction: real_direction }
461			},
462			// litep2p doesn't support the ability to cancel an opening substream so if the
463			// substream was closed while it was opening, it was marked as canceled and if the
464			// substream opens succesfully, it will be closed
465			PeerState::Canceled { direction: substream_direction } => {
466				log::trace!(
467					target: LOG_TARGET,
468					"{}: substream to {peer:?} is canceled, issue disconnection request",
469					self.protocol,
470				);
471
472				self.connected_peers.fetch_add(1usize, Ordering::Relaxed);
473				*state = PeerState::Closing { direction: *substream_direction };
474
475				return OpenResult::Reject
476			},
477			state => {
478				panic!("{}: invalid state for open substream {peer:?} {state:?}", self.protocol);
479			},
480		}
481	}
482
483	/// Report to [`Peerset`] that a substream was closed.
484	///
485	/// If the peer was not a reserved peer, the inbound/outbound slot count is adjusted to account
486	/// for the disconnected peer. After the connection is closed, the peer is chilled for a
487	/// duration of [`DEFAULT_BACKOFF`] which prevens [`Peerset`] from establishing/accepting new
488	/// connections for that time period.
489	pub fn report_substream_closed(&mut self, peer: PeerId) {
490		log::trace!(target: LOG_TARGET, "{}: substream closed to {peer:?}", self.protocol);
491
492		let Some(state) = self.peers.get_mut(&peer) else {
493			log::warn!(target: LOG_TARGET, "{}: substream closed for unknown peer {peer:?}", self.protocol);
494			debug_assert!(false);
495			return
496		};
497
498		match &state {
499			// close was initiated either by remote ([`PeerState::Connected`]) or local node
500			// ([`PeerState::Closing`]) and it was a non-reserved peer
501			PeerState::Connected { direction: Direction::Inbound(Reserved::No) } |
502			PeerState::Closing { direction: Direction::Inbound(Reserved::No) } => {
503				log::trace!(
504					target: LOG_TARGET,
505					"{}: inbound substream closed to non-reserved peer {peer:?}: {state:?}",
506					self.protocol,
507				);
508
509				decrement_or_warn!(
510					self.num_in,
511					peer,
512					self.protocol,
513					Direction::Inbound(Reserved::No)
514				);
515			},
516			// close was initiated either by remote ([`PeerState::Connected`]) or local node
517			// ([`PeerState::Closing`]) and it was a non-reserved peer
518			PeerState::Connected { direction: Direction::Outbound(Reserved::No) } |
519			PeerState::Closing { direction: Direction::Outbound(Reserved::No) } => {
520				log::trace!(
521					target: LOG_TARGET,
522					"{}: outbound substream closed to non-reserved peer {peer:?} {state:?}",
523					self.protocol,
524				);
525
526				decrement_or_warn!(
527					self.num_out,
528					peer,
529					self.protocol,
530					Direction::Outbound(Reserved::No)
531				);
532			},
533			// reserved peers don't require adjustments to slot counts
534			PeerState::Closing { .. } | PeerState::Connected { .. } => {
535				log::debug!(target: LOG_TARGET, "{}: reserved peer {peer:?} disconnected", self.protocol);
536			},
537			state => {
538				log::warn!(target: LOG_TARGET, "{}: invalid state for disconnected peer {peer:?}: {state:?}", self.protocol);
539				debug_assert!(false);
540			},
541		}
542		*state = PeerState::Backoff;
543
544		self.connected_peers.fetch_sub(1usize, Ordering::Relaxed);
545		self.pending_backoffs.push(Box::pin(async move {
546			Delay::new(DEFAULT_BACKOFF).await;
547			(peer, DISCONNECT_ADJUSTMENT)
548		}));
549	}
550
551	/// Report to [`Peerset`] that an inbound substream was opened and that it should validate it.
552	pub fn report_inbound_substream(&mut self, peer: PeerId) -> ValidationResult {
553		log::trace!(target: LOG_TARGET, "{}: inbound substream from {peer:?}", self.protocol);
554
555		if self.peerstore_handle.is_banned(&peer) {
556			log::debug!(
557				target: LOG_TARGET,
558				"{}: rejecting banned peer {peer:?}",
559				self.protocol,
560			);
561
562			return ValidationResult::Reject;
563		}
564
565		let state = self.peers.entry(peer).or_insert(PeerState::Disconnected);
566		let is_reserved_peer = self.reserved_peers.contains(&peer);
567
568		match state {
569			// disconnected peers proceed directly to inbound slot allocation
570			PeerState::Disconnected => {},
571			// peer is backed off but if it can be accepted (either a reserved peer or inbound slot
572			// available), accept the peer and then just ignore the back-off timer when it expires
573			PeerState::Backoff =>
574				if !is_reserved_peer && self.num_in == self.max_in {
575					log::trace!(
576						target: LOG_TARGET,
577						"{}: ({peer:?}) is backed-off and cannot accept, reject inbound substream",
578						self.protocol,
579					);
580
581					return ValidationResult::Reject
582				},
583			// `Peerset` had initiated an outbound substream but litep2p had received an inbound
584			// substream before the command to open the substream was received, meaning local and
585			// remote desired to open a connection at the same time. Since outbound substreams
586			// cannot be canceled with litep2p and the command has already been registered, accept
587			// the inbound peer since the local node had wished a connection to be opened either way
588			// but keep the direction of the substream as it was (outbound).
589			//
590			// litep2p doesn't care what `Peerset` considers the substream direction to be and since
591			// it's used for bookkeeping for substream counts, keeping the substream direction
592			// unmodified simplies the implementation a lot. The direction would otherwise be
593			// irrelevant for protocols but because `SyncingEngine` has a hack to reject excess
594			// inbound substreams, that system has to be kept working for the time being. Once that
595			// issue is fixed, this approach can be re-evaluated if need be.
596			PeerState::Opening { direction: Direction::Outbound(reserved) } => {
597				log::trace!(
598					target: LOG_TARGET,
599					"{}: inbound substream received for {peer:?} ({reserved:?}) that was marked outbound",
600					self.protocol,
601				);
602
603				return ValidationResult::Accept;
604			},
605			PeerState::Canceled { direction } => {
606				log::trace!(
607					target: LOG_TARGET,
608					"{}: {peer:?} is canceled, rejecting substream",
609					self.protocol,
610				);
611
612				*state = PeerState::Canceled { direction: *direction };
613				return ValidationResult::Reject
614			},
615			state => {
616				log::warn!(
617					target: LOG_TARGET,
618					"{}: invalid state ({state:?}) for inbound substream, peer {peer:?}",
619					self.protocol
620				);
621				debug_assert!(false);
622				return ValidationResult::Reject
623			},
624		}
625
626		if is_reserved_peer {
627			log::trace!(
628				target: LOG_TARGET,
629				"{}: {peer:?} accepting peer as reserved peer",
630				self.protocol,
631			);
632
633			*state = PeerState::Opening { direction: Direction::Inbound(is_reserved_peer.into()) };
634			return ValidationResult::Accept
635		}
636
637		if self.num_in < self.max_in {
638			log::trace!(
639				target: LOG_TARGET,
640				"{}: {peer:?} accepting peer as regular peer",
641				self.protocol,
642			);
643
644			self.num_in += 1;
645
646			*state = PeerState::Opening { direction: Direction::Inbound(is_reserved_peer.into()) };
647			return ValidationResult::Accept
648		}
649
650		log::trace!(
651			target: LOG_TARGET,
652			"{}: reject {peer:?}, not a reserved peer and no free inbound slots",
653			self.protocol,
654		);
655
656		*state = PeerState::Disconnected;
657		return ValidationResult::Reject
658	}
659
660	/// Report to [`Peerset`] that there was an error opening a substream.
661	pub fn report_substream_open_failure(&mut self, peer: PeerId, error: NotificationError) {
662		log::trace!(
663			target: LOG_TARGET,
664			"{}: failed to open substream to {peer:?}: {error:?}",
665			self.protocol,
666		);
667
668		match self.peers.get(&peer) {
669			Some(PeerState::Opening { direction: Direction::Outbound(Reserved::No) }) => {
670				decrement_or_warn!(
671					self.num_out,
672					self.protocol,
673					peer,
674					Direction::Outbound(Reserved::No)
675				);
676			},
677			Some(PeerState::Opening { direction: Direction::Inbound(Reserved::No) }) => {
678				decrement_or_warn!(
679					self.num_in,
680					self.protocol,
681					peer,
682					Direction::Inbound(Reserved::No)
683				);
684			},
685			Some(PeerState::Canceled { direction }) => match direction {
686				Direction::Inbound(Reserved::No) => {
687					decrement_or_warn!(
688						self.num_in,
689						self.protocol,
690						peer,
691						Direction::Inbound(Reserved::No)
692					);
693				},
694				Direction::Outbound(Reserved::No) => {
695					decrement_or_warn!(
696						self.num_out,
697						self.protocol,
698						peer,
699						Direction::Outbound(Reserved::No)
700					);
701				},
702				_ => {},
703			},
704			// reserved peers do not require change in the slot counts
705			Some(PeerState::Opening { direction: Direction::Inbound(Reserved::Yes) }) |
706			Some(PeerState::Opening { direction: Direction::Outbound(Reserved::Yes) }) => {
707				log::debug!(
708					target: LOG_TARGET,
709					"{}: substream open failure for reserved peer {peer:?}",
710					self.protocol,
711				);
712			},
713			state => {
714				log::debug!(
715					target: LOG_TARGET,
716					"{}: substream open failure for a unknown state: {state:?}",
717					self.protocol,
718				);
719
720				return;
721			},
722		}
723
724		self.peers.insert(peer, PeerState::Backoff);
725		self.pending_backoffs.push(Box::pin(async move {
726			Delay::new(OPEN_FAILURE_BACKOFF).await;
727			(peer, OPEN_FAILURE_ADJUSTMENT)
728		}));
729	}
730
731	/// [`Peerset`] had accepted a peer but it was then rejected by the protocol.
732	pub fn report_substream_rejected(&mut self, peer: PeerId) {
733		log::trace!(target: LOG_TARGET, "{}: {peer:?} rejected by the protocol", self.protocol);
734
735		match self.peers.remove(&peer) {
736			Some(PeerState::Opening { direction }) => match direction {
737				Direction::Inbound(Reserved::Yes) | Direction::Outbound(Reserved::Yes) => {
738					log::warn!(
739						target: LOG_TARGET,
740						"{}: reserved peer {peer:?} rejected by the protocol",
741						self.protocol,
742					);
743					self.peers.insert(peer, PeerState::Disconnected);
744				},
745				Direction::Inbound(Reserved::No) => {
746					decrement_or_warn!(
747						self.num_in,
748						peer,
749						self.protocol,
750						Direction::Inbound(Reserved::No)
751					);
752					self.peers.insert(peer, PeerState::Disconnected);
753				},
754				Direction::Outbound(Reserved::No) => {
755					decrement_or_warn!(
756						self.num_out,
757						peer,
758						self.protocol,
759						Direction::Outbound(Reserved::No)
760					);
761					self.peers.insert(peer, PeerState::Disconnected);
762				},
763			},
764			Some(state @ PeerState::Canceled { .. }) => {
765				log::debug!(
766					target: LOG_TARGET,
767					"{}: substream to {peer:?} rejected by protocol but already canceled",
768					self.protocol,
769				);
770
771				self.peers.insert(peer, state);
772			},
773			Some(state) => {
774				log::debug!(
775					target: LOG_TARGET,
776					"{}: {peer:?} rejected by the protocol but not opening anymore: {state:?}",
777					self.protocol,
778				);
779
780				self.peers.insert(peer, state);
781			},
782			None => {},
783		}
784	}
785
786	/// Calculate how many of the connected peers were counted as normal inbound/outbound peers
787	/// which is needed to adjust slot counts when new reserved peers are added
788	fn calculate_slot_adjustment<'a>(
789		&'a mut self,
790		peers: impl Iterator<Item = &'a PeerId>,
791	) -> (usize, usize) {
792		peers.fold((0, 0), |(mut inbound, mut outbound), peer| {
793			match self.peers.get_mut(peer) {
794				Some(PeerState::Disconnected | PeerState::Backoff) => {},
795				Some(
796					PeerState::Opening { ref mut direction } |
797					PeerState::Connected { ref mut direction } |
798					PeerState::Canceled { ref mut direction } |
799					PeerState::Closing { ref mut direction },
800				) => {
801					*direction = match direction {
802						Direction::Inbound(Reserved::No) => {
803							inbound += 1;
804							Direction::Inbound(Reserved::Yes)
805						},
806						Direction::Outbound(Reserved::No) => {
807							outbound += 1;
808							Direction::Outbound(Reserved::Yes)
809						},
810						ref direction => **direction,
811					};
812				},
813				None => {
814					self.peers.insert(*peer, PeerState::Disconnected);
815				},
816			}
817
818			(inbound, outbound)
819		})
820	}
821
822	/// Get the number of inbound peers.
823	#[cfg(test)]
824	pub fn num_in(&self) -> usize {
825		self.num_in
826	}
827
828	/// Get the number of outbound peers.
829	#[cfg(test)]
830	pub fn num_out(&self) -> usize {
831		self.num_out
832	}
833
834	/// Get reference to known peers.
835	#[cfg(test)]
836	pub fn peers(&self) -> &HashMap<PeerId, PeerState> {
837		&self.peers
838	}
839
840	/// Get reference to reserved peers.
841	#[cfg(test)]
842	pub fn reserved_peers(&self) -> &HashSet<PeerId> {
843		&self.reserved_peers
844	}
845}
846
847impl Stream for Peerset {
848	type Item = PeersetNotificationCommand;
849
850	fn poll_next(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
851		while let Poll::Ready(Some((peer, reputation))) = self.pending_backoffs.poll_next_unpin(cx)
852		{
853			log::trace!(target: LOG_TARGET, "{}: backoff expired for {peer:?}", self.protocol);
854
855			if std::matches!(self.peers.get(&peer), None | Some(PeerState::Backoff)) {
856				self.peers.insert(peer, PeerState::Disconnected);
857			}
858
859			self.peerstore_handle.report_peer(peer, reputation);
860		}
861
862		if let Poll::Ready(Some(action)) = Pin::new(&mut self.cmd_rx).poll_next(cx) {
863			match action {
864				PeersetCommand::DisconnectPeer { peer } if !self.reserved_peers.contains(&peer) =>
865					match self.peers.remove(&peer) {
866						Some(PeerState::Connected { direction }) => {
867							log::trace!(
868								target: LOG_TARGET,
869								"{}: close connection to {peer:?}, direction {direction:?}",
870								self.protocol,
871							);
872
873							self.peers.insert(peer, PeerState::Closing { direction });
874							return Poll::Ready(Some(PeersetNotificationCommand::CloseSubstream {
875								peers: vec![peer],
876							}))
877						},
878						Some(PeerState::Backoff) => {
879							log::trace!(
880								target: LOG_TARGET,
881								"{}: cannot disconnect {peer:?}, already backed-off",
882								self.protocol,
883							);
884
885							self.peers.insert(peer, PeerState::Backoff);
886						},
887						// substream might have been opening but not yet fully open when the
888						// protocol or `Peerstore` request the connection to be closed
889						//
890						// if the substream opens successfully, close it immediately and mark the
891						// peer as `Disconnected`
892						Some(PeerState::Opening { direction }) => {
893							log::trace!(
894								target: LOG_TARGET,
895								"{}: canceling substream to disconnect peer {peer:?}",
896								self.protocol,
897							);
898
899							self.peers.insert(peer, PeerState::Canceled { direction });
900						},
901						// protocol had issued two disconnection requests in rapid succession and
902						// the substream hadn't closed before the second disconnection request was
903						// received, this is harmless and can be ignored.
904						Some(state @ PeerState::Closing { .. }) => {
905							log::trace!(
906								target: LOG_TARGET,
907								"{}: cannot disconnect {peer:?}, already closing ({state:?})",
908								self.protocol,
909							);
910
911							self.peers.insert(peer, state);
912						},
913						// if peer is banned, e.g. due to genesis mismatch, `Peerstore` will issue a
914						// global disconnection request to all protocols, irrespective of the
915						// connectivity state. Peer isn't necessarily connected to all protocols at
916						// all times so this is a harmless state to be in if a disconnection request
917						// is received.
918						Some(state @ PeerState::Disconnected) => {
919							self.peers.insert(peer, state);
920						},
921						// peer had an opening substream earlier which was canceled and then,
922						// e.g., the peer was banned which caused it to be disconnected again
923						Some(state @ PeerState::Canceled { .. }) => {
924							log::debug!(
925								target: LOG_TARGET,
926								"{}: cannot disconnect {peer:?}, already canceled ({state:?})",
927								self.protocol,
928							);
929
930							self.peers.insert(peer, state);
931						},
932						// peer doesn't exist
933						//
934						// this can happen, for example, when peer connects over
935						// `/block-announces/1` and it has wrong genesis hash which initiates a ban
936						// for that peer. Since the ban is reported to all protocols but the peer
937						// mightn't have been registered to GRANDPA or transactions yet, the peer
938						// doesn't exist in their `Peerset`s and the error can just be ignored.
939						None => {
940							log::debug!(target: LOG_TARGET, "{}: {peer:?} doesn't exist", self.protocol);
941						},
942					},
943				PeersetCommand::DisconnectPeer { peer } => {
944					log::debug!(
945						target: LOG_TARGET,
946						"{}: ignoring disconnection request for reserved peer {peer}",
947						self.protocol,
948					);
949				},
950				// set new reserved peers for the protocol
951				//
952				// current reserved peers not in the new set are disconnected and the new reserved
953				// peers are scheduled for outbound substreams
954				PeersetCommand::SetReservedPeers { peers } => {
955					log::debug!(target: LOG_TARGET, "{}: set reserved peers {peers:?}", self.protocol);
956
957					// reserved peers don't consume any slots so if there are any regular connected
958					// peers, inbound/outbound slot count must be adjusted to not account for these
959					// peers anymore
960					//
961					// calculate how many of the previously connected peers were counted as regular
962					// peers and substract these counts from `num_out`/`num_in`
963					let (in_peers, out_peers) = self.calculate_slot_adjustment(peers.iter());
964					self.num_out -= out_peers;
965					self.num_in -= in_peers;
966
967					// add all unknown peers to `self.peers`
968					peers.iter().for_each(|peer| {
969						if !self.peers.contains_key(peer) {
970							self.peers.insert(*peer, PeerState::Disconnected);
971						}
972					});
973
974					// collect all peers who are not in the new reserved set
975					let peers_to_remove = self
976						.peers
977						.iter()
978						.filter_map(|(peer, _)| (!peers.contains(peer)).then_some(*peer))
979						.collect::<HashSet<_>>();
980
981					self.reserved_peers = peers;
982
983					let peers = peers_to_remove
984						.into_iter()
985						.filter(|peer| {
986							match self.peers.remove(&peer) {
987								Some(PeerState::Connected { direction }) => {
988									log::trace!(
989										target: LOG_TARGET,
990										"{}: close connection to {peer:?}, direction {direction:?}",
991										self.protocol,
992									);
993
994									self.peers.insert(*peer, PeerState::Closing { direction });
995									true
996								},
997								// substream might have been opening but not yet fully open when
998								// the protocol request the reserved set to be changed
999								Some(PeerState::Opening { direction }) => {
1000									log::trace!(
1001										target: LOG_TARGET,
1002										"{}: cancel substream to {peer:?}, direction {direction:?}",
1003										self.protocol,
1004									);
1005
1006									self.peers.insert(*peer, PeerState::Canceled { direction });
1007									false
1008								},
1009								Some(state) => {
1010									self.peers.insert(*peer, state);
1011									false
1012								},
1013								None => {
1014									log::debug!(target: LOG_TARGET, "{}: {peer:?} doesn't exist", self.protocol);
1015									debug_assert!(false);
1016									false
1017								},
1018							}
1019						})
1020						.collect();
1021
1022					log::trace!(
1023						target: LOG_TARGET,
1024						"{}: close substreams to {peers:?}",
1025						self.protocol,
1026					);
1027
1028					return Poll::Ready(Some(PeersetNotificationCommand::CloseSubstream { peers }))
1029				},
1030				PeersetCommand::AddReservedPeers { peers } => {
1031					log::debug!(target: LOG_TARGET, "{}: add reserved peers {peers:?}", self.protocol);
1032
1033					// reserved peers don't consume any slots so if there are any regular connected
1034					// peers, inbound/outbound slot count must be adjusted to not account for these
1035					// peers anymore
1036					//
1037					// calculate how many of the previously connected peers were counted as regular
1038					// peers and substract these counts from `num_out`/`num_in`
1039					let (in_peers, out_peers) = self.calculate_slot_adjustment(peers.iter());
1040					self.num_out -= out_peers;
1041					self.num_in -= in_peers;
1042
1043					let peers = peers
1044						.iter()
1045						.filter_map(|peer| {
1046							if !self.reserved_peers.insert(*peer) {
1047								log::warn!(
1048									target: LOG_TARGET,
1049									"{}: {peer:?} is already a reserved peer",
1050									self.protocol,
1051								);
1052								return None
1053							}
1054
1055							std::matches!(
1056								self.peers.get_mut(peer),
1057								None | Some(PeerState::Disconnected)
1058							)
1059							.then(|| {
1060								self.peers.insert(
1061									*peer,
1062									PeerState::Opening {
1063										direction: Direction::Outbound(Reserved::Yes),
1064									},
1065								);
1066								*peer
1067							})
1068						})
1069						.collect();
1070
1071					log::debug!(target: LOG_TARGET, "{}: start connecting to {peers:?}", self.protocol);
1072
1073					return Poll::Ready(Some(PeersetNotificationCommand::OpenSubstream { peers }))
1074				},
1075				PeersetCommand::RemoveReservedPeers { peers } => {
1076					log::debug!(target: LOG_TARGET, "{}: remove reserved peers {peers:?}", self.protocol);
1077
1078					let peers_to_remove = peers
1079						.iter()
1080						.filter_map(|peer| {
1081							if !self.reserved_peers.remove(peer) {
1082								log::debug!(
1083									target: LOG_TARGET,
1084									"{}: {peer} is not a reserved peer",
1085									self.protocol,
1086								);
1087								return None
1088							}
1089
1090							match self.peers.remove(peer)? {
1091								// peer might have already disconnected by the time request to
1092								// disconnect them was received and the peer was backed off but
1093								// it had no expired by the time the request to disconnect the
1094								// peer was received
1095								PeerState::Backoff => {
1096									log::trace!(
1097										target: LOG_TARGET,
1098										"{}: cannot disconnect removed reserved peer {peer:?}, already backed-off",
1099										self.protocol,
1100									);
1101
1102									self.peers.insert(*peer, PeerState::Backoff);
1103									None
1104								},
1105								// if there is a rapid change in substream state, the peer may
1106								// be canceled when the substream is asked to be closed.
1107								//
1108								// this can happen if substream is first opened and the very
1109								// soon after canceled. The substream may not have had time to
1110								// open yet and second open is ignored. If the substream is now
1111								// closed again before it has had time to open, it will be in
1112								// canceled state since `Peerset` is still waiting to hear
1113								// either success/failure on the original substream it tried to
1114								// cancel.
1115								PeerState::Canceled { direction } => {
1116									log::trace!(
1117										target: LOG_TARGET,
1118										"{}: cannot disconnect removed reserved peer {peer:?}, already canceled",
1119										self.protocol,
1120									);
1121
1122									self.peers.insert(*peer, PeerState::Canceled { direction });
1123									None
1124								},
1125								// substream to the peer might have failed to open which caused
1126								// the peer to be backed off
1127								//
1128								// the back-off might've expired by the time the peer was
1129								// disconnected at which point the peer is already disconnected
1130								// when the protocol asked the peer to be disconnected
1131								PeerState::Disconnected => {
1132									log::trace!(
1133										target: LOG_TARGET,
1134										"{}: cannot disconnect removed reserved peer {peer:?}, already disconnected",
1135										self.protocol,
1136									);
1137
1138									self.peers.insert(*peer, PeerState::Disconnected);
1139									None
1140								},
1141								// if a node disconnects, it's put into `PeerState::Closing`
1142								// which indicates that `Peerset` wants the substream closed and
1143								// has asked litep2p to close it but it hasn't yet received a
1144								// confirmation. If the peer is added as a reserved peer while
1145								// the substream is closing, the peer will remain in the closing
1146								// state as `Peerset` can't do anything with the peer until it
1147								// has heard from litep2p. It's possible that the peer is then
1148								// removed from the reserved set before substream close event
1149								// has been reported to `Peerset` (which the code below is
1150								// handling) and it will once again be ignored until the close
1151								// event is heard from litep2p.
1152								PeerState::Closing { direction } => {
1153									log::trace!(
1154										target: LOG_TARGET,
1155										"{}: cannot disconnect removed reserved peer {peer:?}, already closing",
1156										self.protocol,
1157									);
1158
1159									self.peers.insert(*peer, PeerState::Closing { direction });
1160									None
1161								},
1162								// peer is currently connected as a reserved peer
1163								//
1164								// check if the peer can be accepted as a regular peer based on its
1165								// substream direction and available slots
1166								//
1167								// if there are enough slots, the peer is just converted to
1168								// a regular peer and the used slot count is increased and if the
1169								// peer cannot be accepted, litep2p is asked to close the substream.
1170								PeerState::Connected { direction } => match direction {
1171									Direction::Inbound(_) => match self.num_in < self.max_in {
1172										true => {
1173											log::trace!(
1174												target: LOG_TARGET,
1175												"{}: {peer:?} converted to regular inbound peer (inbound open)",
1176												self.protocol,
1177											);
1178
1179											self.num_in += 1;
1180											self.peers.insert(
1181												*peer,
1182												PeerState::Connected {
1183													direction: Direction::Inbound(Reserved::No),
1184												},
1185											);
1186
1187											None
1188										},
1189										false => {
1190											self.peers.insert(
1191												*peer,
1192												PeerState::Closing {
1193													direction: Direction::Inbound(Reserved::Yes),
1194												},
1195											);
1196
1197											Some(*peer)
1198										},
1199									},
1200									Direction::Outbound(_) => match self.num_out < self.max_out {
1201										true => {
1202											log::trace!(
1203												target: LOG_TARGET,
1204												"{}: {peer:?} converted to regular outbound peer (outbound open)",
1205												self.protocol,
1206											);
1207
1208											self.num_out += 1;
1209											self.peers.insert(
1210												*peer,
1211												PeerState::Connected {
1212													direction: Direction::Outbound(Reserved::No),
1213												},
1214											);
1215
1216											None
1217										},
1218										false => {
1219											self.peers.insert(
1220												*peer,
1221												PeerState::Closing {
1222													direction: Direction::Outbound(Reserved::Yes),
1223												},
1224											);
1225
1226											Some(*peer)
1227										},
1228									},
1229								},
1230								PeerState::Opening { direction } => match direction {
1231									Direction::Inbound(_) => match self.num_in < self.max_in {
1232										true => {
1233											log::trace!(
1234												target: LOG_TARGET,
1235												"{}: {peer:?} converted to regular inbound peer (inbound opening)",
1236												self.protocol,
1237											);
1238
1239											self.num_in += 1;
1240											self.peers.insert(
1241												*peer,
1242												PeerState::Opening {
1243													direction: Direction::Inbound(Reserved::No),
1244												},
1245											);
1246
1247											None
1248										},
1249										false => {
1250											self.peers.insert(
1251												*peer,
1252												PeerState::Canceled {
1253													direction: Direction::Inbound(Reserved::Yes),
1254												},
1255											);
1256
1257											None
1258										},
1259									},
1260									Direction::Outbound(_) => match self.num_out < self.max_out {
1261										true => {
1262											log::trace!(
1263												target: LOG_TARGET,
1264												"{}: {peer:?} converted to regular outbound peer (outbound opening)",
1265												self.protocol,
1266											);
1267
1268											self.num_out += 1;
1269											self.peers.insert(
1270												*peer,
1271												PeerState::Opening {
1272													direction: Direction::Outbound(Reserved::No),
1273												},
1274											);
1275
1276											None
1277										},
1278										false => {
1279											self.peers.insert(
1280												*peer,
1281												PeerState::Canceled {
1282													direction: Direction::Outbound(Reserved::Yes),
1283												},
1284											);
1285
1286											None
1287										},
1288									},
1289								},
1290							}
1291						})
1292						.collect();
1293
1294					log::debug!(
1295						target: LOG_TARGET,
1296						"{}: close substreams to {peers_to_remove:?}",
1297						self.protocol,
1298					);
1299
1300					return Poll::Ready(Some(PeersetNotificationCommand::CloseSubstream {
1301						peers: peers_to_remove,
1302					}))
1303				},
1304				PeersetCommand::SetReservedOnly { reserved_only } => {
1305					log::debug!(target: LOG_TARGET, "{}: set reserved only mode to {reserved_only}", self.protocol);
1306
1307					// update mode and if it's set to true, disconnect all non-reserved peers
1308					self.reserved_only = reserved_only;
1309
1310					if reserved_only {
1311						let peers_to_remove = self
1312							.peers
1313							.iter()
1314							.filter_map(|(peer, state)| {
1315								(!self.reserved_peers.contains(peer) &&
1316									std::matches!(state, PeerState::Connected { .. }))
1317								.then_some(*peer)
1318							})
1319							.collect::<Vec<_>>();
1320
1321						// set peers to correct states
1322
1323						// peers who are connected are move to [`PeerState::Closing`]
1324						// and peers who are already opening are moved to [`PeerState::Canceled`]
1325						// and if the substream for them opens, it will be closed right after.
1326						self.peers.iter_mut().for_each(|(_, state)| match state {
1327							PeerState::Connected { direction } => {
1328								*state = PeerState::Closing { direction: *direction };
1329							},
1330							// peer for whom a substream was opening are canceled and if the
1331							// substream opens successfully, it will be closed immediately
1332							PeerState::Opening { direction } => {
1333								*state = PeerState::Canceled { direction: *direction };
1334							},
1335							_ => {},
1336						});
1337
1338						return Poll::Ready(Some(PeersetNotificationCommand::CloseSubstream {
1339							peers: peers_to_remove,
1340						}))
1341					}
1342				},
1343				PeersetCommand::GetReservedPeers { tx } => {
1344					let _ = tx.send(self.reserved_peers.iter().cloned().collect());
1345				},
1346			}
1347		}
1348
1349		// periodically check if `Peerset` is currently not connected to some reserved peers
1350		// it should be connected to
1351		//
1352		// also check if there are free outbound slots and if so, fetch peers with highest
1353		// reputations from `Peerstore` and start opening substreams to these peers
1354		if let Poll::Ready(()) = Pin::new(&mut self.next_slot_allocation).poll(cx) {
1355			let mut connect_to = self
1356				.peers
1357				.iter()
1358				.filter_map(|(peer, state)| {
1359					(self.reserved_peers.contains(peer) &&
1360						std::matches!(state, PeerState::Disconnected) &&
1361						!self.peerstore_handle.is_banned(peer))
1362					.then_some(*peer)
1363				})
1364				.collect::<Vec<_>>();
1365
1366			connect_to.iter().for_each(|peer| {
1367				self.peers.insert(
1368					*peer,
1369					PeerState::Opening { direction: Direction::Outbound(Reserved::Yes) },
1370				);
1371			});
1372
1373			// if the number of outbound peers is lower than the desired amount of outbound peers,
1374			// query `PeerStore` and try to get a new outbound candidated.
1375			if self.num_out < self.max_out && !self.reserved_only {
1376				let ignore: HashSet<PeerId> = self
1377					.peers
1378					.iter()
1379					.filter_map(|(peer, state)| {
1380						(!std::matches!(state, PeerState::Disconnected)).then_some(*peer)
1381					})
1382					.collect();
1383
1384				let peers: Vec<_> =
1385					self.peerstore_handle.outgoing_candidates(self.max_out - self.num_out, ignore);
1386
1387				if peers.len() > 0 {
1388					peers.iter().for_each(|peer| {
1389						self.peers.insert(
1390							*peer,
1391							PeerState::Opening { direction: Direction::Outbound(Reserved::No) },
1392						);
1393					});
1394
1395					self.num_out += peers.len();
1396					connect_to.extend(peers);
1397				}
1398			}
1399
1400			// start timer for the next allocation and if there were peers which the `Peerset`
1401			// wasn't connected but should be, send command to litep2p to start opening substreams.
1402			self.next_slot_allocation = Delay::new(SLOT_ALLOCATION_FREQUENCY);
1403
1404			if !connect_to.is_empty() {
1405				log::trace!(
1406					target: LOG_TARGET,
1407					"{}: start connecting to peers {connect_to:?}",
1408					self.protocol,
1409				);
1410
1411				return Poll::Ready(Some(PeersetNotificationCommand::OpenSubstream {
1412					peers: connect_to,
1413				}))
1414			}
1415		}
1416
1417		Poll::Pending
1418	}
1419}