sc_network/protocol/notifications/
behaviour.rs

1// This file is part of Substrate.
2
3// Copyright (C) Parity Technologies (UK) Ltd.
4// SPDX-License-Identifier: GPL-3.0-or-later WITH Classpath-exception-2.0
5
6// This program is free software: you can redistribute it and/or modify
7// it under the terms of the GNU General Public License as published by
8// the Free Software Foundation, either version 3 of the License, or
9// (at your option) any later version.
10
11// This program is distributed in the hope that it will be useful,
12// but WITHOUT ANY WARRANTY; without even the implied warranty of
13// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
14// GNU General Public License for more details.
15
16// You should have received a copy of the GNU General Public License
17// along with this program. If not, see <https://www.gnu.org/licenses/>.
18
19use crate::{
20	protocol::notifications::{
21		handler::{self, NotificationsSink, NotifsHandler, NotifsHandlerIn, NotifsHandlerOut},
22		service::{NotificationCommand, ProtocolHandle, ValidationCallResult},
23	},
24	protocol_controller::{self, IncomingIndex, Message, SetId},
25	service::{
26		metrics::NotificationMetrics,
27		traits::{Direction, ValidationResult},
28	},
29	types::ProtocolName,
30};
31
32use bytes::BytesMut;
33use fnv::FnvHashMap;
34use futures::{future::BoxFuture, prelude::*, stream::FuturesUnordered};
35use libp2p::{
36	core::{Endpoint, Multiaddr},
37	swarm::{
38		behaviour::{ConnectionClosed, ConnectionEstablished, DialFailure, FromSwarm},
39		ConnectionDenied, ConnectionId, DialError, NetworkBehaviour, NotifyHandler, PollParameters,
40		THandler, THandlerInEvent, THandlerOutEvent, ToSwarm,
41	},
42	PeerId,
43};
44use log::{debug, error, trace, warn};
45use parking_lot::RwLock;
46use rand::distributions::{Distribution as _, Uniform};
47use sc_utils::mpsc::TracingUnboundedReceiver;
48use smallvec::SmallVec;
49use tokio::sync::oneshot::error::RecvError;
50use tokio_stream::StreamMap;
51
52use std::{
53	cmp,
54	collections::{hash_map::Entry, VecDeque},
55	mem,
56	pin::Pin,
57	sync::Arc,
58	task::{Context, Poll},
59	time::{Duration, Instant},
60};
61
62/// Type representing a pending substream validation.
63type PendingInboundValidation =
64	BoxFuture<'static, (Result<ValidationResult, RecvError>, IncomingIndex)>;
65
66/// Logging target for the file.
67const LOG_TARGET: &str = "sub-libp2p";
68
69/// Network behaviour that handles opening substreams for custom protocols with other peers.
70///
71/// # How it works
72///
73/// The role of the `Notifications` is to synchronize the following components:
74///
75/// - The libp2p swarm that opens new connections and reports disconnects.
76/// - The connection handler (see `group.rs`) that handles individual connections.
77/// - The peerset manager (PSM) that requests links to peers to be established or broken.
78/// - The external API, that requires knowledge of the links that have been established.
79///
80/// In the state machine below, each `PeerId` is attributed one of these states:
81///
82/// - [`PeerState::Requested`]: No open connection, but requested by the peerset. Currently dialing.
83/// - [`PeerState::Disabled`]: Has open TCP connection(s) unbeknownst to the peerset. No substream
84///   is open.
85/// - [`PeerState::Enabled`]: Has open TCP connection(s), acknowledged by the peerset.
86///   - Notifications substreams are open on at least one connection, and external API has been
87///     notified.
88///   - Notifications substreams aren't open.
89/// - [`PeerState::Incoming`]: Has open TCP connection(s) and remote would like to open substreams.
90///   Peerset has been asked to attribute an inbound slot.
91///
92/// In addition to these states, there also exists a "banning" system. If we fail to dial a peer,
93/// we back-off for a few seconds. If the PSM requests connecting to a peer that is currently
94/// backed-off, the next dialing attempt is delayed until after the ban expires. However, the PSM
95/// will still consider the peer to be connected. This "ban" is thus not a ban in a strict sense:
96/// if a backed-off peer tries to connect, the connection is accepted. A ban only delays dialing
97/// attempts.
98///
99/// There may be multiple connections to a peer. The status of a peer on
100/// the API of this behaviour and towards the peerset manager is aggregated in
101/// the following way:
102///
103///   1. The enabled/disabled status is the same across all connections, as decided by the peerset
104///      manager.
105///   2. `send_packet` and `write_notification` always send all data over the same connection to
106///      preserve the ordering provided by the transport, as long as that connection is open. If it
107///      closes, a second open connection may take over, if one exists, but that case should be no
108///      different than a single connection failing and being re-established in terms of potential
109///      reordering and dropped messages. Messages can be received on any connection.
110///   3. The behaviour reports `NotificationsOut::CustomProtocolOpen` when the first connection
111///      reports `NotifsHandlerOut::OpenResultOk`.
112///   4. The behaviour reports `NotificationsOut::CustomProtocolClosed` when the last connection
113///      reports `NotifsHandlerOut::ClosedResult`.
114///
115/// In this way, the number of actual established connections to the peer is
116/// an implementation detail of this behaviour. Note that, in practice and at
117/// the time of this writing, there may be at most two connections to a peer
118/// and only as a result of simultaneous dialing. However, the implementation
119/// accommodates for any number of connections.
120pub struct Notifications {
121	/// Notification protocols. Entries never change after initialization.
122	notif_protocols: Vec<handler::ProtocolConfig>,
123
124	/// Protocol handles.
125	protocol_handles: Vec<ProtocolHandle>,
126
127	// Command streams.
128	command_streams: StreamMap<usize, Box<dyn Stream<Item = NotificationCommand> + Send + Unpin>>,
129
130	/// Protocol controllers are responsible for peer connections management.
131	protocol_controller_handles: Vec<protocol_controller::ProtocolHandle>,
132
133	/// Receiver for instructions about who to connect to or disconnect from.
134	from_protocol_controllers: TracingUnboundedReceiver<Message>,
135
136	/// List of peers in our state.
137	peers: FnvHashMap<(PeerId, SetId), PeerState>,
138
139	/// The elements in `peers` occasionally contain `Delay` objects that we would normally have
140	/// to be polled one by one. In order to avoid doing so, as an optimization, every `Delay` is
141	/// instead put inside of `delays` and reference by a [`DelayId`]. This stream
142	/// yields `PeerId`s whose `DelayId` is potentially ready.
143	///
144	/// By design, we never remove elements from this list. Elements are removed only when the
145	/// `Delay` triggers. As such, this stream may produce obsolete elements.
146	delays:
147		stream::FuturesUnordered<Pin<Box<dyn Future<Output = (DelayId, PeerId, SetId)> + Send>>>,
148
149	/// [`DelayId`] to assign to the next delay.
150	next_delay_id: DelayId,
151
152	/// List of incoming messages we have sent to the peer set manager and that are waiting for an
153	/// answer.
154	incoming: SmallVec<[IncomingPeer; 6]>,
155
156	/// We generate indices to identify incoming connections. This is the next value for the index
157	/// to use when a connection is incoming.
158	next_incoming_index: IncomingIndex,
159
160	/// Events to produce from `poll()`.
161	events: VecDeque<ToSwarm<NotificationsOut, NotifsHandlerIn>>,
162
163	/// Pending inbound substream validations.
164	//
165	// NOTE: it's possible to read a stale response from `pending_inbound_validations`
166	// as the substream may get closed by the remote peer before the protocol has had
167	// a chance to validate it. [`Notifications`] must compare the `crate::peerset::IncomingIndex`
168	// returned by the completed future against the `crate::peerset::IncomingIndex` stored in
169	// `PeerState::Incoming` to check whether the completed future is stale or not.
170	pending_inbound_validations: FuturesUnordered<PendingInboundValidation>,
171
172	/// Metrics for notifications.
173	metrics: NotificationMetrics,
174}
175
176/// Configuration for a notifications protocol.
177#[derive(Debug, Clone)]
178pub struct ProtocolConfig {
179	/// Name of the protocol.
180	pub name: ProtocolName,
181	/// Names of the protocol to use if the main one isn't available.
182	pub fallback_names: Vec<ProtocolName>,
183	/// Handshake of the protocol.
184	pub handshake: Vec<u8>,
185	/// Maximum allowed size for a notification.
186	pub max_notification_size: u64,
187}
188
189/// Identifier for a delay firing.
190#[derive(Debug, Copy, Clone, PartialEq, Eq)]
191struct DelayId(u64);
192
193/// State of a peer we're connected to.
194///
195/// The variants correspond to the state of the peer w.r.t. the peerset.
196#[derive(Debug)]
197enum PeerState {
198	/// State is poisoned. This is a temporary state for a peer and we should always switch back
199	/// to it later. If it is found in the wild, that means there was either a panic or a bug in
200	/// the state machine code.
201	Poisoned,
202
203	/// The peer misbehaved. If the PSM wants us to connect to this peer, we will add an artificial
204	/// delay to the connection.
205	Backoff {
206		/// When the ban expires. For clean-up purposes. References an entry in `delays`.
207		timer: DelayId,
208		/// Until when the peer is backed-off.
209		timer_deadline: Instant,
210	},
211
212	/// The peerset requested that we connect to this peer. We are currently not connected.
213	PendingRequest {
214		/// When to actually start dialing. References an entry in `delays`.
215		timer: DelayId,
216		/// When the `timer` will trigger.
217		timer_deadline: Instant,
218	},
219
220	/// The peerset requested that we connect to this peer. We are currently dialing this peer.
221	Requested,
222
223	/// We are connected to this peer but the peerset hasn't requested it or has denied it.
224	///
225	/// The handler is either in the closed state, or a `Close` message has been sent to it and
226	/// hasn't been answered yet.
227	Disabled {
228		/// If `Some`, any connection request from the peerset to this peer is delayed until the
229		/// given `Instant`.
230		backoff_until: Option<Instant>,
231
232		/// List of connections with this peer, and their state.
233		connections: SmallVec<[(ConnectionId, ConnectionState); crate::MAX_CONNECTIONS_PER_PEER]>,
234	},
235
236	/// We are connected to this peer. The peerset has requested a connection to this peer, but
237	/// it is currently in a "backed-off" phase. The state will switch to `Enabled` once the timer
238	/// expires.
239	///
240	/// The handler is either in the closed state, or a `Close` message has been sent to it and
241	/// hasn't been answered yet.
242	///
243	/// The handler will be opened when `timer` fires.
244	DisabledPendingEnable {
245		/// When to enable this remote. References an entry in `delays`.
246		timer: DelayId,
247		/// When the `timer` will trigger.
248		timer_deadline: Instant,
249
250		/// List of connections with this peer, and their state.
251		connections: SmallVec<[(ConnectionId, ConnectionState); crate::MAX_CONNECTIONS_PER_PEER]>,
252	},
253
254	/// We are connected to this peer and the peerset has accepted it.
255	Enabled {
256		/// List of connections with this peer, and their state.
257		connections: SmallVec<[(ConnectionId, ConnectionState); crate::MAX_CONNECTIONS_PER_PEER]>,
258	},
259
260	/// We are connected to this peer. We have received an `OpenDesiredByRemote` from one of the
261	/// handlers and forwarded that request to the peerset. The connection handlers are waiting for
262	/// a response, i.e. to be opened or closed based on whether the peerset accepts or rejects
263	/// the peer.
264	Incoming {
265		/// If `Some`, any dial attempts to this peer are delayed until the given `Instant`.
266		backoff_until: Option<Instant>,
267
268		/// Incoming index tracking this connection.
269		incoming_index: IncomingIndex,
270
271		/// Peerset has signaled it wants the substream closed.
272		peerset_rejected: bool,
273
274		/// List of connections with this peer, and their state.
275		connections: SmallVec<[(ConnectionId, ConnectionState); crate::MAX_CONNECTIONS_PER_PEER]>,
276	},
277}
278
279impl PeerState {
280	/// True if there exists an established connection to the peer
281	/// that is open for custom protocol traffic.
282	fn is_open(&self) -> bool {
283		self.get_open().is_some()
284	}
285
286	/// Returns the [`NotificationsSink`] of the first established connection
287	/// that is open for custom protocol traffic.
288	fn get_open(&self) -> Option<&NotificationsSink> {
289		match self {
290			Self::Enabled { connections, .. } => connections.iter().find_map(|(_, s)| match s {
291				ConnectionState::Open(s) => Some(s),
292				_ => None,
293			}),
294			_ => None,
295		}
296	}
297}
298
299/// State of the handler of a single connection visible from this state machine.
300#[derive(Debug)]
301enum ConnectionState {
302	/// Connection is in the `Closed` state, meaning that the remote hasn't requested anything.
303	Closed,
304
305	/// Connection is either in the `Open` or the `Closed` state, but a
306	/// [`NotifsHandlerIn::Close`] message has been sent. Waiting for this message to be
307	/// acknowledged through a [`NotifsHandlerOut::CloseResult`].
308	Closing,
309
310	/// Connection is in the `Closed` state but a [`NotifsHandlerIn::Open`] message has been sent.
311	/// An `OpenResultOk`/`OpenResultErr` message is expected.
312	Opening,
313
314	/// Connection is in the `Closed` state but a [`NotifsHandlerIn::Open`] message then a
315	/// [`NotifsHandlerIn::Close`] message has been sent. An `OpenResultOk`/`OpenResultErr` message
316	/// followed with a `CloseResult` message are expected.
317	OpeningThenClosing,
318
319	/// Connection is in the `Closed` state, but a [`NotifsHandlerOut::OpenDesiredByRemote`]
320	/// message has been received, meaning that the remote wants to open a substream.
321	OpenDesiredByRemote,
322
323	/// Connection is in the `Open` state.
324	///
325	/// The external API is notified of a channel with this peer if any of its connection is in
326	/// this state.
327	Open(NotificationsSink),
328}
329
330/// State of an "incoming" message sent to the peer set manager.
331#[derive(Debug)]
332struct IncomingPeer {
333	/// Id of the remote peer of the incoming substream.
334	peer_id: PeerId,
335	/// Id of the set the incoming substream would belong to.
336	set_id: SetId,
337	/// If true, this "incoming" still corresponds to an actual connection. If false, then the
338	/// connection corresponding to it has been closed or replaced already.
339	alive: bool,
340	/// Id that the we sent to the peerset.
341	incoming_id: IncomingIndex,
342	/// Received handshake.
343	handshake: Vec<u8>,
344}
345
346/// Event that can be emitted by the `Notifications`.
347#[derive(Debug)]
348pub enum NotificationsOut {
349	/// Opened a custom protocol with the remote.
350	CustomProtocolOpen {
351		/// Id of the peer we are connected to.
352		peer_id: PeerId,
353		/// Peerset set ID the substream is tied to.
354		set_id: SetId,
355		/// Direction of the stream.
356		direction: Direction,
357		/// If `Some`, a fallback protocol name has been used rather the main protocol name.
358		/// Always matches one of the fallback names passed at initialization.
359		negotiated_fallback: Option<ProtocolName>,
360		/// Handshake that was sent to us.
361		/// This is normally a "Status" message, but this is out of the concern of this code.
362		received_handshake: Vec<u8>,
363		/// Object that permits sending notifications to the peer.
364		notifications_sink: NotificationsSink,
365	},
366
367	/// The [`NotificationsSink`] object used to send notifications with the given peer must be
368	/// replaced with a new one.
369	///
370	/// This event is typically emitted when a transport-level connection is closed and we fall
371	/// back to a secondary connection.
372	CustomProtocolReplaced {
373		/// Id of the peer we are connected to.
374		peer_id: PeerId,
375		/// Peerset set ID the substream is tied to.
376		set_id: SetId,
377		/// Replacement for the previous [`NotificationsSink`].
378		notifications_sink: NotificationsSink,
379	},
380
381	/// Closed a custom protocol with the remote. The existing [`NotificationsSink`] should
382	/// be dropped.
383	CustomProtocolClosed {
384		/// Id of the peer we were connected to.
385		peer_id: PeerId,
386		/// Peerset set ID the substream was tied to.
387		set_id: SetId,
388	},
389
390	/// Receives a message on a custom protocol substream.
391	///
392	/// Also concerns received notifications for the notifications API.
393	Notification {
394		/// Id of the peer the message came from.
395		peer_id: PeerId,
396		/// Peerset set ID the substream is tied to.
397		set_id: SetId,
398		/// Message that has been received.
399		message: BytesMut,
400	},
401}
402
403impl Notifications {
404	/// Creates a `CustomProtos`.
405	pub(crate) fn new(
406		protocol_controller_handles: Vec<protocol_controller::ProtocolHandle>,
407		from_protocol_controllers: TracingUnboundedReceiver<Message>,
408		metrics: NotificationMetrics,
409		notif_protocols: impl Iterator<
410			Item = (
411				ProtocolConfig,
412				ProtocolHandle,
413				Box<dyn Stream<Item = NotificationCommand> + Send + Unpin>,
414			),
415		>,
416	) -> Self {
417		let (notif_protocols, protocol_handles): (Vec<_>, Vec<_>) = notif_protocols
418			.map(|(cfg, protocol_handle, command_stream)| {
419				(
420					handler::ProtocolConfig {
421						name: cfg.name,
422						fallback_names: cfg.fallback_names,
423						handshake: Arc::new(RwLock::new(cfg.handshake)),
424						max_notification_size: cfg.max_notification_size,
425					},
426					(protocol_handle, command_stream),
427				)
428			})
429			.unzip();
430		assert!(!notif_protocols.is_empty());
431
432		let (mut protocol_handles, command_streams): (Vec<_>, Vec<_>) = protocol_handles
433			.into_iter()
434			.enumerate()
435			.map(|(set_id, (mut protocol_handle, command_stream))| {
436				protocol_handle.set_metrics(metrics.clone());
437
438				(protocol_handle, (set_id, command_stream))
439			})
440			.unzip();
441
442		protocol_handles.iter_mut().skip(1).for_each(|handle| {
443			handle.delegate_to_peerset(true);
444		});
445
446		Self {
447			notif_protocols,
448			protocol_handles,
449			command_streams: StreamMap::from_iter(command_streams.into_iter()),
450			protocol_controller_handles,
451			from_protocol_controllers,
452			peers: FnvHashMap::default(),
453			delays: Default::default(),
454			next_delay_id: DelayId(0),
455			incoming: SmallVec::new(),
456			next_incoming_index: IncomingIndex(0),
457			events: VecDeque::new(),
458			pending_inbound_validations: FuturesUnordered::new(),
459			metrics,
460		}
461	}
462
463	/// Modifies the handshake of the given notifications protocol.
464	pub fn set_notif_protocol_handshake(
465		&mut self,
466		set_id: SetId,
467		handshake_message: impl Into<Vec<u8>>,
468	) {
469		if let Some(p) = self.notif_protocols.get_mut(usize::from(set_id)) {
470			*p.handshake.write() = handshake_message.into();
471		} else {
472			log::error!(target: "sub-libp2p", "Unknown handshake change set: {:?}", set_id);
473			debug_assert!(false);
474		}
475	}
476
477	/// Returns the list of all the peers we have an open channel to.
478	pub fn open_peers(&self) -> impl Iterator<Item = &PeerId> {
479		self.peers.iter().filter(|(_, state)| state.is_open()).map(|((id, _), _)| id)
480	}
481
482	/// Returns true if we have an open substream to the given peer.
483	pub fn is_open(&self, peer_id: &PeerId, set_id: SetId) -> bool {
484		self.peers.get(&(*peer_id, set_id)).map(|p| p.is_open()).unwrap_or(false)
485	}
486
487	/// Disconnects the given peer if we are connected to it.
488	pub fn disconnect_peer(&mut self, peer_id: &PeerId, set_id: SetId) {
489		trace!(target: "sub-libp2p", "External API => Disconnect({}, {:?})", peer_id, set_id);
490		self.disconnect_peer_inner(peer_id, set_id);
491	}
492
493	/// Inner implementation of `disconnect_peer`.
494	fn disconnect_peer_inner(&mut self, peer_id: &PeerId, set_id: SetId) {
495		let mut entry = if let Entry::Occupied(entry) = self.peers.entry((*peer_id, set_id)) {
496			entry
497		} else {
498			return
499		};
500
501		match mem::replace(entry.get_mut(), PeerState::Poisoned) {
502			// We're not connected anyway.
503			st @ PeerState::Disabled { .. } => *entry.into_mut() = st,
504			st @ PeerState::Requested => *entry.into_mut() = st,
505			st @ PeerState::PendingRequest { .. } => *entry.into_mut() = st,
506			st @ PeerState::Backoff { .. } => *entry.into_mut() = st,
507
508			// DisabledPendingEnable => Disabled.
509			PeerState::DisabledPendingEnable { connections, timer_deadline, timer: _ } => {
510				trace!(target: "sub-libp2p", "PSM <= Dropped({}, {:?})", peer_id, set_id);
511				self.protocol_controller_handles[usize::from(set_id)].dropped(*peer_id);
512				*entry.into_mut() =
513					PeerState::Disabled { connections, backoff_until: Some(timer_deadline) }
514			},
515
516			// Enabled => Disabled.
517			// All open or opening connections are sent a `Close` message.
518			// If relevant, the external API is instantly notified.
519			PeerState::Enabled { mut connections } => {
520				trace!(target: "sub-libp2p", "PSM <= Dropped({}, {:?})", peer_id, set_id);
521				self.protocol_controller_handles[usize::from(set_id)].dropped(*peer_id);
522
523				if connections.iter().any(|(_, s)| matches!(s, ConnectionState::Open(_))) {
524					trace!(target: "sub-libp2p", "External API <= Closed({}, {:?})", peer_id, set_id);
525					let event =
526						NotificationsOut::CustomProtocolClosed { peer_id: *peer_id, set_id };
527					self.events.push_back(ToSwarm::GenerateEvent(event));
528				}
529
530				for (connec_id, connec_state) in
531					connections.iter_mut().filter(|(_, s)| matches!(s, ConnectionState::Open(_)))
532				{
533					trace!(target: "sub-libp2p", "Handler({:?}, {:?}) <= Close({:?})", peer_id, *connec_id, set_id);
534					self.events.push_back(ToSwarm::NotifyHandler {
535						peer_id: *peer_id,
536						handler: NotifyHandler::One(*connec_id),
537						event: NotifsHandlerIn::Close { protocol_index: set_id.into() },
538					});
539					*connec_state = ConnectionState::Closing;
540				}
541
542				for (connec_id, connec_state) in
543					connections.iter_mut().filter(|(_, s)| matches!(s, ConnectionState::Opening))
544				{
545					trace!(target: "sub-libp2p", "Handler({:?}, {:?}) <= Close({:?})", peer_id, *connec_id, set_id);
546					self.events.push_back(ToSwarm::NotifyHandler {
547						peer_id: *peer_id,
548						handler: NotifyHandler::One(*connec_id),
549						event: NotifsHandlerIn::Close { protocol_index: set_id.into() },
550					});
551					*connec_state = ConnectionState::OpeningThenClosing;
552				}
553
554				debug_assert!(!connections
555					.iter()
556					.any(|(_, s)| matches!(s, ConnectionState::Open(_))));
557				debug_assert!(!connections
558					.iter()
559					.any(|(_, s)| matches!(s, ConnectionState::Opening)));
560
561				*entry.into_mut() = PeerState::Disabled { connections, backoff_until: None }
562			},
563
564			// Incoming => Disabled.
565			// Ongoing opening requests from the remote are rejected.
566			PeerState::Incoming { mut connections, backoff_until, .. } => {
567				let inc = if let Some(inc) = self
568					.incoming
569					.iter_mut()
570					.find(|i| i.peer_id == entry.key().0 && i.set_id == set_id && i.alive)
571				{
572					inc
573				} else {
574					error!(
575						target: "sub-libp2p",
576						"State mismatch in libp2p: no entry in incoming for incoming peer"
577					);
578					return
579				};
580
581				inc.alive = false;
582
583				for (connec_id, connec_state) in connections
584					.iter_mut()
585					.filter(|(_, s)| matches!(s, ConnectionState::OpenDesiredByRemote))
586				{
587					trace!(target: "sub-libp2p", "Handler({:?}, {:?}) <= Close({:?})", peer_id, *connec_id, set_id);
588					self.events.push_back(ToSwarm::NotifyHandler {
589						peer_id: *peer_id,
590						handler: NotifyHandler::One(*connec_id),
591						event: NotifsHandlerIn::Close { protocol_index: set_id.into() },
592					});
593					*connec_state = ConnectionState::Closing;
594				}
595
596				debug_assert!(!connections
597					.iter()
598					.any(|(_, s)| matches!(s, ConnectionState::OpenDesiredByRemote)));
599				*entry.into_mut() = PeerState::Disabled { connections, backoff_until }
600			},
601
602			PeerState::Poisoned => {
603				error!(target: "sub-libp2p", "State of {:?} is poisoned", peer_id)
604			},
605		}
606	}
607
608	/// Function that is called when the peerset wants us to connect to a peer.
609	fn peerset_report_connect(&mut self, peer_id: PeerId, set_id: SetId) {
610		// If `PeerId` is unknown to us, insert an entry, start dialing, and return early.
611		let mut occ_entry = match self.peers.entry((peer_id, set_id)) {
612			Entry::Occupied(entry) => entry,
613			Entry::Vacant(entry) => {
614				// If there's no entry in `self.peers`, start dialing.
615				trace!(
616					target: "sub-libp2p",
617					"PSM => Connect({}, {:?}): Starting to connect",
618					entry.key().0,
619					set_id,
620				);
621				trace!(target: "sub-libp2p", "Libp2p <= Dial {}", entry.key().0);
622				self.events.push_back(ToSwarm::Dial { opts: entry.key().0.into() });
623				entry.insert(PeerState::Requested);
624				return
625			},
626		};
627
628		let now = Instant::now();
629
630		match mem::replace(occ_entry.get_mut(), PeerState::Poisoned) {
631			// Backoff (not expired) => PendingRequest
632			PeerState::Backoff { ref timer, ref timer_deadline } if *timer_deadline > now => {
633				let peer_id = occ_entry.key().0;
634				trace!(
635					target: "sub-libp2p",
636					"PSM => Connect({}, {:?}): Will start to connect at until {:?}",
637					peer_id,
638					set_id,
639					timer_deadline,
640				);
641				*occ_entry.into_mut() =
642					PeerState::PendingRequest { timer: *timer, timer_deadline: *timer_deadline };
643			},
644
645			// Backoff (expired) => Requested
646			PeerState::Backoff { .. } => {
647				trace!(
648					target: "sub-libp2p",
649					"PSM => Connect({}, {:?}): Starting to connect",
650					occ_entry.key().0,
651					set_id,
652				);
653				trace!(target: "sub-libp2p", "Libp2p <= Dial {:?}", occ_entry.key());
654				self.events.push_back(ToSwarm::Dial { opts: occ_entry.key().0.into() });
655				*occ_entry.into_mut() = PeerState::Requested;
656			},
657
658			// Disabled (with non-expired ban) => DisabledPendingEnable
659			PeerState::Disabled { connections, backoff_until: Some(ref backoff) }
660				if *backoff > now =>
661			{
662				let peer_id = occ_entry.key().0;
663				trace!(
664					target: "sub-libp2p",
665					"PSM => Connect({}, {:?}): But peer is backed-off until {:?}",
666					peer_id,
667					set_id,
668					backoff,
669				);
670
671				let delay_id = self.next_delay_id;
672				self.next_delay_id.0 += 1;
673				let delay = futures_timer::Delay::new(*backoff - now);
674				self.delays.push(
675					async move {
676						delay.await;
677						(delay_id, peer_id, set_id)
678					}
679					.boxed(),
680				);
681
682				*occ_entry.into_mut() = PeerState::DisabledPendingEnable {
683					connections,
684					timer: delay_id,
685					timer_deadline: *backoff,
686				};
687			},
688
689			// Disabled => Enabled
690			PeerState::Disabled { mut connections, backoff_until } => {
691				debug_assert!(!connections
692					.iter()
693					.any(|(_, s)| { matches!(s, ConnectionState::Open(_)) }));
694
695				// The first element of `closed` is chosen to open the notifications substream.
696				if let Some((connec_id, connec_state)) =
697					connections.iter_mut().find(|(_, s)| matches!(s, ConnectionState::Closed))
698				{
699					trace!(target: "sub-libp2p", "PSM => Connect({}, {:?}): Enabling connections.",
700						occ_entry.key().0, set_id);
701					trace!(target: "sub-libp2p", "Handler({:?}, {:?}) <= Open({:?})", peer_id, *connec_id, set_id);
702					self.events.push_back(ToSwarm::NotifyHandler {
703						peer_id,
704						handler: NotifyHandler::One(*connec_id),
705						event: NotifsHandlerIn::Open { protocol_index: set_id.into() },
706					});
707					*connec_state = ConnectionState::Opening;
708					*occ_entry.into_mut() = PeerState::Enabled { connections };
709				} else {
710					// If no connection is available, switch to `DisabledPendingEnable` in order
711					// to try again later.
712					debug_assert!(connections.iter().any(|(_, s)| {
713						matches!(s, ConnectionState::OpeningThenClosing | ConnectionState::Closing)
714					}));
715					trace!(
716						target: "sub-libp2p",
717						"PSM => Connect({}, {:?}): No connection in proper state. Delaying.",
718						occ_entry.key().0, set_id
719					);
720
721					let timer_deadline = {
722						let base = now + Duration::from_secs(5);
723						if let Some(backoff_until) = backoff_until {
724							cmp::max(base, backoff_until)
725						} else {
726							base
727						}
728					};
729
730					let delay_id = self.next_delay_id;
731					self.next_delay_id.0 += 1;
732					debug_assert!(timer_deadline > now);
733					let delay = futures_timer::Delay::new(timer_deadline - now);
734					self.delays.push(
735						async move {
736							delay.await;
737							(delay_id, peer_id, set_id)
738						}
739						.boxed(),
740					);
741
742					*occ_entry.into_mut() = PeerState::DisabledPendingEnable {
743						connections,
744						timer: delay_id,
745						timer_deadline,
746					};
747				}
748			},
749			// Incoming => Incoming
750			st @ PeerState::Incoming { .. } => {
751				debug!(
752					target: "sub-libp2p",
753					"PSM => Connect({}, {:?}): Ignoring obsolete connect, we are awaiting accept/reject.",
754					occ_entry.key().0, set_id
755				);
756				*occ_entry.into_mut() = st;
757			},
758
759			// Other states are kept as-is.
760			st @ PeerState::Enabled { .. } => {
761				debug!(target: "sub-libp2p",
762					"PSM => Connect({}, {:?}): Already connected.",
763					occ_entry.key().0, set_id);
764				*occ_entry.into_mut() = st;
765			},
766			st @ PeerState::DisabledPendingEnable { .. } => {
767				debug!(target: "sub-libp2p",
768					"PSM => Connect({}, {:?}): Already pending enabling.",
769					occ_entry.key().0, set_id);
770				*occ_entry.into_mut() = st;
771			},
772			st @ PeerState::Requested { .. } | st @ PeerState::PendingRequest { .. } => {
773				debug!(target: "sub-libp2p",
774					"PSM => Connect({}, {:?}): Duplicate request.",
775					occ_entry.key().0, set_id);
776				*occ_entry.into_mut() = st;
777			},
778
779			PeerState::Poisoned => {
780				error!(target: "sub-libp2p", "State of {:?} is poisoned", occ_entry.key());
781				debug_assert!(false);
782			},
783		}
784	}
785
786	/// Function that is called when the peerset wants us to disconnect from a peer.
787	fn peerset_report_disconnect(&mut self, peer_id: PeerId, set_id: SetId) {
788		let mut entry = match self.peers.entry((peer_id, set_id)) {
789			Entry::Occupied(entry) => entry,
790			Entry::Vacant(entry) => {
791				trace!(target: "sub-libp2p", "PSM => Drop({}, {:?}): Already disabled.",
792					entry.key().0, set_id);
793				return
794			},
795		};
796
797		match mem::replace(entry.get_mut(), PeerState::Poisoned) {
798			st @ PeerState::Disabled { .. } | st @ PeerState::Backoff { .. } => {
799				trace!(target: "sub-libp2p", "PSM => Drop({}, {:?}): Already disabled.",
800					entry.key().0, set_id);
801				*entry.into_mut() = st;
802			},
803
804			// DisabledPendingEnable => Disabled
805			PeerState::DisabledPendingEnable { connections, timer_deadline, timer: _ } => {
806				debug_assert!(!connections.is_empty());
807				trace!(target: "sub-libp2p",
808					"PSM => Drop({}, {:?}): Interrupting pending enabling.",
809					entry.key().0, set_id);
810				*entry.into_mut() =
811					PeerState::Disabled { connections, backoff_until: Some(timer_deadline) };
812			},
813
814			// Enabled => Disabled
815			PeerState::Enabled { mut connections } => {
816				trace!(target: "sub-libp2p", "PSM => Drop({}, {:?}): Disabling connections.",
817					entry.key().0, set_id);
818
819				debug_assert!(connections.iter().any(|(_, s)| matches!(
820					s,
821					ConnectionState::Opening | ConnectionState::Open(_)
822				)));
823
824				if connections.iter().any(|(_, s)| matches!(s, ConnectionState::Open(_))) {
825					trace!(target: "sub-libp2p", "External API <= Closed({}, {:?})", entry.key().0, set_id);
826					let event =
827						NotificationsOut::CustomProtocolClosed { peer_id: entry.key().0, set_id };
828					self.events.push_back(ToSwarm::GenerateEvent(event));
829				}
830
831				for (connec_id, connec_state) in
832					connections.iter_mut().filter(|(_, s)| matches!(s, ConnectionState::Opening))
833				{
834					trace!(target: "sub-libp2p", "Handler({:?}, {:?}) <= Close({:?})",
835						entry.key(), *connec_id, set_id);
836					self.events.push_back(ToSwarm::NotifyHandler {
837						peer_id: entry.key().0,
838						handler: NotifyHandler::One(*connec_id),
839						event: NotifsHandlerIn::Close { protocol_index: set_id.into() },
840					});
841					*connec_state = ConnectionState::OpeningThenClosing;
842				}
843
844				for (connec_id, connec_state) in
845					connections.iter_mut().filter(|(_, s)| matches!(s, ConnectionState::Open(_)))
846				{
847					trace!(target: "sub-libp2p", "Handler({:?}, {:?}) <= Close({:?})",
848						entry.key(), *connec_id, set_id);
849					self.events.push_back(ToSwarm::NotifyHandler {
850						peer_id: entry.key().0,
851						handler: NotifyHandler::One(*connec_id),
852						event: NotifsHandlerIn::Close { protocol_index: set_id.into() },
853					});
854					*connec_state = ConnectionState::Closing;
855				}
856
857				*entry.into_mut() = PeerState::Disabled { connections, backoff_until: None }
858			},
859
860			// Requested => Ø
861			PeerState::Requested => {
862				// We don't cancel dialing. Libp2p doesn't expose that on purpose, as other
863				// sub-systems (such as the discovery mechanism) may require dialing this peer as
864				// well at the same time.
865				trace!(target: "sub-libp2p", "PSM => Drop({}, {:?}): Not yet connected.",
866					entry.key().0, set_id);
867				entry.remove();
868			},
869
870			// PendingRequest => Backoff
871			PeerState::PendingRequest { timer, timer_deadline } => {
872				trace!(target: "sub-libp2p", "PSM => Drop({}, {:?}): Not yet connected",
873					entry.key().0, set_id);
874				*entry.into_mut() = PeerState::Backoff { timer, timer_deadline }
875			},
876
877			// `ProtocolController` disconnected peer while it was still being validated by the
878			// protocol, mark the connection as rejected and once the validation is received from
879			// the protocol, reject the substream
880			PeerState::Incoming { backoff_until, connections, incoming_index, .. } => {
881				debug!(
882					target: "sub-libp2p",
883					"PSM => Drop({}, {:?}): Ignoring obsolete disconnect, we are awaiting accept/reject.",
884					entry.key().0, set_id,
885				);
886				*entry.into_mut() = PeerState::Incoming {
887					backoff_until,
888					connections,
889					incoming_index,
890					peerset_rejected: true,
891				};
892			},
893			PeerState::Poisoned => {
894				error!(target: "sub-libp2p", "State of {:?} is poisoned", entry.key());
895				debug_assert!(false);
896			},
897		}
898	}
899
900	/// Substream has been accepted by the `ProtocolController` and must now be sent
901	/// to the protocol for validation.
902	fn peerset_report_preaccept(&mut self, index: IncomingIndex) {
903		let Some(pos) = self.incoming.iter().position(|i| i.incoming_id == index) else {
904			error!(target: LOG_TARGET, "PSM => Preaccept({:?}): Invalid index", index);
905			return
906		};
907
908		trace!(
909			target: LOG_TARGET,
910			"PSM => Preaccept({:?}): Sent to protocol for validation",
911			index
912		);
913		let incoming = &self.incoming[pos];
914
915		match self.protocol_handles[usize::from(incoming.set_id)]
916			.report_incoming_substream(incoming.peer_id, incoming.handshake.clone())
917		{
918			Ok(ValidationCallResult::Delegated) => {
919				self.protocol_report_accept(index);
920			},
921			Ok(ValidationCallResult::WaitForValidation(rx)) => {
922				self.pending_inbound_validations
923					.push(Box::pin(async move { (rx.await, index) }));
924			},
925			Err(err) => {
926				// parachain collators enable the syncing protocol but `NotificationService` for
927				// `SyncingEngine` is not created which causes `report_incoming_substream()` to
928				// fail. This is not a fatal error and should be ignored even though in typical
929				// cases the `NotificationService` not existing is a fatal error and indicates that
930				// the protocol has exited. Until the parachain collator issue is fixed, just report
931				// and error and reject the peer.
932				debug!(target: LOG_TARGET, "protocol has exited: {err:?} {:?}", incoming.set_id);
933
934				self.protocol_report_reject(index);
935			},
936		}
937	}
938
939	/// Function that is called when the peerset wants us to accept a connection
940	/// request from a peer.
941	fn protocol_report_accept(&mut self, index: IncomingIndex) {
942		let (pos, incoming) =
943			if let Some(pos) = self.incoming.iter().position(|i| i.incoming_id == index) {
944				(pos, self.incoming.get(pos))
945			} else {
946				error!(target: "sub-libp2p", "PSM => Accept({:?}): Invalid index", index);
947				return
948			};
949
950		let Some(incoming) = incoming else {
951			error!(target: "sub-libp2p", "Incoming connection ({:?}) doesn't exist", index);
952			debug_assert!(false);
953			return;
954		};
955
956		if !incoming.alive {
957			trace!(
958				target: "sub-libp2p",
959				"PSM => Accept({:?}, {}, {:?}): Obsolete incoming",
960				index,
961				incoming.peer_id,
962				incoming.set_id,
963			);
964
965			match self.peers.get_mut(&(incoming.peer_id, incoming.set_id)) {
966				Some(PeerState::DisabledPendingEnable { .. }) | Some(PeerState::Enabled { .. }) => {
967				},
968				_ => {
969					trace!(target: "sub-libp2p", "PSM <= Dropped({}, {:?})",
970						incoming.peer_id, incoming.set_id);
971					self.protocol_controller_handles[usize::from(incoming.set_id)]
972						.dropped(incoming.peer_id);
973				},
974			}
975
976			self.incoming.remove(pos);
977			return
978		}
979
980		let state = match self.peers.get_mut(&(incoming.peer_id, incoming.set_id)) {
981			Some(s) => s,
982			None => {
983				log::debug!(
984					target: "sub-libp2p",
985					"Connection to {:?} closed, ({:?} {:?}), ignoring accept",
986					incoming.peer_id,
987					incoming.set_id,
988					index,
989				);
990				self.incoming.remove(pos);
991				return
992			},
993		};
994
995		match mem::replace(state, PeerState::Poisoned) {
996			// Incoming => Enabled
997			PeerState::Incoming {
998				mut connections,
999				incoming_index,
1000				peerset_rejected,
1001				backoff_until,
1002			} => {
1003				if index < incoming_index {
1004					warn!(
1005						target: "sub-libp2p",
1006						"PSM => Accept({:?}, {}, {:?}): Ignoring obsolete incoming index, we are already awaiting {:?}.",
1007						index, incoming.peer_id, incoming.set_id, incoming_index
1008					);
1009
1010					self.incoming.remove(pos);
1011					return
1012				} else if index > incoming_index {
1013					error!(
1014						target: "sub-libp2p",
1015						"PSM => Accept({:?}, {}, {:?}): Ignoring incoming index from the future, we are awaiting {:?}.",
1016						index, incoming.peer_id, incoming.set_id, incoming_index
1017					);
1018
1019					self.incoming.remove(pos);
1020					debug_assert!(false);
1021					return
1022				}
1023
1024				// while the substream was being validated by the protocol, `Peerset` had request
1025				// for the it to be closed so reject the substream now
1026				if peerset_rejected {
1027					trace!(
1028						target: "sub-libp2p",
1029						"Protocol accepted ({:?} {:?} {:?}) but Peerset had request disconnection, rejecting",
1030						index,
1031						incoming.peer_id,
1032						incoming.set_id
1033					);
1034
1035					*state = PeerState::Incoming {
1036						connections,
1037						backoff_until,
1038						peerset_rejected,
1039						incoming_index,
1040					};
1041					return self.report_reject(index).map_or((), |_| ())
1042				}
1043
1044				trace!(
1045					target: "sub-libp2p",
1046					"PSM => Accept({:?}, {}, {:?}): Enabling connections.",
1047					index,
1048					incoming.peer_id,
1049					incoming.set_id
1050				);
1051
1052				debug_assert!(connections
1053					.iter()
1054					.any(|(_, s)| matches!(s, ConnectionState::OpenDesiredByRemote)));
1055				for (connec_id, connec_state) in connections
1056					.iter_mut()
1057					.filter(|(_, s)| matches!(s, ConnectionState::OpenDesiredByRemote))
1058				{
1059					trace!(target: "sub-libp2p", "Handler({:?}, {:?}) <= Open({:?})",
1060						incoming.peer_id, *connec_id, incoming.set_id);
1061					self.events.push_back(ToSwarm::NotifyHandler {
1062						peer_id: incoming.peer_id,
1063						handler: NotifyHandler::One(*connec_id),
1064						event: NotifsHandlerIn::Open { protocol_index: incoming.set_id.into() },
1065					});
1066					*connec_state = ConnectionState::Opening;
1067				}
1068
1069				self.incoming.remove(pos);
1070				*state = PeerState::Enabled { connections };
1071			},
1072			st @ PeerState::Disabled { .. } | st @ PeerState::Backoff { .. } => {
1073				self.incoming.remove(pos);
1074				*state = st;
1075			},
1076			// Any state other than `Incoming` is invalid.
1077			peer => {
1078				error!(
1079					target: "sub-libp2p",
1080					"State mismatch in libp2p: Expected alive incoming. Got {:?}.",
1081					peer
1082				);
1083
1084				self.incoming.remove(pos);
1085				debug_assert!(false);
1086			},
1087		}
1088	}
1089
1090	/// Function that is called when `ProtocolController` wants us to reject an incoming peer.
1091	fn peerset_report_reject(&mut self, index: IncomingIndex) {
1092		let _ = self.report_reject(index);
1093	}
1094
1095	/// Function that is called when the protocol wants us to reject an incoming peer.
1096	fn protocol_report_reject(&mut self, index: IncomingIndex) {
1097		if let Some((set_id, peer_id)) = self.report_reject(index) {
1098			self.protocol_controller_handles[usize::from(set_id)].dropped(peer_id)
1099		}
1100	}
1101
1102	/// Function that is called when the peerset wants us to reject an incoming peer.
1103	fn report_reject(&mut self, index: IncomingIndex) -> Option<(SetId, PeerId)> {
1104		let incoming = if let Some(pos) = self.incoming.iter().position(|i| i.incoming_id == index)
1105		{
1106			self.incoming.remove(pos)
1107		} else {
1108			error!(target: "sub-libp2p", "PSM => Reject({:?}): Invalid index", index);
1109			return None
1110		};
1111
1112		if !incoming.alive {
1113			trace!(
1114				target: "sub-libp2p",
1115				"PSM => Reject({:?}, {}, {:?}): Obsolete incoming, ignoring",
1116				index,
1117				incoming.peer_id,
1118				incoming.set_id,
1119			);
1120
1121			return None
1122		}
1123
1124		let state = match self.peers.get_mut(&(incoming.peer_id, incoming.set_id)) {
1125			Some(s) => s,
1126			None => {
1127				log::debug!(
1128					target: "sub-libp2p",
1129					"Connection to {:?} closed, ({:?} {:?}), ignoring accept",
1130					incoming.peer_id,
1131					incoming.set_id,
1132					index,
1133				);
1134				return None
1135			},
1136		};
1137
1138		match mem::replace(state, PeerState::Poisoned) {
1139			// Incoming => Disabled
1140			PeerState::Incoming { mut connections, backoff_until, incoming_index, .. } => {
1141				if index < incoming_index {
1142					warn!(
1143						target: "sub-libp2p",
1144						"PSM => Reject({:?}, {}, {:?}): Ignoring obsolete incoming index, we are already awaiting {:?}.",
1145						index, incoming.peer_id, incoming.set_id, incoming_index
1146					);
1147					return None
1148				} else if index > incoming_index {
1149					error!(
1150						target: "sub-libp2p",
1151						"PSM => Reject({:?}, {}, {:?}): Ignoring incoming index from the future, we are awaiting {:?}.",
1152						index, incoming.peer_id, incoming.set_id, incoming_index
1153					);
1154					debug_assert!(false);
1155					return None
1156				}
1157
1158				trace!(target: "sub-libp2p", "PSM => Reject({:?}, {}, {:?}): Rejecting connections.",
1159					index, incoming.peer_id, incoming.set_id);
1160
1161				debug_assert!(connections
1162					.iter()
1163					.any(|(_, s)| matches!(s, ConnectionState::OpenDesiredByRemote)));
1164				for (connec_id, connec_state) in connections
1165					.iter_mut()
1166					.filter(|(_, s)| matches!(s, ConnectionState::OpenDesiredByRemote))
1167				{
1168					trace!(target: "sub-libp2p", "Handler({:?}, {:?}) <= Close({:?})",
1169						incoming.peer_id, connec_id, incoming.set_id);
1170					self.events.push_back(ToSwarm::NotifyHandler {
1171						peer_id: incoming.peer_id,
1172						handler: NotifyHandler::One(*connec_id),
1173						event: NotifsHandlerIn::Close { protocol_index: incoming.set_id.into() },
1174					});
1175					*connec_state = ConnectionState::Closing;
1176				}
1177
1178				*state = PeerState::Disabled { connections, backoff_until };
1179				Some((incoming.set_id, incoming.peer_id))
1180			},
1181			// connection to peer may have been closed already
1182			st @ PeerState::Disabled { .. } | st @ PeerState::Backoff { .. } => {
1183				*state = st;
1184				None
1185			},
1186			peer => {
1187				error!(
1188					target: LOG_TARGET,
1189					"State mismatch in libp2p: Expected alive incoming. Got {peer:?}.",
1190				);
1191				None
1192			},
1193		}
1194	}
1195}
1196
1197impl NetworkBehaviour for Notifications {
1198	type ConnectionHandler = NotifsHandler;
1199	type ToSwarm = NotificationsOut;
1200
1201	fn handle_pending_inbound_connection(
1202		&mut self,
1203		_connection_id: ConnectionId,
1204		_local_addr: &Multiaddr,
1205		_remote_addr: &Multiaddr,
1206	) -> Result<(), ConnectionDenied> {
1207		Ok(())
1208	}
1209
1210	fn handle_pending_outbound_connection(
1211		&mut self,
1212		_connection_id: ConnectionId,
1213		_maybe_peer: Option<PeerId>,
1214		_addresses: &[Multiaddr],
1215		_effective_role: Endpoint,
1216	) -> Result<Vec<Multiaddr>, ConnectionDenied> {
1217		Ok(Vec::new())
1218	}
1219
1220	fn handle_established_inbound_connection(
1221		&mut self,
1222		_connection_id: ConnectionId,
1223		peer: PeerId,
1224		_local_addr: &Multiaddr,
1225		_remote_addr: &Multiaddr,
1226	) -> Result<THandler<Self>, ConnectionDenied> {
1227		Ok(NotifsHandler::new(peer, self.notif_protocols.clone(), Some(self.metrics.clone())))
1228	}
1229
1230	fn handle_established_outbound_connection(
1231		&mut self,
1232		_connection_id: ConnectionId,
1233		peer: PeerId,
1234		_addr: &Multiaddr,
1235		_role_override: Endpoint,
1236	) -> Result<THandler<Self>, ConnectionDenied> {
1237		Ok(NotifsHandler::new(peer, self.notif_protocols.clone(), Some(self.metrics.clone())))
1238	}
1239
1240	fn on_swarm_event(&mut self, event: FromSwarm<Self::ConnectionHandler>) {
1241		match event {
1242			FromSwarm::ConnectionEstablished(ConnectionEstablished {
1243				peer_id,
1244				endpoint,
1245				connection_id,
1246				..
1247			}) => {
1248				for set_id in (0..self.notif_protocols.len()).map(SetId::from) {
1249					match self.peers.entry((peer_id, set_id)).or_insert(PeerState::Poisoned) {
1250						// Requested | PendingRequest => Enabled
1251						st @ &mut PeerState::Requested |
1252						st @ &mut PeerState::PendingRequest { .. } => {
1253							trace!(target: "sub-libp2p",
1254								"Libp2p => Connected({}, {:?}, {:?}): Connection was requested by PSM.",
1255								peer_id, set_id, endpoint
1256							);
1257							trace!(target: "sub-libp2p", "Handler({:?}, {:?}) <= Open({:?})", peer_id, connection_id, set_id);
1258							self.events.push_back(ToSwarm::NotifyHandler {
1259								peer_id,
1260								handler: NotifyHandler::One(connection_id),
1261								event: NotifsHandlerIn::Open { protocol_index: set_id.into() },
1262							});
1263
1264							let mut connections = SmallVec::new();
1265							connections.push((connection_id, ConnectionState::Opening));
1266							*st = PeerState::Enabled { connections };
1267						},
1268
1269						// Poisoned gets inserted above if the entry was missing.
1270						// Ø | Backoff => Disabled
1271						st @ &mut PeerState::Poisoned | st @ &mut PeerState::Backoff { .. } => {
1272							let backoff_until =
1273								if let PeerState::Backoff { timer_deadline, .. } = st {
1274									Some(*timer_deadline)
1275								} else {
1276									None
1277								};
1278							trace!(target: "sub-libp2p",
1279								"Libp2p => Connected({}, {:?}, {:?}, {:?}): Not requested by PSM, disabling.",
1280								peer_id, set_id, endpoint, connection_id);
1281
1282							let mut connections = SmallVec::new();
1283							connections.push((connection_id, ConnectionState::Closed));
1284							*st = PeerState::Disabled { connections, backoff_until };
1285						},
1286
1287						// In all other states, add this new connection to the list of closed
1288						// inactive connections.
1289						PeerState::Incoming { connections, .. } |
1290						PeerState::Disabled { connections, .. } |
1291						PeerState::DisabledPendingEnable { connections, .. } |
1292						PeerState::Enabled { connections, .. } => {
1293							trace!(target: "sub-libp2p",
1294								"Libp2p => Connected({}, {:?}, {:?}, {:?}): Secondary connection. Leaving closed.",
1295								peer_id, set_id, endpoint, connection_id);
1296							connections.push((connection_id, ConnectionState::Closed));
1297						},
1298					}
1299				}
1300			},
1301			FromSwarm::ConnectionClosed(ConnectionClosed { peer_id, connection_id, .. }) => {
1302				for set_id in (0..self.notif_protocols.len()).map(SetId::from) {
1303					let mut entry = if let Entry::Occupied(entry) =
1304						self.peers.entry((peer_id, set_id))
1305					{
1306						entry
1307					} else {
1308						error!(target: "sub-libp2p", "inject_connection_closed: State mismatch in the custom protos handler");
1309						debug_assert!(false);
1310						return
1311					};
1312
1313					match mem::replace(entry.get_mut(), PeerState::Poisoned) {
1314						// Disabled => Disabled | Backoff | Ø
1315						PeerState::Disabled { mut connections, backoff_until } => {
1316							trace!(target: "sub-libp2p", "Libp2p => Disconnected({}, {:?}, {:?}): Disabled.",
1317								peer_id, set_id, connection_id);
1318
1319							if let Some(pos) =
1320								connections.iter().position(|(c, _)| *c == connection_id)
1321							{
1322								connections.remove(pos);
1323							} else {
1324								debug_assert!(false);
1325								error!(target: "sub-libp2p",
1326									"inject_connection_closed: State mismatch in the custom protos handler");
1327							}
1328
1329							if connections.is_empty() {
1330								if let Some(until) = backoff_until {
1331									let now = Instant::now();
1332									if until > now {
1333										let delay_id = self.next_delay_id;
1334										self.next_delay_id.0 += 1;
1335										let delay = futures_timer::Delay::new(until - now);
1336										self.delays.push(
1337											async move {
1338												delay.await;
1339												(delay_id, peer_id, set_id)
1340											}
1341											.boxed(),
1342										);
1343
1344										*entry.get_mut() = PeerState::Backoff {
1345											timer: delay_id,
1346											timer_deadline: until,
1347										};
1348									} else {
1349										entry.remove();
1350									}
1351								} else {
1352									entry.remove();
1353								}
1354							} else {
1355								*entry.get_mut() =
1356									PeerState::Disabled { connections, backoff_until };
1357							}
1358						},
1359
1360						// DisabledPendingEnable => DisabledPendingEnable | Backoff
1361						PeerState::DisabledPendingEnable {
1362							mut connections,
1363							timer_deadline,
1364							timer,
1365						} => {
1366							trace!(
1367								target: "sub-libp2p",
1368								"Libp2p => Disconnected({}, {:?}, {:?}): Disabled but pending enable.",
1369								peer_id, set_id, connection_id
1370							);
1371
1372							if let Some(pos) =
1373								connections.iter().position(|(c, _)| *c == connection_id)
1374							{
1375								connections.remove(pos);
1376							} else {
1377								error!(target: "sub-libp2p",
1378									"inject_connection_closed: State mismatch in the custom protos handler");
1379								debug_assert!(false);
1380							}
1381
1382							if connections.is_empty() {
1383								trace!(target: "sub-libp2p", "PSM <= Dropped({}, {:?})", peer_id, set_id);
1384								self.protocol_controller_handles[usize::from(set_id)]
1385									.dropped(peer_id);
1386								*entry.get_mut() = PeerState::Backoff { timer, timer_deadline };
1387							} else {
1388								*entry.get_mut() = PeerState::DisabledPendingEnable {
1389									connections,
1390									timer_deadline,
1391									timer,
1392								};
1393							}
1394						},
1395
1396						// Incoming => Incoming | Disabled | Backoff | Ø
1397						PeerState::Incoming {
1398							mut connections,
1399							backoff_until,
1400							incoming_index,
1401							peerset_rejected,
1402						} => {
1403							trace!(
1404								target: "sub-libp2p",
1405								"Libp2p => Disconnected({}, {:?}, {:?}): OpenDesiredByRemote.",
1406								peer_id, set_id, connection_id
1407							);
1408
1409							debug_assert!(connections
1410								.iter()
1411								.any(|(_, s)| matches!(s, ConnectionState::OpenDesiredByRemote)));
1412
1413							if let Some(pos) =
1414								connections.iter().position(|(c, _)| *c == connection_id)
1415							{
1416								connections.remove(pos);
1417							} else {
1418								error!(target: "sub-libp2p",
1419									"inject_connection_closed: State mismatch in the custom protos handler");
1420								debug_assert!(false);
1421							}
1422
1423							let no_desired_left = !connections
1424								.iter()
1425								.any(|(_, s)| matches!(s, ConnectionState::OpenDesiredByRemote));
1426
1427							// If no connection is `OpenDesiredByRemote` anymore, clean up the
1428							// peerset incoming request.
1429							if no_desired_left {
1430								// In the incoming state, we don't report "Dropped" straight away.
1431								// Instead we will report "Dropped" if receive the corresponding
1432								// "Accept".
1433								if let Some(state) = self
1434									.incoming
1435									.iter_mut()
1436									.find(|i| i.alive && i.set_id == set_id && i.peer_id == peer_id)
1437								{
1438									state.alive = false;
1439								} else {
1440									error!(target: "sub-libp2p", "State mismatch in libp2p: no entry in \
1441										incoming corresponding to an incoming state in peers");
1442									debug_assert!(false);
1443								}
1444							}
1445
1446							if connections.is_empty() {
1447								if let Some(until) = backoff_until {
1448									let now = Instant::now();
1449									if until > now {
1450										let delay_id = self.next_delay_id;
1451										self.next_delay_id.0 += 1;
1452										let delay = futures_timer::Delay::new(until - now);
1453										self.delays.push(
1454											async move {
1455												delay.await;
1456												(delay_id, peer_id, set_id)
1457											}
1458											.boxed(),
1459										);
1460
1461										*entry.get_mut() = PeerState::Backoff {
1462											timer: delay_id,
1463											timer_deadline: until,
1464										};
1465									} else {
1466										entry.remove();
1467									}
1468								} else {
1469									entry.remove();
1470								}
1471							} else if no_desired_left {
1472								// If no connection is `OpenDesiredByRemote` anymore, switch to
1473								// `Disabled`.
1474								*entry.get_mut() =
1475									PeerState::Disabled { connections, backoff_until };
1476							} else {
1477								*entry.get_mut() = PeerState::Incoming {
1478									connections,
1479									backoff_until,
1480									incoming_index,
1481									peerset_rejected,
1482								};
1483							}
1484						},
1485
1486						// Enabled => Enabled | Backoff
1487						// Peers are always backed-off when disconnecting while Enabled.
1488						PeerState::Enabled { mut connections } => {
1489							trace!(
1490								target: "sub-libp2p",
1491								"Libp2p => Disconnected({}, {:?}, {:?}): Enabled.",
1492								peer_id, set_id, connection_id
1493							);
1494
1495							debug_assert!(connections.iter().any(|(_, s)| matches!(
1496								s,
1497								ConnectionState::Opening | ConnectionState::Open(_)
1498							)));
1499
1500							if let Some(pos) =
1501								connections.iter().position(|(c, _)| *c == connection_id)
1502							{
1503								let (_, state) = connections.remove(pos);
1504								if let ConnectionState::Open(_) = state {
1505									if let Some((replacement_pos, replacement_sink)) = connections
1506										.iter()
1507										.enumerate()
1508										.find_map(|(num, (_, s))| match s {
1509											ConnectionState::Open(s) => Some((num, s.clone())),
1510											_ => None,
1511										}) {
1512										if pos <= replacement_pos {
1513											trace!(
1514												target: "sub-libp2p",
1515												"External API <= Sink replaced({}, {:?})",
1516												peer_id, set_id
1517											);
1518											let event = NotificationsOut::CustomProtocolReplaced {
1519												peer_id,
1520												set_id,
1521												notifications_sink: replacement_sink.clone(),
1522											};
1523											self.events.push_back(ToSwarm::GenerateEvent(event));
1524										}
1525									} else {
1526										trace!(
1527											target: "sub-libp2p", "External API <= Closed({}, {:?})",
1528											peer_id, set_id
1529										);
1530										let event = NotificationsOut::CustomProtocolClosed {
1531											peer_id,
1532											set_id,
1533										};
1534										self.events.push_back(ToSwarm::GenerateEvent(event));
1535									}
1536								}
1537							} else {
1538								error!(target: "sub-libp2p",
1539									"inject_connection_closed: State mismatch in the custom protos handler");
1540								debug_assert!(false);
1541							}
1542
1543							if connections.is_empty() {
1544								trace!(target: "sub-libp2p", "PSM <= Dropped({}, {:?})", peer_id, set_id);
1545								self.protocol_controller_handles[usize::from(set_id)]
1546									.dropped(peer_id);
1547								let ban_dur = Uniform::new(5, 10).sample(&mut rand::thread_rng());
1548
1549								let delay_id = self.next_delay_id;
1550								self.next_delay_id.0 += 1;
1551								let delay = futures_timer::Delay::new(Duration::from_secs(ban_dur));
1552								self.delays.push(
1553									async move {
1554										delay.await;
1555										(delay_id, peer_id, set_id)
1556									}
1557									.boxed(),
1558								);
1559
1560								*entry.get_mut() = PeerState::Backoff {
1561									timer: delay_id,
1562									timer_deadline: Instant::now() + Duration::from_secs(ban_dur),
1563								};
1564							} else if !connections.iter().any(|(_, s)| {
1565								matches!(s, ConnectionState::Opening | ConnectionState::Open(_))
1566							}) {
1567								trace!(target: "sub-libp2p", "PSM <= Dropped({}, {:?})", peer_id, set_id);
1568								self.protocol_controller_handles[usize::from(set_id)]
1569									.dropped(peer_id);
1570
1571								*entry.get_mut() =
1572									PeerState::Disabled { connections, backoff_until: None };
1573							} else {
1574								*entry.get_mut() = PeerState::Enabled { connections };
1575							}
1576						},
1577
1578						PeerState::Requested |
1579						PeerState::PendingRequest { .. } |
1580						PeerState::Backoff { .. } => {
1581							// This is a serious bug either in this state machine or in libp2p.
1582							error!(target: "sub-libp2p",
1583								"`inject_connection_closed` called for unknown peer {}",
1584								peer_id);
1585							debug_assert!(false);
1586						},
1587						PeerState::Poisoned => {
1588							error!(target: "sub-libp2p", "State of peer {} is poisoned", peer_id);
1589							debug_assert!(false);
1590						},
1591					}
1592				}
1593			},
1594			FromSwarm::DialFailure(DialFailure { peer_id, error, .. }) => {
1595				if let DialError::Transport(errors) = error {
1596					for (addr, error) in errors.iter() {
1597						trace!(target: "sub-libp2p", "Libp2p => Reach failure for {:?} through {:?}: {:?}", peer_id, addr, error);
1598					}
1599				}
1600
1601				if let Some(peer_id) = peer_id {
1602					trace!(target: "sub-libp2p", "Libp2p => Dial failure for {:?}", peer_id);
1603
1604					for set_id in (0..self.notif_protocols.len()).map(SetId::from) {
1605						if let Entry::Occupied(mut entry) = self.peers.entry((peer_id, set_id)) {
1606							match mem::replace(entry.get_mut(), PeerState::Poisoned) {
1607								// The peer is not in our list.
1608								st @ PeerState::Backoff { .. } => {
1609									*entry.into_mut() = st;
1610								},
1611
1612								// "Basic" situation: we failed to reach a peer that the peerset
1613								// requested.
1614								st @ PeerState::Requested |
1615								st @ PeerState::PendingRequest { .. } => {
1616									trace!(target: "sub-libp2p", "PSM <= Dropped({}, {:?})", peer_id, set_id);
1617									self.protocol_controller_handles[usize::from(set_id)]
1618										.dropped(peer_id);
1619
1620									let now = Instant::now();
1621									let ban_duration = match st {
1622										PeerState::PendingRequest { timer_deadline, .. }
1623											if timer_deadline > now =>
1624											cmp::max(timer_deadline - now, Duration::from_secs(5)),
1625										_ => Duration::from_secs(5),
1626									};
1627
1628									let delay_id = self.next_delay_id;
1629									self.next_delay_id.0 += 1;
1630									let delay = futures_timer::Delay::new(ban_duration);
1631									self.delays.push(
1632										async move {
1633											delay.await;
1634											(delay_id, peer_id, set_id)
1635										}
1636										.boxed(),
1637									);
1638
1639									*entry.into_mut() = PeerState::Backoff {
1640										timer: delay_id,
1641										timer_deadline: now + ban_duration,
1642									};
1643								},
1644
1645								// We can still get dial failures even if we are already connected
1646								// to the peer, as an extra diagnostic for an earlier attempt.
1647								st @ PeerState::Disabled { .. } |
1648								st @ PeerState::Enabled { .. } |
1649								st @ PeerState::DisabledPendingEnable { .. } |
1650								st @ PeerState::Incoming { .. } => {
1651									*entry.into_mut() = st;
1652								},
1653
1654								PeerState::Poisoned => {
1655									error!(target: "sub-libp2p", "State of {:?} is poisoned", peer_id);
1656									debug_assert!(false);
1657								},
1658							}
1659						}
1660					}
1661				}
1662			},
1663			FromSwarm::ListenerClosed(_) => {},
1664			FromSwarm::ListenFailure(_) => {},
1665			FromSwarm::ListenerError(_) => {},
1666			FromSwarm::ExternalAddrExpired(_) => {},
1667			FromSwarm::NewListener(_) => {},
1668			FromSwarm::ExpiredListenAddr(_) => {},
1669			FromSwarm::NewExternalAddrCandidate(_) => {},
1670			FromSwarm::ExternalAddrConfirmed(_) => {},
1671			FromSwarm::AddressChange(_) => {},
1672			FromSwarm::NewListenAddr(_) => {},
1673		}
1674	}
1675
1676	fn on_connection_handler_event(
1677		&mut self,
1678		peer_id: PeerId,
1679		connection_id: ConnectionId,
1680		event: THandlerOutEvent<Self>,
1681	) {
1682		match event {
1683			NotifsHandlerOut::OpenDesiredByRemote { protocol_index, handshake } => {
1684				let set_id = SetId::from(protocol_index);
1685
1686				trace!(target: "sub-libp2p",
1687					"Handler({:?}, {:?}]) => OpenDesiredByRemote({:?})",
1688					peer_id, connection_id, set_id);
1689
1690				let mut entry = if let Entry::Occupied(entry) = self.peers.entry((peer_id, set_id))
1691				{
1692					entry
1693				} else {
1694					error!(
1695						target: "sub-libp2p",
1696						"OpenDesiredByRemote: State mismatch in the custom protos handler"
1697					);
1698					debug_assert!(false);
1699					return
1700				};
1701
1702				match mem::replace(entry.get_mut(), PeerState::Poisoned) {
1703					// Incoming => Incoming
1704					PeerState::Incoming {
1705						mut connections,
1706						backoff_until,
1707						incoming_index,
1708						peerset_rejected,
1709					} => {
1710						debug_assert!(connections
1711							.iter()
1712							.any(|(_, s)| matches!(s, ConnectionState::OpenDesiredByRemote)));
1713						if let Some((_, connec_state)) =
1714							connections.iter_mut().find(|(c, _)| *c == connection_id)
1715						{
1716							if let ConnectionState::Closed = *connec_state {
1717								*connec_state = ConnectionState::OpenDesiredByRemote;
1718							} else {
1719								// Connections in `OpeningThenClosing` and `Closing` state can be
1720								// in a Closed phase, and as such can emit `OpenDesiredByRemote`
1721								// messages.
1722								// Since an `Open` and/or a `Close` message have already been sent,
1723								// there is nothing much that can be done about this anyway.
1724								debug_assert!(matches!(
1725									connec_state,
1726									ConnectionState::OpeningThenClosing | ConnectionState::Closing
1727								));
1728							}
1729						} else {
1730							error!(
1731								target: "sub-libp2p",
1732								"OpenDesiredByRemote: State mismatch in the custom protos handler"
1733							);
1734							debug_assert!(false);
1735						}
1736
1737						*entry.into_mut() = PeerState::Incoming {
1738							connections,
1739							backoff_until,
1740							incoming_index,
1741							peerset_rejected,
1742						};
1743					},
1744
1745					PeerState::Enabled { mut connections } => {
1746						debug_assert!(connections.iter().any(|(_, s)| matches!(
1747							s,
1748							ConnectionState::Opening | ConnectionState::Open(_)
1749						)));
1750
1751						if let Some((_, connec_state)) =
1752							connections.iter_mut().find(|(c, _)| *c == connection_id)
1753						{
1754							if let ConnectionState::Closed = *connec_state {
1755								trace!(target: "sub-libp2p", "Handler({:?}, {:?}) <= Open({:?})",
1756									peer_id, connection_id, set_id);
1757								self.events.push_back(ToSwarm::NotifyHandler {
1758									peer_id,
1759									handler: NotifyHandler::One(connection_id),
1760									event: NotifsHandlerIn::Open { protocol_index: set_id.into() },
1761								});
1762								*connec_state = ConnectionState::Opening;
1763							} else {
1764								// Connections in `OpeningThenClosing`, `Opening`, and `Closing`
1765								// state can be in a Closed phase, and as such can emit
1766								// `OpenDesiredByRemote` messages.
1767								// Since an `Open` message haS already been sent, there is nothing
1768								// more to do.
1769								debug_assert!(matches!(
1770									connec_state,
1771									ConnectionState::OpenDesiredByRemote |
1772										ConnectionState::Closing | ConnectionState::Opening
1773								));
1774							}
1775						} else {
1776							error!(
1777								target: "sub-libp2p",
1778								"OpenDesiredByRemote: State mismatch in the custom protos handler"
1779							);
1780							debug_assert!(false);
1781						}
1782
1783						*entry.into_mut() = PeerState::Enabled { connections };
1784					},
1785
1786					// Disabled => Disabled | Incoming
1787					PeerState::Disabled { mut connections, backoff_until } => {
1788						if let Some((_, connec_state)) =
1789							connections.iter_mut().find(|(c, _)| *c == connection_id)
1790						{
1791							if let ConnectionState::Closed = *connec_state {
1792								*connec_state = ConnectionState::OpenDesiredByRemote;
1793
1794								let incoming_id = self.next_incoming_index;
1795								self.next_incoming_index.0 += 1;
1796
1797								trace!(target: "sub-libp2p", "PSM <= Incoming({}, {:?}, {:?}).",
1798									peer_id, set_id, incoming_id);
1799								self.protocol_controller_handles[usize::from(set_id)]
1800									.incoming_connection(peer_id, incoming_id);
1801								self.incoming.push(IncomingPeer {
1802									peer_id,
1803									set_id,
1804									alive: true,
1805									incoming_id,
1806									handshake,
1807								});
1808
1809								*entry.into_mut() = PeerState::Incoming {
1810									connections,
1811									backoff_until,
1812									peerset_rejected: false,
1813									incoming_index: incoming_id,
1814								};
1815							} else {
1816								// Connections in `OpeningThenClosing` and `Closing` state can be
1817								// in a Closed phase, and as such can emit `OpenDesiredByRemote`
1818								// messages.
1819								// We ignore them.
1820								debug_assert!(matches!(
1821									connec_state,
1822									ConnectionState::OpeningThenClosing | ConnectionState::Closing
1823								));
1824								*entry.into_mut() =
1825									PeerState::Disabled { connections, backoff_until };
1826							}
1827						} else {
1828							error!(
1829								target: "sub-libp2p",
1830								"OpenDesiredByRemote: State mismatch in the custom protos handler"
1831							);
1832							debug_assert!(false);
1833						}
1834					},
1835
1836					// DisabledPendingEnable => Enabled | DisabledPendingEnable
1837					PeerState::DisabledPendingEnable { mut connections, timer, timer_deadline } => {
1838						if let Some((_, connec_state)) =
1839							connections.iter_mut().find(|(c, _)| *c == connection_id)
1840						{
1841							if let ConnectionState::Closed = *connec_state {
1842								trace!(target: "sub-libp2p", "Handler({:?}, {:?}) <= Open({:?})",
1843									peer_id, connection_id, set_id);
1844								self.events.push_back(ToSwarm::NotifyHandler {
1845									peer_id,
1846									handler: NotifyHandler::One(connection_id),
1847									event: NotifsHandlerIn::Open { protocol_index: set_id.into() },
1848								});
1849								*connec_state = ConnectionState::Opening;
1850
1851								*entry.into_mut() = PeerState::Enabled { connections };
1852							} else {
1853								// Connections in `OpeningThenClosing` and `Closing` state can be
1854								// in a Closed phase, and as such can emit `OpenDesiredByRemote`
1855								// messages.
1856								// We ignore them.
1857								debug_assert!(matches!(
1858									connec_state,
1859									ConnectionState::OpeningThenClosing | ConnectionState::Closing
1860								));
1861								*entry.into_mut() = PeerState::DisabledPendingEnable {
1862									connections,
1863									timer,
1864									timer_deadline,
1865								};
1866							}
1867						} else {
1868							error!(
1869								target: "sub-libp2p",
1870								"OpenDesiredByRemote: State mismatch in the custom protos handler"
1871							);
1872							debug_assert!(false);
1873						}
1874					},
1875
1876					state => {
1877						error!(target: "sub-libp2p",
1878							   "OpenDesiredByRemote: Unexpected state in the custom protos handler: {:?}",
1879							   state);
1880						debug_assert!(false);
1881					},
1882				};
1883			},
1884
1885			NotifsHandlerOut::CloseDesired { protocol_index } => {
1886				let set_id = SetId::from(protocol_index);
1887
1888				trace!(target: "sub-libp2p",
1889					"Handler({}, {:?}) => CloseDesired({:?})",
1890					peer_id, connection_id, set_id);
1891
1892				let mut entry = if let Entry::Occupied(entry) = self.peers.entry((peer_id, set_id))
1893				{
1894					entry
1895				} else {
1896					error!(target: "sub-libp2p", "CloseDesired: State mismatch in the custom protos handler");
1897					debug_assert!(false);
1898					return
1899				};
1900
1901				match mem::replace(entry.get_mut(), PeerState::Poisoned) {
1902					// Enabled => Enabled | Disabled
1903					PeerState::Enabled { mut connections } => {
1904						debug_assert!(connections.iter().any(|(_, s)| matches!(
1905							s,
1906							ConnectionState::Opening | ConnectionState::Open(_)
1907						)));
1908
1909						let pos = if let Some(pos) =
1910							connections.iter().position(|(c, _)| *c == connection_id)
1911						{
1912							pos
1913						} else {
1914							error!(target: "sub-libp2p",
1915								"CloseDesired: State mismatch in the custom protos handler");
1916							debug_assert!(false);
1917							return
1918						};
1919
1920						if matches!(connections[pos].1, ConnectionState::Closing) {
1921							*entry.into_mut() = PeerState::Enabled { connections };
1922							return
1923						}
1924
1925						debug_assert!(matches!(connections[pos].1, ConnectionState::Open(_)));
1926						connections[pos].1 = ConnectionState::Closing;
1927
1928						trace!(target: "sub-libp2p", "Handler({}, {:?}) <= Close({:?})", peer_id, connection_id, set_id);
1929						self.events.push_back(ToSwarm::NotifyHandler {
1930							peer_id,
1931							handler: NotifyHandler::One(connection_id),
1932							event: NotifsHandlerIn::Close { protocol_index: set_id.into() },
1933						});
1934
1935						if let Some((replacement_pos, replacement_sink)) =
1936							connections.iter().enumerate().find_map(|(num, (_, s))| match s {
1937								ConnectionState::Open(s) => Some((num, s.clone())),
1938								_ => None,
1939							}) {
1940							if pos <= replacement_pos {
1941								trace!(target: "sub-libp2p", "External API <= Sink replaced({:?}, {:?})", peer_id, set_id);
1942								let event = NotificationsOut::CustomProtocolReplaced {
1943									peer_id,
1944									set_id,
1945									notifications_sink: replacement_sink.clone(),
1946								};
1947								self.events.push_back(ToSwarm::GenerateEvent(event));
1948							}
1949
1950							*entry.into_mut() = PeerState::Enabled { connections };
1951						} else {
1952							// List of open connections wasn't empty before but now it is.
1953							if !connections
1954								.iter()
1955								.any(|(_, s)| matches!(s, ConnectionState::Opening))
1956							{
1957								trace!(target: "sub-libp2p", "PSM <= Dropped({}, {:?})", peer_id, set_id);
1958								self.protocol_controller_handles[usize::from(set_id)]
1959									.dropped(peer_id);
1960								*entry.into_mut() =
1961									PeerState::Disabled { connections, backoff_until: None };
1962							} else {
1963								*entry.into_mut() = PeerState::Enabled { connections };
1964							}
1965
1966							trace!(target: "sub-libp2p", "External API <= Closed({}, {:?})", peer_id, set_id);
1967							let event = NotificationsOut::CustomProtocolClosed { peer_id, set_id };
1968							self.events.push_back(ToSwarm::GenerateEvent(event));
1969						}
1970					},
1971
1972					// All connections in `Disabled` and `DisabledPendingEnable` have been sent a
1973					// `Close` message already, and as such ignore any `CloseDesired` message.
1974					state @ PeerState::Disabled { .. } |
1975					state @ PeerState::DisabledPendingEnable { .. } => {
1976						*entry.into_mut() = state;
1977					},
1978					state => {
1979						error!(target: "sub-libp2p",
1980							"Unexpected state in the custom protos handler: {:?}",
1981							state);
1982					},
1983				}
1984			},
1985
1986			NotifsHandlerOut::CloseResult { protocol_index } => {
1987				let set_id = SetId::from(protocol_index);
1988
1989				trace!(target: "sub-libp2p",
1990					"Handler({}, {:?}) => CloseResult({:?})",
1991					peer_id, connection_id, set_id);
1992
1993				match self.peers.get_mut(&(peer_id, set_id)) {
1994					// Move the connection from `Closing` to `Closed`.
1995					Some(PeerState::Incoming { connections, .. }) |
1996					Some(PeerState::DisabledPendingEnable { connections, .. }) |
1997					Some(PeerState::Disabled { connections, .. }) |
1998					Some(PeerState::Enabled { connections, .. }) => {
1999						if let Some((_, connec_state)) = connections.iter_mut().find(|(c, s)| {
2000							*c == connection_id && matches!(s, ConnectionState::Closing)
2001						}) {
2002							*connec_state = ConnectionState::Closed;
2003						} else {
2004							error!(target: "sub-libp2p",
2005								"CloseResult: State mismatch in the custom protos handler");
2006							debug_assert!(false);
2007						}
2008					},
2009
2010					state => {
2011						error!(target: "sub-libp2p",
2012							   "CloseResult: Unexpected state in the custom protos handler: {:?}",
2013							   state);
2014						debug_assert!(false);
2015					},
2016				}
2017			},
2018
2019			NotifsHandlerOut::OpenResultOk {
2020				protocol_index,
2021				negotiated_fallback,
2022				received_handshake,
2023				notifications_sink,
2024				inbound,
2025				..
2026			} => {
2027				let set_id = SetId::from(protocol_index);
2028				trace!(target: "sub-libp2p",
2029					"Handler({}, {:?}) => OpenResultOk({:?})",
2030					peer_id, connection_id, set_id);
2031
2032				match self.peers.get_mut(&(peer_id, set_id)) {
2033					Some(PeerState::Enabled { connections, .. }) => {
2034						debug_assert!(connections.iter().any(|(_, s)| matches!(
2035							s,
2036							ConnectionState::Opening | ConnectionState::Open(_)
2037						)));
2038						let any_open =
2039							connections.iter().any(|(_, s)| matches!(s, ConnectionState::Open(_)));
2040
2041						if let Some((_, connec_state)) = connections.iter_mut().find(|(c, s)| {
2042							*c == connection_id && matches!(s, ConnectionState::Opening)
2043						}) {
2044							if !any_open {
2045								trace!(target: "sub-libp2p", "External API <= Open({}, {:?})", peer_id, set_id);
2046								let event = NotificationsOut::CustomProtocolOpen {
2047									peer_id,
2048									set_id,
2049									direction: if inbound {
2050										Direction::Inbound
2051									} else {
2052										Direction::Outbound
2053									},
2054									received_handshake: received_handshake.clone(),
2055									negotiated_fallback: negotiated_fallback.clone(),
2056									notifications_sink: notifications_sink.clone(),
2057								};
2058								self.events.push_back(ToSwarm::GenerateEvent(event));
2059							}
2060							*connec_state = ConnectionState::Open(notifications_sink);
2061						} else if let Some((_, connec_state)) =
2062							connections.iter_mut().find(|(c, s)| {
2063								*c == connection_id &&
2064									matches!(s, ConnectionState::OpeningThenClosing)
2065							}) {
2066							*connec_state = ConnectionState::Closing;
2067						} else {
2068							error!(target: "sub-libp2p",
2069								"OpenResultOk State mismatch in the custom protos handler");
2070							debug_assert!(false);
2071						}
2072					},
2073
2074					Some(PeerState::Incoming { connections, .. }) |
2075					Some(PeerState::DisabledPendingEnable { connections, .. }) |
2076					Some(PeerState::Disabled { connections, .. }) => {
2077						if let Some((_, connec_state)) = connections.iter_mut().find(|(c, s)| {
2078							*c == connection_id && matches!(s, ConnectionState::OpeningThenClosing)
2079						}) {
2080							*connec_state = ConnectionState::Closing;
2081						} else {
2082							error!(target: "sub-libp2p",
2083								"OpenResultOk State mismatch in the custom protos handler");
2084							debug_assert!(false);
2085						}
2086					},
2087
2088					state => {
2089						error!(target: "sub-libp2p",
2090							   "OpenResultOk: Unexpected state in the custom protos handler: {:?}",
2091							   state);
2092						debug_assert!(false);
2093					},
2094				}
2095			},
2096
2097			NotifsHandlerOut::OpenResultErr { protocol_index } => {
2098				let set_id = SetId::from(protocol_index);
2099				trace!(target: "sub-libp2p",
2100					"Handler({:?}, {:?}) => OpenResultErr({:?})",
2101					peer_id, connection_id, set_id);
2102
2103				let mut entry = if let Entry::Occupied(entry) = self.peers.entry((peer_id, set_id))
2104				{
2105					entry
2106				} else {
2107					error!(target: "sub-libp2p", "OpenResultErr: State mismatch in the custom protos handler");
2108					debug_assert!(false);
2109					return
2110				};
2111
2112				match mem::replace(entry.get_mut(), PeerState::Poisoned) {
2113					PeerState::Enabled { mut connections } => {
2114						debug_assert!(connections.iter().any(|(_, s)| matches!(
2115							s,
2116							ConnectionState::Opening | ConnectionState::Open(_)
2117						)));
2118
2119						if let Some((_, connec_state)) = connections.iter_mut().find(|(c, s)| {
2120							*c == connection_id && matches!(s, ConnectionState::Opening)
2121						}) {
2122							*connec_state = ConnectionState::Closed;
2123						} else if let Some((_, connec_state)) =
2124							connections.iter_mut().find(|(c, s)| {
2125								*c == connection_id &&
2126									matches!(s, ConnectionState::OpeningThenClosing)
2127							}) {
2128							*connec_state = ConnectionState::Closing;
2129						} else {
2130							error!(target: "sub-libp2p",
2131								"OpenResultErr: State mismatch in the custom protos handler");
2132							debug_assert!(false);
2133						}
2134
2135						if !connections.iter().any(|(_, s)| {
2136							matches!(s, ConnectionState::Opening | ConnectionState::Open(_))
2137						}) {
2138							trace!(target: "sub-libp2p", "PSM <= Dropped({:?}, {:?})", peer_id, set_id);
2139							self.protocol_controller_handles[usize::from(set_id)].dropped(peer_id);
2140
2141							let ban_dur = Uniform::new(5, 10).sample(&mut rand::thread_rng());
2142							*entry.into_mut() = PeerState::Disabled {
2143								connections,
2144								backoff_until: Some(Instant::now() + Duration::from_secs(ban_dur)),
2145							};
2146						} else {
2147							*entry.into_mut() = PeerState::Enabled { connections };
2148						}
2149					},
2150					mut state @ PeerState::Incoming { .. } |
2151					mut state @ PeerState::DisabledPendingEnable { .. } |
2152					mut state @ PeerState::Disabled { .. } => {
2153						match &mut state {
2154							PeerState::Incoming { connections, .. } |
2155							PeerState::Disabled { connections, .. } |
2156							PeerState::DisabledPendingEnable { connections, .. } => {
2157								if let Some((_, connec_state)) =
2158									connections.iter_mut().find(|(c, s)| {
2159										*c == connection_id &&
2160											matches!(s, ConnectionState::OpeningThenClosing)
2161									}) {
2162									*connec_state = ConnectionState::Closing;
2163								} else {
2164									error!(target: "sub-libp2p",
2165										"OpenResultErr: State mismatch in the custom protos handler");
2166									debug_assert!(false);
2167								}
2168							},
2169							_ => unreachable!(
2170								"Match branches are the same as the one on which we
2171							enter this block; qed"
2172							),
2173						};
2174
2175						*entry.into_mut() = state;
2176					},
2177					state => {
2178						error!(target: "sub-libp2p",
2179							"Unexpected state in the custom protos handler: {:?}",
2180							state);
2181						debug_assert!(false);
2182					},
2183				};
2184			},
2185
2186			NotifsHandlerOut::Notification { protocol_index, message } => {
2187				let set_id = SetId::from(protocol_index);
2188				if self.is_open(&peer_id, set_id) {
2189					trace!(
2190						target: "sub-libp2p",
2191						"Handler({:?}) => Notification({}, {:?}, {} bytes)",
2192						connection_id,
2193						peer_id,
2194						set_id,
2195						message.len()
2196					);
2197					trace!(
2198						target: "sub-libp2p",
2199						"External API <= Message({}, {:?})",
2200						peer_id,
2201						set_id,
2202					);
2203					let event = NotificationsOut::Notification {
2204						peer_id,
2205						set_id,
2206						message: message.clone(),
2207					};
2208					self.events.push_back(ToSwarm::GenerateEvent(event));
2209				} else {
2210					trace!(
2211						target: "sub-libp2p",
2212						"Handler({:?}) => Post-close notification({}, {:?}, {} bytes)",
2213						connection_id,
2214						peer_id,
2215						set_id,
2216						message.len()
2217					);
2218				}
2219			},
2220		}
2221	}
2222
2223	fn poll(
2224		&mut self,
2225		cx: &mut Context,
2226		_params: &mut impl PollParameters,
2227	) -> Poll<ToSwarm<Self::ToSwarm, THandlerInEvent<Self>>> {
2228		if let Some(event) = self.events.pop_front() {
2229			return Poll::Ready(event)
2230		}
2231
2232		// Poll for instructions from the protocol controllers.
2233		loop {
2234			match futures::Stream::poll_next(Pin::new(&mut self.from_protocol_controllers), cx) {
2235				Poll::Ready(Some(Message::Accept(index))) => {
2236					self.peerset_report_preaccept(index);
2237				},
2238				Poll::Ready(Some(Message::Reject(index))) => {
2239					let _ = self.peerset_report_reject(index);
2240				},
2241				Poll::Ready(Some(Message::Connect { peer_id, set_id, .. })) => {
2242					self.peerset_report_connect(peer_id, set_id);
2243				},
2244				Poll::Ready(Some(Message::Drop { peer_id, set_id, .. })) => {
2245					self.peerset_report_disconnect(peer_id, set_id);
2246				},
2247				Poll::Ready(None) => {
2248					error!(
2249						target: "sub-libp2p",
2250						"Protocol controllers receiver stream has returned `None`. Ignore this error if the node is shutting down.",
2251					);
2252					break
2253				},
2254				Poll::Pending => break,
2255			}
2256		}
2257
2258		// poll commands from protocols
2259		loop {
2260			match futures::Stream::poll_next(Pin::new(&mut self.command_streams), cx) {
2261				Poll::Ready(Some((set_id, command))) => match command {
2262					NotificationCommand::SetHandshake(handshake) => {
2263						self.set_notif_protocol_handshake(set_id.into(), handshake);
2264					},
2265					NotificationCommand::OpenSubstream(_peer) |
2266					NotificationCommand::CloseSubstream(_peer) => {
2267						todo!("substream control not implemented");
2268					},
2269				},
2270				Poll::Ready(None) => {
2271					error!(target: LOG_TARGET, "Protocol command streams have been shut down");
2272					break
2273				},
2274				Poll::Pending => break,
2275			}
2276		}
2277
2278		while let Poll::Ready(Some((result, index))) =
2279			self.pending_inbound_validations.poll_next_unpin(cx)
2280		{
2281			match result {
2282				Ok(ValidationResult::Accept) => {
2283					self.protocol_report_accept(index);
2284				},
2285				Ok(ValidationResult::Reject) => {
2286					self.protocol_report_reject(index);
2287				},
2288				Err(_) => {
2289					error!(target: LOG_TARGET, "Protocol has shut down");
2290					break
2291				},
2292			}
2293		}
2294
2295		while let Poll::Ready(Some((delay_id, peer_id, set_id))) =
2296			Pin::new(&mut self.delays).poll_next(cx)
2297		{
2298			let peer_state = match self.peers.get_mut(&(peer_id, set_id)) {
2299				Some(s) => s,
2300				// We intentionally never remove elements from `delays`, and it may
2301				// thus contain peers which are now gone. This is a normal situation.
2302				None => continue,
2303			};
2304
2305			match peer_state {
2306				PeerState::Backoff { timer, .. } if *timer == delay_id => {
2307					trace!(target: "sub-libp2p", "Libp2p <= Clean up ban of {:?} from the state ({:?})", peer_id, set_id);
2308					self.peers.remove(&(peer_id, set_id));
2309				},
2310
2311				PeerState::PendingRequest { timer, .. } if *timer == delay_id => {
2312					trace!(target: "sub-libp2p", "Libp2p <= Dial {:?} now that ban has expired ({:?})", peer_id, set_id);
2313					self.events.push_back(ToSwarm::Dial { opts: peer_id.into() });
2314					*peer_state = PeerState::Requested;
2315				},
2316
2317				PeerState::DisabledPendingEnable { connections, timer, timer_deadline }
2318					if *timer == delay_id =>
2319				{
2320					// The first element of `closed` is chosen to open the notifications substream.
2321					if let Some((connec_id, connec_state)) =
2322						connections.iter_mut().find(|(_, s)| matches!(s, ConnectionState::Closed))
2323					{
2324						trace!(target: "sub-libp2p", "Handler({}, {:?}) <= Open({:?}) (ban expired)",
2325							peer_id, *connec_id, set_id);
2326						self.events.push_back(ToSwarm::NotifyHandler {
2327							peer_id,
2328							handler: NotifyHandler::One(*connec_id),
2329							event: NotifsHandlerIn::Open { protocol_index: set_id.into() },
2330						});
2331						*connec_state = ConnectionState::Opening;
2332						*peer_state = PeerState::Enabled { connections: mem::take(connections) };
2333					} else {
2334						*timer_deadline = Instant::now() + Duration::from_secs(5);
2335						let delay = futures_timer::Delay::new(Duration::from_secs(5));
2336						let timer = *timer;
2337						self.delays.push(
2338							async move {
2339								delay.await;
2340								(timer, peer_id, set_id)
2341							}
2342							.boxed(),
2343						);
2344					}
2345				},
2346
2347				// We intentionally never remove elements from `delays`, and it may
2348				// thus contain obsolete entries. This is a normal situation.
2349				_ => {},
2350			}
2351		}
2352
2353		if let Some(event) = self.events.pop_front() {
2354			return Poll::Ready(event)
2355		}
2356
2357		Poll::Pending
2358	}
2359}
2360
2361#[cfg(test)]
2362#[allow(deprecated)]
2363mod tests {
2364	use super::*;
2365	use crate::{
2366		mock::MockPeerStore,
2367		protocol::notifications::handler::tests::*,
2368		protocol_controller::{IncomingIndex, ProtoSetConfig, ProtocolController},
2369	};
2370	use libp2p::core::ConnectedPoint;
2371	use sc_utils::mpsc::tracing_unbounded;
2372	use std::{collections::HashSet, iter};
2373
2374	impl PartialEq for ConnectionState {
2375		fn eq(&self, other: &ConnectionState) -> bool {
2376			match (self, other) {
2377				(ConnectionState::Closed, ConnectionState::Closed) => true,
2378				(ConnectionState::Closing, ConnectionState::Closing) => true,
2379				(ConnectionState::Opening, ConnectionState::Opening) => true,
2380				(ConnectionState::OpeningThenClosing, ConnectionState::OpeningThenClosing) => true,
2381				(ConnectionState::OpenDesiredByRemote, ConnectionState::OpenDesiredByRemote) =>
2382					true,
2383				(ConnectionState::Open(_), ConnectionState::Open(_)) => true,
2384				_ => false,
2385			}
2386		}
2387	}
2388
2389	#[derive(Clone)]
2390	struct MockPollParams {}
2391
2392	impl PollParameters for MockPollParams {
2393		type SupportedProtocolsIter = std::vec::IntoIter<Vec<u8>>;
2394
2395		fn supported_protocols(&self) -> Self::SupportedProtocolsIter {
2396			vec![].into_iter()
2397		}
2398	}
2399
2400	fn development_notifs(
2401	) -> (Notifications, ProtocolController, Box<dyn crate::service::traits::NotificationService>)
2402	{
2403		let (protocol_handle_pair, notif_service) =
2404			crate::protocol::notifications::service::notification_service("/proto/1".into());
2405		let (to_notifications, from_controller) =
2406			tracing_unbounded("test_controller_to_notifications", 10_000);
2407
2408		let (handle, controller) = ProtocolController::new(
2409			SetId::from(0),
2410			ProtoSetConfig {
2411				in_peers: 25,
2412				out_peers: 25,
2413				reserved_nodes: HashSet::new(),
2414				reserved_only: false,
2415			},
2416			to_notifications,
2417			Arc::new(MockPeerStore {}),
2418		);
2419
2420		let (notif_handle, command_stream) = protocol_handle_pair.split();
2421		(
2422			Notifications::new(
2423				vec![handle],
2424				from_controller,
2425				NotificationMetrics::new(None),
2426				iter::once((
2427					ProtocolConfig {
2428						name: "/foo".into(),
2429						fallback_names: Vec::new(),
2430						handshake: vec![1, 2, 3, 4],
2431						max_notification_size: u64::MAX,
2432					},
2433					notif_handle,
2434					command_stream,
2435				)),
2436			),
2437			controller,
2438			notif_service,
2439		)
2440	}
2441
2442	#[test]
2443	fn update_handshake() {
2444		let (mut notif, _controller, _notif_service) = development_notifs();
2445
2446		let inner = notif.notif_protocols.get_mut(0).unwrap().handshake.read().clone();
2447		assert_eq!(inner, vec![1, 2, 3, 4]);
2448
2449		notif.set_notif_protocol_handshake(0.into(), vec![5, 6, 7, 8]);
2450
2451		let inner = notif.notif_protocols.get_mut(0).unwrap().handshake.read().clone();
2452		assert_eq!(inner, vec![5, 6, 7, 8]);
2453	}
2454
2455	#[test]
2456	#[should_panic]
2457	#[cfg(debug_assertions)]
2458	fn update_unknown_handshake() {
2459		let (mut notif, _controller, _notif_service) = development_notifs();
2460
2461		notif.set_notif_protocol_handshake(1337.into(), vec![5, 6, 7, 8]);
2462	}
2463
2464	#[test]
2465	fn disconnect_backoff_peer() {
2466		let (mut notif, _controller, _notif_service) = development_notifs();
2467
2468		let peer = PeerId::random();
2469		notif.peers.insert(
2470			(peer, 0.into()),
2471			PeerState::Backoff { timer: DelayId(0), timer_deadline: Instant::now() },
2472		);
2473		notif.disconnect_peer(&peer, 0.into());
2474
2475		assert!(std::matches!(
2476			notif.peers.get(&(peer, 0.into())),
2477			Some(PeerState::Backoff { timer: DelayId(0), .. })
2478		));
2479	}
2480
2481	#[test]
2482	fn disconnect_pending_request() {
2483		let (mut notif, _controller, _notif_service) = development_notifs();
2484		let peer = PeerId::random();
2485
2486		notif.peers.insert(
2487			(peer, 0.into()),
2488			PeerState::PendingRequest { timer: DelayId(0), timer_deadline: Instant::now() },
2489		);
2490		notif.disconnect_peer(&peer, 0.into());
2491
2492		assert!(std::matches!(
2493			notif.peers.get(&(peer, 0.into())),
2494			Some(PeerState::PendingRequest { timer: DelayId(0), .. })
2495		));
2496	}
2497
2498	#[test]
2499	fn disconnect_requested_peer() {
2500		let (mut notif, _controller, _notif_service) = development_notifs();
2501
2502		let peer = PeerId::random();
2503		notif.peers.insert((peer, 0.into()), PeerState::Requested);
2504		notif.disconnect_peer(&peer, 0.into());
2505
2506		assert!(std::matches!(notif.peers.get(&(peer, 0.into())), Some(PeerState::Requested)));
2507	}
2508
2509	#[test]
2510	fn disconnect_disabled_peer() {
2511		let (mut notif, _controller, _notif_service) = development_notifs();
2512		let peer = PeerId::random();
2513		notif.peers.insert(
2514			(peer, 0.into()),
2515			PeerState::Disabled { backoff_until: None, connections: SmallVec::new() },
2516		);
2517		notif.disconnect_peer(&peer, 0.into());
2518
2519		assert!(std::matches!(
2520			notif.peers.get(&(peer, 0.into())),
2521			Some(PeerState::Disabled { backoff_until: None, .. })
2522		));
2523	}
2524
2525	#[test]
2526	fn remote_opens_connection_and_substream() {
2527		let (mut notif, _controller, _notif_service) = development_notifs();
2528		let peer = PeerId::random();
2529		let conn = ConnectionId::new_unchecked(0);
2530		let connected = ConnectedPoint::Listener {
2531			local_addr: Multiaddr::empty(),
2532			send_back_addr: Multiaddr::empty(),
2533		};
2534
2535		notif.on_swarm_event(FromSwarm::ConnectionEstablished(
2536			libp2p::swarm::behaviour::ConnectionEstablished {
2537				peer_id: peer,
2538				connection_id: conn,
2539				endpoint: &connected,
2540				failed_addresses: &[],
2541				other_established: 0usize,
2542			},
2543		));
2544
2545		if let Some(&PeerState::Disabled { ref connections, backoff_until: None }) =
2546			notif.peers.get(&(peer, 0.into()))
2547		{
2548			assert_eq!(connections[0], (conn, ConnectionState::Closed));
2549		} else {
2550			panic!("invalid state");
2551		}
2552
2553		// remote opens a substream, verify that peer state is updated to `Incoming`
2554		notif.on_connection_handler_event(
2555			peer,
2556			conn,
2557			NotifsHandlerOut::OpenDesiredByRemote {
2558				protocol_index: 0,
2559				handshake: vec![1, 3, 3, 7],
2560			},
2561		);
2562
2563		if let Some(&PeerState::Incoming { ref connections, backoff_until: None, .. }) =
2564			notif.peers.get(&(peer, 0.into()))
2565		{
2566			assert_eq!(connections.len(), 1);
2567			assert_eq!(connections[0], (conn, ConnectionState::OpenDesiredByRemote));
2568		} else {
2569			panic!("invalid state");
2570		}
2571
2572		assert!(std::matches!(
2573			notif.incoming.pop(),
2574			Some(IncomingPeer { alive: true, incoming_id: IncomingIndex(0), .. }),
2575		));
2576	}
2577
2578	#[tokio::test]
2579	async fn disconnect_remote_substream_before_handled_by_controller() {
2580		let (mut notif, _controller, _notif_service) = development_notifs();
2581		let peer = PeerId::random();
2582		let conn = ConnectionId::new_unchecked(0);
2583		let connected = ConnectedPoint::Listener {
2584			local_addr: Multiaddr::empty(),
2585			send_back_addr: Multiaddr::empty(),
2586		};
2587
2588		notif.on_swarm_event(FromSwarm::ConnectionEstablished(
2589			libp2p::swarm::behaviour::ConnectionEstablished {
2590				peer_id: peer,
2591				connection_id: conn,
2592				endpoint: &connected,
2593				failed_addresses: &[],
2594				other_established: 0usize,
2595			},
2596		));
2597		notif.on_connection_handler_event(
2598			peer,
2599			conn,
2600			NotifsHandlerOut::OpenDesiredByRemote {
2601				protocol_index: 0,
2602				handshake: vec![1, 3, 3, 7],
2603			},
2604		);
2605		notif.disconnect_peer(&peer, 0.into());
2606
2607		if let Some(&PeerState::Disabled { ref connections, backoff_until: None }) =
2608			notif.peers.get(&(peer, 0.into()))
2609		{
2610			assert_eq!(connections.len(), 1);
2611			assert_eq!(connections[0], (conn, ConnectionState::Closing));
2612		} else {
2613			panic!("invalid state");
2614		}
2615	}
2616
2617	#[test]
2618	fn peerset_report_connect_backoff() {
2619		let (mut notif, _controller, _notif_service) = development_notifs();
2620		let set_id = SetId::from(0);
2621		let peer = PeerId::random();
2622		let conn = ConnectionId::new_unchecked(0);
2623		let connected = ConnectedPoint::Listener {
2624			local_addr: Multiaddr::empty(),
2625			send_back_addr: Multiaddr::empty(),
2626		};
2627
2628		notif.on_swarm_event(FromSwarm::ConnectionEstablished(
2629			libp2p::swarm::behaviour::ConnectionEstablished {
2630				peer_id: peer,
2631				connection_id: conn,
2632				endpoint: &connected,
2633				failed_addresses: &[],
2634				other_established: 0usize,
2635			},
2636		));
2637		assert!(std::matches!(notif.peers.get(&(peer, set_id)), Some(&PeerState::Disabled { .. })));
2638
2639		// manually add backoff for the entry
2640		//
2641		// there is not straight-forward way of adding backoff to `PeerState::Disabled`
2642		// so manually adjust the value in order to progress on to the next stage.
2643		// This modification together with `ConnectionClosed` will convert the peer
2644		// state into `PeerState::Backoff`.
2645		if let Some(PeerState::Disabled { ref mut backoff_until, .. }) =
2646			notif.peers.get_mut(&(peer, set_id))
2647		{
2648			*backoff_until =
2649				Some(Instant::now().checked_add(std::time::Duration::from_secs(5)).unwrap());
2650		}
2651
2652		notif.on_swarm_event(FromSwarm::ConnectionClosed(
2653			libp2p::swarm::behaviour::ConnectionClosed {
2654				peer_id: peer,
2655				connection_id: conn,
2656				endpoint: &connected.clone(),
2657				handler: NotifsHandler::new(peer, vec![], None),
2658				remaining_established: 0usize,
2659			},
2660		));
2661
2662		let timer = if let Some(&PeerState::Backoff { timer_deadline, .. }) =
2663			notif.peers.get(&(peer, set_id))
2664		{
2665			timer_deadline
2666		} else {
2667			panic!("invalid state");
2668		};
2669
2670		// attempt to connect the backed-off peer and verify that the request is pending
2671		notif.peerset_report_connect(peer, set_id);
2672
2673		if let Some(&PeerState::PendingRequest { timer_deadline, .. }) =
2674			notif.peers.get(&(peer, set_id))
2675		{
2676			assert_eq!(timer, timer_deadline);
2677		} else {
2678			panic!("invalid state");
2679		}
2680	}
2681
2682	#[test]
2683	fn peerset_connect_incoming() {
2684		let (mut notif, _controller, _notif_service) = development_notifs();
2685		let peer = PeerId::random();
2686		let conn = ConnectionId::new_unchecked(0);
2687		let set_id = SetId::from(0);
2688		let connected = ConnectedPoint::Listener {
2689			local_addr: Multiaddr::empty(),
2690			send_back_addr: Multiaddr::empty(),
2691		};
2692
2693		notif.on_swarm_event(FromSwarm::ConnectionEstablished(
2694			libp2p::swarm::behaviour::ConnectionEstablished {
2695				peer_id: peer,
2696				connection_id: conn,
2697				endpoint: &connected,
2698				failed_addresses: &[],
2699				other_established: 0usize,
2700			},
2701		));
2702		assert!(std::matches!(notif.peers.get(&(peer, set_id)), Some(&PeerState::Disabled { .. })));
2703
2704		// remote opens a substream, verify that peer state is updated to `Incoming`
2705		notif.on_connection_handler_event(
2706			peer,
2707			conn,
2708			NotifsHandlerOut::OpenDesiredByRemote {
2709				protocol_index: 0,
2710				handshake: vec![1, 3, 3, 7],
2711			},
2712		);
2713
2714		// attempt to connect to the peer and verify that the peer state is `Enabled`;
2715		// we rely on implementation detail that incoming indices are counted from 0
2716		// to not mock the `Peerset`
2717		notif.protocol_report_accept(IncomingIndex(0));
2718		assert!(std::matches!(notif.peers.get(&(peer, set_id)), Some(&PeerState::Enabled { .. })));
2719	}
2720
2721	#[test]
2722	fn peerset_disconnect_disable_pending_enable() {
2723		let (mut notif, _controller, _notif_service) = development_notifs();
2724		let set_id = SetId::from(0);
2725		let peer = PeerId::random();
2726		let conn = ConnectionId::new_unchecked(0);
2727		let connected = ConnectedPoint::Listener {
2728			local_addr: Multiaddr::empty(),
2729			send_back_addr: Multiaddr::empty(),
2730		};
2731
2732		notif.on_swarm_event(FromSwarm::ConnectionEstablished(
2733			libp2p::swarm::behaviour::ConnectionEstablished {
2734				peer_id: peer,
2735				connection_id: conn,
2736				endpoint: &connected,
2737				failed_addresses: &[],
2738				other_established: 0usize,
2739			},
2740		));
2741		assert!(std::matches!(notif.peers.get(&(peer, set_id)), Some(&PeerState::Disabled { .. })));
2742
2743		// manually add backoff for the entry
2744		if let Some(PeerState::Disabled { ref mut backoff_until, .. }) =
2745			notif.peers.get_mut(&(peer, set_id))
2746		{
2747			*backoff_until =
2748				Some(Instant::now().checked_add(std::time::Duration::from_secs(5)).unwrap());
2749		}
2750
2751		// switch state to `DisabledPendingEnable`
2752		notif.peerset_report_connect(peer, set_id);
2753		assert!(std::matches!(
2754			notif.peers.get(&(peer, set_id)),
2755			Some(&PeerState::DisabledPendingEnable { .. })
2756		));
2757
2758		notif.peerset_report_disconnect(peer, set_id);
2759
2760		if let Some(PeerState::Disabled { backoff_until, .. }) = notif.peers.get(&(peer, set_id)) {
2761			assert!(backoff_until.is_some());
2762			assert!(backoff_until.unwrap() > Instant::now());
2763		} else {
2764			panic!("invalid state");
2765		}
2766	}
2767
2768	#[test]
2769	fn peerset_disconnect_enabled() {
2770		let (mut notif, _controller, _notif_service) = development_notifs();
2771		let peer = PeerId::random();
2772		let conn = ConnectionId::new_unchecked(0);
2773		let set_id = SetId::from(0);
2774		let connected = ConnectedPoint::Listener {
2775			local_addr: Multiaddr::empty(),
2776			send_back_addr: Multiaddr::empty(),
2777		};
2778
2779		// Set peer into `Enabled` state.
2780		notif.on_swarm_event(FromSwarm::ConnectionEstablished(
2781			libp2p::swarm::behaviour::ConnectionEstablished {
2782				peer_id: peer,
2783				connection_id: conn,
2784				endpoint: &connected,
2785				failed_addresses: &[],
2786				other_established: 0usize,
2787			},
2788		));
2789		notif.on_connection_handler_event(
2790			peer,
2791			conn,
2792			NotifsHandlerOut::OpenDesiredByRemote {
2793				protocol_index: 0,
2794				handshake: vec![1, 3, 3, 7],
2795			},
2796		);
2797		// we rely on the implementation detail that incoming indices are counted from 0
2798		// to not mock the `Peerset`
2799		notif.protocol_report_accept(IncomingIndex(0));
2800		assert!(std::matches!(notif.peers.get(&(peer, set_id)), Some(&PeerState::Enabled { .. })));
2801
2802		// disconnect peer and verify that the state is `Disabled`
2803		notif.peerset_report_disconnect(peer, set_id);
2804		assert!(std::matches!(notif.peers.get(&(peer, set_id)), Some(&PeerState::Disabled { .. })));
2805	}
2806
2807	#[test]
2808	fn peerset_disconnect_requested() {
2809		let (mut notif, _controller, _notif_service) = development_notifs();
2810		let peer = PeerId::random();
2811		let set_id = SetId::from(0);
2812
2813		// Set peer into `Requested` state.
2814		notif.peerset_report_connect(peer, set_id);
2815		assert!(std::matches!(notif.peers.get(&(peer, set_id)), Some(&PeerState::Requested)));
2816
2817		// disconnect peer and verify that the state is `Disabled`
2818		notif.peerset_report_disconnect(peer, set_id);
2819		assert!(notif.peers.get(&(peer, set_id)).is_none());
2820	}
2821
2822	#[test]
2823	fn peerset_disconnect_pending_request() {
2824		let (mut notif, _controller, _notif_service) = development_notifs();
2825		let set_id = SetId::from(0);
2826		let peer = PeerId::random();
2827		let conn = ConnectionId::new_unchecked(0);
2828		let connected = ConnectedPoint::Listener {
2829			local_addr: Multiaddr::empty(),
2830			send_back_addr: Multiaddr::empty(),
2831		};
2832
2833		notif.on_swarm_event(FromSwarm::ConnectionEstablished(
2834			libp2p::swarm::behaviour::ConnectionEstablished {
2835				peer_id: peer,
2836				connection_id: conn,
2837				endpoint: &connected,
2838				failed_addresses: &[],
2839				other_established: 0usize,
2840			},
2841		));
2842		assert!(std::matches!(notif.peers.get(&(peer, set_id)), Some(&PeerState::Disabled { .. })));
2843
2844		// manually add backoff for the entry
2845		if let Some(PeerState::Disabled { ref mut backoff_until, .. }) =
2846			notif.peers.get_mut(&(peer, set_id))
2847		{
2848			*backoff_until =
2849				Some(Instant::now().checked_add(std::time::Duration::from_secs(5)).unwrap());
2850		}
2851
2852		notif.on_swarm_event(FromSwarm::ConnectionClosed(
2853			libp2p::swarm::behaviour::ConnectionClosed {
2854				peer_id: peer,
2855				connection_id: conn,
2856				endpoint: &connected.clone(),
2857				handler: NotifsHandler::new(peer, vec![], None),
2858				remaining_established: 0usize,
2859			},
2860		));
2861		assert!(std::matches!(notif.peers.get(&(peer, set_id)), Some(&PeerState::Backoff { .. })));
2862
2863		// attempt to connect the backed-off peer and verify that the request is pending
2864		notif.peerset_report_connect(peer, set_id);
2865		assert!(std::matches!(
2866			notif.peers.get(&(peer, set_id)),
2867			Some(&PeerState::PendingRequest { .. })
2868		));
2869
2870		// attempt to disconnect the backed-off peer and verify that the request is pending
2871		notif.peerset_report_disconnect(peer, set_id);
2872		assert!(std::matches!(notif.peers.get(&(peer, set_id)), Some(&PeerState::Backoff { .. })));
2873	}
2874
2875	#[test]
2876	fn peerset_accept_peer_not_alive() {
2877		let (mut notif, _controller, _notif_service) = development_notifs();
2878		let peer = PeerId::random();
2879		let conn = ConnectionId::new_unchecked(0);
2880		let set_id = SetId::from(0);
2881		let connected = ConnectedPoint::Listener {
2882			local_addr: Multiaddr::empty(),
2883			send_back_addr: Multiaddr::empty(),
2884		};
2885
2886		notif.on_swarm_event(FromSwarm::ConnectionEstablished(
2887			libp2p::swarm::behaviour::ConnectionEstablished {
2888				peer_id: peer,
2889				connection_id: conn,
2890				endpoint: &connected,
2891				failed_addresses: &[],
2892				other_established: 0usize,
2893			},
2894		));
2895		assert!(std::matches!(notif.peers.get(&(peer, set_id)), Some(&PeerState::Disabled { .. })));
2896
2897		// remote opens a substream, verify that peer state is updated to `Incoming`
2898		notif.on_connection_handler_event(
2899			peer,
2900			conn,
2901			NotifsHandlerOut::OpenDesiredByRemote {
2902				protocol_index: 0,
2903				handshake: vec![1, 3, 3, 7],
2904			},
2905		);
2906		assert!(std::matches!(notif.peers.get(&(peer, set_id)), Some(&PeerState::Incoming { .. })));
2907
2908		assert!(std::matches!(
2909			notif.incoming[0],
2910			IncomingPeer { alive: true, incoming_id: IncomingIndex(0), .. },
2911		));
2912
2913		notif.disconnect_peer(&peer, set_id);
2914		assert!(std::matches!(notif.peers.get(&(peer, set_id)), Some(&PeerState::Disabled { .. })));
2915		assert!(std::matches!(
2916			notif.incoming[0],
2917			IncomingPeer { alive: false, incoming_id: IncomingIndex(0), .. },
2918		));
2919
2920		notif.protocol_report_accept(IncomingIndex(0));
2921		assert_eq!(notif.incoming.len(), 0);
2922		assert!(std::matches!(notif.peers.get(&(peer, set_id)), Some(PeerState::Disabled { .. })));
2923	}
2924
2925	#[test]
2926	fn secondary_connection_peer_state_incoming() {
2927		let (mut notif, _controller, _notif_service) = development_notifs();
2928		let peer = PeerId::random();
2929		let conn = ConnectionId::new_unchecked(0);
2930		let conn2 = ConnectionId::new_unchecked(1);
2931		let set_id = SetId::from(0);
2932		let connected = ConnectedPoint::Listener {
2933			local_addr: Multiaddr::empty(),
2934			send_back_addr: Multiaddr::empty(),
2935		};
2936
2937		notif.on_swarm_event(FromSwarm::ConnectionEstablished(
2938			libp2p::swarm::behaviour::ConnectionEstablished {
2939				peer_id: peer,
2940				connection_id: conn,
2941				endpoint: &connected,
2942				failed_addresses: &[],
2943				other_established: 0usize,
2944			},
2945		));
2946		assert!(std::matches!(notif.peers.get(&(peer, set_id)), Some(&PeerState::Disabled { .. })));
2947
2948		notif.on_connection_handler_event(
2949			peer,
2950			conn,
2951			NotifsHandlerOut::OpenDesiredByRemote {
2952				protocol_index: 0,
2953				handshake: vec![1, 3, 3, 7],
2954			},
2955		);
2956		if let Some(PeerState::Incoming { connections, .. }) = notif.peers.get(&(peer, set_id)) {
2957			assert_eq!(connections.len(), 1);
2958			assert_eq!(connections[0], (conn, ConnectionState::OpenDesiredByRemote));
2959		} else {
2960			panic!("invalid state");
2961		}
2962
2963		// add another connection
2964		notif.on_swarm_event(FromSwarm::ConnectionEstablished(
2965			libp2p::swarm::behaviour::ConnectionEstablished {
2966				peer_id: peer,
2967				connection_id: conn2,
2968				endpoint: &connected,
2969				failed_addresses: &[],
2970				other_established: 0usize,
2971			},
2972		));
2973
2974		if let Some(PeerState::Incoming { connections, .. }) = notif.peers.get(&(peer, set_id)) {
2975			assert_eq!(connections.len(), 2);
2976			assert_eq!(connections[0], (conn, ConnectionState::OpenDesiredByRemote));
2977			assert_eq!(connections[1], (conn2, ConnectionState::Closed));
2978		} else {
2979			panic!("invalid state");
2980		}
2981	}
2982
2983	#[test]
2984	fn close_connection_for_disabled_peer() {
2985		let (mut notif, _controller, _notif_service) = development_notifs();
2986		let peer = PeerId::random();
2987		let conn = ConnectionId::new_unchecked(0);
2988		let set_id = SetId::from(0);
2989		let connected = ConnectedPoint::Listener {
2990			local_addr: Multiaddr::empty(),
2991			send_back_addr: Multiaddr::empty(),
2992		};
2993
2994		notif.on_swarm_event(FromSwarm::ConnectionEstablished(
2995			libp2p::swarm::behaviour::ConnectionEstablished {
2996				peer_id: peer,
2997				connection_id: conn,
2998				endpoint: &connected,
2999				failed_addresses: &[],
3000				other_established: 0usize,
3001			},
3002		));
3003		assert!(std::matches!(notif.peers.get(&(peer, set_id)), Some(&PeerState::Disabled { .. })));
3004
3005		notif.on_swarm_event(FromSwarm::ConnectionClosed(
3006			libp2p::swarm::behaviour::ConnectionClosed {
3007				peer_id: peer,
3008				connection_id: conn,
3009				endpoint: &connected.clone(),
3010				handler: NotifsHandler::new(peer, vec![], None),
3011				remaining_established: 0usize,
3012			},
3013		));
3014		assert!(notif.peers.get(&(peer, set_id)).is_none());
3015	}
3016
3017	#[test]
3018	fn close_connection_for_incoming_peer_one_connection() {
3019		let (mut notif, _controller, _notif_service) = development_notifs();
3020		let peer = PeerId::random();
3021		let conn = ConnectionId::new_unchecked(0);
3022		let set_id = SetId::from(0);
3023		let connected = ConnectedPoint::Listener {
3024			local_addr: Multiaddr::empty(),
3025			send_back_addr: Multiaddr::empty(),
3026		};
3027
3028		notif.on_swarm_event(FromSwarm::ConnectionEstablished(
3029			libp2p::swarm::behaviour::ConnectionEstablished {
3030				peer_id: peer,
3031				connection_id: conn,
3032				endpoint: &connected,
3033				failed_addresses: &[],
3034				other_established: 0usize,
3035			},
3036		));
3037		assert!(std::matches!(notif.peers.get(&(peer, set_id)), Some(&PeerState::Disabled { .. })));
3038
3039		notif.on_connection_handler_event(
3040			peer,
3041			conn,
3042			NotifsHandlerOut::OpenDesiredByRemote {
3043				protocol_index: 0,
3044				handshake: vec![1, 3, 3, 7],
3045			},
3046		);
3047		assert!(std::matches!(notif.peers.get(&(peer, set_id)), Some(&PeerState::Incoming { .. })));
3048
3049		notif.on_swarm_event(FromSwarm::ConnectionClosed(
3050			libp2p::swarm::behaviour::ConnectionClosed {
3051				peer_id: peer,
3052				connection_id: conn,
3053				endpoint: &connected.clone(),
3054				handler: NotifsHandler::new(peer, vec![], None),
3055				remaining_established: 0usize,
3056			},
3057		));
3058		assert!(notif.peers.get(&(peer, set_id)).is_none());
3059		assert!(std::matches!(
3060			notif.incoming[0],
3061			IncomingPeer { alive: false, incoming_id: IncomingIndex(0), .. },
3062		));
3063	}
3064
3065	#[test]
3066	fn close_connection_for_incoming_peer_two_connections() {
3067		let (mut notif, _controller, _notif_service) = development_notifs();
3068		let peer = PeerId::random();
3069		let conn = ConnectionId::new_unchecked(0);
3070		let conn1 = ConnectionId::new_unchecked(1);
3071		let set_id = SetId::from(0);
3072		let connected = ConnectedPoint::Listener {
3073			local_addr: Multiaddr::empty(),
3074			send_back_addr: Multiaddr::empty(),
3075		};
3076		let mut conns = SmallVec::<
3077			[(ConnectionId, ConnectionState); crate::MAX_CONNECTIONS_PER_PEER],
3078		>::from(vec![(conn, ConnectionState::Closed)]);
3079
3080		notif.on_swarm_event(FromSwarm::ConnectionEstablished(
3081			libp2p::swarm::behaviour::ConnectionEstablished {
3082				peer_id: peer,
3083				connection_id: conn,
3084				endpoint: &connected,
3085				failed_addresses: &[],
3086				other_established: 0usize,
3087			},
3088		));
3089		assert!(std::matches!(notif.peers.get(&(peer, set_id)), Some(&PeerState::Disabled { .. })));
3090
3091		notif.on_connection_handler_event(
3092			peer,
3093			conn,
3094			NotifsHandlerOut::OpenDesiredByRemote {
3095				protocol_index: 0,
3096				handshake: vec![1, 3, 3, 7],
3097			},
3098		);
3099		assert!(std::matches!(notif.peers.get(&(peer, set_id)), Some(&PeerState::Incoming { .. })));
3100
3101		notif.on_swarm_event(FromSwarm::ConnectionEstablished(
3102			libp2p::swarm::behaviour::ConnectionEstablished {
3103				peer_id: peer,
3104				connection_id: conn1,
3105				endpoint: &connected,
3106				failed_addresses: &[],
3107				other_established: 0usize,
3108			},
3109		));
3110		conns.push((conn1, ConnectionState::Closed));
3111
3112		if let Some(PeerState::Incoming { ref connections, .. }) = notif.peers.get(&(peer, set_id))
3113		{
3114			assert_eq!(connections.len(), 2);
3115			assert_eq!(connections[0], (conn, ConnectionState::OpenDesiredByRemote));
3116			assert_eq!(connections[1], (conn1, ConnectionState::Closed));
3117		}
3118
3119		notif.on_swarm_event(FromSwarm::ConnectionClosed(
3120			libp2p::swarm::behaviour::ConnectionClosed {
3121				peer_id: peer,
3122				connection_id: conn,
3123				endpoint: &connected.clone(),
3124				handler: NotifsHandler::new(peer, vec![], None),
3125				remaining_established: 0usize,
3126			},
3127		));
3128
3129		if let Some(PeerState::Disabled { connections, .. }) = notif.peers.get(&(peer, set_id)) {
3130			assert_eq!(connections.len(), 1);
3131			assert_eq!(connections[0], (conn1, ConnectionState::Closed));
3132		} else {
3133			panic!("invalid state");
3134		}
3135	}
3136
3137	#[test]
3138	fn connection_and_substream_open() {
3139		let (mut notif, _controller, _notif_service) = development_notifs();
3140		let peer = PeerId::random();
3141		let conn = ConnectionId::new_unchecked(0);
3142		let set_id = SetId::from(0);
3143		let connected = ConnectedPoint::Listener {
3144			local_addr: Multiaddr::empty(),
3145			send_back_addr: Multiaddr::empty(),
3146		};
3147		let mut conn_yielder = ConnectionYielder::new();
3148
3149		// move the peer to `Enabled` state
3150		notif.on_swarm_event(FromSwarm::ConnectionEstablished(
3151			libp2p::swarm::behaviour::ConnectionEstablished {
3152				peer_id: peer,
3153				connection_id: conn,
3154				endpoint: &connected,
3155				failed_addresses: &[],
3156				other_established: 0usize,
3157			},
3158		));
3159		assert!(std::matches!(notif.peers.get(&(peer, set_id)), Some(&PeerState::Disabled { .. })));
3160
3161		notif.on_connection_handler_event(
3162			peer,
3163			conn,
3164			NotifsHandlerOut::OpenDesiredByRemote {
3165				protocol_index: 0,
3166				handshake: vec![1, 3, 3, 7],
3167			},
3168		);
3169		assert!(std::matches!(notif.peers.get(&(peer, set_id)), Some(&PeerState::Incoming { .. })));
3170
3171		// We rely on the implementation detail that incoming indices are counted
3172		// from 0 to not mock the `Peerset`.
3173		notif.protocol_report_accept(IncomingIndex(0));
3174		assert!(std::matches!(notif.peers.get(&(peer, set_id)), Some(&PeerState::Enabled { .. })));
3175
3176		// open new substream
3177		let event = conn_yielder.open_substream(peer, 0, vec![1, 2, 3, 4]);
3178
3179		notif.on_connection_handler_event(peer, conn, event);
3180		assert!(std::matches!(notif.peers.get(&(peer, set_id)), Some(&PeerState::Enabled { .. })));
3181
3182		if let Some(PeerState::Enabled { ref connections, .. }) = notif.peers.get(&(peer, set_id)) {
3183			assert_eq!(connections.len(), 1);
3184			assert_eq!(connections[0].0, conn);
3185			assert!(std::matches!(connections[0].1, ConnectionState::Open(_)));
3186		}
3187
3188		assert!(std::matches!(
3189			notif.events[notif.events.len() - 1],
3190			ToSwarm::GenerateEvent(NotificationsOut::CustomProtocolOpen { .. })
3191		));
3192	}
3193
3194	#[test]
3195	fn connection_closed_sink_replaced() {
3196		let (mut notif, _controller, _notif_service) = development_notifs();
3197		let peer = PeerId::random();
3198		let conn1 = ConnectionId::new_unchecked(0);
3199		let conn2 = ConnectionId::new_unchecked(1);
3200		let set_id = SetId::from(0);
3201		let connected = ConnectedPoint::Listener {
3202			local_addr: Multiaddr::empty(),
3203			send_back_addr: Multiaddr::empty(),
3204		};
3205		let mut conn_yielder = ConnectionYielder::new();
3206
3207		// open two connections
3208		for conn_id in vec![conn1, conn2] {
3209			notif.on_swarm_event(FromSwarm::ConnectionEstablished(
3210				libp2p::swarm::behaviour::ConnectionEstablished {
3211					peer_id: peer,
3212					connection_id: conn_id,
3213					endpoint: &connected,
3214					failed_addresses: &[],
3215					other_established: 0usize,
3216				},
3217			));
3218		}
3219
3220		if let Some(PeerState::Disabled { connections, .. }) = notif.peers.get(&(peer, set_id)) {
3221			assert_eq!(connections[0], (conn1, ConnectionState::Closed));
3222			assert_eq!(connections[1], (conn2, ConnectionState::Closed));
3223		} else {
3224			panic!("invalid state");
3225		}
3226
3227		// open substreams on both active connections
3228		notif.peerset_report_connect(peer, set_id);
3229		notif.on_connection_handler_event(
3230			peer,
3231			conn2,
3232			NotifsHandlerOut::OpenDesiredByRemote {
3233				protocol_index: 0,
3234				handshake: vec![1, 3, 3, 7],
3235			},
3236		);
3237
3238		if let Some(PeerState::Enabled { connections, .. }) = notif.peers.get(&(peer, set_id)) {
3239			assert_eq!(connections[0], (conn1, ConnectionState::Opening));
3240			assert_eq!(connections[1], (conn2, ConnectionState::Opening));
3241		} else {
3242			panic!("invalid state");
3243		}
3244
3245		// add two new substreams, one for each connection and verify that both are in open state
3246		for conn in vec![conn1, conn2].iter() {
3247			notif.on_connection_handler_event(
3248				peer,
3249				*conn,
3250				conn_yielder.open_substream(peer, 0, vec![1, 2, 3, 4]),
3251			);
3252		}
3253
3254		if let Some(PeerState::Enabled { ref connections, .. }) = notif.peers.get(&(peer, set_id)) {
3255			assert_eq!(connections[0].0, conn1);
3256			assert!(std::matches!(connections[0].1, ConnectionState::Open(_)));
3257			assert_eq!(connections[1].0, conn2);
3258			assert!(std::matches!(connections[1].1, ConnectionState::Open(_)));
3259		} else {
3260			panic!("invalid state");
3261		}
3262
3263		// check peer information
3264		assert_eq!(notif.open_peers().collect::<Vec<_>>(), vec![&peer],);
3265
3266		// close the other connection and verify that notification replacement event is emitted
3267		notif.on_swarm_event(FromSwarm::ConnectionClosed(
3268			libp2p::swarm::behaviour::ConnectionClosed {
3269				peer_id: peer,
3270				connection_id: conn1,
3271				endpoint: &connected.clone(),
3272				handler: NotifsHandler::new(peer, vec![], None),
3273				remaining_established: 0usize,
3274			},
3275		));
3276
3277		if let Some(PeerState::Enabled { ref connections, .. }) = notif.peers.get(&(peer, set_id)) {
3278			assert_eq!(connections.len(), 1);
3279			assert_eq!(connections[0].0, conn2);
3280			assert!(std::matches!(connections[0].1, ConnectionState::Open(_)));
3281		} else {
3282			panic!("invalid state");
3283		}
3284
3285		assert!(std::matches!(
3286			notif.events[notif.events.len() - 1],
3287			ToSwarm::GenerateEvent(NotificationsOut::CustomProtocolReplaced { .. })
3288		));
3289	}
3290
3291	#[test]
3292	fn dial_failure_for_requested_peer() {
3293		let (mut notif, _controller, _notif_service) = development_notifs();
3294		let peer = PeerId::random();
3295		let set_id = SetId::from(0);
3296
3297		// Set peer into `Requested` state.
3298		notif.peerset_report_connect(peer, set_id);
3299		assert!(std::matches!(notif.peers.get(&(peer, set_id)), Some(&PeerState::Requested)));
3300
3301		notif.on_swarm_event(FromSwarm::DialFailure(libp2p::swarm::behaviour::DialFailure {
3302			peer_id: Some(peer),
3303			error: &libp2p::swarm::DialError::Aborted,
3304			connection_id: ConnectionId::new_unchecked(1337),
3305		}));
3306
3307		if let Some(PeerState::Backoff { timer_deadline, .. }) = notif.peers.get(&(peer, set_id)) {
3308			assert!(timer_deadline > &Instant::now());
3309		} else {
3310			panic!("invalid state");
3311		}
3312	}
3313
3314	#[tokio::test]
3315	async fn write_notification() {
3316		let (mut notif, _controller, _notif_service) = development_notifs();
3317		let peer = PeerId::random();
3318		let conn = ConnectionId::new_unchecked(0);
3319		let set_id = SetId::from(0);
3320		let connected = ConnectedPoint::Listener {
3321			local_addr: Multiaddr::empty(),
3322			send_back_addr: Multiaddr::empty(),
3323		};
3324		let mut conn_yielder = ConnectionYielder::new();
3325
3326		notif.on_swarm_event(FromSwarm::ConnectionEstablished(
3327			libp2p::swarm::behaviour::ConnectionEstablished {
3328				peer_id: peer,
3329				connection_id: conn,
3330				endpoint: &connected,
3331				failed_addresses: &[],
3332				other_established: 0usize,
3333			},
3334		));
3335		assert!(std::matches!(notif.peers.get(&(peer, set_id)), Some(&PeerState::Disabled { .. })));
3336
3337		notif.peerset_report_connect(peer, set_id);
3338		assert!(std::matches!(notif.peers.get(&(peer, set_id)), Some(&PeerState::Enabled { .. })));
3339
3340		notif.on_connection_handler_event(
3341			peer,
3342			conn,
3343			conn_yielder.open_substream(peer, 0, vec![1, 2, 3, 4]),
3344		);
3345
3346		if let Some(PeerState::Enabled { ref connections, .. }) = notif.peers.get(&(peer, set_id)) {
3347			assert_eq!(connections[0].0, conn);
3348			assert!(std::matches!(connections[0].1, ConnectionState::Open(_)));
3349		} else {
3350			panic!("invalid state");
3351		}
3352
3353		notif
3354			.peers
3355			.get(&(peer, set_id))
3356			.unwrap()
3357			.get_open()
3358			.unwrap()
3359			.send_sync_notification(vec![1, 3, 3, 7]);
3360		assert_eq!(conn_yielder.get_next_event(peer, set_id.into()).await, Some(vec![1, 3, 3, 7]));
3361	}
3362
3363	#[test]
3364	fn peerset_report_connect_backoff_expired() {
3365		let (mut notif, _controller, _notif_service) = development_notifs();
3366		let set_id = SetId::from(0);
3367		let peer = PeerId::random();
3368		let conn = ConnectionId::new_unchecked(0);
3369		let connected = ConnectedPoint::Listener {
3370			local_addr: Multiaddr::empty(),
3371			send_back_addr: Multiaddr::empty(),
3372		};
3373		let backoff_duration = Duration::from_millis(100);
3374
3375		notif.on_swarm_event(FromSwarm::ConnectionEstablished(
3376			libp2p::swarm::behaviour::ConnectionEstablished {
3377				peer_id: peer,
3378				connection_id: conn,
3379				endpoint: &connected,
3380				failed_addresses: &[],
3381				other_established: 0usize,
3382			},
3383		));
3384		assert!(std::matches!(notif.peers.get(&(peer, set_id)), Some(&PeerState::Disabled { .. })));
3385
3386		// manually add backoff for the entry
3387		if let Some(PeerState::Disabled { ref mut backoff_until, .. }) =
3388			notif.peers.get_mut(&(peer, set_id))
3389		{
3390			*backoff_until = Some(Instant::now().checked_add(backoff_duration).unwrap());
3391		}
3392
3393		notif.on_swarm_event(FromSwarm::ConnectionClosed(
3394			libp2p::swarm::behaviour::ConnectionClosed {
3395				peer_id: peer,
3396				connection_id: conn,
3397				endpoint: &connected.clone(),
3398				handler: NotifsHandler::new(peer, vec![], None),
3399				remaining_established: 0usize,
3400			},
3401		));
3402
3403		// wait until the backoff time has passed
3404		std::thread::sleep(backoff_duration * 2);
3405
3406		// attempt to connect the backed-off peer and verify that the request is pending
3407		notif.peerset_report_connect(peer, set_id);
3408		assert!(std::matches!(notif.peers.get(&(peer, set_id)), Some(&PeerState::Requested { .. })))
3409	}
3410
3411	#[test]
3412	fn peerset_report_disconnect_disabled() {
3413		let (mut notif, _controller, _notif_service) = development_notifs();
3414		let peer = PeerId::random();
3415		let set_id = SetId::from(0);
3416		let conn = ConnectionId::new_unchecked(0);
3417		let connected = ConnectedPoint::Listener {
3418			local_addr: Multiaddr::empty(),
3419			send_back_addr: Multiaddr::empty(),
3420		};
3421
3422		notif.on_swarm_event(FromSwarm::ConnectionEstablished(
3423			libp2p::swarm::behaviour::ConnectionEstablished {
3424				peer_id: peer,
3425				connection_id: conn,
3426				endpoint: &connected,
3427				failed_addresses: &[],
3428				other_established: 0usize,
3429			},
3430		));
3431		assert!(std::matches!(notif.peers.get(&(peer, set_id)), Some(&PeerState::Disabled { .. })));
3432
3433		notif.peerset_report_disconnect(peer, set_id);
3434		assert!(std::matches!(notif.peers.get(&(peer, set_id)), Some(&PeerState::Disabled { .. })));
3435	}
3436
3437	#[test]
3438	fn peerset_report_disconnect_backoff() {
3439		let (mut notif, _controller, _notif_service) = development_notifs();
3440		let set_id = SetId::from(0);
3441		let peer = PeerId::random();
3442		let conn = ConnectionId::new_unchecked(0);
3443		let connected = ConnectedPoint::Listener {
3444			local_addr: Multiaddr::empty(),
3445			send_back_addr: Multiaddr::empty(),
3446		};
3447		let backoff_duration = Duration::from_secs(2);
3448
3449		notif.on_swarm_event(FromSwarm::ConnectionEstablished(
3450			libp2p::swarm::behaviour::ConnectionEstablished {
3451				peer_id: peer,
3452				connection_id: conn,
3453				endpoint: &connected,
3454				failed_addresses: &[],
3455				other_established: 0usize,
3456			},
3457		));
3458		assert!(std::matches!(notif.peers.get(&(peer, set_id)), Some(&PeerState::Disabled { .. })));
3459
3460		// manually add backoff for the entry
3461		if let Some(PeerState::Disabled { ref mut backoff_until, .. }) =
3462			notif.peers.get_mut(&(peer, set_id))
3463		{
3464			*backoff_until = Some(Instant::now().checked_add(backoff_duration).unwrap());
3465		}
3466
3467		notif.on_swarm_event(FromSwarm::ConnectionClosed(
3468			libp2p::swarm::behaviour::ConnectionClosed {
3469				peer_id: peer,
3470				connection_id: conn,
3471				endpoint: &connected.clone(),
3472				handler: NotifsHandler::new(peer, vec![], None),
3473				remaining_established: 0usize,
3474			},
3475		));
3476
3477		assert!(std::matches!(notif.peers.get(&(peer, set_id)), Some(&PeerState::Backoff { .. })));
3478
3479		notif.peerset_report_disconnect(peer, set_id);
3480		assert!(std::matches!(notif.peers.get(&(peer, set_id)), Some(&PeerState::Backoff { .. })));
3481	}
3482
3483	#[test]
3484	fn peer_is_backed_off_if_both_connections_get_closed_while_peer_is_disabled_with_back_off() {
3485		let (mut notif, _controller, _notif_service) = development_notifs();
3486		let set_id = SetId::from(0);
3487		let peer = PeerId::random();
3488		let conn1 = ConnectionId::new_unchecked(0);
3489		let conn2 = ConnectionId::new_unchecked(1);
3490		let connected = ConnectedPoint::Listener {
3491			local_addr: Multiaddr::empty(),
3492			send_back_addr: Multiaddr::empty(),
3493		};
3494
3495		notif.on_swarm_event(FromSwarm::ConnectionEstablished(
3496			libp2p::swarm::behaviour::ConnectionEstablished {
3497				peer_id: peer,
3498				connection_id: conn1,
3499				endpoint: &connected,
3500				failed_addresses: &[],
3501				other_established: 0usize,
3502			},
3503		));
3504		notif.on_swarm_event(FromSwarm::ConnectionEstablished(
3505			libp2p::swarm::behaviour::ConnectionEstablished {
3506				peer_id: peer,
3507				connection_id: conn2,
3508				endpoint: &connected,
3509				failed_addresses: &[],
3510				other_established: 0usize,
3511			},
3512		));
3513		assert!(std::matches!(notif.peers.get(&(peer, set_id)), Some(&PeerState::Disabled { .. })));
3514
3515		// manually add backoff for the entry
3516		if let Some(PeerState::Disabled { ref mut backoff_until, .. }) =
3517			notif.peers.get_mut(&(peer, set_id))
3518		{
3519			*backoff_until =
3520				Some(Instant::now().checked_add(std::time::Duration::from_secs(5)).unwrap());
3521		}
3522
3523		// switch state to `DisabledPendingEnable`
3524		notif.peerset_report_connect(peer, set_id);
3525		assert!(std::matches!(
3526			notif.peers.get(&(peer, set_id)),
3527			Some(&PeerState::DisabledPendingEnable { .. })
3528		));
3529
3530		notif.on_swarm_event(FromSwarm::ConnectionClosed(
3531			libp2p::swarm::behaviour::ConnectionClosed {
3532				peer_id: peer,
3533				connection_id: conn1,
3534				endpoint: &connected.clone(),
3535				handler: NotifsHandler::new(peer, vec![], None),
3536				remaining_established: 0usize,
3537			},
3538		));
3539		assert!(std::matches!(
3540			notif.peers.get(&(peer, set_id)),
3541			Some(&PeerState::DisabledPendingEnable { .. })
3542		));
3543
3544		notif.on_swarm_event(FromSwarm::ConnectionClosed(
3545			libp2p::swarm::behaviour::ConnectionClosed {
3546				peer_id: peer,
3547				connection_id: conn2,
3548				endpoint: &connected.clone(),
3549				handler: NotifsHandler::new(peer, vec![], None),
3550				remaining_established: 0usize,
3551			},
3552		));
3553		assert!(std::matches!(notif.peers.get(&(peer, set_id)), Some(&PeerState::Backoff { .. })));
3554	}
3555
3556	#[test]
3557	fn inject_connection_closed_incoming_with_backoff() {
3558		let (mut notif, _controller, _notif_service) = development_notifs();
3559		let peer = PeerId::random();
3560		let set_id = SetId::from(0);
3561		let conn = ConnectionId::new_unchecked(0);
3562		let connected = ConnectedPoint::Listener {
3563			local_addr: Multiaddr::empty(),
3564			send_back_addr: Multiaddr::empty(),
3565		};
3566
3567		notif.on_swarm_event(FromSwarm::ConnectionEstablished(
3568			libp2p::swarm::behaviour::ConnectionEstablished {
3569				peer_id: peer,
3570				connection_id: conn,
3571				endpoint: &connected,
3572				failed_addresses: &[],
3573				other_established: 0usize,
3574			},
3575		));
3576		assert!(std::matches!(notif.peers.get(&(peer, set_id)), Some(&PeerState::Disabled { .. })));
3577
3578		// remote opens a substream, verify that peer state is updated to `Incoming`
3579		notif.on_connection_handler_event(
3580			peer,
3581			conn,
3582			NotifsHandlerOut::OpenDesiredByRemote {
3583				protocol_index: 0,
3584				handshake: vec![1, 3, 3, 7],
3585			},
3586		);
3587
3588		// manually add backoff for the entry
3589		if let Some(&mut PeerState::Incoming { ref mut backoff_until, .. }) =
3590			notif.peers.get_mut(&(peer, 0.into()))
3591		{
3592			*backoff_until =
3593				Some(Instant::now().checked_add(std::time::Duration::from_secs(5)).unwrap());
3594		} else {
3595			panic!("invalid state");
3596		}
3597
3598		notif.on_swarm_event(FromSwarm::ConnectionClosed(
3599			libp2p::swarm::behaviour::ConnectionClosed {
3600				peer_id: peer,
3601				connection_id: conn,
3602				endpoint: &connected.clone(),
3603				handler: NotifsHandler::new(peer, vec![], None),
3604				remaining_established: 0usize,
3605			},
3606		));
3607		assert!(std::matches!(notif.peers.get(&(peer, set_id)), Some(&PeerState::Backoff { .. })));
3608	}
3609
3610	#[test]
3611	fn two_connections_inactive_connection_gets_closed_peer_state_is_still_incoming() {
3612		let (mut notif, _controller, _notif_service) = development_notifs();
3613		let peer = PeerId::random();
3614		let conn1 = ConnectionId::new_unchecked(0);
3615		let conn2 = ConnectionId::new_unchecked(1);
3616		let set_id = SetId::from(0);
3617		let connected = ConnectedPoint::Listener {
3618			local_addr: Multiaddr::empty(),
3619			send_back_addr: Multiaddr::empty(),
3620		};
3621
3622		// open two connections
3623		for conn_id in vec![conn1, conn2] {
3624			notif.on_swarm_event(FromSwarm::ConnectionEstablished(
3625				libp2p::swarm::behaviour::ConnectionEstablished {
3626					peer_id: peer,
3627					connection_id: conn_id,
3628					endpoint: &connected,
3629					failed_addresses: &[],
3630					other_established: 0usize,
3631				},
3632			));
3633		}
3634
3635		if let Some(PeerState::Disabled { connections, .. }) = notif.peers.get(&(peer, set_id)) {
3636			assert_eq!(connections[0], (conn1, ConnectionState::Closed));
3637			assert_eq!(connections[1], (conn2, ConnectionState::Closed));
3638		} else {
3639			panic!("invalid state");
3640		}
3641
3642		// remote opens a substream, verify that peer state is updated to `Incoming`
3643		notif.on_connection_handler_event(
3644			peer,
3645			conn1,
3646			NotifsHandlerOut::OpenDesiredByRemote {
3647				protocol_index: 0,
3648				handshake: vec![1, 3, 3, 7],
3649			},
3650		);
3651		assert!(std::matches!(
3652			notif.peers.get_mut(&(peer, 0.into())),
3653			Some(&mut PeerState::Incoming { .. })
3654		));
3655
3656		notif.on_swarm_event(FromSwarm::ConnectionClosed(
3657			libp2p::swarm::behaviour::ConnectionClosed {
3658				peer_id: peer,
3659				connection_id: conn2,
3660				endpoint: &connected.clone(),
3661				handler: NotifsHandler::new(peer, vec![], None),
3662				remaining_established: 0usize,
3663			},
3664		));
3665		assert!(std::matches!(notif.peers.get(&(peer, set_id)), Some(&PeerState::Incoming { .. })));
3666	}
3667
3668	#[test]
3669	fn two_connections_active_connection_gets_closed_peer_state_is_disabled() {
3670		let (mut notif, _controller, _notif_service) = development_notifs();
3671		let peer = PeerId::random();
3672		let conn1 = ConnectionId::new_unchecked(0);
3673		let conn2 = ConnectionId::new_unchecked(1);
3674		let set_id = SetId::from(0);
3675		let connected = ConnectedPoint::Listener {
3676			local_addr: Multiaddr::empty(),
3677			send_back_addr: Multiaddr::empty(),
3678		};
3679
3680		// open two connections
3681		for conn_id in vec![conn1, conn2] {
3682			notif.on_swarm_event(FromSwarm::ConnectionEstablished(
3683				libp2p::swarm::behaviour::ConnectionEstablished {
3684					peer_id: peer,
3685					connection_id: conn_id,
3686					endpoint: &ConnectedPoint::Listener {
3687						local_addr: Multiaddr::empty(),
3688						send_back_addr: Multiaddr::empty(),
3689					},
3690					failed_addresses: &[],
3691					other_established: 0usize,
3692				},
3693			));
3694		}
3695
3696		if let Some(PeerState::Disabled { connections, .. }) = notif.peers.get(&(peer, set_id)) {
3697			assert_eq!(connections[0], (conn1, ConnectionState::Closed));
3698			assert_eq!(connections[1], (conn2, ConnectionState::Closed));
3699		} else {
3700			panic!("invalid state");
3701		}
3702
3703		// remote opens a substream, verify that peer state is updated to `Incoming`
3704		notif.on_connection_handler_event(
3705			peer,
3706			conn1,
3707			NotifsHandlerOut::OpenDesiredByRemote {
3708				protocol_index: 0,
3709				handshake: vec![1, 3, 3, 7],
3710			},
3711		);
3712		assert!(std::matches!(
3713			notif.peers.get_mut(&(peer, 0.into())),
3714			Some(PeerState::Incoming { .. })
3715		));
3716
3717		notif.on_swarm_event(FromSwarm::ConnectionClosed(
3718			libp2p::swarm::behaviour::ConnectionClosed {
3719				peer_id: peer,
3720				connection_id: conn1,
3721				endpoint: &connected.clone(),
3722				handler: NotifsHandler::new(peer, vec![], None),
3723				remaining_established: 0usize,
3724			},
3725		));
3726		assert!(std::matches!(notif.peers.get(&(peer, set_id)), Some(&PeerState::Disabled { .. })));
3727	}
3728
3729	#[test]
3730	fn inject_connection_closed_for_active_connection() {
3731		let (mut notif, _controller, _notif_service) = development_notifs();
3732		let peer = PeerId::random();
3733		let conn1 = ConnectionId::new_unchecked(0);
3734		let conn2 = ConnectionId::new_unchecked(1);
3735		let set_id = SetId::from(0);
3736		let connected = ConnectedPoint::Listener {
3737			local_addr: Multiaddr::empty(),
3738			send_back_addr: Multiaddr::empty(),
3739		};
3740		let mut conn_yielder = ConnectionYielder::new();
3741
3742		// open two connections
3743		for conn_id in vec![conn1, conn2] {
3744			notif.on_swarm_event(FromSwarm::ConnectionEstablished(
3745				libp2p::swarm::behaviour::ConnectionEstablished {
3746					peer_id: peer,
3747					connection_id: conn_id,
3748					endpoint: &connected,
3749					failed_addresses: &[],
3750					other_established: 0usize,
3751				},
3752			));
3753		}
3754
3755		if let Some(PeerState::Disabled { connections, .. }) = notif.peers.get(&(peer, set_id)) {
3756			assert_eq!(connections[0], (conn1, ConnectionState::Closed));
3757			assert_eq!(connections[1], (conn2, ConnectionState::Closed));
3758		} else {
3759			panic!("invalid state");
3760		}
3761
3762		// open substreams on both active connections
3763		notif.peerset_report_connect(peer, set_id);
3764
3765		if let Some(PeerState::Enabled { connections, .. }) = notif.peers.get(&(peer, set_id)) {
3766			assert_eq!(connections[0], (conn1, ConnectionState::Opening));
3767			assert_eq!(connections[1], (conn2, ConnectionState::Closed));
3768		} else {
3769			panic!("invalid state");
3770		}
3771
3772		notif.on_connection_handler_event(
3773			peer,
3774			conn1,
3775			conn_yielder.open_substream(peer, 0, vec![1, 2, 3, 4]),
3776		);
3777
3778		if let Some(PeerState::Enabled { ref connections, .. }) = notif.peers.get(&(peer, set_id)) {
3779			assert!(std::matches!(connections[0].1, ConnectionState::Open(_)));
3780			assert_eq!(connections[0].0, conn1);
3781			assert_eq!(connections[1], (conn2, ConnectionState::Closed));
3782		} else {
3783			panic!("invalid state");
3784		}
3785
3786		notif.on_swarm_event(FromSwarm::ConnectionClosed(
3787			libp2p::swarm::behaviour::ConnectionClosed {
3788				peer_id: peer,
3789				connection_id: conn1,
3790				endpoint: &connected.clone(),
3791				handler: NotifsHandler::new(peer, vec![], None),
3792				remaining_established: 0usize,
3793			},
3794		));
3795	}
3796
3797	#[test]
3798	fn inject_dial_failure_for_pending_request() {
3799		let (mut notif, _controller, _notif_service) = development_notifs();
3800		let set_id = SetId::from(0);
3801		let peer = PeerId::random();
3802		let conn = ConnectionId::new_unchecked(0);
3803		let connected = ConnectedPoint::Listener {
3804			local_addr: Multiaddr::empty(),
3805			send_back_addr: Multiaddr::empty(),
3806		};
3807
3808		notif.on_swarm_event(FromSwarm::ConnectionEstablished(
3809			libp2p::swarm::behaviour::ConnectionEstablished {
3810				peer_id: peer,
3811				connection_id: conn,
3812				endpoint: &connected,
3813				failed_addresses: &[],
3814				other_established: 0usize,
3815			},
3816		));
3817		assert!(std::matches!(notif.peers.get(&(peer, set_id)), Some(&PeerState::Disabled { .. })));
3818
3819		// manually add backoff for the entry
3820		if let Some(PeerState::Disabled { ref mut backoff_until, .. }) =
3821			notif.peers.get_mut(&(peer, set_id))
3822		{
3823			*backoff_until =
3824				Some(Instant::now().checked_add(std::time::Duration::from_secs(5)).unwrap());
3825		}
3826
3827		notif.on_swarm_event(FromSwarm::ConnectionClosed(
3828			libp2p::swarm::behaviour::ConnectionClosed {
3829				peer_id: peer,
3830				connection_id: conn,
3831				endpoint: &connected.clone(),
3832				handler: NotifsHandler::new(peer, vec![], None),
3833				remaining_established: 0usize,
3834			},
3835		));
3836
3837		assert!(std::matches!(notif.peers.get(&(peer, set_id)), Some(&PeerState::Backoff { .. })));
3838
3839		// attempt to connect the backed-off peer and verify that the request is pending
3840		notif.peerset_report_connect(peer, set_id);
3841		assert!(std::matches!(
3842			notif.peers.get(&(peer, set_id)),
3843			Some(&PeerState::PendingRequest { .. })
3844		));
3845
3846		let now = Instant::now();
3847		notif.on_swarm_event(FromSwarm::DialFailure(libp2p::swarm::behaviour::DialFailure {
3848			peer_id: Some(peer),
3849			error: &libp2p::swarm::DialError::Aborted,
3850			connection_id: ConnectionId::new_unchecked(0),
3851		}));
3852
3853		if let Some(PeerState::PendingRequest { ref timer_deadline, .. }) =
3854			notif.peers.get(&(peer, set_id))
3855		{
3856			assert!(timer_deadline > &(now + std::time::Duration::from_secs(5)));
3857		}
3858	}
3859
3860	#[test]
3861	fn peerstate_incoming_open_desired_by_remote() {
3862		let (mut notif, _controller, _notif_service) = development_notifs();
3863		let peer = PeerId::random();
3864		let set_id = SetId::from(0);
3865		let conn1 = ConnectionId::new_unchecked(0);
3866		let conn2 = ConnectionId::new_unchecked(1);
3867		let connected = ConnectedPoint::Listener {
3868			local_addr: Multiaddr::empty(),
3869			send_back_addr: Multiaddr::empty(),
3870		};
3871
3872		notif.on_swarm_event(FromSwarm::ConnectionEstablished(
3873			libp2p::swarm::behaviour::ConnectionEstablished {
3874				peer_id: peer,
3875				connection_id: conn1,
3876				endpoint: &connected,
3877				failed_addresses: &[],
3878				other_established: 0usize,
3879			},
3880		));
3881		notif.on_swarm_event(FromSwarm::ConnectionEstablished(
3882			libp2p::swarm::behaviour::ConnectionEstablished {
3883				peer_id: peer,
3884				connection_id: conn2,
3885				endpoint: &connected,
3886				failed_addresses: &[],
3887				other_established: 0usize,
3888			},
3889		));
3890		assert!(std::matches!(notif.peers.get(&(peer, set_id)), Some(&PeerState::Disabled { .. })));
3891
3892		// remote opens a substream, verify that peer state is updated to `Incoming`
3893		notif.on_connection_handler_event(
3894			peer,
3895			conn1,
3896			NotifsHandlerOut::OpenDesiredByRemote {
3897				protocol_index: 0,
3898				handshake: vec![1, 3, 3, 7],
3899			},
3900		);
3901		assert!(std::matches!(notif.peers.get(&(peer, set_id)), Some(&PeerState::Incoming { .. })));
3902
3903		// add another open event from remote
3904		notif.on_connection_handler_event(
3905			peer,
3906			conn2,
3907			NotifsHandlerOut::OpenDesiredByRemote {
3908				protocol_index: 0,
3909				handshake: vec![1, 3, 3, 7],
3910			},
3911		);
3912
3913		if let Some(PeerState::Incoming { ref connections, .. }) = notif.peers.get(&(peer, set_id))
3914		{
3915			assert_eq!(connections[0], (conn1, ConnectionState::OpenDesiredByRemote));
3916			assert_eq!(connections[1], (conn2, ConnectionState::OpenDesiredByRemote));
3917		}
3918	}
3919
3920	#[tokio::test]
3921	async fn remove_backoff_peer_after_timeout() {
3922		let (mut notif, _controller, _notif_service) = development_notifs();
3923		let peer = PeerId::random();
3924		let set_id = SetId::from(0);
3925		let conn = ConnectionId::new_unchecked(0);
3926		let connected = ConnectedPoint::Listener {
3927			local_addr: Multiaddr::empty(),
3928			send_back_addr: Multiaddr::empty(),
3929		};
3930
3931		notif.on_swarm_event(FromSwarm::ConnectionEstablished(
3932			libp2p::swarm::behaviour::ConnectionEstablished {
3933				peer_id: peer,
3934				connection_id: conn,
3935				endpoint: &connected,
3936				failed_addresses: &[],
3937				other_established: 0usize,
3938			},
3939		));
3940
3941		if let Some(&mut PeerState::Disabled { ref mut backoff_until, .. }) =
3942			notif.peers.get_mut(&(peer, 0.into()))
3943		{
3944			*backoff_until =
3945				Some(Instant::now().checked_add(std::time::Duration::from_millis(100)).unwrap());
3946		} else {
3947			panic!("invalid state");
3948		}
3949
3950		notif.on_swarm_event(FromSwarm::ConnectionClosed(
3951			libp2p::swarm::behaviour::ConnectionClosed {
3952				peer_id: peer,
3953				connection_id: conn,
3954				endpoint: &connected.clone(),
3955				handler: NotifsHandler::new(peer, vec![], None),
3956				remaining_established: 0usize,
3957			},
3958		));
3959
3960		let until = if let Some(&PeerState::Backoff { timer_deadline, .. }) =
3961			notif.peers.get(&(peer, set_id))
3962		{
3963			timer_deadline
3964		} else {
3965			panic!("invalid state");
3966		};
3967
3968		if until > Instant::now() {
3969			std::thread::sleep(until - Instant::now());
3970		}
3971
3972		assert!(notif.peers.get(&(peer, set_id)).is_some());
3973
3974		if tokio::time::timeout(Duration::from_secs(5), async {
3975			let mut params = MockPollParams {};
3976
3977			loop {
3978				futures::future::poll_fn(|cx| {
3979					let _ = notif.poll(cx, &mut params);
3980					Poll::Ready(())
3981				})
3982				.await;
3983
3984				if notif.peers.get(&(peer, set_id)).is_none() {
3985					break
3986				}
3987			}
3988		})
3989		.await
3990		.is_err()
3991		{
3992			panic!("backoff peer was not removed in time");
3993		}
3994
3995		assert!(notif.peers.get(&(peer, set_id)).is_none());
3996	}
3997
3998	#[tokio::test]
3999	async fn reschedule_disabled_pending_enable_when_connection_not_closed() {
4000		let (mut notif, _controller, _notif_service) = development_notifs();
4001		let peer = PeerId::random();
4002		let conn = ConnectionId::new_unchecked(0);
4003		let set_id = SetId::from(0);
4004		let mut conn_yielder = ConnectionYielder::new();
4005
4006		// move the peer to `Enabled` state
4007		notif.on_swarm_event(FromSwarm::ConnectionEstablished(
4008			libp2p::swarm::behaviour::ConnectionEstablished {
4009				peer_id: peer,
4010				connection_id: conn,
4011				endpoint: &ConnectedPoint::Listener {
4012					local_addr: Multiaddr::empty(),
4013					send_back_addr: Multiaddr::empty(),
4014				},
4015				failed_addresses: &[],
4016				other_established: 0usize,
4017			},
4018		));
4019		assert!(std::matches!(notif.peers.get(&(peer, set_id)), Some(&PeerState::Disabled { .. })));
4020
4021		// open substream
4022		notif.on_connection_handler_event(
4023			peer,
4024			conn,
4025			NotifsHandlerOut::OpenDesiredByRemote {
4026				protocol_index: 0,
4027				handshake: vec![1, 3, 3, 7],
4028			},
4029		);
4030		assert!(std::matches!(notif.peers.get(&(peer, set_id)), Some(&PeerState::Incoming { .. })));
4031
4032		// we rely on the implementation detail that incoming indices are counted from 0
4033		// to not mock the `Peerset`
4034		notif.protocol_report_accept(IncomingIndex(0));
4035		assert!(std::matches!(notif.peers.get(&(peer, set_id)), Some(&PeerState::Enabled { .. })));
4036
4037		let event = conn_yielder.open_substream(peer, 0, vec![1, 2, 3, 4]);
4038
4039		notif.on_connection_handler_event(peer, conn, event);
4040		assert!(std::matches!(notif.peers.get(&(peer, set_id)), Some(&PeerState::Enabled { .. })));
4041
4042		if let Some(PeerState::Enabled { ref connections, .. }) = notif.peers.get(&(peer, set_id)) {
4043			assert!(std::matches!(connections[0], (_, ConnectionState::Open(_))));
4044			assert_eq!(connections[0].0, conn);
4045		} else {
4046			panic!("invalid state");
4047		}
4048
4049		notif.peerset_report_disconnect(peer, set_id);
4050
4051		if let Some(PeerState::Disabled { ref connections, ref mut backoff_until }) =
4052			notif.peers.get_mut(&(peer, set_id))
4053		{
4054			assert!(std::matches!(connections[0], (_, ConnectionState::Closing)));
4055			assert_eq!(connections[0].0, conn);
4056
4057			*backoff_until =
4058				Some(Instant::now().checked_add(std::time::Duration::from_secs(2)).unwrap());
4059		} else {
4060			panic!("invalid state");
4061		}
4062
4063		notif.peerset_report_connect(peer, set_id);
4064
4065		let prev_instant =
4066			if let Some(PeerState::DisabledPendingEnable {
4067				ref connections, timer_deadline, ..
4068			}) = notif.peers.get(&(peer, set_id))
4069			{
4070				assert!(std::matches!(connections[0], (_, ConnectionState::Closing)));
4071				assert_eq!(connections[0].0, conn);
4072
4073				*timer_deadline
4074			} else {
4075				panic!("invalid state");
4076			};
4077
4078		// one of the peers has an active backoff timer so poll the notifications code until
4079		// the timer has expired. Because the connection is still in the state of `Closing`,
4080		// verify that the code continues to keep the peer disabled by resetting the timer
4081		// after the first one expired.
4082		if tokio::time::timeout(Duration::from_secs(5), async {
4083			let mut params = MockPollParams {};
4084
4085			loop {
4086				futures::future::poll_fn(|cx| {
4087					let _ = notif.poll(cx, &mut params);
4088					Poll::Ready(())
4089				})
4090				.await;
4091
4092				if let Some(PeerState::DisabledPendingEnable {
4093					timer_deadline, connections, ..
4094				}) = notif.peers.get(&(peer, set_id))
4095				{
4096					assert!(std::matches!(connections[0], (_, ConnectionState::Closing)));
4097
4098					if timer_deadline != &prev_instant {
4099						break
4100					}
4101				} else {
4102					panic!("invalid state");
4103				}
4104			}
4105		})
4106		.await
4107		.is_err()
4108		{
4109			panic!("backoff peer was not removed in time");
4110		}
4111	}
4112
4113	#[test]
4114	#[should_panic]
4115	#[cfg(debug_assertions)]
4116	fn peerset_report_connect_with_enabled_peer() {
4117		let (mut notif, _controller, _notif_service) = development_notifs();
4118		let peer = PeerId::random();
4119		let conn = ConnectionId::new_unchecked(0);
4120		let set_id = SetId::from(0);
4121		let connected = ConnectedPoint::Listener {
4122			local_addr: Multiaddr::empty(),
4123			send_back_addr: Multiaddr::empty(),
4124		};
4125		let mut conn_yielder = ConnectionYielder::new();
4126
4127		// move the peer to `Enabled` state
4128		notif.on_swarm_event(FromSwarm::ConnectionEstablished(
4129			libp2p::swarm::behaviour::ConnectionEstablished {
4130				peer_id: peer,
4131				connection_id: conn,
4132				endpoint: &connected,
4133				failed_addresses: &[],
4134				other_established: 0usize,
4135			},
4136		));
4137		assert!(std::matches!(notif.peers.get(&(peer, set_id)), Some(&PeerState::Disabled { .. })));
4138
4139		notif.on_connection_handler_event(
4140			peer,
4141			conn,
4142			NotifsHandlerOut::OpenDesiredByRemote {
4143				protocol_index: 0,
4144				handshake: vec![1, 3, 3, 7],
4145			},
4146		);
4147		assert!(std::matches!(notif.peers.get(&(peer, set_id)), Some(&PeerState::Incoming { .. })));
4148
4149		notif.peerset_report_connect(peer, set_id);
4150		assert!(std::matches!(notif.peers.get(&(peer, set_id)), Some(&PeerState::Enabled { .. })));
4151
4152		let event = conn_yielder.open_substream(peer, 0, vec![1, 2, 3, 4]);
4153
4154		notif.on_connection_handler_event(peer, conn, event);
4155		assert!(std::matches!(notif.peers.get(&(peer, set_id)), Some(&PeerState::Enabled { .. })));
4156
4157		if let Some(PeerState::Enabled { ref connections, .. }) = notif.peers.get(&(peer, set_id)) {
4158			assert!(std::matches!(connections[0], (_, ConnectionState::Open(_))));
4159			assert_eq!(connections[0].0, conn);
4160		} else {
4161			panic!("invalid state");
4162		}
4163
4164		notif.peerset_report_connect(peer, set_id);
4165	}
4166
4167	#[test]
4168	#[cfg(debug_assertions)]
4169	fn peerset_report_connect_with_disabled_pending_enable_peer() {
4170		let (mut notif, _controller, _notif_service) = development_notifs();
4171		let set_id = SetId::from(0);
4172		let peer = PeerId::random();
4173		let conn = ConnectionId::new_unchecked(0);
4174		let connected = ConnectedPoint::Listener {
4175			local_addr: Multiaddr::empty(),
4176			send_back_addr: Multiaddr::empty(),
4177		};
4178
4179		notif.on_swarm_event(FromSwarm::ConnectionEstablished(
4180			libp2p::swarm::behaviour::ConnectionEstablished {
4181				peer_id: peer,
4182				connection_id: conn,
4183				endpoint: &connected,
4184				failed_addresses: &[],
4185				other_established: 0usize,
4186			},
4187		));
4188		assert!(std::matches!(notif.peers.get(&(peer, set_id)), Some(&PeerState::Disabled { .. })));
4189
4190		// manually add backoff for the entry
4191		if let Some(PeerState::Disabled { ref mut backoff_until, .. }) =
4192			notif.peers.get_mut(&(peer, set_id))
4193		{
4194			*backoff_until =
4195				Some(Instant::now().checked_add(std::time::Duration::from_secs(5)).unwrap());
4196		}
4197
4198		// switch state to `DisabledPendingEnable`
4199		notif.peerset_report_connect(peer, set_id);
4200		assert!(std::matches!(
4201			notif.peers.get(&(peer, set_id)),
4202			Some(&PeerState::DisabledPendingEnable { .. })
4203		));
4204
4205		// duplicate "connect" must not change the state
4206		notif.peerset_report_connect(peer, set_id);
4207		assert!(std::matches!(
4208			notif.peers.get(&(peer, set_id)),
4209			Some(&PeerState::DisabledPendingEnable { .. })
4210		));
4211	}
4212
4213	#[test]
4214	#[cfg(debug_assertions)]
4215	fn peerset_report_connect_with_requested_peer() {
4216		let (mut notif, _controller, _notif_service) = development_notifs();
4217		let peer = PeerId::random();
4218		let set_id = SetId::from(0);
4219
4220		// Set peer into `Requested` state.
4221		notif.peerset_report_connect(peer, set_id);
4222		assert!(std::matches!(notif.peers.get(&(peer, set_id)), Some(&PeerState::Requested)));
4223
4224		// Duplicate "connect" must not change the state.
4225		notif.peerset_report_connect(peer, set_id);
4226		assert!(std::matches!(notif.peers.get(&(peer, set_id)), Some(&PeerState::Requested)));
4227	}
4228
4229	#[test]
4230	#[cfg(debug_assertions)]
4231	fn peerset_report_connect_with_pending_requested() {
4232		let (mut notif, _controller, _notif_service) = development_notifs();
4233		let set_id = SetId::from(0);
4234		let peer = PeerId::random();
4235		let conn = ConnectionId::new_unchecked(0);
4236		let connected = ConnectedPoint::Listener {
4237			local_addr: Multiaddr::empty(),
4238			send_back_addr: Multiaddr::empty(),
4239		};
4240
4241		notif.on_swarm_event(FromSwarm::ConnectionEstablished(
4242			libp2p::swarm::behaviour::ConnectionEstablished {
4243				peer_id: peer,
4244				connection_id: conn,
4245				endpoint: &connected,
4246				failed_addresses: &[],
4247				other_established: 0usize,
4248			},
4249		));
4250		assert!(std::matches!(notif.peers.get(&(peer, set_id)), Some(&PeerState::Disabled { .. })));
4251
4252		// manually add backoff for the entry
4253		if let Some(PeerState::Disabled { ref mut backoff_until, .. }) =
4254			notif.peers.get_mut(&(peer, set_id))
4255		{
4256			*backoff_until =
4257				Some(Instant::now().checked_add(std::time::Duration::from_secs(5)).unwrap());
4258		}
4259
4260		notif.on_swarm_event(FromSwarm::ConnectionClosed(
4261			libp2p::swarm::behaviour::ConnectionClosed {
4262				peer_id: peer,
4263				connection_id: conn,
4264				endpoint: &connected.clone(),
4265				handler: NotifsHandler::new(peer, vec![], None),
4266				remaining_established: 0usize,
4267			},
4268		));
4269		assert!(std::matches!(notif.peers.get(&(peer, set_id)), Some(&PeerState::Backoff { .. })));
4270
4271		// attempt to connect the backed-off peer and verify that the request is pending
4272		notif.peerset_report_connect(peer, set_id);
4273		assert!(std::matches!(
4274			notif.peers.get(&(peer, set_id)),
4275			Some(&PeerState::PendingRequest { .. })
4276		));
4277
4278		// duplicate "connect" must not change the state
4279		notif.peerset_report_connect(peer, set_id);
4280		assert!(std::matches!(
4281			notif.peers.get(&(peer, set_id)),
4282			Some(&PeerState::PendingRequest { .. })
4283		));
4284	}
4285
4286	#[test]
4287	#[cfg(debug_assertions)]
4288	fn peerset_report_connect_with_incoming_peer() {
4289		let (mut notif, _controller, _notif_service) = development_notifs();
4290		let peer = PeerId::random();
4291		let set_id = SetId::from(0);
4292		let conn = ConnectionId::new_unchecked(0);
4293		let connected = ConnectedPoint::Listener {
4294			local_addr: Multiaddr::empty(),
4295			send_back_addr: Multiaddr::empty(),
4296		};
4297
4298		notif.on_swarm_event(FromSwarm::ConnectionEstablished(
4299			libp2p::swarm::behaviour::ConnectionEstablished {
4300				peer_id: peer,
4301				connection_id: conn,
4302				endpoint: &connected,
4303				failed_addresses: &[],
4304				other_established: 0usize,
4305			},
4306		));
4307		assert!(std::matches!(notif.peers.get(&(peer, set_id)), Some(&PeerState::Disabled { .. })));
4308
4309		// remote opens a substream, verify that peer state is updated to `Incoming`
4310		notif.on_connection_handler_event(
4311			peer,
4312			conn,
4313			NotifsHandlerOut::OpenDesiredByRemote {
4314				protocol_index: 0,
4315				handshake: vec![1, 3, 3, 7],
4316			},
4317		);
4318		assert!(std::matches!(notif.peers.get(&(peer, set_id)), Some(&PeerState::Incoming { .. })));
4319
4320		notif.peerset_report_connect(peer, set_id);
4321		assert!(std::matches!(notif.peers.get(&(peer, set_id)), Some(&PeerState::Incoming { .. })));
4322	}
4323
4324	#[test]
4325	#[cfg(debug_assertions)]
4326	fn peerset_report_disconnect_with_incoming_peer() {
4327		let (mut notif, _controller, _notif_service) = development_notifs();
4328		let peer = PeerId::random();
4329		let set_id = SetId::from(0);
4330		let conn = ConnectionId::new_unchecked(0);
4331		let connected = ConnectedPoint::Listener {
4332			local_addr: Multiaddr::empty(),
4333			send_back_addr: Multiaddr::empty(),
4334		};
4335
4336		notif.on_swarm_event(FromSwarm::ConnectionEstablished(
4337			libp2p::swarm::behaviour::ConnectionEstablished {
4338				peer_id: peer,
4339				connection_id: conn,
4340				endpoint: &connected,
4341				failed_addresses: &[],
4342				other_established: 0usize,
4343			},
4344		));
4345		assert!(std::matches!(notif.peers.get(&(peer, set_id)), Some(&PeerState::Disabled { .. })));
4346
4347		// remote opens a substream, verify that peer state is updated to `Incoming`
4348		notif.on_connection_handler_event(
4349			peer,
4350			conn,
4351			NotifsHandlerOut::OpenDesiredByRemote {
4352				protocol_index: 0,
4353				handshake: vec![1, 3, 3, 7],
4354			},
4355		);
4356		assert!(std::matches!(notif.peers.get(&(peer, set_id)), Some(&PeerState::Incoming { .. })));
4357
4358		notif.peerset_report_disconnect(peer, set_id);
4359		assert!(std::matches!(notif.peers.get(&(peer, set_id)), Some(&PeerState::Incoming { .. })));
4360	}
4361
4362	#[test]
4363	#[cfg(debug_assertions)]
4364	fn peerset_report_disconnect_with_incoming_peer_protocol_accepts() {
4365		let (mut notif, _controller, _notif_service) = development_notifs();
4366		let peer = PeerId::random();
4367		let set_id = SetId::from(0);
4368		let conn = ConnectionId::new_unchecked(0);
4369		let connected = ConnectedPoint::Listener {
4370			local_addr: Multiaddr::empty(),
4371			send_back_addr: Multiaddr::empty(),
4372		};
4373
4374		notif.on_swarm_event(FromSwarm::ConnectionEstablished(
4375			libp2p::swarm::behaviour::ConnectionEstablished {
4376				peer_id: peer,
4377				connection_id: conn,
4378				endpoint: &connected,
4379				failed_addresses: &[],
4380				other_established: 0usize,
4381			},
4382		));
4383		assert!(std::matches!(notif.peers.get(&(peer, set_id)), Some(&PeerState::Disabled { .. })));
4384
4385		// remote opens a substream, verify that peer state is updated to `Incoming`
4386		notif.on_connection_handler_event(
4387			peer,
4388			conn,
4389			NotifsHandlerOut::OpenDesiredByRemote {
4390				protocol_index: 0,
4391				handshake: vec![1, 3, 3, 7],
4392			},
4393		);
4394		assert!(std::matches!(notif.peers.get(&(peer, set_id)), Some(&PeerState::Incoming { .. })));
4395
4396		// `Peerset` wants to disconnect the peer but since it's still under validation,
4397		// it won't be disabled automatically
4398		notif.peerset_report_disconnect(peer, set_id);
4399
4400		let incoming_index = match notif.peers.get(&(peer, set_id)) {
4401			Some(&PeerState::Incoming { peerset_rejected, incoming_index, .. }) => {
4402				assert!(peerset_rejected);
4403				incoming_index
4404			},
4405			state => panic!("invalid state: {state:?}"),
4406		};
4407
4408		// protocol accepted peer but since `Peerset` wanted to disconnect it, the peer will be
4409		// disabled
4410		notif.protocol_report_accept(incoming_index);
4411
4412		match notif.peers.get(&(peer, set_id)) {
4413			Some(&PeerState::Disabled { .. }) => {},
4414			state => panic!("invalid state: {state:?}"),
4415		};
4416	}
4417
4418	#[test]
4419	#[cfg(debug_assertions)]
4420	fn peer_disconnected_protocol_accepts() {
4421		let (mut notif, _controller, _notif_service) = development_notifs();
4422		let peer = PeerId::random();
4423		let set_id = SetId::from(0);
4424		let conn = ConnectionId::new_unchecked(0);
4425		let connected = ConnectedPoint::Listener {
4426			local_addr: Multiaddr::empty(),
4427			send_back_addr: Multiaddr::empty(),
4428		};
4429
4430		notif.on_swarm_event(FromSwarm::ConnectionEstablished(
4431			libp2p::swarm::behaviour::ConnectionEstablished {
4432				peer_id: peer,
4433				connection_id: conn,
4434				endpoint: &connected,
4435				failed_addresses: &[],
4436				other_established: 0usize,
4437			},
4438		));
4439		assert!(std::matches!(notif.peers.get(&(peer, set_id)), Some(&PeerState::Disabled { .. })));
4440
4441		// remote opens a substream, verify that peer state is updated to `Incoming`
4442		notif.on_connection_handler_event(
4443			peer,
4444			conn,
4445			NotifsHandlerOut::OpenDesiredByRemote {
4446				protocol_index: 0,
4447				handshake: vec![1, 3, 3, 7],
4448			},
4449		);
4450		assert!(std::matches!(notif.peers.get(&(peer, set_id)), Some(&PeerState::Incoming { .. })));
4451
4452		assert!(notif.incoming.iter().any(|entry| entry.incoming_id == IncomingIndex(0)));
4453		notif.disconnect_peer(&peer, set_id);
4454
4455		// since the connection was closed, nothing happens for the peer state because
4456		// there is nothing actionable
4457		notif.protocol_report_accept(IncomingIndex(0));
4458
4459		match notif.peers.get(&(peer, set_id)) {
4460			Some(&PeerState::Disabled { .. }) => {},
4461			state => panic!("invalid state: {state:?}"),
4462		};
4463
4464		assert!(!notif.incoming.iter().any(|entry| entry.incoming_id == IncomingIndex(0)));
4465	}
4466
4467	#[test]
4468	#[cfg(debug_assertions)]
4469	fn connection_closed_protocol_accepts() {
4470		let (mut notif, _controller, _notif_service) = development_notifs();
4471		let peer = PeerId::random();
4472		let set_id = SetId::from(0);
4473		let conn = ConnectionId::new_unchecked(0);
4474		let connected = ConnectedPoint::Listener {
4475			local_addr: Multiaddr::empty(),
4476			send_back_addr: Multiaddr::empty(),
4477		};
4478
4479		notif.on_swarm_event(FromSwarm::ConnectionEstablished(
4480			libp2p::swarm::behaviour::ConnectionEstablished {
4481				peer_id: peer,
4482				connection_id: conn,
4483				endpoint: &connected,
4484				failed_addresses: &[],
4485				other_established: 0usize,
4486			},
4487		));
4488		assert!(std::matches!(notif.peers.get(&(peer, set_id)), Some(&PeerState::Disabled { .. })));
4489
4490		// remote opens a substream, verify that peer state is updated to `Incoming`
4491		notif.on_connection_handler_event(
4492			peer,
4493			conn,
4494			NotifsHandlerOut::OpenDesiredByRemote {
4495				protocol_index: 0,
4496				handshake: vec![1, 3, 3, 7],
4497			},
4498		);
4499		assert!(std::matches!(notif.peers.get(&(peer, set_id)), Some(&PeerState::Incoming { .. })));
4500
4501		notif.on_swarm_event(FromSwarm::ConnectionClosed(
4502			libp2p::swarm::behaviour::ConnectionClosed {
4503				peer_id: peer,
4504				connection_id: ConnectionId::new_unchecked(0),
4505				endpoint: &connected.clone(),
4506				handler: NotifsHandler::new(peer, vec![], None),
4507				remaining_established: 0usize,
4508			},
4509		));
4510
4511		// connection closed, nothing to do
4512		notif.protocol_report_accept(IncomingIndex(0));
4513
4514		match notif.peers.get(&(peer, set_id)) {
4515			None => {},
4516			state => panic!("invalid state: {state:?}"),
4517		};
4518	}
4519
4520	#[test]
4521	#[cfg(debug_assertions)]
4522	fn peer_disconnected_protocol_reject() {
4523		let (mut notif, _controller, _notif_service) = development_notifs();
4524		let peer = PeerId::random();
4525		let set_id = SetId::from(0);
4526		let conn = ConnectionId::new_unchecked(0);
4527		let connected = ConnectedPoint::Listener {
4528			local_addr: Multiaddr::empty(),
4529			send_back_addr: Multiaddr::empty(),
4530		};
4531
4532		notif.on_swarm_event(FromSwarm::ConnectionEstablished(
4533			libp2p::swarm::behaviour::ConnectionEstablished {
4534				peer_id: peer,
4535				connection_id: conn,
4536				endpoint: &connected,
4537				failed_addresses: &[],
4538				other_established: 0usize,
4539			},
4540		));
4541		assert!(std::matches!(notif.peers.get(&(peer, set_id)), Some(&PeerState::Disabled { .. })));
4542
4543		// remote opens a substream, verify that peer state is updated to `Incoming`
4544		notif.on_connection_handler_event(
4545			peer,
4546			conn,
4547			NotifsHandlerOut::OpenDesiredByRemote {
4548				protocol_index: 0,
4549				handshake: vec![1, 3, 3, 7],
4550			},
4551		);
4552		assert!(std::matches!(notif.peers.get(&(peer, set_id)), Some(&PeerState::Incoming { .. })));
4553
4554		assert!(notif.incoming.iter().any(|entry| entry.incoming_id == IncomingIndex(0)));
4555		notif.disconnect_peer(&peer, set_id);
4556
4557		// since the connection was closed, nothing happens for the peer state because
4558		// there is nothing actionable
4559		notif.protocol_report_reject(IncomingIndex(0));
4560
4561		match notif.peers.get(&(peer, set_id)) {
4562			Some(&PeerState::Disabled { .. }) => {},
4563			state => panic!("invalid state: {state:?}"),
4564		};
4565
4566		assert!(!notif.incoming.iter().any(|entry| entry.incoming_id == IncomingIndex(0)));
4567	}
4568
4569	#[test]
4570	#[cfg(debug_assertions)]
4571	fn connection_closed_protocol_rejects() {
4572		let (mut notif, _controller, _notif_service) = development_notifs();
4573		let peer = PeerId::random();
4574		let set_id = SetId::from(0);
4575		let conn = ConnectionId::new_unchecked(0);
4576		let connected = ConnectedPoint::Listener {
4577			local_addr: Multiaddr::empty(),
4578			send_back_addr: Multiaddr::empty(),
4579		};
4580
4581		notif.on_swarm_event(FromSwarm::ConnectionEstablished(
4582			libp2p::swarm::behaviour::ConnectionEstablished {
4583				peer_id: peer,
4584				connection_id: conn,
4585				endpoint: &connected,
4586				failed_addresses: &[],
4587				other_established: 0usize,
4588			},
4589		));
4590		assert!(std::matches!(notif.peers.get(&(peer, set_id)), Some(&PeerState::Disabled { .. })));
4591
4592		// remote opens a substream, verify that peer state is updated to `Incoming`
4593		notif.on_connection_handler_event(
4594			peer,
4595			conn,
4596			NotifsHandlerOut::OpenDesiredByRemote {
4597				protocol_index: 0,
4598				handshake: vec![1, 3, 3, 7],
4599			},
4600		);
4601		assert!(std::matches!(notif.peers.get(&(peer, set_id)), Some(&PeerState::Incoming { .. })));
4602
4603		notif.on_swarm_event(FromSwarm::ConnectionClosed(
4604			libp2p::swarm::behaviour::ConnectionClosed {
4605				peer_id: peer,
4606				connection_id: ConnectionId::new_unchecked(0),
4607				endpoint: &connected.clone(),
4608				handler: NotifsHandler::new(peer, vec![], None),
4609				remaining_established: 0usize,
4610			},
4611		));
4612
4613		// connection closed, nothing to do
4614		notif.protocol_report_reject(IncomingIndex(0));
4615
4616		match notif.peers.get(&(peer, set_id)) {
4617			None => {},
4618			state => panic!("invalid state: {state:?}"),
4619		};
4620	}
4621
4622	#[test]
4623	#[should_panic]
4624	#[cfg(debug_assertions)]
4625	fn protocol_report_accept_not_incoming_peer() {
4626		let (mut notif, _controller, _notif_service) = development_notifs();
4627		let peer = PeerId::random();
4628		let conn = ConnectionId::new_unchecked(0);
4629		let set_id = SetId::from(0);
4630		let connected = ConnectedPoint::Listener {
4631			local_addr: Multiaddr::empty(),
4632			send_back_addr: Multiaddr::empty(),
4633		};
4634		let mut conn_yielder = ConnectionYielder::new();
4635
4636		notif.on_swarm_event(FromSwarm::ConnectionEstablished(
4637			libp2p::swarm::behaviour::ConnectionEstablished {
4638				peer_id: peer,
4639				connection_id: conn,
4640				endpoint: &connected,
4641				failed_addresses: &[],
4642				other_established: 0usize,
4643			},
4644		));
4645		assert!(std::matches!(notif.peers.get(&(peer, set_id)), Some(&PeerState::Disabled { .. })));
4646
4647		// remote opens a substream, verify that peer state is updated to `Incoming`
4648		notif.on_connection_handler_event(
4649			peer,
4650			conn,
4651			NotifsHandlerOut::OpenDesiredByRemote {
4652				protocol_index: 0,
4653				handshake: vec![1, 3, 3, 7],
4654			},
4655		);
4656		assert!(std::matches!(notif.peers.get(&(peer, set_id)), Some(&PeerState::Incoming { .. })));
4657
4658		assert!(std::matches!(
4659			notif.incoming[0],
4660			IncomingPeer { alive: true, incoming_id: IncomingIndex(0), .. },
4661		));
4662
4663		notif.peerset_report_connect(peer, set_id);
4664		assert!(std::matches!(notif.peers.get(&(peer, set_id)), Some(&PeerState::Enabled { .. })));
4665
4666		let event = conn_yielder.open_substream(peer, 0, vec![1, 2, 3, 4]);
4667		notif.on_connection_handler_event(peer, conn, event);
4668
4669		assert!(std::matches!(notif.peers.get(&(peer, set_id)), Some(&PeerState::Enabled { .. })));
4670		notif.incoming[0].alive = true;
4671		notif.protocol_report_accept(IncomingIndex(0));
4672	}
4673
4674	#[test]
4675	#[should_panic]
4676	#[cfg(debug_assertions)]
4677	fn inject_connection_closed_non_existent_peer() {
4678		let (mut notif, _controller, _notif_service) = development_notifs();
4679		let peer = PeerId::random();
4680		let endpoint = ConnectedPoint::Listener {
4681			local_addr: Multiaddr::empty(),
4682			send_back_addr: Multiaddr::empty(),
4683		};
4684
4685		notif.on_swarm_event(FromSwarm::ConnectionClosed(
4686			libp2p::swarm::behaviour::ConnectionClosed {
4687				peer_id: peer,
4688				connection_id: ConnectionId::new_unchecked(0),
4689				endpoint: &endpoint.clone(),
4690				handler: NotifsHandler::new(peer, vec![], None),
4691				remaining_established: 0usize,
4692			},
4693		));
4694	}
4695
4696	#[test]
4697	fn disconnect_non_existent_peer() {
4698		let (mut notif, _controller, _notif_service) = development_notifs();
4699		let peer = PeerId::random();
4700		let set_id = SetId::from(0);
4701
4702		notif.peerset_report_disconnect(peer, set_id);
4703
4704		assert!(notif.peers.is_empty());
4705		assert!(notif.incoming.is_empty());
4706	}
4707
4708	#[test]
4709	fn accept_non_existent_connection() {
4710		let (mut notif, _controller, _notif_service) = development_notifs();
4711
4712		notif.protocol_report_accept(0.into());
4713
4714		assert!(notif.peers.is_empty());
4715		assert!(notif.incoming.is_empty());
4716	}
4717
4718	#[test]
4719	fn reject_non_existent_connection() {
4720		let (mut notif, _controller, _notif_service) = development_notifs();
4721
4722		notif.protocol_report_reject(0.into());
4723
4724		assert!(notif.peers.is_empty());
4725		assert!(notif.incoming.is_empty());
4726	}
4727
4728	#[test]
4729	fn reject_non_active_connection() {
4730		let (mut notif, _controller, _notif_service) = development_notifs();
4731		let peer = PeerId::random();
4732		let conn = ConnectionId::new_unchecked(0);
4733		let set_id = SetId::from(0);
4734		let connected = ConnectedPoint::Listener {
4735			local_addr: Multiaddr::empty(),
4736			send_back_addr: Multiaddr::empty(),
4737		};
4738
4739		notif.on_swarm_event(FromSwarm::ConnectionEstablished(
4740			libp2p::swarm::behaviour::ConnectionEstablished {
4741				peer_id: peer,
4742				connection_id: conn,
4743				endpoint: &connected,
4744				failed_addresses: &[],
4745				other_established: 0usize,
4746			},
4747		));
4748		assert!(std::matches!(notif.peers.get(&(peer, set_id)), Some(&PeerState::Disabled { .. })));
4749
4750		// remote opens a substream, verify that peer state is updated to `Incoming`
4751		notif.on_connection_handler_event(
4752			peer,
4753			conn,
4754			NotifsHandlerOut::OpenDesiredByRemote {
4755				protocol_index: 0,
4756				handshake: vec![1, 3, 3, 7],
4757			},
4758		);
4759		assert!(std::matches!(notif.peers.get(&(peer, set_id)), Some(&PeerState::Incoming { .. })));
4760
4761		notif.incoming[0].alive = false;
4762		notif.protocol_report_reject(0.into());
4763
4764		assert!(std::matches!(notif.peers.get(&(peer, set_id)), Some(&PeerState::Incoming { .. })));
4765	}
4766
4767	#[test]
4768	#[should_panic]
4769	#[cfg(debug_assertions)]
4770	fn inject_non_existent_connection_closed_for_incoming_peer() {
4771		let (mut notif, _controller, _notif_service) = development_notifs();
4772		let peer = PeerId::random();
4773		let conn = ConnectionId::new_unchecked(0);
4774		let set_id = SetId::from(0);
4775		let connected = ConnectedPoint::Listener {
4776			local_addr: Multiaddr::empty(),
4777			send_back_addr: Multiaddr::empty(),
4778		};
4779
4780		notif.on_swarm_event(FromSwarm::ConnectionEstablished(
4781			libp2p::swarm::behaviour::ConnectionEstablished {
4782				peer_id: peer,
4783				connection_id: conn,
4784				endpoint: &connected,
4785				failed_addresses: &[],
4786				other_established: 0usize,
4787			},
4788		));
4789		assert!(std::matches!(notif.peers.get(&(peer, set_id)), Some(&PeerState::Disabled { .. })));
4790
4791		// remote opens a substream, verify that peer state is updated to `Incoming`
4792		notif.on_connection_handler_event(
4793			peer,
4794			conn,
4795			NotifsHandlerOut::OpenDesiredByRemote {
4796				protocol_index: 0,
4797				handshake: vec![1, 3, 3, 7],
4798			},
4799		);
4800		assert!(std::matches!(notif.peers.get(&(peer, set_id)), Some(&PeerState::Incoming { .. })));
4801
4802		notif.on_swarm_event(FromSwarm::ConnectionClosed(
4803			libp2p::swarm::behaviour::ConnectionClosed {
4804				peer_id: peer,
4805				connection_id: ConnectionId::new_unchecked(1337),
4806				endpoint: &connected.clone(),
4807				handler: NotifsHandler::new(peer, vec![], None),
4808				remaining_established: 0usize,
4809			},
4810		));
4811	}
4812
4813	#[test]
4814	#[should_panic]
4815	#[cfg(debug_assertions)]
4816	fn inject_non_existent_connection_closed_for_disabled_peer() {
4817		let (mut notif, _controller, _notif_service) = development_notifs();
4818		let set_id = SetId::from(0);
4819		let peer = PeerId::random();
4820		let conn = ConnectionId::new_unchecked(0);
4821		let connected = ConnectedPoint::Listener {
4822			local_addr: Multiaddr::empty(),
4823			send_back_addr: Multiaddr::empty(),
4824		};
4825
4826		notif.on_swarm_event(FromSwarm::ConnectionEstablished(
4827			libp2p::swarm::behaviour::ConnectionEstablished {
4828				peer_id: peer,
4829				connection_id: conn,
4830				endpoint: &connected,
4831				failed_addresses: &[],
4832				other_established: 0usize,
4833			},
4834		));
4835		assert!(std::matches!(notif.peers.get(&(peer, set_id)), Some(&PeerState::Disabled { .. })));
4836
4837		notif.on_swarm_event(FromSwarm::ConnectionClosed(
4838			libp2p::swarm::behaviour::ConnectionClosed {
4839				peer_id: peer,
4840				connection_id: ConnectionId::new_unchecked(1337),
4841				endpoint: &connected.clone(),
4842				handler: NotifsHandler::new(peer, vec![], None),
4843				remaining_established: 0usize,
4844			},
4845		));
4846	}
4847
4848	#[test]
4849	#[should_panic]
4850	#[cfg(debug_assertions)]
4851	fn inject_non_existent_connection_closed_for_disabled_pending_enable() {
4852		let (mut notif, _controller, _notif_service) = development_notifs();
4853		let set_id = SetId::from(0);
4854		let peer = PeerId::random();
4855		let conn = ConnectionId::new_unchecked(0);
4856		let connected = ConnectedPoint::Listener {
4857			local_addr: Multiaddr::empty(),
4858			send_back_addr: Multiaddr::empty(),
4859		};
4860
4861		notif.on_swarm_event(FromSwarm::ConnectionEstablished(
4862			libp2p::swarm::behaviour::ConnectionEstablished {
4863				peer_id: peer,
4864				connection_id: conn,
4865				endpoint: &connected,
4866				failed_addresses: &[],
4867				other_established: 0usize,
4868			},
4869		));
4870		assert!(std::matches!(notif.peers.get(&(peer, set_id)), Some(&PeerState::Disabled { .. })));
4871
4872		// manually add backoff for the entry
4873		if let Some(PeerState::Disabled { ref mut backoff_until, .. }) =
4874			notif.peers.get_mut(&(peer, set_id))
4875		{
4876			*backoff_until =
4877				Some(Instant::now().checked_add(std::time::Duration::from_secs(5)).unwrap());
4878		}
4879
4880		// switch state to `DisabledPendingEnable`
4881		notif.peerset_report_connect(peer, set_id);
4882
4883		assert!(std::matches!(
4884			notif.peers.get(&(peer, set_id)),
4885			Some(&PeerState::DisabledPendingEnable { .. })
4886		));
4887
4888		notif.on_swarm_event(FromSwarm::ConnectionClosed(
4889			libp2p::swarm::behaviour::ConnectionClosed {
4890				peer_id: peer,
4891				connection_id: ConnectionId::new_unchecked(1337),
4892				endpoint: &connected.clone(),
4893				handler: NotifsHandler::new(peer, vec![], None),
4894				remaining_established: 0usize,
4895			},
4896		));
4897	}
4898
4899	#[test]
4900	#[should_panic]
4901	#[cfg(debug_assertions)]
4902	fn inject_connection_closed_for_incoming_peer_state_mismatch() {
4903		let (mut notif, _controller, _notif_service) = development_notifs();
4904		let peer = PeerId::random();
4905		let conn = ConnectionId::new_unchecked(0);
4906		let set_id = SetId::from(0);
4907		let connected = ConnectedPoint::Listener {
4908			local_addr: Multiaddr::empty(),
4909			send_back_addr: Multiaddr::empty(),
4910		};
4911
4912		notif.on_swarm_event(FromSwarm::ConnectionEstablished(
4913			libp2p::swarm::behaviour::ConnectionEstablished {
4914				peer_id: peer,
4915				connection_id: conn,
4916				endpoint: &connected,
4917				failed_addresses: &[],
4918				other_established: 0usize,
4919			},
4920		));
4921		assert!(std::matches!(notif.peers.get(&(peer, set_id)), Some(&PeerState::Disabled { .. })));
4922
4923		// remote opens a substream, verify that peer state is updated to `Incoming`
4924		notif.on_connection_handler_event(
4925			peer,
4926			conn,
4927			NotifsHandlerOut::OpenDesiredByRemote {
4928				protocol_index: 0,
4929				handshake: vec![1, 3, 3, 7],
4930			},
4931		);
4932		assert!(std::matches!(notif.peers.get(&(peer, set_id)), Some(&PeerState::Incoming { .. })));
4933		notif.incoming[0].alive = false;
4934
4935		notif.on_swarm_event(FromSwarm::ConnectionClosed(
4936			libp2p::swarm::behaviour::ConnectionClosed {
4937				peer_id: peer,
4938				connection_id: conn,
4939				endpoint: &connected.clone(),
4940				handler: NotifsHandler::new(peer, vec![], None),
4941				remaining_established: 0usize,
4942			},
4943		));
4944	}
4945
4946	#[test]
4947	#[should_panic]
4948	#[cfg(debug_assertions)]
4949	fn inject_connection_closed_for_enabled_state_mismatch() {
4950		let (mut notif, _controller, _notif_service) = development_notifs();
4951		let peer = PeerId::random();
4952		let conn = ConnectionId::new_unchecked(0);
4953		let set_id = SetId::from(0);
4954		let connected = ConnectedPoint::Listener {
4955			local_addr: Multiaddr::empty(),
4956			send_back_addr: Multiaddr::empty(),
4957		};
4958
4959		notif.on_swarm_event(FromSwarm::ConnectionEstablished(
4960			libp2p::swarm::behaviour::ConnectionEstablished {
4961				peer_id: peer,
4962				connection_id: conn,
4963				endpoint: &connected,
4964				failed_addresses: &[],
4965				other_established: 0usize,
4966			},
4967		));
4968		assert!(std::matches!(notif.peers.get(&(peer, set_id)), Some(&PeerState::Disabled { .. })));
4969
4970		// remote opens a substream, verify that peer state is updated to `Incoming`
4971		notif.on_connection_handler_event(
4972			peer,
4973			conn,
4974			NotifsHandlerOut::OpenDesiredByRemote {
4975				protocol_index: 0,
4976				handshake: vec![1, 3, 3, 7],
4977			},
4978		);
4979		assert!(std::matches!(notif.peers.get(&(peer, set_id)), Some(&PeerState::Incoming { .. })));
4980
4981		// attempt to connect to the peer and verify that the peer state is `Enabled`
4982		notif.peerset_report_connect(peer, set_id);
4983		assert!(std::matches!(notif.peers.get(&(peer, set_id)), Some(&PeerState::Enabled { .. })));
4984
4985		notif.on_swarm_event(FromSwarm::ConnectionClosed(
4986			libp2p::swarm::behaviour::ConnectionClosed {
4987				peer_id: peer,
4988				connection_id: ConnectionId::new_unchecked(1337),
4989				endpoint: &connected.clone(),
4990				handler: NotifsHandler::new(peer, vec![], None),
4991				remaining_established: 0usize,
4992			},
4993		));
4994	}
4995
4996	#[test]
4997	#[should_panic]
4998	#[cfg(debug_assertions)]
4999	fn inject_connection_closed_for_backoff_peer() {
5000		let (mut notif, _controller, _notif_service) = development_notifs();
5001		let set_id = SetId::from(0);
5002		let peer = PeerId::random();
5003		let conn = ConnectionId::new_unchecked(0);
5004		let connected = ConnectedPoint::Listener {
5005			local_addr: Multiaddr::empty(),
5006			send_back_addr: Multiaddr::empty(),
5007		};
5008
5009		notif.on_swarm_event(FromSwarm::ConnectionEstablished(
5010			libp2p::swarm::behaviour::ConnectionEstablished {
5011				peer_id: peer,
5012				connection_id: conn,
5013				endpoint: &connected,
5014				failed_addresses: &[],
5015				other_established: 0usize,
5016			},
5017		));
5018		assert!(std::matches!(notif.peers.get(&(peer, set_id)), Some(&PeerState::Disabled { .. })));
5019
5020		// manually add backoff for the entry
5021		if let Some(PeerState::Disabled { ref mut backoff_until, .. }) =
5022			notif.peers.get_mut(&(peer, set_id))
5023		{
5024			*backoff_until =
5025				Some(Instant::now().checked_add(std::time::Duration::from_secs(5)).unwrap());
5026		}
5027
5028		notif.on_swarm_event(FromSwarm::ConnectionClosed(
5029			libp2p::swarm::behaviour::ConnectionClosed {
5030				peer_id: peer,
5031				connection_id: conn,
5032				endpoint: &connected.clone(),
5033				handler: NotifsHandler::new(peer, vec![], None),
5034				remaining_established: 0usize,
5035			},
5036		));
5037		assert!(std::matches!(notif.peers.get(&(peer, set_id)), Some(&PeerState::Backoff { .. })));
5038
5039		notif.on_swarm_event(FromSwarm::ConnectionClosed(
5040			libp2p::swarm::behaviour::ConnectionClosed {
5041				peer_id: peer,
5042				connection_id: conn,
5043				endpoint: &connected.clone(),
5044				handler: NotifsHandler::new(peer, vec![], None),
5045				remaining_established: 0usize,
5046			},
5047		));
5048	}
5049
5050	#[test]
5051	#[should_panic]
5052	#[cfg(debug_assertions)]
5053	fn open_result_ok_non_existent_peer() {
5054		let (mut notif, _controller, _notif_service) = development_notifs();
5055		let conn = ConnectionId::new_unchecked(0);
5056		let mut conn_yielder = ConnectionYielder::new();
5057
5058		notif.on_connection_handler_event(
5059			PeerId::random(),
5060			conn,
5061			conn_yielder.open_substream(PeerId::random(), 0, vec![1, 2, 3, 4]),
5062		);
5063	}
5064}