referrerpolicy=no-referrer-when-downgrade

sc_network/protocol/notifications/
behaviour.rs

1// This file is part of Substrate.
2
3// Copyright (C) Parity Technologies (UK) Ltd.
4// SPDX-License-Identifier: GPL-3.0-or-later WITH Classpath-exception-2.0
5
6// This program is free software: you can redistribute it and/or modify
7// it under the terms of the GNU General Public License as published by
8// the Free Software Foundation, either version 3 of the License, or
9// (at your option) any later version.
10
11// This program is distributed in the hope that it will be useful,
12// but WITHOUT ANY WARRANTY; without even the implied warranty of
13// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
14// GNU General Public License for more details.
15
16// You should have received a copy of the GNU General Public License
17// along with this program. If not, see <https://www.gnu.org/licenses/>.
18
19use crate::{
20	protocol::notifications::{
21		handler::{
22			self, CloseReason, NotificationsSink, NotifsHandler, NotifsHandlerIn, NotifsHandlerOut,
23		},
24		service::{NotificationCommand, ProtocolHandle, ValidationCallResult},
25	},
26	protocol_controller::{self, IncomingIndex, Message, SetId},
27	service::{
28		metrics::NotificationMetrics,
29		traits::{Direction, ValidationResult},
30	},
31	types::ProtocolName,
32};
33
34use bytes::BytesMut;
35use fnv::FnvHashMap;
36use futures::{future::BoxFuture, prelude::*, stream::FuturesUnordered};
37use libp2p::{
38	core::{transport::PortUse, Endpoint, Multiaddr},
39	swarm::{
40		behaviour::{ConnectionClosed, ConnectionEstablished, DialFailure, FromSwarm},
41		ConnectionDenied, ConnectionId, DialError, NetworkBehaviour, NotifyHandler, THandler,
42		THandlerInEvent, THandlerOutEvent, ToSwarm,
43	},
44	PeerId,
45};
46use log::{debug, error, trace, warn};
47use parking_lot::RwLock;
48use rand::distributions::{Distribution as _, Uniform};
49use sc_utils::mpsc::TracingUnboundedReceiver;
50use smallvec::SmallVec;
51use tokio::sync::oneshot::error::RecvError;
52use tokio_stream::StreamMap;
53
54use libp2p::swarm::CloseConnection;
55use std::{
56	cmp,
57	collections::{hash_map::Entry, VecDeque},
58	mem,
59	pin::Pin,
60	sync::Arc,
61	task::{Context, Poll},
62	time::{Duration, Instant},
63};
64
65/// Logging target for the file.
66const LOG_TARGET: &str = "sub-libp2p::notification::behaviour";
67
68/// Type representing a pending substream validation.
69type PendingInboundValidation =
70	BoxFuture<'static, (Result<ValidationResult, RecvError>, IncomingIndex)>;
71
72/// Network behaviour that handles opening substreams for custom protocols with other peers.
73///
74/// # How it works
75///
76/// The role of the `Notifications` is to synchronize the following components:
77///
78/// - The libp2p swarm that opens new connections and reports disconnects.
79/// - The connection handler (see `group.rs`) that handles individual connections.
80/// - The peerset manager (PSM) that requests links to peers to be established or broken.
81/// - The external API, that requires knowledge of the links that have been established.
82///
83/// In the state machine below, each `PeerId` is attributed one of these states:
84///
85/// - [`PeerState::Requested`]: No open connection, but requested by the peerset. Currently dialing.
86/// - [`PeerState::Disabled`]: Has open TCP connection(s) unbeknownst to the peerset. No substream
87///   is open.
88/// - [`PeerState::Enabled`]: Has open TCP connection(s), acknowledged by the peerset.
89///   - Notifications substreams are open on at least one connection, and external API has been
90///     notified.
91///   - Notifications substreams aren't open.
92/// - [`PeerState::Incoming`]: Has open TCP connection(s) and remote would like to open substreams.
93///   Peerset has been asked to attribute an inbound slot.
94///
95/// In addition to these states, there also exists a "banning" system. If we fail to dial a peer,
96/// we back-off for a few seconds. If the PSM requests connecting to a peer that is currently
97/// backed-off, the next dialing attempt is delayed until after the ban expires. However, the PSM
98/// will still consider the peer to be connected. This "ban" is thus not a ban in a strict sense:
99/// if a backed-off peer tries to connect, the connection is accepted. A ban only delays dialing
100/// attempts.
101///
102/// There may be multiple connections to a peer. The status of a peer on
103/// the API of this behaviour and towards the peerset manager is aggregated in
104/// the following way:
105///
106///   1. The enabled/disabled status is the same across all connections, as decided by the peerset
107///      manager.
108///   2. `send_packet` and `write_notification` always send all data over the same connection to
109///      preserve the ordering provided by the transport, as long as that connection is open. If it
110///      closes, a second open connection may take over, if one exists, but that case should be no
111///      different than a single connection failing and being re-established in terms of potential
112///      reordering and dropped messages. Messages can be received on any connection.
113///   3. The behaviour reports `NotificationsOut::CustomProtocolOpen` when the first connection
114///      reports `NotifsHandlerOut::OpenResultOk`.
115///   4. The behaviour reports `NotificationsOut::CustomProtocolClosed` when the last connection
116///      reports `NotifsHandlerOut::ClosedResult`.
117///
118/// In this way, the number of actual established connections to the peer is
119/// an implementation detail of this behaviour. Note that, in practice and at
120/// the time of this writing, there may be at most two connections to a peer
121/// and only as a result of simultaneous dialing. However, the implementation
122/// accommodates for any number of connections.
123pub struct Notifications {
124	/// Notification protocols. Entries never change after initialization.
125	notif_protocols: Vec<handler::ProtocolConfig>,
126
127	/// Protocol handles.
128	protocol_handles: Vec<ProtocolHandle>,
129
130	// Command streams.
131	command_streams: StreamMap<usize, Box<dyn Stream<Item = NotificationCommand> + Send + Unpin>>,
132
133	/// Protocol controllers are responsible for peer connections management.
134	protocol_controller_handles: Vec<protocol_controller::ProtocolHandle>,
135
136	/// Receiver for instructions about who to connect to or disconnect from.
137	from_protocol_controllers: TracingUnboundedReceiver<Message>,
138
139	/// List of peers in our state.
140	peers: FnvHashMap<(PeerId, SetId), PeerState>,
141
142	/// The elements in `peers` occasionally contain `Delay` objects that we would normally have
143	/// to be polled one by one. In order to avoid doing so, as an optimization, every `Delay` is
144	/// instead put inside of `delays` and reference by a [`DelayId`]. This stream
145	/// yields `PeerId`s whose `DelayId` is potentially ready.
146	///
147	/// By design, we never remove elements from this list. Elements are removed only when the
148	/// `Delay` triggers. As such, this stream may produce obsolete elements.
149	delays:
150		stream::FuturesUnordered<Pin<Box<dyn Future<Output = (DelayId, PeerId, SetId)> + Send>>>,
151
152	/// [`DelayId`] to assign to the next delay.
153	next_delay_id: DelayId,
154
155	/// List of incoming messages we have sent to the peer set manager and that are waiting for an
156	/// answer.
157	incoming: SmallVec<[IncomingPeer; 6]>,
158
159	/// We generate indices to identify incoming connections. This is the next value for the index
160	/// to use when a connection is incoming.
161	next_incoming_index: IncomingIndex,
162
163	/// Events to produce from `poll()`.
164	events: VecDeque<ToSwarm<NotificationsOut, NotifsHandlerIn>>,
165
166	/// Pending inbound substream validations.
167	//
168	// NOTE: it's possible to read a stale response from `pending_inbound_validations`
169	// as the substream may get closed by the remote peer before the protocol has had
170	// a chance to validate it. [`Notifications`] must compare the `crate::peerset::IncomingIndex`
171	// returned by the completed future against the `crate::peerset::IncomingIndex` stored in
172	// `PeerState::Incoming` to check whether the completed future is stale or not.
173	pending_inbound_validations: FuturesUnordered<PendingInboundValidation>,
174
175	/// Metrics for notifications.
176	metrics: NotificationMetrics,
177}
178
179/// Configuration for a notifications protocol.
180#[derive(Debug, Clone)]
181pub struct ProtocolConfig {
182	/// Name of the protocol.
183	pub name: ProtocolName,
184	/// Names of the protocol to use if the main one isn't available.
185	pub fallback_names: Vec<ProtocolName>,
186	/// Handshake of the protocol.
187	pub handshake: Vec<u8>,
188	/// Maximum allowed size for a notification.
189	pub max_notification_size: u64,
190}
191
192/// Identifier for a delay firing.
193#[derive(Debug, Copy, Clone, PartialEq, Eq)]
194struct DelayId(u64);
195
196/// State of a peer we're connected to.
197///
198/// The variants correspond to the state of the peer w.r.t. the peerset.
199#[derive(Debug)]
200enum PeerState {
201	/// State is poisoned. This is a temporary state for a peer and we should always switch back
202	/// to it later. If it is found in the wild, that means there was either a panic or a bug in
203	/// the state machine code.
204	Poisoned,
205
206	/// The peer misbehaved. If the PSM wants us to connect to this peer, we will add an artificial
207	/// delay to the connection.
208	Backoff {
209		/// When the ban expires. For clean-up purposes. References an entry in `delays`.
210		timer: DelayId,
211		/// Until when the peer is backed-off.
212		timer_deadline: Instant,
213	},
214
215	/// The peerset requested that we connect to this peer. We are currently not connected.
216	PendingRequest {
217		/// When to actually start dialing. References an entry in `delays`.
218		timer: DelayId,
219		/// When the `timer` will trigger.
220		timer_deadline: Instant,
221	},
222
223	/// The peerset requested that we connect to this peer. We are currently dialing this peer.
224	Requested,
225
226	/// We are connected to this peer but the peerset hasn't requested it or has denied it.
227	///
228	/// The handler is either in the closed state, or a `Close` message has been sent to it and
229	/// hasn't been answered yet.
230	Disabled {
231		/// If `Some`, any connection request from the peerset to this peer is delayed until the
232		/// given `Instant`.
233		backoff_until: Option<Instant>,
234
235		/// List of connections with this peer, and their state.
236		connections: SmallVec<[(ConnectionId, ConnectionState); crate::MAX_CONNECTIONS_PER_PEER]>,
237	},
238
239	/// We are connected to this peer. The peerset has requested a connection to this peer, but
240	/// it is currently in a "backed-off" phase. The state will switch to `Enabled` once the timer
241	/// expires.
242	///
243	/// The handler is either in the closed state, or a `Close` message has been sent to it and
244	/// hasn't been answered yet.
245	///
246	/// The handler will be opened when `timer` fires.
247	DisabledPendingEnable {
248		/// When to enable this remote. References an entry in `delays`.
249		timer: DelayId,
250		/// When the `timer` will trigger.
251		timer_deadline: Instant,
252
253		/// List of connections with this peer, and their state.
254		connections: SmallVec<[(ConnectionId, ConnectionState); crate::MAX_CONNECTIONS_PER_PEER]>,
255	},
256
257	/// We are connected to this peer and the peerset has accepted it.
258	Enabled {
259		/// List of connections with this peer, and their state.
260		connections: SmallVec<[(ConnectionId, ConnectionState); crate::MAX_CONNECTIONS_PER_PEER]>,
261	},
262
263	/// We are connected to this peer. We have received an `OpenDesiredByRemote` from one of the
264	/// handlers and forwarded that request to the peerset. The connection handlers are waiting for
265	/// a response, i.e. to be opened or closed based on whether the peerset accepts or rejects
266	/// the peer.
267	Incoming {
268		/// If `Some`, any dial attempts to this peer are delayed until the given `Instant`.
269		backoff_until: Option<Instant>,
270
271		/// Incoming index tracking this connection.
272		incoming_index: IncomingIndex,
273
274		/// Peerset has signaled it wants the substream closed.
275		peerset_rejected: bool,
276
277		/// List of connections with this peer, and their state.
278		connections: SmallVec<[(ConnectionId, ConnectionState); crate::MAX_CONNECTIONS_PER_PEER]>,
279	},
280}
281
282impl PeerState {
283	/// True if there exists an established connection to the peer
284	/// that is open for custom protocol traffic.
285	fn is_open(&self) -> bool {
286		self.get_open().is_some()
287	}
288
289	/// Returns the [`NotificationsSink`] of the first established connection
290	/// that is open for custom protocol traffic.
291	fn get_open(&self) -> Option<&NotificationsSink> {
292		match self {
293			Self::Enabled { connections, .. } => connections.iter().find_map(|(_, s)| match s {
294				ConnectionState::Open(s) => Some(s),
295				_ => None,
296			}),
297			_ => None,
298		}
299	}
300}
301
302/// State of the handler of a single connection visible from this state machine.
303#[derive(Debug)]
304enum ConnectionState {
305	/// Connection is in the `Closed` state, meaning that the remote hasn't requested anything.
306	Closed,
307
308	/// Connection is either in the `Open` or the `Closed` state, but a
309	/// [`NotifsHandlerIn::Close`] message has been sent. Waiting for this message to be
310	/// acknowledged through a [`NotifsHandlerOut::CloseResult`].
311	Closing,
312
313	/// Connection is in the `Closed` state but a [`NotifsHandlerIn::Open`] message has been sent.
314	/// An `OpenResultOk`/`OpenResultErr` message is expected.
315	Opening,
316
317	/// Connection is in the `Closed` state but a [`NotifsHandlerIn::Open`] message then a
318	/// [`NotifsHandlerIn::Close`] message has been sent. An `OpenResultOk`/`OpenResultErr` message
319	/// followed with a `CloseResult` message are expected.
320	OpeningThenClosing,
321
322	/// Connection is in the `Closed` state, but a [`NotifsHandlerOut::OpenDesiredByRemote`]
323	/// message has been received, meaning that the remote wants to open a substream.
324	OpenDesiredByRemote,
325
326	/// Connection is in the `Open` state.
327	///
328	/// The external API is notified of a channel with this peer if any of its connection is in
329	/// this state.
330	Open(NotificationsSink),
331}
332
333/// State of an "incoming" message sent to the peer set manager.
334#[derive(Debug)]
335struct IncomingPeer {
336	/// Id of the remote peer of the incoming substream.
337	peer_id: PeerId,
338	/// Id of the set the incoming substream would belong to.
339	set_id: SetId,
340	/// If true, this "incoming" still corresponds to an actual connection. If false, then the
341	/// connection corresponding to it has been closed or replaced already.
342	alive: bool,
343	/// Id that the we sent to the peerset.
344	incoming_id: IncomingIndex,
345	/// Received handshake.
346	handshake: Vec<u8>,
347}
348
349/// Event that can be emitted by the `Notifications`.
350#[derive(Debug)]
351pub enum NotificationsOut {
352	/// Opened a custom protocol with the remote.
353	CustomProtocolOpen {
354		/// Id of the peer we are connected to.
355		peer_id: PeerId,
356		/// Peerset set ID the substream is tied to.
357		set_id: SetId,
358		/// Direction of the stream.
359		direction: Direction,
360		/// If `Some`, a fallback protocol name has been used rather the main protocol name.
361		/// Always matches one of the fallback names passed at initialization.
362		negotiated_fallback: Option<ProtocolName>,
363		/// Handshake that was sent to us.
364		/// This is normally a "Status" message, but this is out of the concern of this code.
365		received_handshake: Vec<u8>,
366		/// Object that permits sending notifications to the peer.
367		notifications_sink: NotificationsSink,
368	},
369
370	/// The [`NotificationsSink`] object used to send notifications with the given peer must be
371	/// replaced with a new one.
372	///
373	/// This event is typically emitted when a transport-level connection is closed and we fall
374	/// back to a secondary connection.
375	CustomProtocolReplaced {
376		/// Id of the peer we are connected to.
377		peer_id: PeerId,
378		/// Peerset set ID the substream is tied to.
379		set_id: SetId,
380		/// Replacement for the previous [`NotificationsSink`].
381		notifications_sink: NotificationsSink,
382	},
383
384	/// Closed a custom protocol with the remote. The existing [`NotificationsSink`] should
385	/// be dropped.
386	CustomProtocolClosed {
387		/// Id of the peer we were connected to.
388		peer_id: PeerId,
389		/// Peerset set ID the substream was tied to.
390		set_id: SetId,
391	},
392
393	/// Receives a message on a custom protocol substream.
394	///
395	/// Also concerns received notifications for the notifications API.
396	Notification {
397		/// Id of the peer the message came from.
398		peer_id: PeerId,
399		/// Peerset set ID the substream is tied to.
400		set_id: SetId,
401		/// Message that has been received.
402		message: BytesMut,
403	},
404
405	/// The remote peer misbehaved by sent a message on an outbound substream.
406	ProtocolMisbehavior {
407		/// Id of the peer the message came from.
408		peer_id: PeerId,
409		/// Peerset set ID the substream is tied to.
410		set_id: SetId,
411	},
412}
413
414impl Notifications {
415	/// Creates a `CustomProtos`.
416	pub(crate) fn new(
417		protocol_controller_handles: Vec<protocol_controller::ProtocolHandle>,
418		from_protocol_controllers: TracingUnboundedReceiver<Message>,
419		metrics: NotificationMetrics,
420		notif_protocols: impl Iterator<
421			Item = (
422				ProtocolConfig,
423				ProtocolHandle,
424				Box<dyn Stream<Item = NotificationCommand> + Send + Unpin>,
425			),
426		>,
427	) -> Self {
428		let (notif_protocols, protocol_handles): (Vec<_>, Vec<_>) = notif_protocols
429			.map(|(cfg, protocol_handle, command_stream)| {
430				(
431					handler::ProtocolConfig {
432						name: cfg.name,
433						fallback_names: cfg.fallback_names,
434						handshake: Arc::new(RwLock::new(cfg.handshake)),
435						max_notification_size: cfg.max_notification_size,
436					},
437					(protocol_handle, command_stream),
438				)
439			})
440			.unzip();
441		assert!(!notif_protocols.is_empty());
442
443		let (mut protocol_handles, command_streams): (Vec<_>, Vec<_>) = protocol_handles
444			.into_iter()
445			.enumerate()
446			.map(|(set_id, (mut protocol_handle, command_stream))| {
447				protocol_handle.set_metrics(metrics.clone());
448
449				(protocol_handle, (set_id, command_stream))
450			})
451			.unzip();
452
453		protocol_handles.iter_mut().skip(1).for_each(|handle| {
454			handle.delegate_to_peerset(true);
455		});
456
457		Self {
458			notif_protocols,
459			protocol_handles,
460			command_streams: StreamMap::from_iter(command_streams.into_iter()),
461			protocol_controller_handles,
462			from_protocol_controllers,
463			peers: FnvHashMap::default(),
464			delays: Default::default(),
465			next_delay_id: DelayId(0),
466			incoming: SmallVec::new(),
467			next_incoming_index: IncomingIndex(0),
468			events: VecDeque::new(),
469			pending_inbound_validations: FuturesUnordered::new(),
470			metrics,
471		}
472	}
473
474	/// Modifies the handshake of the given notifications protocol.
475	pub fn set_notif_protocol_handshake(
476		&mut self,
477		set_id: SetId,
478		handshake_message: impl Into<Vec<u8>>,
479	) {
480		if let Some(p) = self.notif_protocols.get_mut(usize::from(set_id)) {
481			*p.handshake.write() = handshake_message.into();
482		} else {
483			log::error!(target: LOG_TARGET, "Unknown handshake change set: {:?}", set_id);
484			debug_assert!(false);
485		}
486	}
487
488	/// Returns the list of all the peers we have an open channel to.
489	pub fn open_peers(&self) -> impl Iterator<Item = &PeerId> {
490		self.peers.iter().filter(|(_, state)| state.is_open()).map(|((id, _), _)| id)
491	}
492
493	/// Returns true if we have an open substream to the given peer.
494	pub fn is_open(&self, peer_id: &PeerId, set_id: SetId) -> bool {
495		self.peers.get(&(*peer_id, set_id)).map(|p| p.is_open()).unwrap_or(false)
496	}
497
498	/// Disconnects the given peer if we are connected to it.
499	pub fn disconnect_peer(&mut self, peer_id: &PeerId, set_id: SetId) {
500		trace!(target: LOG_TARGET, "External API => Disconnect({}, {:?})", peer_id, set_id);
501		self.disconnect_peer_inner(peer_id, set_id);
502	}
503
504	/// Inner implementation of `disconnect_peer`.
505	fn disconnect_peer_inner(&mut self, peer_id: &PeerId, set_id: SetId) {
506		let mut entry = if let Entry::Occupied(entry) = self.peers.entry((*peer_id, set_id)) {
507			entry
508		} else {
509			return
510		};
511
512		match mem::replace(entry.get_mut(), PeerState::Poisoned) {
513			// We're not connected anyway.
514			st @ PeerState::Disabled { .. } => *entry.into_mut() = st,
515			st @ PeerState::Requested => *entry.into_mut() = st,
516			st @ PeerState::PendingRequest { .. } => *entry.into_mut() = st,
517			st @ PeerState::Backoff { .. } => *entry.into_mut() = st,
518
519			// DisabledPendingEnable => Disabled.
520			PeerState::DisabledPendingEnable { connections, timer_deadline, timer: _ } => {
521				trace!(target: LOG_TARGET, "PSM <= Dropped({}, {:?})", peer_id, set_id);
522				self.protocol_controller_handles[usize::from(set_id)].dropped(*peer_id);
523				*entry.into_mut() =
524					PeerState::Disabled { connections, backoff_until: Some(timer_deadline) }
525			},
526
527			// Enabled => Disabled.
528			// All open or opening connections are sent a `Close` message.
529			// If relevant, the external API is instantly notified.
530			PeerState::Enabled { mut connections } => {
531				trace!(target: LOG_TARGET, "PSM <= Dropped({}, {:?})", peer_id, set_id);
532				self.protocol_controller_handles[usize::from(set_id)].dropped(*peer_id);
533
534				if connections.iter().any(|(_, s)| matches!(s, ConnectionState::Open(_))) {
535					trace!(target: LOG_TARGET, "External API <= Closed({}, {:?})", peer_id, set_id);
536					let event =
537						NotificationsOut::CustomProtocolClosed { peer_id: *peer_id, set_id };
538					self.events.push_back(ToSwarm::GenerateEvent(event));
539				}
540
541				for (connec_id, connec_state) in
542					connections.iter_mut().filter(|(_, s)| matches!(s, ConnectionState::Open(_)))
543				{
544					trace!(target: LOG_TARGET, "Handler({:?}, {:?}) <= Close({:?})", peer_id, *connec_id, set_id);
545					self.events.push_back(ToSwarm::NotifyHandler {
546						peer_id: *peer_id,
547						handler: NotifyHandler::One(*connec_id),
548						event: NotifsHandlerIn::Close { protocol_index: set_id.into() },
549					});
550					*connec_state = ConnectionState::Closing;
551				}
552
553				for (connec_id, connec_state) in
554					connections.iter_mut().filter(|(_, s)| matches!(s, ConnectionState::Opening))
555				{
556					trace!(target: LOG_TARGET, "Handler({:?}, {:?}) <= Close({:?})", peer_id, *connec_id, set_id);
557					self.events.push_back(ToSwarm::NotifyHandler {
558						peer_id: *peer_id,
559						handler: NotifyHandler::One(*connec_id),
560						event: NotifsHandlerIn::Close { protocol_index: set_id.into() },
561					});
562					*connec_state = ConnectionState::OpeningThenClosing;
563				}
564
565				debug_assert!(!connections
566					.iter()
567					.any(|(_, s)| matches!(s, ConnectionState::Open(_))));
568				debug_assert!(!connections
569					.iter()
570					.any(|(_, s)| matches!(s, ConnectionState::Opening)));
571
572				*entry.into_mut() = PeerState::Disabled { connections, backoff_until: None }
573			},
574
575			// Incoming => Disabled.
576			// Ongoing opening requests from the remote are rejected.
577			PeerState::Incoming { mut connections, backoff_until, .. } => {
578				let inc = if let Some(inc) = self
579					.incoming
580					.iter_mut()
581					.find(|i| i.peer_id == entry.key().0 && i.set_id == set_id && i.alive)
582				{
583					inc
584				} else {
585					error!(
586						target: LOG_TARGET,
587						"State mismatch in libp2p: no entry in incoming for incoming peer"
588					);
589					return
590				};
591
592				inc.alive = false;
593
594				for (connec_id, connec_state) in connections
595					.iter_mut()
596					.filter(|(_, s)| matches!(s, ConnectionState::OpenDesiredByRemote))
597				{
598					trace!(target: LOG_TARGET, "Handler({:?}, {:?}) <= Close({:?})", peer_id, *connec_id, set_id);
599					self.events.push_back(ToSwarm::NotifyHandler {
600						peer_id: *peer_id,
601						handler: NotifyHandler::One(*connec_id),
602						event: NotifsHandlerIn::Close { protocol_index: set_id.into() },
603					});
604					*connec_state = ConnectionState::Closing;
605				}
606
607				debug_assert!(!connections
608					.iter()
609					.any(|(_, s)| matches!(s, ConnectionState::OpenDesiredByRemote)));
610				*entry.into_mut() = PeerState::Disabled { connections, backoff_until }
611			},
612
613			PeerState::Poisoned => {
614				error!(target: LOG_TARGET, "State of {:?} is poisoned", peer_id)
615			},
616		}
617	}
618
619	/// Function that is called when the peerset wants us to connect to a peer.
620	fn peerset_report_connect(&mut self, peer_id: PeerId, set_id: SetId) {
621		// If `PeerId` is unknown to us, insert an entry, start dialing, and return early.
622		let mut occ_entry = match self.peers.entry((peer_id, set_id)) {
623			Entry::Occupied(entry) => entry,
624			Entry::Vacant(entry) => {
625				// If there's no entry in `self.peers`, start dialing.
626				trace!(
627					target: LOG_TARGET,
628					"PSM => Connect({}, {:?}): Starting to connect",
629					entry.key().0,
630					set_id,
631				);
632				trace!(target: LOG_TARGET, "Libp2p <= Dial {}", entry.key().0);
633				self.events.push_back(ToSwarm::Dial { opts: entry.key().0.into() });
634				entry.insert(PeerState::Requested);
635				return
636			},
637		};
638
639		let now = Instant::now();
640
641		match mem::replace(occ_entry.get_mut(), PeerState::Poisoned) {
642			// Backoff (not expired) => PendingRequest
643			PeerState::Backoff { ref timer, ref timer_deadline } if *timer_deadline > now => {
644				let peer_id = occ_entry.key().0;
645				trace!(
646					target: LOG_TARGET,
647					"PSM => Connect({}, {:?}): Will start to connect at {:?}",
648					peer_id,
649					set_id,
650					timer_deadline,
651				);
652				*occ_entry.into_mut() =
653					PeerState::PendingRequest { timer: *timer, timer_deadline: *timer_deadline };
654			},
655
656			// Backoff (expired) => Requested
657			PeerState::Backoff { .. } => {
658				trace!(
659					target: LOG_TARGET,
660					"PSM => Connect({}, {:?}): Starting to connect",
661					occ_entry.key().0,
662					set_id,
663				);
664				trace!(target: LOG_TARGET, "Libp2p <= Dial {:?}", occ_entry.key());
665				self.events.push_back(ToSwarm::Dial { opts: occ_entry.key().0.into() });
666				*occ_entry.into_mut() = PeerState::Requested;
667			},
668
669			// Disabled (with non-expired ban) => DisabledPendingEnable
670			PeerState::Disabled { connections, backoff_until: Some(ref backoff) }
671				if *backoff > now =>
672			{
673				let peer_id = occ_entry.key().0;
674				trace!(
675					target: LOG_TARGET,
676					"PSM => Connect({}, {:?}): But peer is backed-off until {:?}",
677					peer_id,
678					set_id,
679					backoff,
680				);
681
682				let delay_id = self.next_delay_id;
683				self.next_delay_id.0 += 1;
684				let delay = futures_timer::Delay::new(*backoff - now);
685				self.delays.push(
686					async move {
687						delay.await;
688						(delay_id, peer_id, set_id)
689					}
690					.boxed(),
691				);
692
693				*occ_entry.into_mut() = PeerState::DisabledPendingEnable {
694					connections,
695					timer: delay_id,
696					timer_deadline: *backoff,
697				};
698			},
699
700			// Disabled => Enabled
701			PeerState::Disabled { mut connections, backoff_until } => {
702				debug_assert!(!connections
703					.iter()
704					.any(|(_, s)| { matches!(s, ConnectionState::Open(_)) }));
705
706				// The first element of `closed` is chosen to open the notifications substream.
707				if let Some((connec_id, connec_state)) =
708					connections.iter_mut().find(|(_, s)| matches!(s, ConnectionState::Closed))
709				{
710					trace!(target: LOG_TARGET, "PSM => Connect({}, {:?}): Enabling connections.",
711						occ_entry.key().0, set_id);
712					trace!(target: LOG_TARGET, "Handler({:?}, {:?}) <= Open({:?})", peer_id, *connec_id, set_id);
713					self.events.push_back(ToSwarm::NotifyHandler {
714						peer_id,
715						handler: NotifyHandler::One(*connec_id),
716						event: NotifsHandlerIn::Open { protocol_index: set_id.into(), peer_id },
717					});
718					*connec_state = ConnectionState::Opening;
719					*occ_entry.into_mut() = PeerState::Enabled { connections };
720				} else {
721					// If no connection is available, switch to `DisabledPendingEnable` in order
722					// to try again later.
723					debug_assert!(connections.iter().any(|(_, s)| {
724						matches!(s, ConnectionState::OpeningThenClosing | ConnectionState::Closing)
725					}));
726					trace!(
727						target: LOG_TARGET,
728						"PSM => Connect({}, {:?}): No connection in proper state. Delaying.",
729						occ_entry.key().0, set_id
730					);
731
732					let timer_deadline = {
733						let base = now + Duration::from_secs(5);
734						if let Some(backoff_until) = backoff_until {
735							cmp::max(base, backoff_until)
736						} else {
737							base
738						}
739					};
740
741					let delay_id = self.next_delay_id;
742					self.next_delay_id.0 += 1;
743					debug_assert!(timer_deadline > now);
744					let delay = futures_timer::Delay::new(timer_deadline - now);
745					self.delays.push(
746						async move {
747							delay.await;
748							(delay_id, peer_id, set_id)
749						}
750						.boxed(),
751					);
752
753					*occ_entry.into_mut() = PeerState::DisabledPendingEnable {
754						connections,
755						timer: delay_id,
756						timer_deadline,
757					};
758				}
759			},
760			// Incoming => Incoming
761			st @ PeerState::Incoming { .. } => {
762				debug!(
763					target: LOG_TARGET,
764					"PSM => Connect({}, {:?}): Ignoring obsolete connect, we are awaiting accept/reject.",
765					occ_entry.key().0, set_id
766				);
767				*occ_entry.into_mut() = st;
768			},
769
770			// Other states are kept as-is.
771			st @ PeerState::Enabled { .. } => {
772				debug!(target: LOG_TARGET,
773					"PSM => Connect({}, {:?}): Already connected.",
774					occ_entry.key().0, set_id);
775				*occ_entry.into_mut() = st;
776			},
777			st @ PeerState::DisabledPendingEnable { .. } => {
778				debug!(target: LOG_TARGET,
779					"PSM => Connect({}, {:?}): Already pending enabling.",
780					occ_entry.key().0, set_id);
781				*occ_entry.into_mut() = st;
782			},
783			st @ PeerState::Requested { .. } | st @ PeerState::PendingRequest { .. } => {
784				debug!(target: LOG_TARGET,
785					"PSM => Connect({}, {:?}): Duplicate request.",
786					occ_entry.key().0, set_id);
787				*occ_entry.into_mut() = st;
788			},
789
790			PeerState::Poisoned => {
791				error!(target: LOG_TARGET, "State of {:?} is poisoned", occ_entry.key());
792				debug_assert!(false);
793			},
794		}
795	}
796
797	/// Function that is called when the peerset wants us to disconnect from a peer.
798	fn peerset_report_disconnect(&mut self, peer_id: PeerId, set_id: SetId) {
799		let mut entry = match self.peers.entry((peer_id, set_id)) {
800			Entry::Occupied(entry) => entry,
801			Entry::Vacant(entry) => {
802				trace!(target: LOG_TARGET, "PSM => Drop({}, {:?}): Already disabled.",
803					entry.key().0, set_id);
804				return
805			},
806		};
807
808		match mem::replace(entry.get_mut(), PeerState::Poisoned) {
809			st @ PeerState::Disabled { .. } | st @ PeerState::Backoff { .. } => {
810				trace!(target: LOG_TARGET, "PSM => Drop({}, {:?}): Already disabled.",
811					entry.key().0, set_id);
812				*entry.into_mut() = st;
813			},
814
815			// DisabledPendingEnable => Disabled
816			PeerState::DisabledPendingEnable { connections, timer_deadline, timer: _ } => {
817				debug_assert!(!connections.is_empty());
818				trace!(target: LOG_TARGET,
819					"PSM => Drop({}, {:?}): Interrupting pending enabling.",
820					entry.key().0, set_id);
821				*entry.into_mut() =
822					PeerState::Disabled { connections, backoff_until: Some(timer_deadline) };
823			},
824
825			// Enabled => Disabled
826			PeerState::Enabled { mut connections } => {
827				trace!(target: LOG_TARGET, "PSM => Drop({}, {:?}): Disabling connections.",
828					entry.key().0, set_id);
829
830				debug_assert!(connections.iter().any(|(_, s)| matches!(
831					s,
832					ConnectionState::Opening | ConnectionState::Open(_)
833				)));
834
835				if connections.iter().any(|(_, s)| matches!(s, ConnectionState::Open(_))) {
836					trace!(target: LOG_TARGET, "External API <= Closed({}, {:?})", entry.key().0, set_id);
837					let event =
838						NotificationsOut::CustomProtocolClosed { peer_id: entry.key().0, set_id };
839					self.events.push_back(ToSwarm::GenerateEvent(event));
840				}
841
842				for (connec_id, connec_state) in
843					connections.iter_mut().filter(|(_, s)| matches!(s, ConnectionState::Opening))
844				{
845					trace!(target: LOG_TARGET, "Handler({:?}, {:?}) <= Close({:?})",
846						entry.key(), *connec_id, set_id);
847					self.events.push_back(ToSwarm::NotifyHandler {
848						peer_id: entry.key().0,
849						handler: NotifyHandler::One(*connec_id),
850						event: NotifsHandlerIn::Close { protocol_index: set_id.into() },
851					});
852					*connec_state = ConnectionState::OpeningThenClosing;
853				}
854
855				for (connec_id, connec_state) in
856					connections.iter_mut().filter(|(_, s)| matches!(s, ConnectionState::Open(_)))
857				{
858					trace!(target: LOG_TARGET, "Handler({:?}, {:?}) <= Close({:?})",
859						entry.key(), *connec_id, set_id);
860					self.events.push_back(ToSwarm::NotifyHandler {
861						peer_id: entry.key().0,
862						handler: NotifyHandler::One(*connec_id),
863						event: NotifsHandlerIn::Close { protocol_index: set_id.into() },
864					});
865					*connec_state = ConnectionState::Closing;
866				}
867
868				*entry.into_mut() = PeerState::Disabled { connections, backoff_until: None }
869			},
870
871			// Requested => ร˜
872			PeerState::Requested => {
873				// We don't cancel dialing. Libp2p doesn't expose that on purpose, as other
874				// sub-systems (such as the discovery mechanism) may require dialing this peer as
875				// well at the same time.
876				trace!(target: LOG_TARGET, "PSM => Drop({}, {:?}): Not yet connected.",
877					entry.key().0, set_id);
878				entry.remove();
879			},
880
881			// PendingRequest => Backoff
882			PeerState::PendingRequest { timer, timer_deadline } => {
883				trace!(target: LOG_TARGET, "PSM => Drop({}, {:?}): Not yet connected",
884					entry.key().0, set_id);
885				*entry.into_mut() = PeerState::Backoff { timer, timer_deadline }
886			},
887
888			// `ProtocolController` disconnected peer while it was still being validated by the
889			// protocol, mark the connection as rejected and once the validation is received from
890			// the protocol, reject the substream
891			PeerState::Incoming { backoff_until, connections, incoming_index, .. } => {
892				debug!(
893					target: LOG_TARGET,
894					"PSM => Drop({}, {:?}): Ignoring obsolete disconnect, we are awaiting accept/reject.",
895					entry.key().0, set_id,
896				);
897				*entry.into_mut() = PeerState::Incoming {
898					backoff_until,
899					connections,
900					incoming_index,
901					peerset_rejected: true,
902				};
903			},
904			PeerState::Poisoned => {
905				error!(target: LOG_TARGET, "State of {:?} is poisoned", entry.key());
906				debug_assert!(false);
907			},
908		}
909	}
910
911	/// Substream has been accepted by the `ProtocolController` and must now be sent
912	/// to the protocol for validation.
913	fn peerset_report_preaccept(&mut self, index: IncomingIndex) {
914		let Some(pos) = self.incoming.iter().position(|i| i.incoming_id == index) else {
915			error!(target: LOG_TARGET, "PSM => Preaccept({:?}): Invalid index", index);
916			return
917		};
918
919		trace!(
920			target: LOG_TARGET,
921			"PSM => Preaccept({:?}): Sent to protocol for validation",
922			index
923		);
924		let incoming = &self.incoming[pos];
925
926		match self.protocol_handles[usize::from(incoming.set_id)]
927			.report_incoming_substream(incoming.peer_id, incoming.handshake.clone())
928		{
929			Ok(ValidationCallResult::Delegated) => {
930				self.protocol_report_accept(index);
931			},
932			Ok(ValidationCallResult::WaitForValidation(rx)) => {
933				self.pending_inbound_validations
934					.push(Box::pin(async move { (rx.await, index) }));
935			},
936			Err(err) => {
937				// parachain collators enable the syncing protocol but `NotificationService` for
938				// `SyncingEngine` is not created which causes `report_incoming_substream()` to
939				// fail. This is not a fatal error and should be ignored even though in typical
940				// cases the `NotificationService` not existing is a fatal error and indicates that
941				// the protocol has exited. Until the parachain collator issue is fixed, just report
942				// and error and reject the peer.
943				debug!(target: LOG_TARGET, "protocol has exited: {err:?} {:?}", incoming.set_id);
944
945				self.protocol_report_reject(index);
946			},
947		}
948	}
949
950	/// Function that is called when the peerset wants us to accept a connection
951	/// request from a peer.
952	fn protocol_report_accept(&mut self, index: IncomingIndex) {
953		let (pos, incoming) =
954			if let Some(pos) = self.incoming.iter().position(|i| i.incoming_id == index) {
955				(pos, self.incoming.get(pos))
956			} else {
957				error!(target: LOG_TARGET, "PSM => Accept({:?}): Invalid index", index);
958				return
959			};
960
961		let Some(incoming) = incoming else {
962			error!(target: LOG_TARGET, "Incoming connection ({:?}) doesn't exist", index);
963			debug_assert!(false);
964			return;
965		};
966
967		if !incoming.alive {
968			trace!(
969				target: LOG_TARGET,
970				"PSM => Accept({:?}, {}, {:?}): Obsolete incoming",
971				index,
972				incoming.peer_id,
973				incoming.set_id,
974			);
975
976			match self.peers.get_mut(&(incoming.peer_id, incoming.set_id)) {
977				Some(PeerState::DisabledPendingEnable { .. }) | Some(PeerState::Enabled { .. }) => {
978				},
979				_ => {
980					trace!(target: LOG_TARGET, "PSM <= Dropped({}, {:?})",
981						incoming.peer_id, incoming.set_id);
982					self.protocol_controller_handles[usize::from(incoming.set_id)]
983						.dropped(incoming.peer_id);
984				},
985			}
986
987			self.incoming.remove(pos);
988			return
989		}
990
991		let state = match self.peers.get_mut(&(incoming.peer_id, incoming.set_id)) {
992			Some(s) => s,
993			None => {
994				log::debug!(
995					target: LOG_TARGET,
996					"Connection to {:?} closed, ({:?} {:?}), ignoring accept",
997					incoming.peer_id,
998					incoming.set_id,
999					index,
1000				);
1001				self.incoming.remove(pos);
1002				return
1003			},
1004		};
1005
1006		match mem::replace(state, PeerState::Poisoned) {
1007			// Incoming => Enabled
1008			PeerState::Incoming {
1009				mut connections,
1010				incoming_index,
1011				peerset_rejected,
1012				backoff_until,
1013			} => {
1014				if index < incoming_index {
1015					warn!(
1016						target: LOG_TARGET,
1017						"PSM => Accept({:?}, {}, {:?}): Ignoring obsolete incoming index, we are already awaiting {:?}.",
1018						index, incoming.peer_id, incoming.set_id, incoming_index
1019					);
1020
1021					self.incoming.remove(pos);
1022					return
1023				} else if index > incoming_index {
1024					error!(
1025						target: LOG_TARGET,
1026						"PSM => Accept({:?}, {}, {:?}): Ignoring incoming index from the future, we are awaiting {:?}.",
1027						index, incoming.peer_id, incoming.set_id, incoming_index
1028					);
1029
1030					self.incoming.remove(pos);
1031					debug_assert!(false);
1032					return
1033				}
1034
1035				// while the substream was being validated by the protocol, `Peerset` had request
1036				// for the it to be closed so reject the substream now
1037				if peerset_rejected {
1038					trace!(
1039						target: LOG_TARGET,
1040						"Protocol accepted ({:?} {:?} {:?}) but Peerset had requested disconnection, rejecting",
1041						index,
1042						incoming.peer_id,
1043						incoming.set_id
1044					);
1045
1046					*state = PeerState::Incoming {
1047						connections,
1048						backoff_until,
1049						peerset_rejected,
1050						incoming_index,
1051					};
1052					return self.report_reject(index).map_or((), |_| ())
1053				}
1054
1055				trace!(
1056					target: LOG_TARGET,
1057					"PSM => Accept({:?}, {}, {:?}): Enabling connections.",
1058					index,
1059					incoming.peer_id,
1060					incoming.set_id
1061				);
1062
1063				debug_assert!(connections
1064					.iter()
1065					.any(|(_, s)| matches!(s, ConnectionState::OpenDesiredByRemote)));
1066				for (connec_id, connec_state) in connections
1067					.iter_mut()
1068					.filter(|(_, s)| matches!(s, ConnectionState::OpenDesiredByRemote))
1069				{
1070					trace!(target: LOG_TARGET, "Handler({:?}, {:?}) <= Open({:?})",
1071						incoming.peer_id, *connec_id, incoming.set_id);
1072					self.events.push_back(ToSwarm::NotifyHandler {
1073						peer_id: incoming.peer_id,
1074						handler: NotifyHandler::One(*connec_id),
1075						event: NotifsHandlerIn::Open {
1076							protocol_index: incoming.set_id.into(),
1077							peer_id: incoming.peer_id,
1078						},
1079					});
1080					*connec_state = ConnectionState::Opening;
1081				}
1082
1083				self.incoming.remove(pos);
1084				*state = PeerState::Enabled { connections };
1085			},
1086			st @ PeerState::Disabled { .. } | st @ PeerState::Backoff { .. } => {
1087				self.incoming.remove(pos);
1088				*state = st;
1089			},
1090			// Any state other than `Incoming` is invalid.
1091			peer => {
1092				error!(
1093					target: LOG_TARGET,
1094					"State mismatch in libp2p: Expected alive incoming. Got {:?}.",
1095					peer
1096				);
1097
1098				self.incoming.remove(pos);
1099				debug_assert!(false);
1100			},
1101		}
1102	}
1103
1104	/// Function that is called when `ProtocolController` wants us to reject an incoming peer.
1105	fn peerset_report_reject(&mut self, index: IncomingIndex) {
1106		let _ = self.report_reject(index);
1107	}
1108
1109	/// Function that is called when the protocol wants us to reject an incoming peer.
1110	fn protocol_report_reject(&mut self, index: IncomingIndex) {
1111		if let Some((set_id, peer_id)) = self.report_reject(index) {
1112			self.protocol_controller_handles[usize::from(set_id)].dropped(peer_id)
1113		}
1114	}
1115
1116	/// Function that is called when the peerset wants us to reject an incoming peer.
1117	fn report_reject(&mut self, index: IncomingIndex) -> Option<(SetId, PeerId)> {
1118		let incoming = if let Some(pos) = self.incoming.iter().position(|i| i.incoming_id == index)
1119		{
1120			self.incoming.remove(pos)
1121		} else {
1122			error!(target: LOG_TARGET, "PSM => Reject({:?}): Invalid index", index);
1123			return None
1124		};
1125
1126		if !incoming.alive {
1127			trace!(
1128				target: LOG_TARGET,
1129				"PSM => Reject({:?}, {}, {:?}): Obsolete incoming, ignoring",
1130				index,
1131				incoming.peer_id,
1132				incoming.set_id,
1133			);
1134
1135			return None
1136		}
1137
1138		let state = match self.peers.get_mut(&(incoming.peer_id, incoming.set_id)) {
1139			Some(s) => s,
1140			None => {
1141				log::debug!(
1142					target: LOG_TARGET,
1143					"Connection to {:?} closed, ({:?} {:?}), ignoring accept",
1144					incoming.peer_id,
1145					incoming.set_id,
1146					index,
1147				);
1148				return None
1149			},
1150		};
1151
1152		match mem::replace(state, PeerState::Poisoned) {
1153			// Incoming => Disabled
1154			PeerState::Incoming { mut connections, backoff_until, incoming_index, .. } => {
1155				if index < incoming_index {
1156					warn!(
1157						target: LOG_TARGET,
1158						"PSM => Reject({:?}, {}, {:?}): Ignoring obsolete incoming index, we are already awaiting {:?}.",
1159						index, incoming.peer_id, incoming.set_id, incoming_index
1160					);
1161					return None
1162				} else if index > incoming_index {
1163					error!(
1164						target: LOG_TARGET,
1165						"PSM => Reject({:?}, {}, {:?}): Ignoring incoming index from the future, we are awaiting {:?}.",
1166						index, incoming.peer_id, incoming.set_id, incoming_index
1167					);
1168					debug_assert!(false);
1169					return None
1170				}
1171
1172				trace!(target: LOG_TARGET, "PSM => Reject({:?}, {}, {:?}): Rejecting connections.",
1173					index, incoming.peer_id, incoming.set_id);
1174
1175				debug_assert!(connections
1176					.iter()
1177					.any(|(_, s)| matches!(s, ConnectionState::OpenDesiredByRemote)));
1178				for (connec_id, connec_state) in connections
1179					.iter_mut()
1180					.filter(|(_, s)| matches!(s, ConnectionState::OpenDesiredByRemote))
1181				{
1182					trace!(target: LOG_TARGET, "Handler({:?}, {:?}) <= Close({:?})",
1183						incoming.peer_id, connec_id, incoming.set_id);
1184					self.events.push_back(ToSwarm::NotifyHandler {
1185						peer_id: incoming.peer_id,
1186						handler: NotifyHandler::One(*connec_id),
1187						event: NotifsHandlerIn::Close { protocol_index: incoming.set_id.into() },
1188					});
1189					*connec_state = ConnectionState::Closing;
1190				}
1191
1192				*state = PeerState::Disabled { connections, backoff_until };
1193				Some((incoming.set_id, incoming.peer_id))
1194			},
1195			// connection to peer may have been closed already
1196			st @ PeerState::Disabled { .. } | st @ PeerState::Backoff { .. } => {
1197				*state = st;
1198				None
1199			},
1200			peer => {
1201				error!(
1202					target: LOG_TARGET,
1203					"State mismatch in libp2p: Expected alive incoming. Got {peer:?}.",
1204				);
1205				None
1206			},
1207		}
1208	}
1209}
1210
1211impl NetworkBehaviour for Notifications {
1212	type ConnectionHandler = NotifsHandler;
1213	type ToSwarm = NotificationsOut;
1214
1215	fn handle_pending_inbound_connection(
1216		&mut self,
1217		_connection_id: ConnectionId,
1218		_local_addr: &Multiaddr,
1219		_remote_addr: &Multiaddr,
1220	) -> Result<(), ConnectionDenied> {
1221		Ok(())
1222	}
1223
1224	fn handle_pending_outbound_connection(
1225		&mut self,
1226		_connection_id: ConnectionId,
1227		_maybe_peer: Option<PeerId>,
1228		_addresses: &[Multiaddr],
1229		_effective_role: Endpoint,
1230	) -> Result<Vec<Multiaddr>, ConnectionDenied> {
1231		Ok(Vec::new())
1232	}
1233
1234	fn handle_established_inbound_connection(
1235		&mut self,
1236		_connection_id: ConnectionId,
1237		peer: PeerId,
1238		_local_addr: &Multiaddr,
1239		_remote_addr: &Multiaddr,
1240	) -> Result<THandler<Self>, ConnectionDenied> {
1241		Ok(NotifsHandler::new(peer, self.notif_protocols.clone(), Some(self.metrics.clone())))
1242	}
1243
1244	fn handle_established_outbound_connection(
1245		&mut self,
1246		_connection_id: ConnectionId,
1247		peer: PeerId,
1248		_addr: &Multiaddr,
1249		_role_override: Endpoint,
1250		_port_use: PortUse,
1251	) -> Result<THandler<Self>, ConnectionDenied> {
1252		Ok(NotifsHandler::new(peer, self.notif_protocols.clone(), Some(self.metrics.clone())))
1253	}
1254
1255	fn on_swarm_event(&mut self, event: FromSwarm) {
1256		match event {
1257			FromSwarm::ConnectionEstablished(ConnectionEstablished {
1258				peer_id,
1259				endpoint,
1260				connection_id,
1261				..
1262			}) => {
1263				for set_id in (0..self.notif_protocols.len()).map(SetId::from) {
1264					match self.peers.entry((peer_id, set_id)).or_insert(PeerState::Poisoned) {
1265						// Requested | PendingRequest => Enabled
1266						st @ &mut PeerState::Requested |
1267						st @ &mut PeerState::PendingRequest { .. } => {
1268							trace!(target: LOG_TARGET,
1269								"Libp2p => Connected({}, {:?}, {:?}): Connection was requested by PSM.",
1270								peer_id, set_id, endpoint
1271							);
1272							trace!(target: LOG_TARGET, "Handler({:?}, {:?}) <= Open({:?})", peer_id, connection_id, set_id);
1273							self.events.push_back(ToSwarm::NotifyHandler {
1274								peer_id,
1275								handler: NotifyHandler::One(connection_id),
1276								event: NotifsHandlerIn::Open {
1277									protocol_index: set_id.into(),
1278									peer_id,
1279								},
1280							});
1281
1282							let mut connections = SmallVec::new();
1283							connections.push((connection_id, ConnectionState::Opening));
1284							*st = PeerState::Enabled { connections };
1285						},
1286
1287						// Poisoned gets inserted above if the entry was missing.
1288						// ร˜ | Backoff => Disabled
1289						st @ &mut PeerState::Poisoned | st @ &mut PeerState::Backoff { .. } => {
1290							let backoff_until =
1291								if let PeerState::Backoff { timer_deadline, .. } = st {
1292									Some(*timer_deadline)
1293								} else {
1294									None
1295								};
1296							trace!(target: LOG_TARGET,
1297								"Libp2p => Connected({}, {:?}, {:?}, {:?}): Not requested by PSM, disabling.",
1298								peer_id, set_id, endpoint, connection_id);
1299
1300							let mut connections = SmallVec::new();
1301							connections.push((connection_id, ConnectionState::Closed));
1302							*st = PeerState::Disabled { connections, backoff_until };
1303						},
1304
1305						// In all other states, add this new connection to the list of closed
1306						// inactive connections.
1307						PeerState::Incoming { connections, .. } |
1308						PeerState::Disabled { connections, .. } |
1309						PeerState::DisabledPendingEnable { connections, .. } |
1310						PeerState::Enabled { connections, .. } => {
1311							trace!(target: LOG_TARGET,
1312								"Libp2p => Connected({}, {:?}, {:?}, {:?}): Secondary connection. Leaving closed.",
1313								peer_id, set_id, endpoint, connection_id);
1314							connections.push((connection_id, ConnectionState::Closed));
1315						},
1316					}
1317				}
1318			},
1319			FromSwarm::ConnectionClosed(ConnectionClosed { peer_id, connection_id, .. }) => {
1320				for set_id in (0..self.notif_protocols.len()).map(SetId::from) {
1321					let mut entry = if let Entry::Occupied(entry) =
1322						self.peers.entry((peer_id, set_id))
1323					{
1324						entry
1325					} else {
1326						error!(target: LOG_TARGET, "inject_connection_closed: State mismatch in the custom protos handler");
1327						debug_assert!(false);
1328						return
1329					};
1330
1331					match mem::replace(entry.get_mut(), PeerState::Poisoned) {
1332						// Disabled => Disabled | Backoff | ร˜
1333						PeerState::Disabled { mut connections, backoff_until } => {
1334							trace!(target: LOG_TARGET, "Libp2p => Disconnected({}, {:?}, {:?}): Disabled.",
1335								peer_id, set_id, connection_id);
1336
1337							if let Some(pos) =
1338								connections.iter().position(|(c, _)| *c == connection_id)
1339							{
1340								connections.remove(pos);
1341							} else {
1342								debug_assert!(false);
1343								error!(target: LOG_TARGET,
1344									"inject_connection_closed: State mismatch in the custom protos handler");
1345							}
1346
1347							if connections.is_empty() {
1348								if let Some(until) = backoff_until {
1349									let now = Instant::now();
1350									if until > now {
1351										let delay_id = self.next_delay_id;
1352										self.next_delay_id.0 += 1;
1353										let delay = futures_timer::Delay::new(until - now);
1354										self.delays.push(
1355											async move {
1356												delay.await;
1357												(delay_id, peer_id, set_id)
1358											}
1359											.boxed(),
1360										);
1361
1362										*entry.get_mut() = PeerState::Backoff {
1363											timer: delay_id,
1364											timer_deadline: until,
1365										};
1366									} else {
1367										entry.remove();
1368									}
1369								} else {
1370									entry.remove();
1371								}
1372							} else {
1373								*entry.get_mut() =
1374									PeerState::Disabled { connections, backoff_until };
1375							}
1376						},
1377
1378						// DisabledPendingEnable => DisabledPendingEnable | Backoff
1379						PeerState::DisabledPendingEnable {
1380							mut connections,
1381							timer_deadline,
1382							timer,
1383						} => {
1384							trace!(
1385								target: LOG_TARGET,
1386								"Libp2p => Disconnected({}, {:?}, {:?}): Disabled but pending enable.",
1387								peer_id, set_id, connection_id
1388							);
1389
1390							if let Some(pos) =
1391								connections.iter().position(|(c, _)| *c == connection_id)
1392							{
1393								connections.remove(pos);
1394							} else {
1395								error!(target: LOG_TARGET,
1396									"inject_connection_closed: State mismatch in the custom protos handler");
1397								debug_assert!(false);
1398							}
1399
1400							if connections.is_empty() {
1401								trace!(target: LOG_TARGET, "PSM <= Dropped({}, {:?})", peer_id, set_id);
1402								self.protocol_controller_handles[usize::from(set_id)]
1403									.dropped(peer_id);
1404								*entry.get_mut() = PeerState::Backoff { timer, timer_deadline };
1405							} else {
1406								*entry.get_mut() = PeerState::DisabledPendingEnable {
1407									connections,
1408									timer_deadline,
1409									timer,
1410								};
1411							}
1412						},
1413
1414						// Incoming => Incoming | Disabled | Backoff | ร˜
1415						PeerState::Incoming {
1416							mut connections,
1417							backoff_until,
1418							incoming_index,
1419							peerset_rejected,
1420						} => {
1421							trace!(
1422								target: LOG_TARGET,
1423								"Libp2p => Disconnected({}, {:?}, {:?}): OpenDesiredByRemote.",
1424								peer_id, set_id, connection_id
1425							);
1426
1427							debug_assert!(connections
1428								.iter()
1429								.any(|(_, s)| matches!(s, ConnectionState::OpenDesiredByRemote)));
1430
1431							if let Some(pos) =
1432								connections.iter().position(|(c, _)| *c == connection_id)
1433							{
1434								connections.remove(pos);
1435							} else {
1436								error!(target: LOG_TARGET,
1437									"inject_connection_closed: State mismatch in the custom protos handler");
1438								debug_assert!(false);
1439							}
1440
1441							let no_desired_left = !connections
1442								.iter()
1443								.any(|(_, s)| matches!(s, ConnectionState::OpenDesiredByRemote));
1444
1445							// If no connection is `OpenDesiredByRemote` anymore, clean up the
1446							// peerset incoming request.
1447							if no_desired_left {
1448								// In the incoming state, we don't report "Dropped" straight away.
1449								// Instead we will report "Dropped" if receive the corresponding
1450								// "Accept".
1451								if let Some(state) = self
1452									.incoming
1453									.iter_mut()
1454									.find(|i| i.alive && i.set_id == set_id && i.peer_id == peer_id)
1455								{
1456									state.alive = false;
1457								} else {
1458									error!(target: LOG_TARGET, "State mismatch in libp2p: no entry in \
1459										incoming corresponding to an incoming state in peers");
1460									debug_assert!(false);
1461								}
1462							}
1463
1464							if connections.is_empty() {
1465								if let Some(until) = backoff_until {
1466									let now = Instant::now();
1467									if until > now {
1468										let delay_id = self.next_delay_id;
1469										self.next_delay_id.0 += 1;
1470										let delay = futures_timer::Delay::new(until - now);
1471										self.delays.push(
1472											async move {
1473												delay.await;
1474												(delay_id, peer_id, set_id)
1475											}
1476											.boxed(),
1477										);
1478
1479										*entry.get_mut() = PeerState::Backoff {
1480											timer: delay_id,
1481											timer_deadline: until,
1482										};
1483									} else {
1484										entry.remove();
1485									}
1486								} else {
1487									entry.remove();
1488								}
1489							} else if no_desired_left {
1490								// If no connection is `OpenDesiredByRemote` anymore, switch to
1491								// `Disabled`.
1492								*entry.get_mut() =
1493									PeerState::Disabled { connections, backoff_until };
1494							} else {
1495								*entry.get_mut() = PeerState::Incoming {
1496									connections,
1497									backoff_until,
1498									incoming_index,
1499									peerset_rejected,
1500								};
1501							}
1502						},
1503
1504						// Enabled => Enabled | Backoff
1505						// Peers are always backed-off when disconnecting while Enabled.
1506						PeerState::Enabled { mut connections } => {
1507							trace!(
1508								target: LOG_TARGET,
1509								"Libp2p => Disconnected({}, {:?}, {:?}): Enabled.",
1510								peer_id, set_id, connection_id
1511							);
1512
1513							debug_assert!(connections.iter().any(|(_, s)| matches!(
1514								s,
1515								ConnectionState::Opening | ConnectionState::Open(_)
1516							)));
1517
1518							if let Some(pos) =
1519								connections.iter().position(|(c, _)| *c == connection_id)
1520							{
1521								let (_, state) = connections.remove(pos);
1522								if let ConnectionState::Open(_) = state {
1523									if let Some((replacement_pos, replacement_sink)) = connections
1524										.iter()
1525										.enumerate()
1526										.find_map(|(num, (_, s))| match s {
1527											ConnectionState::Open(s) => Some((num, s.clone())),
1528											_ => None,
1529										}) {
1530										if pos <= replacement_pos {
1531											trace!(
1532												target: LOG_TARGET,
1533												"External API <= Sink replaced({}, {:?})",
1534												peer_id, set_id
1535											);
1536											let event = NotificationsOut::CustomProtocolReplaced {
1537												peer_id,
1538												set_id,
1539												notifications_sink: replacement_sink.clone(),
1540											};
1541											self.events.push_back(ToSwarm::GenerateEvent(event));
1542										}
1543									} else {
1544										trace!(
1545											target: LOG_TARGET, "External API <= Closed({}, {:?})",
1546											peer_id, set_id
1547										);
1548										let event = NotificationsOut::CustomProtocolClosed {
1549											peer_id,
1550											set_id,
1551										};
1552										self.events.push_back(ToSwarm::GenerateEvent(event));
1553									}
1554								}
1555							} else {
1556								error!(target: LOG_TARGET,
1557									"inject_connection_closed: State mismatch in the custom protos handler");
1558								debug_assert!(false);
1559							}
1560
1561							if connections.is_empty() {
1562								trace!(target: LOG_TARGET, "PSM <= Dropped({}, {:?})", peer_id, set_id);
1563								self.protocol_controller_handles[usize::from(set_id)]
1564									.dropped(peer_id);
1565								let ban_dur = Uniform::new(5, 10).sample(&mut rand::thread_rng());
1566
1567								let delay_id = self.next_delay_id;
1568								self.next_delay_id.0 += 1;
1569								let delay = futures_timer::Delay::new(Duration::from_secs(ban_dur));
1570								self.delays.push(
1571									async move {
1572										delay.await;
1573										(delay_id, peer_id, set_id)
1574									}
1575									.boxed(),
1576								);
1577
1578								*entry.get_mut() = PeerState::Backoff {
1579									timer: delay_id,
1580									timer_deadline: Instant::now() + Duration::from_secs(ban_dur),
1581								};
1582							} else if !connections.iter().any(|(_, s)| {
1583								matches!(s, ConnectionState::Opening | ConnectionState::Open(_))
1584							}) {
1585								trace!(target: LOG_TARGET, "PSM <= Dropped({}, {:?})", peer_id, set_id);
1586								self.protocol_controller_handles[usize::from(set_id)]
1587									.dropped(peer_id);
1588
1589								*entry.get_mut() =
1590									PeerState::Disabled { connections, backoff_until: None };
1591							} else {
1592								*entry.get_mut() = PeerState::Enabled { connections };
1593							}
1594						},
1595
1596						PeerState::Requested |
1597						PeerState::PendingRequest { .. } |
1598						PeerState::Backoff { .. } => {
1599							// This is a serious bug either in this state machine or in libp2p.
1600							error!(target: LOG_TARGET,
1601								"`inject_connection_closed` called for unknown peer {}",
1602								peer_id);
1603							debug_assert!(false);
1604						},
1605						PeerState::Poisoned => {
1606							error!(target: LOG_TARGET, "State of peer {} is poisoned", peer_id);
1607							debug_assert!(false);
1608						},
1609					}
1610				}
1611			},
1612			FromSwarm::DialFailure(DialFailure { peer_id, error, .. }) => {
1613				if let DialError::Transport(errors) = error {
1614					for (addr, error) in errors.iter() {
1615						trace!(target: LOG_TARGET, "Libp2p => Reach failure for {:?} through {:?}: {:?}", peer_id, addr, error);
1616					}
1617				}
1618
1619				if let Some(peer_id) = peer_id {
1620					trace!(target: LOG_TARGET, "Libp2p => Dial failure for {:?}", peer_id);
1621
1622					for set_id in (0..self.notif_protocols.len()).map(SetId::from) {
1623						if let Entry::Occupied(mut entry) = self.peers.entry((peer_id, set_id)) {
1624							match mem::replace(entry.get_mut(), PeerState::Poisoned) {
1625								// The peer is not in our list.
1626								st @ PeerState::Backoff { .. } => {
1627									*entry.into_mut() = st;
1628								},
1629
1630								// "Basic" situation: we failed to reach a peer that the peerset
1631								// requested.
1632								st @ PeerState::Requested |
1633								st @ PeerState::PendingRequest { .. } => {
1634									trace!(target: LOG_TARGET, "PSM <= Dropped({}, {:?})", peer_id, set_id);
1635									self.protocol_controller_handles[usize::from(set_id)]
1636										.dropped(peer_id);
1637
1638									let now = Instant::now();
1639									let ban_duration = match st {
1640										PeerState::PendingRequest { timer_deadline, .. }
1641											if timer_deadline > now =>
1642											cmp::max(timer_deadline - now, Duration::from_secs(5)),
1643										_ => Duration::from_secs(5),
1644									};
1645
1646									let delay_id = self.next_delay_id;
1647									self.next_delay_id.0 += 1;
1648									let delay = futures_timer::Delay::new(ban_duration);
1649									self.delays.push(
1650										async move {
1651											delay.await;
1652											(delay_id, peer_id, set_id)
1653										}
1654										.boxed(),
1655									);
1656
1657									*entry.into_mut() = PeerState::Backoff {
1658										timer: delay_id,
1659										timer_deadline: now + ban_duration,
1660									};
1661								},
1662
1663								// We can still get dial failures even if we are already connected
1664								// to the peer, as an extra diagnostic for an earlier attempt.
1665								st @ PeerState::Disabled { .. } |
1666								st @ PeerState::Enabled { .. } |
1667								st @ PeerState::DisabledPendingEnable { .. } |
1668								st @ PeerState::Incoming { .. } => {
1669									*entry.into_mut() = st;
1670								},
1671
1672								PeerState::Poisoned => {
1673									error!(target: LOG_TARGET, "State of {:?} is poisoned", peer_id);
1674									debug_assert!(false);
1675								},
1676							}
1677						}
1678					}
1679				}
1680			},
1681			FromSwarm::ListenerClosed(_) => {},
1682			FromSwarm::ListenFailure(_) => {},
1683			FromSwarm::ListenerError(_) => {},
1684			FromSwarm::ExternalAddrExpired(_) => {},
1685			FromSwarm::NewListener(_) => {},
1686			FromSwarm::ExpiredListenAddr(_) => {},
1687			FromSwarm::NewExternalAddrCandidate(_) => {},
1688			FromSwarm::ExternalAddrConfirmed(_) => {},
1689			FromSwarm::AddressChange(_) => {},
1690			FromSwarm::NewListenAddr(_) => {},
1691			FromSwarm::NewExternalAddrOfPeer(_) => {},
1692			event => {
1693				warn!(target: LOG_TARGET, "New unknown `FromSwarm` libp2p event: {event:?}");
1694			},
1695		}
1696	}
1697
1698	fn on_connection_handler_event(
1699		&mut self,
1700		peer_id: PeerId,
1701		connection_id: ConnectionId,
1702		event: THandlerOutEvent<Self>,
1703	) {
1704		match event {
1705			NotifsHandlerOut::OpenDesiredByRemote { protocol_index, handshake } => {
1706				let set_id = SetId::from(protocol_index);
1707
1708				trace!(target: LOG_TARGET,
1709					"Handler({:?}, {:?}]) => OpenDesiredByRemote({:?})",
1710					peer_id, connection_id, set_id);
1711
1712				let mut entry = if let Entry::Occupied(entry) = self.peers.entry((peer_id, set_id))
1713				{
1714					entry
1715				} else {
1716					error!(
1717						target: LOG_TARGET,
1718						"OpenDesiredByRemote: State mismatch in the custom protos handler"
1719					);
1720					debug_assert!(false);
1721					return
1722				};
1723
1724				match mem::replace(entry.get_mut(), PeerState::Poisoned) {
1725					// Incoming => Incoming
1726					PeerState::Incoming {
1727						mut connections,
1728						backoff_until,
1729						incoming_index,
1730						peerset_rejected,
1731					} => {
1732						debug_assert!(connections
1733							.iter()
1734							.any(|(_, s)| matches!(s, ConnectionState::OpenDesiredByRemote)));
1735						if let Some((_, connec_state)) =
1736							connections.iter_mut().find(|(c, _)| *c == connection_id)
1737						{
1738							if let ConnectionState::Closed = *connec_state {
1739								*connec_state = ConnectionState::OpenDesiredByRemote;
1740							} else {
1741								// Connections in `OpeningThenClosing` and `Closing` state can be
1742								// in a Closed phase, and as such can emit `OpenDesiredByRemote`
1743								// messages.
1744								// Since an `Open` and/or a `Close` message have already been sent,
1745								// there is nothing much that can be done about this anyway.
1746								debug_assert!(matches!(
1747									connec_state,
1748									ConnectionState::OpeningThenClosing | ConnectionState::Closing
1749								));
1750							}
1751						} else {
1752							error!(
1753								target: LOG_TARGET,
1754								"OpenDesiredByRemote: State mismatch in the custom protos handler"
1755							);
1756							debug_assert!(false);
1757						}
1758
1759						*entry.into_mut() = PeerState::Incoming {
1760							connections,
1761							backoff_until,
1762							incoming_index,
1763							peerset_rejected,
1764						};
1765					},
1766
1767					PeerState::Enabled { mut connections } => {
1768						debug_assert!(connections.iter().any(|(_, s)| matches!(
1769							s,
1770							ConnectionState::Opening | ConnectionState::Open(_)
1771						)));
1772
1773						if let Some((_, connec_state)) =
1774							connections.iter_mut().find(|(c, _)| *c == connection_id)
1775						{
1776							if let ConnectionState::Closed = *connec_state {
1777								trace!(target: LOG_TARGET, "Handler({:?}, {:?}) <= Open({:?})",
1778									peer_id, connection_id, set_id);
1779								self.events.push_back(ToSwarm::NotifyHandler {
1780									peer_id,
1781									handler: NotifyHandler::One(connection_id),
1782									event: NotifsHandlerIn::Open {
1783										protocol_index: set_id.into(),
1784										peer_id,
1785									},
1786								});
1787								*connec_state = ConnectionState::Opening;
1788							} else {
1789								// Connections in `OpeningThenClosing`, `Opening`, and `Closing`
1790								// state can be in a Closed phase, and as such can emit
1791								// `OpenDesiredByRemote` messages.
1792								// Since an `Open` message haS already been sent, there is nothing
1793								// more to do.
1794								debug_assert!(matches!(
1795									connec_state,
1796									ConnectionState::OpenDesiredByRemote |
1797										ConnectionState::Closing | ConnectionState::Opening
1798								));
1799							}
1800						} else {
1801							error!(
1802								target: LOG_TARGET,
1803								"OpenDesiredByRemote: State mismatch in the custom protos handler"
1804							);
1805							debug_assert!(false);
1806						}
1807
1808						*entry.into_mut() = PeerState::Enabled { connections };
1809					},
1810
1811					// Disabled => Disabled | Incoming
1812					PeerState::Disabled { mut connections, backoff_until } => {
1813						if let Some((_, connec_state)) =
1814							connections.iter_mut().find(|(c, _)| *c == connection_id)
1815						{
1816							if let ConnectionState::Closed = *connec_state {
1817								*connec_state = ConnectionState::OpenDesiredByRemote;
1818
1819								let incoming_id = self.next_incoming_index;
1820								self.next_incoming_index.0 += 1;
1821
1822								trace!(target: LOG_TARGET, "PSM <= Incoming({}, {:?}, {:?}).",
1823									peer_id, set_id, incoming_id);
1824								self.protocol_controller_handles[usize::from(set_id)]
1825									.incoming_connection(peer_id, incoming_id);
1826								self.incoming.push(IncomingPeer {
1827									peer_id,
1828									set_id,
1829									alive: true,
1830									incoming_id,
1831									handshake,
1832								});
1833
1834								*entry.into_mut() = PeerState::Incoming {
1835									connections,
1836									backoff_until,
1837									peerset_rejected: false,
1838									incoming_index: incoming_id,
1839								};
1840							} else {
1841								// Connections in `OpeningThenClosing` and `Closing` state can be
1842								// in a Closed phase, and as such can emit `OpenDesiredByRemote`
1843								// messages.
1844								// We ignore them.
1845								debug_assert!(matches!(
1846									connec_state,
1847									ConnectionState::OpeningThenClosing | ConnectionState::Closing
1848								));
1849								*entry.into_mut() =
1850									PeerState::Disabled { connections, backoff_until };
1851							}
1852						} else {
1853							error!(
1854								target: LOG_TARGET,
1855								"OpenDesiredByRemote: State mismatch in the custom protos handler"
1856							);
1857							debug_assert!(false);
1858						}
1859					},
1860
1861					// DisabledPendingEnable => Enabled | DisabledPendingEnable
1862					PeerState::DisabledPendingEnable { mut connections, timer, timer_deadline } => {
1863						if let Some((_, connec_state)) =
1864							connections.iter_mut().find(|(c, _)| *c == connection_id)
1865						{
1866							if let ConnectionState::Closed = *connec_state {
1867								trace!(target: LOG_TARGET, "Handler({:?}, {:?}) <= Open({:?})",
1868									peer_id, connection_id, set_id);
1869								self.events.push_back(ToSwarm::NotifyHandler {
1870									peer_id,
1871									handler: NotifyHandler::One(connection_id),
1872									event: NotifsHandlerIn::Open {
1873										protocol_index: set_id.into(),
1874										peer_id,
1875									},
1876								});
1877								*connec_state = ConnectionState::Opening;
1878
1879								*entry.into_mut() = PeerState::Enabled { connections };
1880							} else {
1881								// Connections in `OpeningThenClosing` and `Closing` state can be
1882								// in a Closed phase, and as such can emit `OpenDesiredByRemote`
1883								// messages.
1884								// We ignore them.
1885								debug_assert!(matches!(
1886									connec_state,
1887									ConnectionState::OpeningThenClosing | ConnectionState::Closing
1888								));
1889								*entry.into_mut() = PeerState::DisabledPendingEnable {
1890									connections,
1891									timer,
1892									timer_deadline,
1893								};
1894							}
1895						} else {
1896							error!(
1897								target: LOG_TARGET,
1898								"OpenDesiredByRemote: State mismatch in the custom protos handler"
1899							);
1900							debug_assert!(false);
1901						}
1902					},
1903
1904					state => {
1905						error!(target: LOG_TARGET,
1906							   "OpenDesiredByRemote: Unexpected state in the custom protos handler: {:?}",
1907							   state);
1908						debug_assert!(false);
1909					},
1910				};
1911			},
1912
1913			NotifsHandlerOut::CloseDesired { protocol_index, reason } => {
1914				let set_id = SetId::from(protocol_index);
1915
1916				trace!(target: LOG_TARGET,
1917					"Handler({}, {:?}) => CloseDesired({:?})",
1918					peer_id, connection_id, set_id);
1919
1920				let mut entry = if let Entry::Occupied(entry) = self.peers.entry((peer_id, set_id))
1921				{
1922					entry
1923				} else {
1924					error!(target: LOG_TARGET, "CloseDesired: State mismatch in the custom protos handler");
1925					debug_assert!(false);
1926					return
1927				};
1928
1929				if reason == CloseReason::ProtocolMisbehavior {
1930					self.events.push_back(ToSwarm::GenerateEvent(
1931						NotificationsOut::ProtocolMisbehavior { peer_id, set_id },
1932					));
1933				}
1934
1935				match mem::replace(entry.get_mut(), PeerState::Poisoned) {
1936					// Enabled => Enabled | Disabled
1937					PeerState::Enabled { mut connections } => {
1938						debug_assert!(connections.iter().any(|(_, s)| matches!(
1939							s,
1940							ConnectionState::Opening | ConnectionState::Open(_)
1941						)));
1942
1943						let pos = if let Some(pos) =
1944							connections.iter().position(|(c, _)| *c == connection_id)
1945						{
1946							pos
1947						} else {
1948							error!(target: LOG_TARGET,
1949								"CloseDesired: State mismatch in the custom protos handler");
1950							debug_assert!(false);
1951							return
1952						};
1953
1954						if matches!(connections[pos].1, ConnectionState::Closing) {
1955							*entry.into_mut() = PeerState::Enabled { connections };
1956							return
1957						}
1958
1959						debug_assert!(matches!(connections[pos].1, ConnectionState::Open(_)));
1960						connections[pos].1 = ConnectionState::Closing;
1961
1962						trace!(target: LOG_TARGET, "Handler({}, {:?}) <= Close({:?})", peer_id, connection_id, set_id);
1963						self.events.push_back(ToSwarm::NotifyHandler {
1964							peer_id,
1965							handler: NotifyHandler::One(connection_id),
1966							event: NotifsHandlerIn::Close { protocol_index: set_id.into() },
1967						});
1968
1969						if let Some((replacement_pos, replacement_sink)) =
1970							connections.iter().enumerate().find_map(|(num, (_, s))| match s {
1971								ConnectionState::Open(s) => Some((num, s.clone())),
1972								_ => None,
1973							}) {
1974							if pos <= replacement_pos {
1975								trace!(target: LOG_TARGET, "External API <= Sink replaced({:?}, {:?})", peer_id, set_id);
1976								let event = NotificationsOut::CustomProtocolReplaced {
1977									peer_id,
1978									set_id,
1979									notifications_sink: replacement_sink.clone(),
1980								};
1981								self.events.push_back(ToSwarm::GenerateEvent(event));
1982							}
1983
1984							*entry.into_mut() = PeerState::Enabled { connections };
1985						} else {
1986							// List of open connections wasn't empty before but now it is.
1987							if !connections
1988								.iter()
1989								.any(|(_, s)| matches!(s, ConnectionState::Opening))
1990							{
1991								trace!(target: LOG_TARGET, "PSM <= Dropped({}, {:?})", peer_id, set_id);
1992								self.protocol_controller_handles[usize::from(set_id)]
1993									.dropped(peer_id);
1994								*entry.into_mut() =
1995									PeerState::Disabled { connections, backoff_until: None };
1996							} else {
1997								*entry.into_mut() = PeerState::Enabled { connections };
1998							}
1999
2000							trace!(target: LOG_TARGET, "External API <= Closed({}, {:?})", peer_id, set_id);
2001							let event = NotificationsOut::CustomProtocolClosed { peer_id, set_id };
2002							self.events.push_back(ToSwarm::GenerateEvent(event));
2003						}
2004					},
2005
2006					// All connections in `Disabled` and `DisabledPendingEnable` have been sent a
2007					// `Close` message already, and as such ignore any `CloseDesired` message.
2008					state @ PeerState::Disabled { .. } |
2009					state @ PeerState::DisabledPendingEnable { .. } => {
2010						*entry.into_mut() = state;
2011					},
2012					state => {
2013						error!(target: LOG_TARGET,
2014							"Unexpected state in the custom protos handler: {:?}",
2015							state);
2016					},
2017				}
2018			},
2019
2020			NotifsHandlerOut::CloseResult { protocol_index } => {
2021				let set_id = SetId::from(protocol_index);
2022
2023				trace!(target: LOG_TARGET,
2024					"Handler({}, {:?}) => CloseResult({:?})",
2025					peer_id, connection_id, set_id);
2026
2027				match self.peers.get_mut(&(peer_id, set_id)) {
2028					// Move the connection from `Closing` to `Closed`.
2029					Some(PeerState::Incoming { connections, .. }) |
2030					Some(PeerState::DisabledPendingEnable { connections, .. }) |
2031					Some(PeerState::Disabled { connections, .. }) |
2032					Some(PeerState::Enabled { connections, .. }) => {
2033						if let Some((_, connec_state)) = connections.iter_mut().find(|(c, s)| {
2034							*c == connection_id && matches!(s, ConnectionState::Closing)
2035						}) {
2036							*connec_state = ConnectionState::Closed;
2037						} else {
2038							error!(target: LOG_TARGET,
2039								"CloseResult: State mismatch in the custom protos handler");
2040							debug_assert!(false);
2041						}
2042					},
2043
2044					state => {
2045						error!(target: LOG_TARGET,
2046							   "CloseResult: Unexpected state in the custom protos handler: {:?}",
2047							   state);
2048						debug_assert!(false);
2049					},
2050				}
2051			},
2052
2053			NotifsHandlerOut::OpenResultOk {
2054				protocol_index,
2055				negotiated_fallback,
2056				received_handshake,
2057				notifications_sink,
2058				inbound,
2059				..
2060			} => {
2061				let set_id = SetId::from(protocol_index);
2062				trace!(target: LOG_TARGET,
2063					"Handler({}, {:?}) => OpenResultOk({:?})",
2064					peer_id, connection_id, set_id);
2065
2066				match self.peers.get_mut(&(peer_id, set_id)) {
2067					Some(PeerState::Enabled { connections, .. }) => {
2068						debug_assert!(connections.iter().any(|(_, s)| matches!(
2069							s,
2070							ConnectionState::Opening | ConnectionState::Open(_)
2071						)));
2072						let any_open =
2073							connections.iter().any(|(_, s)| matches!(s, ConnectionState::Open(_)));
2074
2075						if let Some((_, connec_state)) = connections.iter_mut().find(|(c, s)| {
2076							*c == connection_id && matches!(s, ConnectionState::Opening)
2077						}) {
2078							if !any_open {
2079								trace!(target: LOG_TARGET, "External API <= Open({}, {:?})", peer_id, set_id);
2080								let event = NotificationsOut::CustomProtocolOpen {
2081									peer_id,
2082									set_id,
2083									direction: if inbound {
2084										Direction::Inbound
2085									} else {
2086										Direction::Outbound
2087									},
2088									received_handshake: received_handshake.clone(),
2089									negotiated_fallback: negotiated_fallback.clone(),
2090									notifications_sink: notifications_sink.clone(),
2091								};
2092								self.events.push_back(ToSwarm::GenerateEvent(event));
2093							}
2094							*connec_state = ConnectionState::Open(notifications_sink);
2095						} else if let Some((_, connec_state)) =
2096							connections.iter_mut().find(|(c, s)| {
2097								*c == connection_id &&
2098									matches!(s, ConnectionState::OpeningThenClosing)
2099							}) {
2100							*connec_state = ConnectionState::Closing;
2101						} else {
2102							error!(target: LOG_TARGET,
2103								"OpenResultOk State mismatch in the custom protos handler");
2104							debug_assert!(false);
2105						}
2106					},
2107
2108					Some(PeerState::Incoming { connections, .. }) |
2109					Some(PeerState::DisabledPendingEnable { connections, .. }) |
2110					Some(PeerState::Disabled { connections, .. }) => {
2111						if let Some((_, connec_state)) = connections.iter_mut().find(|(c, s)| {
2112							*c == connection_id && matches!(s, ConnectionState::OpeningThenClosing)
2113						}) {
2114							*connec_state = ConnectionState::Closing;
2115						} else {
2116							error!(target: LOG_TARGET,
2117								"OpenResultOk State mismatch in the custom protos handler");
2118							debug_assert!(false);
2119						}
2120					},
2121
2122					state => {
2123						error!(target: LOG_TARGET,
2124							   "OpenResultOk: Unexpected state in the custom protos handler: {:?}",
2125							   state);
2126						debug_assert!(false);
2127					},
2128				}
2129			},
2130
2131			NotifsHandlerOut::OpenResultErr { protocol_index } => {
2132				let set_id = SetId::from(protocol_index);
2133				trace!(target: LOG_TARGET,
2134					"Handler({:?}, {:?}) => OpenResultErr({:?})",
2135					peer_id, connection_id, set_id);
2136
2137				let mut entry = if let Entry::Occupied(entry) = self.peers.entry((peer_id, set_id))
2138				{
2139					entry
2140				} else {
2141					error!(target: LOG_TARGET, "OpenResultErr: State mismatch in the custom protos handler");
2142					debug_assert!(false);
2143					return
2144				};
2145
2146				match mem::replace(entry.get_mut(), PeerState::Poisoned) {
2147					PeerState::Enabled { mut connections } => {
2148						debug_assert!(connections.iter().any(|(_, s)| matches!(
2149							s,
2150							ConnectionState::Opening | ConnectionState::Open(_)
2151						)));
2152
2153						if let Some((_, connec_state)) = connections.iter_mut().find(|(c, s)| {
2154							*c == connection_id && matches!(s, ConnectionState::Opening)
2155						}) {
2156							*connec_state = ConnectionState::Closed;
2157						} else if let Some((_, connec_state)) =
2158							connections.iter_mut().find(|(c, s)| {
2159								*c == connection_id &&
2160									matches!(s, ConnectionState::OpeningThenClosing)
2161							}) {
2162							*connec_state = ConnectionState::Closing;
2163						} else {
2164							error!(target: LOG_TARGET,
2165								"OpenResultErr: State mismatch in the custom protos handler");
2166							debug_assert!(false);
2167						}
2168
2169						if !connections.iter().any(|(_, s)| {
2170							matches!(s, ConnectionState::Opening | ConnectionState::Open(_))
2171						}) {
2172							trace!(target: LOG_TARGET, "PSM <= Dropped({:?}, {:?})", peer_id, set_id);
2173							self.protocol_controller_handles[usize::from(set_id)].dropped(peer_id);
2174
2175							let ban_dur = Uniform::new(5, 10).sample(&mut rand::thread_rng());
2176							*entry.into_mut() = PeerState::Disabled {
2177								connections,
2178								backoff_until: Some(Instant::now() + Duration::from_secs(ban_dur)),
2179							};
2180						} else {
2181							*entry.into_mut() = PeerState::Enabled { connections };
2182						}
2183					},
2184					mut state @ PeerState::Incoming { .. } |
2185					mut state @ PeerState::DisabledPendingEnable { .. } |
2186					mut state @ PeerState::Disabled { .. } => {
2187						match &mut state {
2188							PeerState::Incoming { connections, .. } |
2189							PeerState::Disabled { connections, .. } |
2190							PeerState::DisabledPendingEnable { connections, .. } => {
2191								if let Some((_, connec_state)) =
2192									connections.iter_mut().find(|(c, s)| {
2193										*c == connection_id &&
2194											matches!(s, ConnectionState::OpeningThenClosing)
2195									}) {
2196									*connec_state = ConnectionState::Closing;
2197								} else {
2198									error!(target: LOG_TARGET,
2199										"OpenResultErr: State mismatch in the custom protos handler");
2200									debug_assert!(false);
2201								}
2202							},
2203							_ => unreachable!(
2204								"Match branches are the same as the one on which we
2205							enter this block; qed"
2206							),
2207						};
2208
2209						*entry.into_mut() = state;
2210					},
2211					state => {
2212						error!(target: LOG_TARGET,
2213							"Unexpected state in the custom protos handler: {:?}",
2214							state);
2215						debug_assert!(false);
2216					},
2217				};
2218			},
2219
2220			NotifsHandlerOut::Notification { protocol_index, message } => {
2221				let set_id = SetId::from(protocol_index);
2222				if self.is_open(&peer_id, set_id) {
2223					trace!(
2224						target: LOG_TARGET,
2225						"Handler({:?}) => Notification({}, {:?}, {} bytes)",
2226						connection_id,
2227						peer_id,
2228						set_id,
2229						message.len()
2230					);
2231					trace!(
2232						target: LOG_TARGET,
2233						"External API <= Message({}, {:?})",
2234						peer_id,
2235						set_id,
2236					);
2237					let event = NotificationsOut::Notification {
2238						peer_id,
2239						set_id,
2240						message: message.clone(),
2241					};
2242					self.events.push_back(ToSwarm::GenerateEvent(event));
2243				} else {
2244					trace!(
2245						target: LOG_TARGET,
2246						"Handler({:?}) => Post-close notification({}, {:?}, {} bytes)",
2247						connection_id,
2248						peer_id,
2249						set_id,
2250						message.len()
2251					);
2252				}
2253			},
2254			NotifsHandlerOut::Close { protocol_index } => {
2255				let set_id = SetId::from(protocol_index);
2256
2257				trace!(target: LOG_TARGET, "Handler({}, {:?}) => SyncNotificationsClogged({:?})", peer_id, connection_id, set_id);
2258				self.events.push_back(ToSwarm::CloseConnection {
2259					peer_id,
2260					connection: CloseConnection::One(connection_id),
2261				});
2262			},
2263		}
2264	}
2265
2266	fn poll(&mut self, cx: &mut Context) -> Poll<ToSwarm<Self::ToSwarm, THandlerInEvent<Self>>> {
2267		if let Some(event) = self.events.pop_front() {
2268			return Poll::Ready(event)
2269		}
2270
2271		// Poll for instructions from the protocol controllers.
2272		loop {
2273			match futures::Stream::poll_next(Pin::new(&mut self.from_protocol_controllers), cx) {
2274				Poll::Ready(Some(Message::Accept(index))) => {
2275					self.peerset_report_preaccept(index);
2276				},
2277				Poll::Ready(Some(Message::Reject(index))) => {
2278					let _ = self.peerset_report_reject(index);
2279				},
2280				Poll::Ready(Some(Message::Connect { peer_id, set_id, .. })) => {
2281					self.peerset_report_connect(peer_id, set_id);
2282				},
2283				Poll::Ready(Some(Message::Drop { peer_id, set_id, .. })) => {
2284					self.peerset_report_disconnect(peer_id, set_id);
2285				},
2286				Poll::Ready(None) => {
2287					error!(
2288						target: LOG_TARGET,
2289						"Protocol controllers receiver stream has returned `None`. Ignore this error if the node is shutting down.",
2290					);
2291					break
2292				},
2293				Poll::Pending => break,
2294			}
2295		}
2296
2297		// poll commands from protocols
2298		loop {
2299			match futures::Stream::poll_next(Pin::new(&mut self.command_streams), cx) {
2300				Poll::Ready(Some((set_id, command))) => match command {
2301					NotificationCommand::SetHandshake(handshake) => {
2302						self.set_notif_protocol_handshake(set_id.into(), handshake);
2303					},
2304					NotificationCommand::OpenSubstream(_peer) |
2305					NotificationCommand::CloseSubstream(_peer) => {
2306						todo!("substream control not implemented");
2307					},
2308				},
2309				Poll::Ready(None) => {
2310					error!(target: LOG_TARGET, "Protocol command streams have been shut down");
2311					break
2312				},
2313				Poll::Pending => break,
2314			}
2315		}
2316
2317		while let Poll::Ready(Some((result, index))) =
2318			self.pending_inbound_validations.poll_next_unpin(cx)
2319		{
2320			match result {
2321				Ok(ValidationResult::Accept) => {
2322					self.protocol_report_accept(index);
2323				},
2324				Ok(ValidationResult::Reject) => {
2325					self.protocol_report_reject(index);
2326				},
2327				Err(_) => {
2328					error!(target: LOG_TARGET, "Protocol has shut down");
2329					break
2330				},
2331			}
2332		}
2333
2334		while let Poll::Ready(Some((delay_id, peer_id, set_id))) =
2335			Pin::new(&mut self.delays).poll_next(cx)
2336		{
2337			let peer_state = match self.peers.get_mut(&(peer_id, set_id)) {
2338				Some(s) => s,
2339				// We intentionally never remove elements from `delays`, and it may
2340				// thus contain peers which are now gone. This is a normal situation.
2341				None => continue,
2342			};
2343
2344			match peer_state {
2345				PeerState::Backoff { timer, .. } if *timer == delay_id => {
2346					trace!(target: LOG_TARGET, "Libp2p <= Clean up ban of {:?} from the state ({:?})", peer_id, set_id);
2347					self.peers.remove(&(peer_id, set_id));
2348				},
2349
2350				PeerState::PendingRequest { timer, .. } if *timer == delay_id => {
2351					trace!(target: LOG_TARGET, "Libp2p <= Dial {:?} now that ban has expired ({:?})", peer_id, set_id);
2352					self.events.push_back(ToSwarm::Dial { opts: peer_id.into() });
2353					*peer_state = PeerState::Requested;
2354				},
2355
2356				PeerState::DisabledPendingEnable { connections, timer, timer_deadline }
2357					if *timer == delay_id =>
2358				{
2359					// The first element of `closed` is chosen to open the notifications substream.
2360					if let Some((connec_id, connec_state)) =
2361						connections.iter_mut().find(|(_, s)| matches!(s, ConnectionState::Closed))
2362					{
2363						trace!(target: LOG_TARGET, "Handler({}, {:?}) <= Open({:?}) (ban expired)",
2364							peer_id, *connec_id, set_id);
2365						self.events.push_back(ToSwarm::NotifyHandler {
2366							peer_id,
2367							handler: NotifyHandler::One(*connec_id),
2368							event: NotifsHandlerIn::Open { protocol_index: set_id.into(), peer_id },
2369						});
2370						*connec_state = ConnectionState::Opening;
2371						*peer_state = PeerState::Enabled { connections: mem::take(connections) };
2372					} else {
2373						*timer_deadline = Instant::now() + Duration::from_secs(5);
2374						let delay = futures_timer::Delay::new(Duration::from_secs(5));
2375						let timer = *timer;
2376						self.delays.push(
2377							async move {
2378								delay.await;
2379								(timer, peer_id, set_id)
2380							}
2381							.boxed(),
2382						);
2383					}
2384				},
2385
2386				// We intentionally never remove elements from `delays`, and it may
2387				// thus contain obsolete entries. This is a normal situation.
2388				_ => {},
2389			}
2390		}
2391
2392		if let Some(event) = self.events.pop_front() {
2393			return Poll::Ready(event)
2394		}
2395
2396		Poll::Pending
2397	}
2398}
2399
2400#[cfg(test)]
2401mod tests {
2402	use super::*;
2403	use crate::{
2404		mock::MockPeerStore,
2405		protocol::notifications::handler::tests::*,
2406		protocol_controller::{IncomingIndex, ProtoSetConfig, ProtocolController},
2407	};
2408	use libp2p::core::ConnectedPoint;
2409	use sc_utils::mpsc::tracing_unbounded;
2410	use std::{collections::HashSet, iter};
2411
2412	impl PartialEq for ConnectionState {
2413		fn eq(&self, other: &ConnectionState) -> bool {
2414			match (self, other) {
2415				(ConnectionState::Closed, ConnectionState::Closed) => true,
2416				(ConnectionState::Closing, ConnectionState::Closing) => true,
2417				(ConnectionState::Opening, ConnectionState::Opening) => true,
2418				(ConnectionState::OpeningThenClosing, ConnectionState::OpeningThenClosing) => true,
2419				(ConnectionState::OpenDesiredByRemote, ConnectionState::OpenDesiredByRemote) =>
2420					true,
2421				(ConnectionState::Open(_), ConnectionState::Open(_)) => true,
2422				_ => false,
2423			}
2424		}
2425	}
2426
2427	fn development_notifs(
2428	) -> (Notifications, ProtocolController, Box<dyn crate::service::traits::NotificationService>)
2429	{
2430		let (protocol_handle_pair, notif_service) =
2431			crate::protocol::notifications::service::notification_service("/proto/1".into());
2432		let (to_notifications, from_controller) =
2433			tracing_unbounded("test_controller_to_notifications", 10_000);
2434
2435		let (handle, controller) = ProtocolController::new(
2436			SetId::from(0),
2437			ProtoSetConfig {
2438				in_peers: 25,
2439				out_peers: 25,
2440				reserved_nodes: HashSet::new(),
2441				reserved_only: false,
2442			},
2443			to_notifications,
2444			Arc::new(MockPeerStore {}),
2445		);
2446
2447		let (notif_handle, command_stream) = protocol_handle_pair.split();
2448		(
2449			Notifications::new(
2450				vec![handle],
2451				from_controller,
2452				NotificationMetrics::new(None),
2453				iter::once((
2454					ProtocolConfig {
2455						name: "/foo".into(),
2456						fallback_names: Vec::new(),
2457						handshake: vec![1, 2, 3, 4],
2458						max_notification_size: u64::MAX,
2459					},
2460					notif_handle,
2461					command_stream,
2462				)),
2463			),
2464			controller,
2465			notif_service,
2466		)
2467	}
2468
2469	#[test]
2470	fn update_handshake() {
2471		let (mut notif, _controller, _notif_service) = development_notifs();
2472
2473		let inner = notif.notif_protocols.get_mut(0).unwrap().handshake.read().clone();
2474		assert_eq!(inner, vec![1, 2, 3, 4]);
2475
2476		notif.set_notif_protocol_handshake(0.into(), vec![5, 6, 7, 8]);
2477
2478		let inner = notif.notif_protocols.get_mut(0).unwrap().handshake.read().clone();
2479		assert_eq!(inner, vec![5, 6, 7, 8]);
2480	}
2481
2482	#[test]
2483	#[should_panic]
2484	#[cfg(debug_assertions)]
2485	fn update_unknown_handshake() {
2486		let (mut notif, _controller, _notif_service) = development_notifs();
2487
2488		notif.set_notif_protocol_handshake(1337.into(), vec![5, 6, 7, 8]);
2489	}
2490
2491	#[test]
2492	fn disconnect_backoff_peer() {
2493		let (mut notif, _controller, _notif_service) = development_notifs();
2494
2495		let peer = PeerId::random();
2496		notif.peers.insert(
2497			(peer, 0.into()),
2498			PeerState::Backoff { timer: DelayId(0), timer_deadline: Instant::now() },
2499		);
2500		notif.disconnect_peer(&peer, 0.into());
2501
2502		assert!(std::matches!(
2503			notif.peers.get(&(peer, 0.into())),
2504			Some(PeerState::Backoff { timer: DelayId(0), .. })
2505		));
2506	}
2507
2508	#[test]
2509	fn disconnect_pending_request() {
2510		let (mut notif, _controller, _notif_service) = development_notifs();
2511		let peer = PeerId::random();
2512
2513		notif.peers.insert(
2514			(peer, 0.into()),
2515			PeerState::PendingRequest { timer: DelayId(0), timer_deadline: Instant::now() },
2516		);
2517		notif.disconnect_peer(&peer, 0.into());
2518
2519		assert!(std::matches!(
2520			notif.peers.get(&(peer, 0.into())),
2521			Some(PeerState::PendingRequest { timer: DelayId(0), .. })
2522		));
2523	}
2524
2525	#[test]
2526	fn disconnect_requested_peer() {
2527		let (mut notif, _controller, _notif_service) = development_notifs();
2528
2529		let peer = PeerId::random();
2530		notif.peers.insert((peer, 0.into()), PeerState::Requested);
2531		notif.disconnect_peer(&peer, 0.into());
2532
2533		assert!(std::matches!(notif.peers.get(&(peer, 0.into())), Some(PeerState::Requested)));
2534	}
2535
2536	#[test]
2537	fn disconnect_disabled_peer() {
2538		let (mut notif, _controller, _notif_service) = development_notifs();
2539		let peer = PeerId::random();
2540		notif.peers.insert(
2541			(peer, 0.into()),
2542			PeerState::Disabled { backoff_until: None, connections: SmallVec::new() },
2543		);
2544		notif.disconnect_peer(&peer, 0.into());
2545
2546		assert!(std::matches!(
2547			notif.peers.get(&(peer, 0.into())),
2548			Some(PeerState::Disabled { backoff_until: None, .. })
2549		));
2550	}
2551
2552	#[test]
2553	fn remote_opens_connection_and_substream() {
2554		let (mut notif, _controller, _notif_service) = development_notifs();
2555		let peer = PeerId::random();
2556		let conn = ConnectionId::new_unchecked(0);
2557		let connected = ConnectedPoint::Listener {
2558			local_addr: Multiaddr::empty(),
2559			send_back_addr: Multiaddr::empty(),
2560		};
2561
2562		notif.on_swarm_event(FromSwarm::ConnectionEstablished(
2563			libp2p::swarm::behaviour::ConnectionEstablished {
2564				peer_id: peer,
2565				connection_id: conn,
2566				endpoint: &connected,
2567				failed_addresses: &[],
2568				other_established: 0usize,
2569			},
2570		));
2571
2572		if let Some(&PeerState::Disabled { ref connections, backoff_until: None }) =
2573			notif.peers.get(&(peer, 0.into()))
2574		{
2575			assert_eq!(connections[0], (conn, ConnectionState::Closed));
2576		} else {
2577			panic!("invalid state");
2578		}
2579
2580		// remote opens a substream, verify that peer state is updated to `Incoming`
2581		notif.on_connection_handler_event(
2582			peer,
2583			conn,
2584			NotifsHandlerOut::OpenDesiredByRemote {
2585				protocol_index: 0,
2586				handshake: vec![1, 3, 3, 7],
2587			},
2588		);
2589
2590		if let Some(&PeerState::Incoming { ref connections, backoff_until: None, .. }) =
2591			notif.peers.get(&(peer, 0.into()))
2592		{
2593			assert_eq!(connections.len(), 1);
2594			assert_eq!(connections[0], (conn, ConnectionState::OpenDesiredByRemote));
2595		} else {
2596			panic!("invalid state");
2597		}
2598
2599		assert!(std::matches!(
2600			notif.incoming.pop(),
2601			Some(IncomingPeer { alive: true, incoming_id: IncomingIndex(0), .. }),
2602		));
2603	}
2604
2605	#[tokio::test]
2606	async fn disconnect_remote_substream_before_handled_by_controller() {
2607		let (mut notif, _controller, _notif_service) = development_notifs();
2608		let peer = PeerId::random();
2609		let conn = ConnectionId::new_unchecked(0);
2610		let connected = ConnectedPoint::Listener {
2611			local_addr: Multiaddr::empty(),
2612			send_back_addr: Multiaddr::empty(),
2613		};
2614
2615		notif.on_swarm_event(FromSwarm::ConnectionEstablished(
2616			libp2p::swarm::behaviour::ConnectionEstablished {
2617				peer_id: peer,
2618				connection_id: conn,
2619				endpoint: &connected,
2620				failed_addresses: &[],
2621				other_established: 0usize,
2622			},
2623		));
2624		notif.on_connection_handler_event(
2625			peer,
2626			conn,
2627			NotifsHandlerOut::OpenDesiredByRemote {
2628				protocol_index: 0,
2629				handshake: vec![1, 3, 3, 7],
2630			},
2631		);
2632		notif.disconnect_peer(&peer, 0.into());
2633
2634		if let Some(&PeerState::Disabled { ref connections, backoff_until: None }) =
2635			notif.peers.get(&(peer, 0.into()))
2636		{
2637			assert_eq!(connections.len(), 1);
2638			assert_eq!(connections[0], (conn, ConnectionState::Closing));
2639		} else {
2640			panic!("invalid state");
2641		}
2642	}
2643
2644	#[test]
2645	fn peerset_report_connect_backoff() {
2646		let (mut notif, _controller, _notif_service) = development_notifs();
2647		let set_id = SetId::from(0);
2648		let peer = PeerId::random();
2649		let conn = ConnectionId::new_unchecked(0);
2650		let connected = ConnectedPoint::Listener {
2651			local_addr: Multiaddr::empty(),
2652			send_back_addr: Multiaddr::empty(),
2653		};
2654
2655		notif.on_swarm_event(FromSwarm::ConnectionEstablished(
2656			libp2p::swarm::behaviour::ConnectionEstablished {
2657				peer_id: peer,
2658				connection_id: conn,
2659				endpoint: &connected,
2660				failed_addresses: &[],
2661				other_established: 0usize,
2662			},
2663		));
2664		assert!(std::matches!(notif.peers.get(&(peer, set_id)), Some(&PeerState::Disabled { .. })));
2665
2666		// manually add backoff for the entry
2667		//
2668		// there is not straight-forward way of adding backoff to `PeerState::Disabled`
2669		// so manually adjust the value in order to progress on to the next stage.
2670		// This modification together with `ConnectionClosed` will convert the peer
2671		// state into `PeerState::Backoff`.
2672		if let Some(PeerState::Disabled { ref mut backoff_until, .. }) =
2673			notif.peers.get_mut(&(peer, set_id))
2674		{
2675			*backoff_until =
2676				Some(Instant::now().checked_add(std::time::Duration::from_secs(5)).unwrap());
2677		}
2678
2679		notif.on_swarm_event(FromSwarm::ConnectionClosed(
2680			libp2p::swarm::behaviour::ConnectionClosed {
2681				peer_id: peer,
2682				connection_id: conn,
2683				endpoint: &connected.clone(),
2684				cause: None,
2685				remaining_established: 0usize,
2686			},
2687		));
2688
2689		let timer = if let Some(&PeerState::Backoff { timer_deadline, .. }) =
2690			notif.peers.get(&(peer, set_id))
2691		{
2692			timer_deadline
2693		} else {
2694			panic!("invalid state");
2695		};
2696
2697		// attempt to connect the backed-off peer and verify that the request is pending
2698		notif.peerset_report_connect(peer, set_id);
2699
2700		if let Some(&PeerState::PendingRequest { timer_deadline, .. }) =
2701			notif.peers.get(&(peer, set_id))
2702		{
2703			assert_eq!(timer, timer_deadline);
2704		} else {
2705			panic!("invalid state");
2706		}
2707	}
2708
2709	#[test]
2710	fn peerset_connect_incoming() {
2711		let (mut notif, _controller, _notif_service) = development_notifs();
2712		let peer = PeerId::random();
2713		let conn = ConnectionId::new_unchecked(0);
2714		let set_id = SetId::from(0);
2715		let connected = ConnectedPoint::Listener {
2716			local_addr: Multiaddr::empty(),
2717			send_back_addr: Multiaddr::empty(),
2718		};
2719
2720		notif.on_swarm_event(FromSwarm::ConnectionEstablished(
2721			libp2p::swarm::behaviour::ConnectionEstablished {
2722				peer_id: peer,
2723				connection_id: conn,
2724				endpoint: &connected,
2725				failed_addresses: &[],
2726				other_established: 0usize,
2727			},
2728		));
2729		assert!(std::matches!(notif.peers.get(&(peer, set_id)), Some(&PeerState::Disabled { .. })));
2730
2731		// remote opens a substream, verify that peer state is updated to `Incoming`
2732		notif.on_connection_handler_event(
2733			peer,
2734			conn,
2735			NotifsHandlerOut::OpenDesiredByRemote {
2736				protocol_index: 0,
2737				handshake: vec![1, 3, 3, 7],
2738			},
2739		);
2740
2741		// attempt to connect to the peer and verify that the peer state is `Enabled`;
2742		// we rely on implementation detail that incoming indices are counted from 0
2743		// to not mock the `Peerset`
2744		notif.protocol_report_accept(IncomingIndex(0));
2745		assert!(std::matches!(notif.peers.get(&(peer, set_id)), Some(&PeerState::Enabled { .. })));
2746	}
2747
2748	#[test]
2749	fn peerset_disconnect_disable_pending_enable() {
2750		let (mut notif, _controller, _notif_service) = development_notifs();
2751		let set_id = SetId::from(0);
2752		let peer = PeerId::random();
2753		let conn = ConnectionId::new_unchecked(0);
2754		let connected = ConnectedPoint::Listener {
2755			local_addr: Multiaddr::empty(),
2756			send_back_addr: Multiaddr::empty(),
2757		};
2758
2759		notif.on_swarm_event(FromSwarm::ConnectionEstablished(
2760			libp2p::swarm::behaviour::ConnectionEstablished {
2761				peer_id: peer,
2762				connection_id: conn,
2763				endpoint: &connected,
2764				failed_addresses: &[],
2765				other_established: 0usize,
2766			},
2767		));
2768		assert!(std::matches!(notif.peers.get(&(peer, set_id)), Some(&PeerState::Disabled { .. })));
2769
2770		// manually add backoff for the entry
2771		if let Some(PeerState::Disabled { ref mut backoff_until, .. }) =
2772			notif.peers.get_mut(&(peer, set_id))
2773		{
2774			*backoff_until =
2775				Some(Instant::now().checked_add(std::time::Duration::from_secs(5)).unwrap());
2776		}
2777
2778		// switch state to `DisabledPendingEnable`
2779		notif.peerset_report_connect(peer, set_id);
2780		assert!(std::matches!(
2781			notif.peers.get(&(peer, set_id)),
2782			Some(&PeerState::DisabledPendingEnable { .. })
2783		));
2784
2785		notif.peerset_report_disconnect(peer, set_id);
2786
2787		if let Some(PeerState::Disabled { backoff_until, .. }) = notif.peers.get(&(peer, set_id)) {
2788			assert!(backoff_until.is_some());
2789			assert!(backoff_until.unwrap() > Instant::now());
2790		} else {
2791			panic!("invalid state");
2792		}
2793	}
2794
2795	#[test]
2796	fn peerset_disconnect_enabled() {
2797		let (mut notif, _controller, _notif_service) = development_notifs();
2798		let peer = PeerId::random();
2799		let conn = ConnectionId::new_unchecked(0);
2800		let set_id = SetId::from(0);
2801		let connected = ConnectedPoint::Listener {
2802			local_addr: Multiaddr::empty(),
2803			send_back_addr: Multiaddr::empty(),
2804		};
2805
2806		// Set peer into `Enabled` state.
2807		notif.on_swarm_event(FromSwarm::ConnectionEstablished(
2808			libp2p::swarm::behaviour::ConnectionEstablished {
2809				peer_id: peer,
2810				connection_id: conn,
2811				endpoint: &connected,
2812				failed_addresses: &[],
2813				other_established: 0usize,
2814			},
2815		));
2816		notif.on_connection_handler_event(
2817			peer,
2818			conn,
2819			NotifsHandlerOut::OpenDesiredByRemote {
2820				protocol_index: 0,
2821				handshake: vec![1, 3, 3, 7],
2822			},
2823		);
2824		// we rely on the implementation detail that incoming indices are counted from 0
2825		// to not mock the `Peerset`
2826		notif.protocol_report_accept(IncomingIndex(0));
2827		assert!(std::matches!(notif.peers.get(&(peer, set_id)), Some(&PeerState::Enabled { .. })));
2828
2829		// disconnect peer and verify that the state is `Disabled`
2830		notif.peerset_report_disconnect(peer, set_id);
2831		assert!(std::matches!(notif.peers.get(&(peer, set_id)), Some(&PeerState::Disabled { .. })));
2832	}
2833
2834	#[test]
2835	fn peerset_disconnect_requested() {
2836		let (mut notif, _controller, _notif_service) = development_notifs();
2837		let peer = PeerId::random();
2838		let set_id = SetId::from(0);
2839
2840		// Set peer into `Requested` state.
2841		notif.peerset_report_connect(peer, set_id);
2842		assert!(std::matches!(notif.peers.get(&(peer, set_id)), Some(&PeerState::Requested)));
2843
2844		// disconnect peer and verify that the state is `Disabled`
2845		notif.peerset_report_disconnect(peer, set_id);
2846		assert!(notif.peers.get(&(peer, set_id)).is_none());
2847	}
2848
2849	#[test]
2850	fn peerset_disconnect_pending_request() {
2851		let (mut notif, _controller, _notif_service) = development_notifs();
2852		let set_id = SetId::from(0);
2853		let peer = PeerId::random();
2854		let conn = ConnectionId::new_unchecked(0);
2855		let connected = ConnectedPoint::Listener {
2856			local_addr: Multiaddr::empty(),
2857			send_back_addr: Multiaddr::empty(),
2858		};
2859
2860		notif.on_swarm_event(FromSwarm::ConnectionEstablished(
2861			libp2p::swarm::behaviour::ConnectionEstablished {
2862				peer_id: peer,
2863				connection_id: conn,
2864				endpoint: &connected,
2865				failed_addresses: &[],
2866				other_established: 0usize,
2867			},
2868		));
2869		assert!(std::matches!(notif.peers.get(&(peer, set_id)), Some(&PeerState::Disabled { .. })));
2870
2871		// manually add backoff for the entry
2872		if let Some(PeerState::Disabled { ref mut backoff_until, .. }) =
2873			notif.peers.get_mut(&(peer, set_id))
2874		{
2875			*backoff_until =
2876				Some(Instant::now().checked_add(std::time::Duration::from_secs(5)).unwrap());
2877		}
2878
2879		notif.on_swarm_event(FromSwarm::ConnectionClosed(
2880			libp2p::swarm::behaviour::ConnectionClosed {
2881				peer_id: peer,
2882				connection_id: conn,
2883				endpoint: &connected.clone(),
2884				cause: None,
2885				remaining_established: 0usize,
2886			},
2887		));
2888		assert!(std::matches!(notif.peers.get(&(peer, set_id)), Some(&PeerState::Backoff { .. })));
2889
2890		// attempt to connect the backed-off peer and verify that the request is pending
2891		notif.peerset_report_connect(peer, set_id);
2892		assert!(std::matches!(
2893			notif.peers.get(&(peer, set_id)),
2894			Some(&PeerState::PendingRequest { .. })
2895		));
2896
2897		// attempt to disconnect the backed-off peer and verify that the request is pending
2898		notif.peerset_report_disconnect(peer, set_id);
2899		assert!(std::matches!(notif.peers.get(&(peer, set_id)), Some(&PeerState::Backoff { .. })));
2900	}
2901
2902	#[test]
2903	fn peerset_accept_peer_not_alive() {
2904		let (mut notif, _controller, _notif_service) = development_notifs();
2905		let peer = PeerId::random();
2906		let conn = ConnectionId::new_unchecked(0);
2907		let set_id = SetId::from(0);
2908		let connected = ConnectedPoint::Listener {
2909			local_addr: Multiaddr::empty(),
2910			send_back_addr: Multiaddr::empty(),
2911		};
2912
2913		notif.on_swarm_event(FromSwarm::ConnectionEstablished(
2914			libp2p::swarm::behaviour::ConnectionEstablished {
2915				peer_id: peer,
2916				connection_id: conn,
2917				endpoint: &connected,
2918				failed_addresses: &[],
2919				other_established: 0usize,
2920			},
2921		));
2922		assert!(std::matches!(notif.peers.get(&(peer, set_id)), Some(&PeerState::Disabled { .. })));
2923
2924		// remote opens a substream, verify that peer state is updated to `Incoming`
2925		notif.on_connection_handler_event(
2926			peer,
2927			conn,
2928			NotifsHandlerOut::OpenDesiredByRemote {
2929				protocol_index: 0,
2930				handshake: vec![1, 3, 3, 7],
2931			},
2932		);
2933		assert!(std::matches!(notif.peers.get(&(peer, set_id)), Some(&PeerState::Incoming { .. })));
2934
2935		assert!(std::matches!(
2936			notif.incoming[0],
2937			IncomingPeer { alive: true, incoming_id: IncomingIndex(0), .. },
2938		));
2939
2940		notif.disconnect_peer(&peer, set_id);
2941		assert!(std::matches!(notif.peers.get(&(peer, set_id)), Some(&PeerState::Disabled { .. })));
2942		assert!(std::matches!(
2943			notif.incoming[0],
2944			IncomingPeer { alive: false, incoming_id: IncomingIndex(0), .. },
2945		));
2946
2947		notif.protocol_report_accept(IncomingIndex(0));
2948		assert_eq!(notif.incoming.len(), 0);
2949		assert!(std::matches!(notif.peers.get(&(peer, set_id)), Some(PeerState::Disabled { .. })));
2950	}
2951
2952	#[test]
2953	fn secondary_connection_peer_state_incoming() {
2954		let (mut notif, _controller, _notif_service) = development_notifs();
2955		let peer = PeerId::random();
2956		let conn = ConnectionId::new_unchecked(0);
2957		let conn2 = ConnectionId::new_unchecked(1);
2958		let set_id = SetId::from(0);
2959		let connected = ConnectedPoint::Listener {
2960			local_addr: Multiaddr::empty(),
2961			send_back_addr: Multiaddr::empty(),
2962		};
2963
2964		notif.on_swarm_event(FromSwarm::ConnectionEstablished(
2965			libp2p::swarm::behaviour::ConnectionEstablished {
2966				peer_id: peer,
2967				connection_id: conn,
2968				endpoint: &connected,
2969				failed_addresses: &[],
2970				other_established: 0usize,
2971			},
2972		));
2973		assert!(std::matches!(notif.peers.get(&(peer, set_id)), Some(&PeerState::Disabled { .. })));
2974
2975		notif.on_connection_handler_event(
2976			peer,
2977			conn,
2978			NotifsHandlerOut::OpenDesiredByRemote {
2979				protocol_index: 0,
2980				handshake: vec![1, 3, 3, 7],
2981			},
2982		);
2983		if let Some(PeerState::Incoming { connections, .. }) = notif.peers.get(&(peer, set_id)) {
2984			assert_eq!(connections.len(), 1);
2985			assert_eq!(connections[0], (conn, ConnectionState::OpenDesiredByRemote));
2986		} else {
2987			panic!("invalid state");
2988		}
2989
2990		// add another connection
2991		notif.on_swarm_event(FromSwarm::ConnectionEstablished(
2992			libp2p::swarm::behaviour::ConnectionEstablished {
2993				peer_id: peer,
2994				connection_id: conn2,
2995				endpoint: &connected,
2996				failed_addresses: &[],
2997				other_established: 0usize,
2998			},
2999		));
3000
3001		if let Some(PeerState::Incoming { connections, .. }) = notif.peers.get(&(peer, set_id)) {
3002			assert_eq!(connections.len(), 2);
3003			assert_eq!(connections[0], (conn, ConnectionState::OpenDesiredByRemote));
3004			assert_eq!(connections[1], (conn2, ConnectionState::Closed));
3005		} else {
3006			panic!("invalid state");
3007		}
3008	}
3009
3010	#[test]
3011	fn close_connection_for_disabled_peer() {
3012		let (mut notif, _controller, _notif_service) = development_notifs();
3013		let peer = PeerId::random();
3014		let conn = ConnectionId::new_unchecked(0);
3015		let set_id = SetId::from(0);
3016		let connected = ConnectedPoint::Listener {
3017			local_addr: Multiaddr::empty(),
3018			send_back_addr: Multiaddr::empty(),
3019		};
3020
3021		notif.on_swarm_event(FromSwarm::ConnectionEstablished(
3022			libp2p::swarm::behaviour::ConnectionEstablished {
3023				peer_id: peer,
3024				connection_id: conn,
3025				endpoint: &connected,
3026				failed_addresses: &[],
3027				other_established: 0usize,
3028			},
3029		));
3030		assert!(std::matches!(notif.peers.get(&(peer, set_id)), Some(&PeerState::Disabled { .. })));
3031
3032		notif.on_swarm_event(FromSwarm::ConnectionClosed(
3033			libp2p::swarm::behaviour::ConnectionClosed {
3034				peer_id: peer,
3035				connection_id: conn,
3036				endpoint: &connected.clone(),
3037				cause: None,
3038				remaining_established: 0usize,
3039			},
3040		));
3041		assert!(notif.peers.get(&(peer, set_id)).is_none());
3042	}
3043
3044	#[test]
3045	fn close_connection_for_incoming_peer_one_connection() {
3046		let (mut notif, _controller, _notif_service) = development_notifs();
3047		let peer = PeerId::random();
3048		let conn = ConnectionId::new_unchecked(0);
3049		let set_id = SetId::from(0);
3050		let connected = ConnectedPoint::Listener {
3051			local_addr: Multiaddr::empty(),
3052			send_back_addr: Multiaddr::empty(),
3053		};
3054
3055		notif.on_swarm_event(FromSwarm::ConnectionEstablished(
3056			libp2p::swarm::behaviour::ConnectionEstablished {
3057				peer_id: peer,
3058				connection_id: conn,
3059				endpoint: &connected,
3060				failed_addresses: &[],
3061				other_established: 0usize,
3062			},
3063		));
3064		assert!(std::matches!(notif.peers.get(&(peer, set_id)), Some(&PeerState::Disabled { .. })));
3065
3066		notif.on_connection_handler_event(
3067			peer,
3068			conn,
3069			NotifsHandlerOut::OpenDesiredByRemote {
3070				protocol_index: 0,
3071				handshake: vec![1, 3, 3, 7],
3072			},
3073		);
3074		assert!(std::matches!(notif.peers.get(&(peer, set_id)), Some(&PeerState::Incoming { .. })));
3075
3076		notif.on_swarm_event(FromSwarm::ConnectionClosed(
3077			libp2p::swarm::behaviour::ConnectionClosed {
3078				peer_id: peer,
3079				connection_id: conn,
3080				endpoint: &connected.clone(),
3081				cause: None,
3082				remaining_established: 0usize,
3083			},
3084		));
3085		assert!(notif.peers.get(&(peer, set_id)).is_none());
3086		assert!(std::matches!(
3087			notif.incoming[0],
3088			IncomingPeer { alive: false, incoming_id: IncomingIndex(0), .. },
3089		));
3090	}
3091
3092	#[test]
3093	fn close_connection_for_incoming_peer_two_connections() {
3094		let (mut notif, _controller, _notif_service) = development_notifs();
3095		let peer = PeerId::random();
3096		let conn = ConnectionId::new_unchecked(0);
3097		let conn1 = ConnectionId::new_unchecked(1);
3098		let set_id = SetId::from(0);
3099		let connected = ConnectedPoint::Listener {
3100			local_addr: Multiaddr::empty(),
3101			send_back_addr: Multiaddr::empty(),
3102		};
3103		let mut conns = SmallVec::<
3104			[(ConnectionId, ConnectionState); crate::MAX_CONNECTIONS_PER_PEER],
3105		>::from(vec![(conn, ConnectionState::Closed)]);
3106
3107		notif.on_swarm_event(FromSwarm::ConnectionEstablished(
3108			libp2p::swarm::behaviour::ConnectionEstablished {
3109				peer_id: peer,
3110				connection_id: conn,
3111				endpoint: &connected,
3112				failed_addresses: &[],
3113				other_established: 0usize,
3114			},
3115		));
3116		assert!(std::matches!(notif.peers.get(&(peer, set_id)), Some(&PeerState::Disabled { .. })));
3117
3118		notif.on_connection_handler_event(
3119			peer,
3120			conn,
3121			NotifsHandlerOut::OpenDesiredByRemote {
3122				protocol_index: 0,
3123				handshake: vec![1, 3, 3, 7],
3124			},
3125		);
3126		assert!(std::matches!(notif.peers.get(&(peer, set_id)), Some(&PeerState::Incoming { .. })));
3127
3128		notif.on_swarm_event(FromSwarm::ConnectionEstablished(
3129			libp2p::swarm::behaviour::ConnectionEstablished {
3130				peer_id: peer,
3131				connection_id: conn1,
3132				endpoint: &connected,
3133				failed_addresses: &[],
3134				other_established: 0usize,
3135			},
3136		));
3137		conns.push((conn1, ConnectionState::Closed));
3138
3139		if let Some(PeerState::Incoming { ref connections, .. }) = notif.peers.get(&(peer, set_id))
3140		{
3141			assert_eq!(connections.len(), 2);
3142			assert_eq!(connections[0], (conn, ConnectionState::OpenDesiredByRemote));
3143			assert_eq!(connections[1], (conn1, ConnectionState::Closed));
3144		}
3145
3146		notif.on_swarm_event(FromSwarm::ConnectionClosed(
3147			libp2p::swarm::behaviour::ConnectionClosed {
3148				peer_id: peer,
3149				connection_id: conn,
3150				endpoint: &connected.clone(),
3151				cause: None,
3152				remaining_established: 0usize,
3153			},
3154		));
3155
3156		if let Some(PeerState::Disabled { connections, .. }) = notif.peers.get(&(peer, set_id)) {
3157			assert_eq!(connections.len(), 1);
3158			assert_eq!(connections[0], (conn1, ConnectionState::Closed));
3159		} else {
3160			panic!("invalid state");
3161		}
3162	}
3163
3164	#[test]
3165	fn connection_and_substream_open() {
3166		let (mut notif, _controller, _notif_service) = development_notifs();
3167		let peer = PeerId::random();
3168		let conn = ConnectionId::new_unchecked(0);
3169		let set_id = SetId::from(0);
3170		let connected = ConnectedPoint::Listener {
3171			local_addr: Multiaddr::empty(),
3172			send_back_addr: Multiaddr::empty(),
3173		};
3174		let mut conn_yielder = ConnectionYielder::new();
3175
3176		// move the peer to `Enabled` state
3177		notif.on_swarm_event(FromSwarm::ConnectionEstablished(
3178			libp2p::swarm::behaviour::ConnectionEstablished {
3179				peer_id: peer,
3180				connection_id: conn,
3181				endpoint: &connected,
3182				failed_addresses: &[],
3183				other_established: 0usize,
3184			},
3185		));
3186		assert!(std::matches!(notif.peers.get(&(peer, set_id)), Some(&PeerState::Disabled { .. })));
3187
3188		notif.on_connection_handler_event(
3189			peer,
3190			conn,
3191			NotifsHandlerOut::OpenDesiredByRemote {
3192				protocol_index: 0,
3193				handshake: vec![1, 3, 3, 7],
3194			},
3195		);
3196		assert!(std::matches!(notif.peers.get(&(peer, set_id)), Some(&PeerState::Incoming { .. })));
3197
3198		// We rely on the implementation detail that incoming indices are counted
3199		// from 0 to not mock the `Peerset`.
3200		notif.protocol_report_accept(IncomingIndex(0));
3201		assert!(std::matches!(notif.peers.get(&(peer, set_id)), Some(&PeerState::Enabled { .. })));
3202
3203		// open new substream
3204		let event = conn_yielder.open_substream(peer, 0, vec![1, 2, 3, 4]);
3205
3206		notif.on_connection_handler_event(peer, conn, event);
3207		assert!(std::matches!(notif.peers.get(&(peer, set_id)), Some(&PeerState::Enabled { .. })));
3208
3209		if let Some(PeerState::Enabled { ref connections, .. }) = notif.peers.get(&(peer, set_id)) {
3210			assert_eq!(connections.len(), 1);
3211			assert_eq!(connections[0].0, conn);
3212			assert!(std::matches!(connections[0].1, ConnectionState::Open(_)));
3213		}
3214
3215		assert!(std::matches!(
3216			notif.events[notif.events.len() - 1],
3217			ToSwarm::GenerateEvent(NotificationsOut::CustomProtocolOpen { .. })
3218		));
3219	}
3220
3221	#[test]
3222	fn connection_closed_sink_replaced() {
3223		let (mut notif, _controller, _notif_service) = development_notifs();
3224		let peer = PeerId::random();
3225		let conn1 = ConnectionId::new_unchecked(0);
3226		let conn2 = ConnectionId::new_unchecked(1);
3227		let set_id = SetId::from(0);
3228		let connected = ConnectedPoint::Listener {
3229			local_addr: Multiaddr::empty(),
3230			send_back_addr: Multiaddr::empty(),
3231		};
3232		let mut conn_yielder = ConnectionYielder::new();
3233
3234		// open two connections
3235		for conn_id in vec![conn1, conn2] {
3236			notif.on_swarm_event(FromSwarm::ConnectionEstablished(
3237				libp2p::swarm::behaviour::ConnectionEstablished {
3238					peer_id: peer,
3239					connection_id: conn_id,
3240					endpoint: &connected,
3241					failed_addresses: &[],
3242					other_established: 0usize,
3243				},
3244			));
3245		}
3246
3247		if let Some(PeerState::Disabled { connections, .. }) = notif.peers.get(&(peer, set_id)) {
3248			assert_eq!(connections[0], (conn1, ConnectionState::Closed));
3249			assert_eq!(connections[1], (conn2, ConnectionState::Closed));
3250		} else {
3251			panic!("invalid state");
3252		}
3253
3254		// open substreams on both active connections
3255		notif.peerset_report_connect(peer, set_id);
3256		notif.on_connection_handler_event(
3257			peer,
3258			conn2,
3259			NotifsHandlerOut::OpenDesiredByRemote {
3260				protocol_index: 0,
3261				handshake: vec![1, 3, 3, 7],
3262			},
3263		);
3264
3265		if let Some(PeerState::Enabled { connections, .. }) = notif.peers.get(&(peer, set_id)) {
3266			assert_eq!(connections[0], (conn1, ConnectionState::Opening));
3267			assert_eq!(connections[1], (conn2, ConnectionState::Opening));
3268		} else {
3269			panic!("invalid state");
3270		}
3271
3272		// add two new substreams, one for each connection and verify that both are in open state
3273		for conn in vec![conn1, conn2].iter() {
3274			notif.on_connection_handler_event(
3275				peer,
3276				*conn,
3277				conn_yielder.open_substream(peer, 0, vec![1, 2, 3, 4]),
3278			);
3279		}
3280
3281		if let Some(PeerState::Enabled { ref connections, .. }) = notif.peers.get(&(peer, set_id)) {
3282			assert_eq!(connections[0].0, conn1);
3283			assert!(std::matches!(connections[0].1, ConnectionState::Open(_)));
3284			assert_eq!(connections[1].0, conn2);
3285			assert!(std::matches!(connections[1].1, ConnectionState::Open(_)));
3286		} else {
3287			panic!("invalid state");
3288		}
3289
3290		// check peer information
3291		assert_eq!(notif.open_peers().collect::<Vec<_>>(), vec![&peer],);
3292
3293		// close the other connection and verify that notification replacement event is emitted
3294		notif.on_swarm_event(FromSwarm::ConnectionClosed(
3295			libp2p::swarm::behaviour::ConnectionClosed {
3296				peer_id: peer,
3297				connection_id: conn1,
3298				endpoint: &connected.clone(),
3299				cause: None,
3300				remaining_established: 0usize,
3301			},
3302		));
3303
3304		if let Some(PeerState::Enabled { ref connections, .. }) = notif.peers.get(&(peer, set_id)) {
3305			assert_eq!(connections.len(), 1);
3306			assert_eq!(connections[0].0, conn2);
3307			assert!(std::matches!(connections[0].1, ConnectionState::Open(_)));
3308		} else {
3309			panic!("invalid state");
3310		}
3311
3312		assert!(std::matches!(
3313			notif.events[notif.events.len() - 1],
3314			ToSwarm::GenerateEvent(NotificationsOut::CustomProtocolReplaced { .. })
3315		));
3316	}
3317
3318	#[test]
3319	fn dial_failure_for_requested_peer() {
3320		let (mut notif, _controller, _notif_service) = development_notifs();
3321		let peer = PeerId::random();
3322		let set_id = SetId::from(0);
3323
3324		// Set peer into `Requested` state.
3325		notif.peerset_report_connect(peer, set_id);
3326		assert!(std::matches!(notif.peers.get(&(peer, set_id)), Some(&PeerState::Requested)));
3327
3328		notif.on_swarm_event(FromSwarm::DialFailure(libp2p::swarm::behaviour::DialFailure {
3329			peer_id: Some(peer),
3330			error: &libp2p::swarm::DialError::Aborted,
3331			connection_id: ConnectionId::new_unchecked(1337),
3332		}));
3333
3334		if let Some(PeerState::Backoff { timer_deadline, .. }) = notif.peers.get(&(peer, set_id)) {
3335			assert!(timer_deadline > &Instant::now());
3336		} else {
3337			panic!("invalid state");
3338		}
3339	}
3340
3341	#[tokio::test]
3342	async fn write_notification() {
3343		let (mut notif, _controller, _notif_service) = development_notifs();
3344		let peer = PeerId::random();
3345		let conn = ConnectionId::new_unchecked(0);
3346		let set_id = SetId::from(0);
3347		let connected = ConnectedPoint::Listener {
3348			local_addr: Multiaddr::empty(),
3349			send_back_addr: Multiaddr::empty(),
3350		};
3351		let mut conn_yielder = ConnectionYielder::new();
3352
3353		notif.on_swarm_event(FromSwarm::ConnectionEstablished(
3354			libp2p::swarm::behaviour::ConnectionEstablished {
3355				peer_id: peer,
3356				connection_id: conn,
3357				endpoint: &connected,
3358				failed_addresses: &[],
3359				other_established: 0usize,
3360			},
3361		));
3362		assert!(std::matches!(notif.peers.get(&(peer, set_id)), Some(&PeerState::Disabled { .. })));
3363
3364		notif.peerset_report_connect(peer, set_id);
3365		assert!(std::matches!(notif.peers.get(&(peer, set_id)), Some(&PeerState::Enabled { .. })));
3366
3367		notif.on_connection_handler_event(
3368			peer,
3369			conn,
3370			conn_yielder.open_substream(peer, 0, vec![1, 2, 3, 4]),
3371		);
3372
3373		if let Some(PeerState::Enabled { ref connections, .. }) = notif.peers.get(&(peer, set_id)) {
3374			assert_eq!(connections[0].0, conn);
3375			assert!(std::matches!(connections[0].1, ConnectionState::Open(_)));
3376		} else {
3377			panic!("invalid state");
3378		}
3379
3380		notif
3381			.peers
3382			.get(&(peer, set_id))
3383			.unwrap()
3384			.get_open()
3385			.unwrap()
3386			.send_sync_notification(vec![1, 3, 3, 7]);
3387		assert_eq!(conn_yielder.get_next_event(peer, set_id.into()).await, Some(vec![1, 3, 3, 7]));
3388	}
3389
3390	#[test]
3391	fn peerset_report_connect_backoff_expired() {
3392		let (mut notif, _controller, _notif_service) = development_notifs();
3393		let set_id = SetId::from(0);
3394		let peer = PeerId::random();
3395		let conn = ConnectionId::new_unchecked(0);
3396		let connected = ConnectedPoint::Listener {
3397			local_addr: Multiaddr::empty(),
3398			send_back_addr: Multiaddr::empty(),
3399		};
3400		let backoff_duration = Duration::from_millis(100);
3401
3402		notif.on_swarm_event(FromSwarm::ConnectionEstablished(
3403			libp2p::swarm::behaviour::ConnectionEstablished {
3404				peer_id: peer,
3405				connection_id: conn,
3406				endpoint: &connected,
3407				failed_addresses: &[],
3408				other_established: 0usize,
3409			},
3410		));
3411		assert!(std::matches!(notif.peers.get(&(peer, set_id)), Some(&PeerState::Disabled { .. })));
3412
3413		// manually add backoff for the entry
3414		if let Some(PeerState::Disabled { ref mut backoff_until, .. }) =
3415			notif.peers.get_mut(&(peer, set_id))
3416		{
3417			*backoff_until = Some(Instant::now().checked_add(backoff_duration).unwrap());
3418		}
3419
3420		notif.on_swarm_event(FromSwarm::ConnectionClosed(
3421			libp2p::swarm::behaviour::ConnectionClosed {
3422				peer_id: peer,
3423				connection_id: conn,
3424				endpoint: &connected.clone(),
3425				cause: None,
3426				remaining_established: 0usize,
3427			},
3428		));
3429
3430		// wait until the backoff time has passed
3431		std::thread::sleep(backoff_duration * 2);
3432
3433		// attempt to connect the backed-off peer and verify that the request is pending
3434		notif.peerset_report_connect(peer, set_id);
3435		assert!(std::matches!(notif.peers.get(&(peer, set_id)), Some(&PeerState::Requested { .. })))
3436	}
3437
3438	#[test]
3439	fn peerset_report_disconnect_disabled() {
3440		let (mut notif, _controller, _notif_service) = development_notifs();
3441		let peer = PeerId::random();
3442		let set_id = SetId::from(0);
3443		let conn = ConnectionId::new_unchecked(0);
3444		let connected = ConnectedPoint::Listener {
3445			local_addr: Multiaddr::empty(),
3446			send_back_addr: Multiaddr::empty(),
3447		};
3448
3449		notif.on_swarm_event(FromSwarm::ConnectionEstablished(
3450			libp2p::swarm::behaviour::ConnectionEstablished {
3451				peer_id: peer,
3452				connection_id: conn,
3453				endpoint: &connected,
3454				failed_addresses: &[],
3455				other_established: 0usize,
3456			},
3457		));
3458		assert!(std::matches!(notif.peers.get(&(peer, set_id)), Some(&PeerState::Disabled { .. })));
3459
3460		notif.peerset_report_disconnect(peer, set_id);
3461		assert!(std::matches!(notif.peers.get(&(peer, set_id)), Some(&PeerState::Disabled { .. })));
3462	}
3463
3464	#[test]
3465	fn peerset_report_disconnect_backoff() {
3466		let (mut notif, _controller, _notif_service) = development_notifs();
3467		let set_id = SetId::from(0);
3468		let peer = PeerId::random();
3469		let conn = ConnectionId::new_unchecked(0);
3470		let connected = ConnectedPoint::Listener {
3471			local_addr: Multiaddr::empty(),
3472			send_back_addr: Multiaddr::empty(),
3473		};
3474		let backoff_duration = Duration::from_secs(2);
3475
3476		notif.on_swarm_event(FromSwarm::ConnectionEstablished(
3477			libp2p::swarm::behaviour::ConnectionEstablished {
3478				peer_id: peer,
3479				connection_id: conn,
3480				endpoint: &connected,
3481				failed_addresses: &[],
3482				other_established: 0usize,
3483			},
3484		));
3485		assert!(std::matches!(notif.peers.get(&(peer, set_id)), Some(&PeerState::Disabled { .. })));
3486
3487		// manually add backoff for the entry
3488		if let Some(PeerState::Disabled { ref mut backoff_until, .. }) =
3489			notif.peers.get_mut(&(peer, set_id))
3490		{
3491			*backoff_until = Some(Instant::now().checked_add(backoff_duration).unwrap());
3492		}
3493
3494		notif.on_swarm_event(FromSwarm::ConnectionClosed(
3495			libp2p::swarm::behaviour::ConnectionClosed {
3496				peer_id: peer,
3497				connection_id: conn,
3498				endpoint: &connected.clone(),
3499				cause: None,
3500				remaining_established: 0usize,
3501			},
3502		));
3503
3504		assert!(std::matches!(notif.peers.get(&(peer, set_id)), Some(&PeerState::Backoff { .. })));
3505
3506		notif.peerset_report_disconnect(peer, set_id);
3507		assert!(std::matches!(notif.peers.get(&(peer, set_id)), Some(&PeerState::Backoff { .. })));
3508	}
3509
3510	#[test]
3511	fn peer_is_backed_off_if_both_connections_get_closed_while_peer_is_disabled_with_back_off() {
3512		let (mut notif, _controller, _notif_service) = development_notifs();
3513		let set_id = SetId::from(0);
3514		let peer = PeerId::random();
3515		let conn1 = ConnectionId::new_unchecked(0);
3516		let conn2 = ConnectionId::new_unchecked(1);
3517		let connected = ConnectedPoint::Listener {
3518			local_addr: Multiaddr::empty(),
3519			send_back_addr: Multiaddr::empty(),
3520		};
3521
3522		notif.on_swarm_event(FromSwarm::ConnectionEstablished(
3523			libp2p::swarm::behaviour::ConnectionEstablished {
3524				peer_id: peer,
3525				connection_id: conn1,
3526				endpoint: &connected,
3527				failed_addresses: &[],
3528				other_established: 0usize,
3529			},
3530		));
3531		notif.on_swarm_event(FromSwarm::ConnectionEstablished(
3532			libp2p::swarm::behaviour::ConnectionEstablished {
3533				peer_id: peer,
3534				connection_id: conn2,
3535				endpoint: &connected,
3536				failed_addresses: &[],
3537				other_established: 0usize,
3538			},
3539		));
3540		assert!(std::matches!(notif.peers.get(&(peer, set_id)), Some(&PeerState::Disabled { .. })));
3541
3542		// manually add backoff for the entry
3543		if let Some(PeerState::Disabled { ref mut backoff_until, .. }) =
3544			notif.peers.get_mut(&(peer, set_id))
3545		{
3546			*backoff_until =
3547				Some(Instant::now().checked_add(std::time::Duration::from_secs(5)).unwrap());
3548		}
3549
3550		// switch state to `DisabledPendingEnable`
3551		notif.peerset_report_connect(peer, set_id);
3552		assert!(std::matches!(
3553			notif.peers.get(&(peer, set_id)),
3554			Some(&PeerState::DisabledPendingEnable { .. })
3555		));
3556
3557		notif.on_swarm_event(FromSwarm::ConnectionClosed(
3558			libp2p::swarm::behaviour::ConnectionClosed {
3559				peer_id: peer,
3560				connection_id: conn1,
3561				endpoint: &connected.clone(),
3562				cause: None,
3563				remaining_established: 0usize,
3564			},
3565		));
3566		assert!(std::matches!(
3567			notif.peers.get(&(peer, set_id)),
3568			Some(&PeerState::DisabledPendingEnable { .. })
3569		));
3570
3571		notif.on_swarm_event(FromSwarm::ConnectionClosed(
3572			libp2p::swarm::behaviour::ConnectionClosed {
3573				peer_id: peer,
3574				connection_id: conn2,
3575				endpoint: &connected.clone(),
3576				cause: None,
3577				remaining_established: 0usize,
3578			},
3579		));
3580		assert!(std::matches!(notif.peers.get(&(peer, set_id)), Some(&PeerState::Backoff { .. })));
3581	}
3582
3583	#[test]
3584	fn inject_connection_closed_incoming_with_backoff() {
3585		let (mut notif, _controller, _notif_service) = development_notifs();
3586		let peer = PeerId::random();
3587		let set_id = SetId::from(0);
3588		let conn = ConnectionId::new_unchecked(0);
3589		let connected = ConnectedPoint::Listener {
3590			local_addr: Multiaddr::empty(),
3591			send_back_addr: Multiaddr::empty(),
3592		};
3593
3594		notif.on_swarm_event(FromSwarm::ConnectionEstablished(
3595			libp2p::swarm::behaviour::ConnectionEstablished {
3596				peer_id: peer,
3597				connection_id: conn,
3598				endpoint: &connected,
3599				failed_addresses: &[],
3600				other_established: 0usize,
3601			},
3602		));
3603		assert!(std::matches!(notif.peers.get(&(peer, set_id)), Some(&PeerState::Disabled { .. })));
3604
3605		// remote opens a substream, verify that peer state is updated to `Incoming`
3606		notif.on_connection_handler_event(
3607			peer,
3608			conn,
3609			NotifsHandlerOut::OpenDesiredByRemote {
3610				protocol_index: 0,
3611				handshake: vec![1, 3, 3, 7],
3612			},
3613		);
3614
3615		// manually add backoff for the entry
3616		if let Some(&mut PeerState::Incoming { ref mut backoff_until, .. }) =
3617			notif.peers.get_mut(&(peer, 0.into()))
3618		{
3619			*backoff_until =
3620				Some(Instant::now().checked_add(std::time::Duration::from_secs(5)).unwrap());
3621		} else {
3622			panic!("invalid state");
3623		}
3624
3625		notif.on_swarm_event(FromSwarm::ConnectionClosed(
3626			libp2p::swarm::behaviour::ConnectionClosed {
3627				peer_id: peer,
3628				connection_id: conn,
3629				endpoint: &connected.clone(),
3630				cause: None,
3631				remaining_established: 0usize,
3632			},
3633		));
3634		assert!(std::matches!(notif.peers.get(&(peer, set_id)), Some(&PeerState::Backoff { .. })));
3635	}
3636
3637	#[test]
3638	fn two_connections_inactive_connection_gets_closed_peer_state_is_still_incoming() {
3639		let (mut notif, _controller, _notif_service) = development_notifs();
3640		let peer = PeerId::random();
3641		let conn1 = ConnectionId::new_unchecked(0);
3642		let conn2 = ConnectionId::new_unchecked(1);
3643		let set_id = SetId::from(0);
3644		let connected = ConnectedPoint::Listener {
3645			local_addr: Multiaddr::empty(),
3646			send_back_addr: Multiaddr::empty(),
3647		};
3648
3649		// open two connections
3650		for conn_id in vec![conn1, conn2] {
3651			notif.on_swarm_event(FromSwarm::ConnectionEstablished(
3652				libp2p::swarm::behaviour::ConnectionEstablished {
3653					peer_id: peer,
3654					connection_id: conn_id,
3655					endpoint: &connected,
3656					failed_addresses: &[],
3657					other_established: 0usize,
3658				},
3659			));
3660		}
3661
3662		if let Some(PeerState::Disabled { connections, .. }) = notif.peers.get(&(peer, set_id)) {
3663			assert_eq!(connections[0], (conn1, ConnectionState::Closed));
3664			assert_eq!(connections[1], (conn2, ConnectionState::Closed));
3665		} else {
3666			panic!("invalid state");
3667		}
3668
3669		// remote opens a substream, verify that peer state is updated to `Incoming`
3670		notif.on_connection_handler_event(
3671			peer,
3672			conn1,
3673			NotifsHandlerOut::OpenDesiredByRemote {
3674				protocol_index: 0,
3675				handshake: vec![1, 3, 3, 7],
3676			},
3677		);
3678		assert!(std::matches!(
3679			notif.peers.get_mut(&(peer, 0.into())),
3680			Some(&mut PeerState::Incoming { .. })
3681		));
3682
3683		notif.on_swarm_event(FromSwarm::ConnectionClosed(
3684			libp2p::swarm::behaviour::ConnectionClosed {
3685				peer_id: peer,
3686				connection_id: conn2,
3687				endpoint: &connected.clone(),
3688				cause: None,
3689				remaining_established: 0usize,
3690			},
3691		));
3692		assert!(std::matches!(notif.peers.get(&(peer, set_id)), Some(&PeerState::Incoming { .. })));
3693	}
3694
3695	#[test]
3696	fn two_connections_active_connection_gets_closed_peer_state_is_disabled() {
3697		let (mut notif, _controller, _notif_service) = development_notifs();
3698		let peer = PeerId::random();
3699		let conn1 = ConnectionId::new_unchecked(0);
3700		let conn2 = ConnectionId::new_unchecked(1);
3701		let set_id = SetId::from(0);
3702		let connected = ConnectedPoint::Listener {
3703			local_addr: Multiaddr::empty(),
3704			send_back_addr: Multiaddr::empty(),
3705		};
3706
3707		// open two connections
3708		for conn_id in vec![conn1, conn2] {
3709			notif.on_swarm_event(FromSwarm::ConnectionEstablished(
3710				libp2p::swarm::behaviour::ConnectionEstablished {
3711					peer_id: peer,
3712					connection_id: conn_id,
3713					endpoint: &ConnectedPoint::Listener {
3714						local_addr: Multiaddr::empty(),
3715						send_back_addr: Multiaddr::empty(),
3716					},
3717					failed_addresses: &[],
3718					other_established: 0usize,
3719				},
3720			));
3721		}
3722
3723		if let Some(PeerState::Disabled { connections, .. }) = notif.peers.get(&(peer, set_id)) {
3724			assert_eq!(connections[0], (conn1, ConnectionState::Closed));
3725			assert_eq!(connections[1], (conn2, ConnectionState::Closed));
3726		} else {
3727			panic!("invalid state");
3728		}
3729
3730		// remote opens a substream, verify that peer state is updated to `Incoming`
3731		notif.on_connection_handler_event(
3732			peer,
3733			conn1,
3734			NotifsHandlerOut::OpenDesiredByRemote {
3735				protocol_index: 0,
3736				handshake: vec![1, 3, 3, 7],
3737			},
3738		);
3739		assert!(std::matches!(
3740			notif.peers.get_mut(&(peer, 0.into())),
3741			Some(PeerState::Incoming { .. })
3742		));
3743
3744		notif.on_swarm_event(FromSwarm::ConnectionClosed(
3745			libp2p::swarm::behaviour::ConnectionClosed {
3746				peer_id: peer,
3747				connection_id: conn1,
3748				endpoint: &connected.clone(),
3749				cause: None,
3750				remaining_established: 0usize,
3751			},
3752		));
3753		assert!(std::matches!(notif.peers.get(&(peer, set_id)), Some(&PeerState::Disabled { .. })));
3754	}
3755
3756	#[test]
3757	fn inject_connection_closed_for_active_connection() {
3758		let (mut notif, _controller, _notif_service) = development_notifs();
3759		let peer = PeerId::random();
3760		let conn1 = ConnectionId::new_unchecked(0);
3761		let conn2 = ConnectionId::new_unchecked(1);
3762		let set_id = SetId::from(0);
3763		let connected = ConnectedPoint::Listener {
3764			local_addr: Multiaddr::empty(),
3765			send_back_addr: Multiaddr::empty(),
3766		};
3767		let mut conn_yielder = ConnectionYielder::new();
3768
3769		// open two connections
3770		for conn_id in vec![conn1, conn2] {
3771			notif.on_swarm_event(FromSwarm::ConnectionEstablished(
3772				libp2p::swarm::behaviour::ConnectionEstablished {
3773					peer_id: peer,
3774					connection_id: conn_id,
3775					endpoint: &connected,
3776					failed_addresses: &[],
3777					other_established: 0usize,
3778				},
3779			));
3780		}
3781
3782		if let Some(PeerState::Disabled { connections, .. }) = notif.peers.get(&(peer, set_id)) {
3783			assert_eq!(connections[0], (conn1, ConnectionState::Closed));
3784			assert_eq!(connections[1], (conn2, ConnectionState::Closed));
3785		} else {
3786			panic!("invalid state");
3787		}
3788
3789		// open substreams on both active connections
3790		notif.peerset_report_connect(peer, set_id);
3791
3792		if let Some(PeerState::Enabled { connections, .. }) = notif.peers.get(&(peer, set_id)) {
3793			assert_eq!(connections[0], (conn1, ConnectionState::Opening));
3794			assert_eq!(connections[1], (conn2, ConnectionState::Closed));
3795		} else {
3796			panic!("invalid state");
3797		}
3798
3799		notif.on_connection_handler_event(
3800			peer,
3801			conn1,
3802			conn_yielder.open_substream(peer, 0, vec![1, 2, 3, 4]),
3803		);
3804
3805		if let Some(PeerState::Enabled { ref connections, .. }) = notif.peers.get(&(peer, set_id)) {
3806			assert!(std::matches!(connections[0].1, ConnectionState::Open(_)));
3807			assert_eq!(connections[0].0, conn1);
3808			assert_eq!(connections[1], (conn2, ConnectionState::Closed));
3809		} else {
3810			panic!("invalid state");
3811		}
3812
3813		notif.on_swarm_event(FromSwarm::ConnectionClosed(
3814			libp2p::swarm::behaviour::ConnectionClosed {
3815				peer_id: peer,
3816				connection_id: conn1,
3817				endpoint: &connected.clone(),
3818				cause: None,
3819				remaining_established: 0usize,
3820			},
3821		));
3822	}
3823
3824	#[test]
3825	fn inject_dial_failure_for_pending_request() {
3826		let (mut notif, _controller, _notif_service) = development_notifs();
3827		let set_id = SetId::from(0);
3828		let peer = PeerId::random();
3829		let conn = ConnectionId::new_unchecked(0);
3830		let connected = ConnectedPoint::Listener {
3831			local_addr: Multiaddr::empty(),
3832			send_back_addr: Multiaddr::empty(),
3833		};
3834
3835		notif.on_swarm_event(FromSwarm::ConnectionEstablished(
3836			libp2p::swarm::behaviour::ConnectionEstablished {
3837				peer_id: peer,
3838				connection_id: conn,
3839				endpoint: &connected,
3840				failed_addresses: &[],
3841				other_established: 0usize,
3842			},
3843		));
3844		assert!(std::matches!(notif.peers.get(&(peer, set_id)), Some(&PeerState::Disabled { .. })));
3845
3846		// manually add backoff for the entry
3847		if let Some(PeerState::Disabled { ref mut backoff_until, .. }) =
3848			notif.peers.get_mut(&(peer, set_id))
3849		{
3850			*backoff_until =
3851				Some(Instant::now().checked_add(std::time::Duration::from_secs(5)).unwrap());
3852		}
3853
3854		notif.on_swarm_event(FromSwarm::ConnectionClosed(
3855			libp2p::swarm::behaviour::ConnectionClosed {
3856				peer_id: peer,
3857				connection_id: conn,
3858				endpoint: &connected.clone(),
3859				cause: None,
3860				remaining_established: 0usize,
3861			},
3862		));
3863
3864		assert!(std::matches!(notif.peers.get(&(peer, set_id)), Some(&PeerState::Backoff { .. })));
3865
3866		// attempt to connect the backed-off peer and verify that the request is pending
3867		notif.peerset_report_connect(peer, set_id);
3868		assert!(std::matches!(
3869			notif.peers.get(&(peer, set_id)),
3870			Some(&PeerState::PendingRequest { .. })
3871		));
3872
3873		let now = Instant::now();
3874		notif.on_swarm_event(FromSwarm::DialFailure(libp2p::swarm::behaviour::DialFailure {
3875			peer_id: Some(peer),
3876			error: &libp2p::swarm::DialError::Aborted,
3877			connection_id: ConnectionId::new_unchecked(0),
3878		}));
3879
3880		if let Some(PeerState::PendingRequest { ref timer_deadline, .. }) =
3881			notif.peers.get(&(peer, set_id))
3882		{
3883			assert!(timer_deadline > &(now + std::time::Duration::from_secs(5)));
3884		}
3885	}
3886
3887	#[test]
3888	fn peerstate_incoming_open_desired_by_remote() {
3889		let (mut notif, _controller, _notif_service) = development_notifs();
3890		let peer = PeerId::random();
3891		let set_id = SetId::from(0);
3892		let conn1 = ConnectionId::new_unchecked(0);
3893		let conn2 = ConnectionId::new_unchecked(1);
3894		let connected = ConnectedPoint::Listener {
3895			local_addr: Multiaddr::empty(),
3896			send_back_addr: Multiaddr::empty(),
3897		};
3898
3899		notif.on_swarm_event(FromSwarm::ConnectionEstablished(
3900			libp2p::swarm::behaviour::ConnectionEstablished {
3901				peer_id: peer,
3902				connection_id: conn1,
3903				endpoint: &connected,
3904				failed_addresses: &[],
3905				other_established: 0usize,
3906			},
3907		));
3908		notif.on_swarm_event(FromSwarm::ConnectionEstablished(
3909			libp2p::swarm::behaviour::ConnectionEstablished {
3910				peer_id: peer,
3911				connection_id: conn2,
3912				endpoint: &connected,
3913				failed_addresses: &[],
3914				other_established: 0usize,
3915			},
3916		));
3917		assert!(std::matches!(notif.peers.get(&(peer, set_id)), Some(&PeerState::Disabled { .. })));
3918
3919		// remote opens a substream, verify that peer state is updated to `Incoming`
3920		notif.on_connection_handler_event(
3921			peer,
3922			conn1,
3923			NotifsHandlerOut::OpenDesiredByRemote {
3924				protocol_index: 0,
3925				handshake: vec![1, 3, 3, 7],
3926			},
3927		);
3928		assert!(std::matches!(notif.peers.get(&(peer, set_id)), Some(&PeerState::Incoming { .. })));
3929
3930		// add another open event from remote
3931		notif.on_connection_handler_event(
3932			peer,
3933			conn2,
3934			NotifsHandlerOut::OpenDesiredByRemote {
3935				protocol_index: 0,
3936				handshake: vec![1, 3, 3, 7],
3937			},
3938		);
3939
3940		if let Some(PeerState::Incoming { ref connections, .. }) = notif.peers.get(&(peer, set_id))
3941		{
3942			assert_eq!(connections[0], (conn1, ConnectionState::OpenDesiredByRemote));
3943			assert_eq!(connections[1], (conn2, ConnectionState::OpenDesiredByRemote));
3944		}
3945	}
3946
3947	#[tokio::test]
3948	async fn remove_backoff_peer_after_timeout() {
3949		let (mut notif, _controller, _notif_service) = development_notifs();
3950		let peer = PeerId::random();
3951		let set_id = SetId::from(0);
3952		let conn = ConnectionId::new_unchecked(0);
3953		let connected = ConnectedPoint::Listener {
3954			local_addr: Multiaddr::empty(),
3955			send_back_addr: Multiaddr::empty(),
3956		};
3957
3958		notif.on_swarm_event(FromSwarm::ConnectionEstablished(
3959			libp2p::swarm::behaviour::ConnectionEstablished {
3960				peer_id: peer,
3961				connection_id: conn,
3962				endpoint: &connected,
3963				failed_addresses: &[],
3964				other_established: 0usize,
3965			},
3966		));
3967
3968		if let Some(&mut PeerState::Disabled { ref mut backoff_until, .. }) =
3969			notif.peers.get_mut(&(peer, 0.into()))
3970		{
3971			*backoff_until =
3972				Some(Instant::now().checked_add(std::time::Duration::from_millis(100)).unwrap());
3973		} else {
3974			panic!("invalid state");
3975		}
3976
3977		notif.on_swarm_event(FromSwarm::ConnectionClosed(
3978			libp2p::swarm::behaviour::ConnectionClosed {
3979				peer_id: peer,
3980				connection_id: conn,
3981				endpoint: &connected.clone(),
3982				cause: None,
3983				remaining_established: 0usize,
3984			},
3985		));
3986
3987		let until = if let Some(&PeerState::Backoff { timer_deadline, .. }) =
3988			notif.peers.get(&(peer, set_id))
3989		{
3990			timer_deadline
3991		} else {
3992			panic!("invalid state");
3993		};
3994
3995		if until > Instant::now() {
3996			std::thread::sleep(until - Instant::now());
3997		}
3998
3999		assert!(notif.peers.get(&(peer, set_id)).is_some());
4000
4001		if tokio::time::timeout(Duration::from_secs(5), async {
4002			loop {
4003				futures::future::poll_fn(|cx| {
4004					let _ = notif.poll(cx);
4005					Poll::Ready(())
4006				})
4007				.await;
4008
4009				if notif.peers.get(&(peer, set_id)).is_none() {
4010					break
4011				}
4012			}
4013		})
4014		.await
4015		.is_err()
4016		{
4017			panic!("backoff peer was not removed in time");
4018		}
4019
4020		assert!(notif.peers.get(&(peer, set_id)).is_none());
4021	}
4022
4023	#[tokio::test]
4024	async fn reschedule_disabled_pending_enable_when_connection_not_closed() {
4025		let (mut notif, _controller, _notif_service) = development_notifs();
4026		let peer = PeerId::random();
4027		let conn = ConnectionId::new_unchecked(0);
4028		let set_id = SetId::from(0);
4029		let mut conn_yielder = ConnectionYielder::new();
4030
4031		// move the peer to `Enabled` state
4032		notif.on_swarm_event(FromSwarm::ConnectionEstablished(
4033			libp2p::swarm::behaviour::ConnectionEstablished {
4034				peer_id: peer,
4035				connection_id: conn,
4036				endpoint: &ConnectedPoint::Listener {
4037					local_addr: Multiaddr::empty(),
4038					send_back_addr: Multiaddr::empty(),
4039				},
4040				failed_addresses: &[],
4041				other_established: 0usize,
4042			},
4043		));
4044		assert!(std::matches!(notif.peers.get(&(peer, set_id)), Some(&PeerState::Disabled { .. })));
4045
4046		// open substream
4047		notif.on_connection_handler_event(
4048			peer,
4049			conn,
4050			NotifsHandlerOut::OpenDesiredByRemote {
4051				protocol_index: 0,
4052				handshake: vec![1, 3, 3, 7],
4053			},
4054		);
4055		assert!(std::matches!(notif.peers.get(&(peer, set_id)), Some(&PeerState::Incoming { .. })));
4056
4057		// we rely on the implementation detail that incoming indices are counted from 0
4058		// to not mock the `Peerset`
4059		notif.protocol_report_accept(IncomingIndex(0));
4060		assert!(std::matches!(notif.peers.get(&(peer, set_id)), Some(&PeerState::Enabled { .. })));
4061
4062		let event = conn_yielder.open_substream(peer, 0, vec![1, 2, 3, 4]);
4063
4064		notif.on_connection_handler_event(peer, conn, event);
4065		assert!(std::matches!(notif.peers.get(&(peer, set_id)), Some(&PeerState::Enabled { .. })));
4066
4067		if let Some(PeerState::Enabled { ref connections, .. }) = notif.peers.get(&(peer, set_id)) {
4068			assert!(std::matches!(connections[0], (_, ConnectionState::Open(_))));
4069			assert_eq!(connections[0].0, conn);
4070		} else {
4071			panic!("invalid state");
4072		}
4073
4074		notif.peerset_report_disconnect(peer, set_id);
4075
4076		if let Some(PeerState::Disabled { ref connections, ref mut backoff_until }) =
4077			notif.peers.get_mut(&(peer, set_id))
4078		{
4079			assert!(std::matches!(connections[0], (_, ConnectionState::Closing)));
4080			assert_eq!(connections[0].0, conn);
4081
4082			*backoff_until =
4083				Some(Instant::now().checked_add(std::time::Duration::from_secs(2)).unwrap());
4084		} else {
4085			panic!("invalid state");
4086		}
4087
4088		notif.peerset_report_connect(peer, set_id);
4089
4090		let prev_instant =
4091			if let Some(PeerState::DisabledPendingEnable {
4092				ref connections, timer_deadline, ..
4093			}) = notif.peers.get(&(peer, set_id))
4094			{
4095				assert!(std::matches!(connections[0], (_, ConnectionState::Closing)));
4096				assert_eq!(connections[0].0, conn);
4097
4098				*timer_deadline
4099			} else {
4100				panic!("invalid state");
4101			};
4102
4103		// one of the peers has an active backoff timer so poll the notifications code until
4104		// the timer has expired. Because the connection is still in the state of `Closing`,
4105		// verify that the code continues to keep the peer disabled by resetting the timer
4106		// after the first one expired.
4107		if tokio::time::timeout(Duration::from_secs(5), async {
4108			loop {
4109				futures::future::poll_fn(|cx| {
4110					let _ = notif.poll(cx);
4111					Poll::Ready(())
4112				})
4113				.await;
4114
4115				if let Some(PeerState::DisabledPendingEnable {
4116					timer_deadline, connections, ..
4117				}) = notif.peers.get(&(peer, set_id))
4118				{
4119					assert!(std::matches!(connections[0], (_, ConnectionState::Closing)));
4120
4121					if timer_deadline != &prev_instant {
4122						break
4123					}
4124				} else {
4125					panic!("invalid state");
4126				}
4127			}
4128		})
4129		.await
4130		.is_err()
4131		{
4132			panic!("backoff peer was not removed in time");
4133		}
4134	}
4135
4136	#[test]
4137	#[should_panic]
4138	#[cfg(debug_assertions)]
4139	fn peerset_report_connect_with_enabled_peer() {
4140		let (mut notif, _controller, _notif_service) = development_notifs();
4141		let peer = PeerId::random();
4142		let conn = ConnectionId::new_unchecked(0);
4143		let set_id = SetId::from(0);
4144		let connected = ConnectedPoint::Listener {
4145			local_addr: Multiaddr::empty(),
4146			send_back_addr: Multiaddr::empty(),
4147		};
4148		let mut conn_yielder = ConnectionYielder::new();
4149
4150		// move the peer to `Enabled` state
4151		notif.on_swarm_event(FromSwarm::ConnectionEstablished(
4152			libp2p::swarm::behaviour::ConnectionEstablished {
4153				peer_id: peer,
4154				connection_id: conn,
4155				endpoint: &connected,
4156				failed_addresses: &[],
4157				other_established: 0usize,
4158			},
4159		));
4160		assert!(std::matches!(notif.peers.get(&(peer, set_id)), Some(&PeerState::Disabled { .. })));
4161
4162		notif.on_connection_handler_event(
4163			peer,
4164			conn,
4165			NotifsHandlerOut::OpenDesiredByRemote {
4166				protocol_index: 0,
4167				handshake: vec![1, 3, 3, 7],
4168			},
4169		);
4170		assert!(std::matches!(notif.peers.get(&(peer, set_id)), Some(&PeerState::Incoming { .. })));
4171
4172		notif.peerset_report_connect(peer, set_id);
4173		assert!(std::matches!(notif.peers.get(&(peer, set_id)), Some(&PeerState::Enabled { .. })));
4174
4175		let event = conn_yielder.open_substream(peer, 0, vec![1, 2, 3, 4]);
4176
4177		notif.on_connection_handler_event(peer, conn, event);
4178		assert!(std::matches!(notif.peers.get(&(peer, set_id)), Some(&PeerState::Enabled { .. })));
4179
4180		if let Some(PeerState::Enabled { ref connections, .. }) = notif.peers.get(&(peer, set_id)) {
4181			assert!(std::matches!(connections[0], (_, ConnectionState::Open(_))));
4182			assert_eq!(connections[0].0, conn);
4183		} else {
4184			panic!("invalid state");
4185		}
4186
4187		notif.peerset_report_connect(peer, set_id);
4188	}
4189
4190	#[test]
4191	#[cfg(debug_assertions)]
4192	fn peerset_report_connect_with_disabled_pending_enable_peer() {
4193		let (mut notif, _controller, _notif_service) = development_notifs();
4194		let set_id = SetId::from(0);
4195		let peer = PeerId::random();
4196		let conn = ConnectionId::new_unchecked(0);
4197		let connected = ConnectedPoint::Listener {
4198			local_addr: Multiaddr::empty(),
4199			send_back_addr: Multiaddr::empty(),
4200		};
4201
4202		notif.on_swarm_event(FromSwarm::ConnectionEstablished(
4203			libp2p::swarm::behaviour::ConnectionEstablished {
4204				peer_id: peer,
4205				connection_id: conn,
4206				endpoint: &connected,
4207				failed_addresses: &[],
4208				other_established: 0usize,
4209			},
4210		));
4211		assert!(std::matches!(notif.peers.get(&(peer, set_id)), Some(&PeerState::Disabled { .. })));
4212
4213		// manually add backoff for the entry
4214		if let Some(PeerState::Disabled { ref mut backoff_until, .. }) =
4215			notif.peers.get_mut(&(peer, set_id))
4216		{
4217			*backoff_until =
4218				Some(Instant::now().checked_add(std::time::Duration::from_secs(5)).unwrap());
4219		}
4220
4221		// switch state to `DisabledPendingEnable`
4222		notif.peerset_report_connect(peer, set_id);
4223		assert!(std::matches!(
4224			notif.peers.get(&(peer, set_id)),
4225			Some(&PeerState::DisabledPendingEnable { .. })
4226		));
4227
4228		// duplicate "connect" must not change the state
4229		notif.peerset_report_connect(peer, set_id);
4230		assert!(std::matches!(
4231			notif.peers.get(&(peer, set_id)),
4232			Some(&PeerState::DisabledPendingEnable { .. })
4233		));
4234	}
4235
4236	#[test]
4237	#[cfg(debug_assertions)]
4238	fn peerset_report_connect_with_requested_peer() {
4239		let (mut notif, _controller, _notif_service) = development_notifs();
4240		let peer = PeerId::random();
4241		let set_id = SetId::from(0);
4242
4243		// Set peer into `Requested` state.
4244		notif.peerset_report_connect(peer, set_id);
4245		assert!(std::matches!(notif.peers.get(&(peer, set_id)), Some(&PeerState::Requested)));
4246
4247		// Duplicate "connect" must not change the state.
4248		notif.peerset_report_connect(peer, set_id);
4249		assert!(std::matches!(notif.peers.get(&(peer, set_id)), Some(&PeerState::Requested)));
4250	}
4251
4252	#[test]
4253	#[cfg(debug_assertions)]
4254	fn peerset_report_connect_with_pending_requested() {
4255		let (mut notif, _controller, _notif_service) = development_notifs();
4256		let set_id = SetId::from(0);
4257		let peer = PeerId::random();
4258		let conn = ConnectionId::new_unchecked(0);
4259		let connected = ConnectedPoint::Listener {
4260			local_addr: Multiaddr::empty(),
4261			send_back_addr: Multiaddr::empty(),
4262		};
4263
4264		notif.on_swarm_event(FromSwarm::ConnectionEstablished(
4265			libp2p::swarm::behaviour::ConnectionEstablished {
4266				peer_id: peer,
4267				connection_id: conn,
4268				endpoint: &connected,
4269				failed_addresses: &[],
4270				other_established: 0usize,
4271			},
4272		));
4273		assert!(std::matches!(notif.peers.get(&(peer, set_id)), Some(&PeerState::Disabled { .. })));
4274
4275		// manually add backoff for the entry
4276		if let Some(PeerState::Disabled { ref mut backoff_until, .. }) =
4277			notif.peers.get_mut(&(peer, set_id))
4278		{
4279			*backoff_until =
4280				Some(Instant::now().checked_add(std::time::Duration::from_secs(5)).unwrap());
4281		}
4282
4283		notif.on_swarm_event(FromSwarm::ConnectionClosed(
4284			libp2p::swarm::behaviour::ConnectionClosed {
4285				peer_id: peer,
4286				connection_id: conn,
4287				endpoint: &connected.clone(),
4288				cause: None,
4289				remaining_established: 0usize,
4290			},
4291		));
4292		assert!(std::matches!(notif.peers.get(&(peer, set_id)), Some(&PeerState::Backoff { .. })));
4293
4294		// attempt to connect the backed-off peer and verify that the request is pending
4295		notif.peerset_report_connect(peer, set_id);
4296		assert!(std::matches!(
4297			notif.peers.get(&(peer, set_id)),
4298			Some(&PeerState::PendingRequest { .. })
4299		));
4300
4301		// duplicate "connect" must not change the state
4302		notif.peerset_report_connect(peer, set_id);
4303		assert!(std::matches!(
4304			notif.peers.get(&(peer, set_id)),
4305			Some(&PeerState::PendingRequest { .. })
4306		));
4307	}
4308
4309	#[test]
4310	#[cfg(debug_assertions)]
4311	fn peerset_report_connect_with_incoming_peer() {
4312		let (mut notif, _controller, _notif_service) = development_notifs();
4313		let peer = PeerId::random();
4314		let set_id = SetId::from(0);
4315		let conn = ConnectionId::new_unchecked(0);
4316		let connected = ConnectedPoint::Listener {
4317			local_addr: Multiaddr::empty(),
4318			send_back_addr: Multiaddr::empty(),
4319		};
4320
4321		notif.on_swarm_event(FromSwarm::ConnectionEstablished(
4322			libp2p::swarm::behaviour::ConnectionEstablished {
4323				peer_id: peer,
4324				connection_id: conn,
4325				endpoint: &connected,
4326				failed_addresses: &[],
4327				other_established: 0usize,
4328			},
4329		));
4330		assert!(std::matches!(notif.peers.get(&(peer, set_id)), Some(&PeerState::Disabled { .. })));
4331
4332		// remote opens a substream, verify that peer state is updated to `Incoming`
4333		notif.on_connection_handler_event(
4334			peer,
4335			conn,
4336			NotifsHandlerOut::OpenDesiredByRemote {
4337				protocol_index: 0,
4338				handshake: vec![1, 3, 3, 7],
4339			},
4340		);
4341		assert!(std::matches!(notif.peers.get(&(peer, set_id)), Some(&PeerState::Incoming { .. })));
4342
4343		notif.peerset_report_connect(peer, set_id);
4344		assert!(std::matches!(notif.peers.get(&(peer, set_id)), Some(&PeerState::Incoming { .. })));
4345	}
4346
4347	#[test]
4348	#[cfg(debug_assertions)]
4349	fn peerset_report_disconnect_with_incoming_peer() {
4350		let (mut notif, _controller, _notif_service) = development_notifs();
4351		let peer = PeerId::random();
4352		let set_id = SetId::from(0);
4353		let conn = ConnectionId::new_unchecked(0);
4354		let connected = ConnectedPoint::Listener {
4355			local_addr: Multiaddr::empty(),
4356			send_back_addr: Multiaddr::empty(),
4357		};
4358
4359		notif.on_swarm_event(FromSwarm::ConnectionEstablished(
4360			libp2p::swarm::behaviour::ConnectionEstablished {
4361				peer_id: peer,
4362				connection_id: conn,
4363				endpoint: &connected,
4364				failed_addresses: &[],
4365				other_established: 0usize,
4366			},
4367		));
4368		assert!(std::matches!(notif.peers.get(&(peer, set_id)), Some(&PeerState::Disabled { .. })));
4369
4370		// remote opens a substream, verify that peer state is updated to `Incoming`
4371		notif.on_connection_handler_event(
4372			peer,
4373			conn,
4374			NotifsHandlerOut::OpenDesiredByRemote {
4375				protocol_index: 0,
4376				handshake: vec![1, 3, 3, 7],
4377			},
4378		);
4379		assert!(std::matches!(notif.peers.get(&(peer, set_id)), Some(&PeerState::Incoming { .. })));
4380
4381		notif.peerset_report_disconnect(peer, set_id);
4382		assert!(std::matches!(notif.peers.get(&(peer, set_id)), Some(&PeerState::Incoming { .. })));
4383	}
4384
4385	#[test]
4386	#[cfg(debug_assertions)]
4387	fn peerset_report_disconnect_with_incoming_peer_protocol_accepts() {
4388		let (mut notif, _controller, _notif_service) = development_notifs();
4389		let peer = PeerId::random();
4390		let set_id = SetId::from(0);
4391		let conn = ConnectionId::new_unchecked(0);
4392		let connected = ConnectedPoint::Listener {
4393			local_addr: Multiaddr::empty(),
4394			send_back_addr: Multiaddr::empty(),
4395		};
4396
4397		notif.on_swarm_event(FromSwarm::ConnectionEstablished(
4398			libp2p::swarm::behaviour::ConnectionEstablished {
4399				peer_id: peer,
4400				connection_id: conn,
4401				endpoint: &connected,
4402				failed_addresses: &[],
4403				other_established: 0usize,
4404			},
4405		));
4406		assert!(std::matches!(notif.peers.get(&(peer, set_id)), Some(&PeerState::Disabled { .. })));
4407
4408		// remote opens a substream, verify that peer state is updated to `Incoming`
4409		notif.on_connection_handler_event(
4410			peer,
4411			conn,
4412			NotifsHandlerOut::OpenDesiredByRemote {
4413				protocol_index: 0,
4414				handshake: vec![1, 3, 3, 7],
4415			},
4416		);
4417		assert!(std::matches!(notif.peers.get(&(peer, set_id)), Some(&PeerState::Incoming { .. })));
4418
4419		// `Peerset` wants to disconnect the peer but since it's still under validation,
4420		// it won't be disabled automatically
4421		notif.peerset_report_disconnect(peer, set_id);
4422
4423		let incoming_index = match notif.peers.get(&(peer, set_id)) {
4424			Some(&PeerState::Incoming { peerset_rejected, incoming_index, .. }) => {
4425				assert!(peerset_rejected);
4426				incoming_index
4427			},
4428			state => panic!("invalid state: {state:?}"),
4429		};
4430
4431		// protocol accepted peer but since `Peerset` wanted to disconnect it, the peer will be
4432		// disabled
4433		notif.protocol_report_accept(incoming_index);
4434
4435		match notif.peers.get(&(peer, set_id)) {
4436			Some(&PeerState::Disabled { .. }) => {},
4437			state => panic!("invalid state: {state:?}"),
4438		};
4439	}
4440
4441	#[test]
4442	#[cfg(debug_assertions)]
4443	fn peer_disconnected_protocol_accepts() {
4444		let (mut notif, _controller, _notif_service) = development_notifs();
4445		let peer = PeerId::random();
4446		let set_id = SetId::from(0);
4447		let conn = ConnectionId::new_unchecked(0);
4448		let connected = ConnectedPoint::Listener {
4449			local_addr: Multiaddr::empty(),
4450			send_back_addr: Multiaddr::empty(),
4451		};
4452
4453		notif.on_swarm_event(FromSwarm::ConnectionEstablished(
4454			libp2p::swarm::behaviour::ConnectionEstablished {
4455				peer_id: peer,
4456				connection_id: conn,
4457				endpoint: &connected,
4458				failed_addresses: &[],
4459				other_established: 0usize,
4460			},
4461		));
4462		assert!(std::matches!(notif.peers.get(&(peer, set_id)), Some(&PeerState::Disabled { .. })));
4463
4464		// remote opens a substream, verify that peer state is updated to `Incoming`
4465		notif.on_connection_handler_event(
4466			peer,
4467			conn,
4468			NotifsHandlerOut::OpenDesiredByRemote {
4469				protocol_index: 0,
4470				handshake: vec![1, 3, 3, 7],
4471			},
4472		);
4473		assert!(std::matches!(notif.peers.get(&(peer, set_id)), Some(&PeerState::Incoming { .. })));
4474
4475		assert!(notif.incoming.iter().any(|entry| entry.incoming_id == IncomingIndex(0)));
4476		notif.disconnect_peer(&peer, set_id);
4477
4478		// since the connection was closed, nothing happens for the peer state because
4479		// there is nothing actionable
4480		notif.protocol_report_accept(IncomingIndex(0));
4481
4482		match notif.peers.get(&(peer, set_id)) {
4483			Some(&PeerState::Disabled { .. }) => {},
4484			state => panic!("invalid state: {state:?}"),
4485		};
4486
4487		assert!(!notif.incoming.iter().any(|entry| entry.incoming_id == IncomingIndex(0)));
4488	}
4489
4490	#[test]
4491	#[cfg(debug_assertions)]
4492	fn connection_closed_protocol_accepts() {
4493		let (mut notif, _controller, _notif_service) = development_notifs();
4494		let peer = PeerId::random();
4495		let set_id = SetId::from(0);
4496		let conn = ConnectionId::new_unchecked(0);
4497		let connected = ConnectedPoint::Listener {
4498			local_addr: Multiaddr::empty(),
4499			send_back_addr: Multiaddr::empty(),
4500		};
4501
4502		notif.on_swarm_event(FromSwarm::ConnectionEstablished(
4503			libp2p::swarm::behaviour::ConnectionEstablished {
4504				peer_id: peer,
4505				connection_id: conn,
4506				endpoint: &connected,
4507				failed_addresses: &[],
4508				other_established: 0usize,
4509			},
4510		));
4511		assert!(std::matches!(notif.peers.get(&(peer, set_id)), Some(&PeerState::Disabled { .. })));
4512
4513		// remote opens a substream, verify that peer state is updated to `Incoming`
4514		notif.on_connection_handler_event(
4515			peer,
4516			conn,
4517			NotifsHandlerOut::OpenDesiredByRemote {
4518				protocol_index: 0,
4519				handshake: vec![1, 3, 3, 7],
4520			},
4521		);
4522		assert!(std::matches!(notif.peers.get(&(peer, set_id)), Some(&PeerState::Incoming { .. })));
4523
4524		notif.on_swarm_event(FromSwarm::ConnectionClosed(
4525			libp2p::swarm::behaviour::ConnectionClosed {
4526				peer_id: peer,
4527				connection_id: ConnectionId::new_unchecked(0),
4528				endpoint: &connected.clone(),
4529				cause: None,
4530				remaining_established: 0usize,
4531			},
4532		));
4533
4534		// connection closed, nothing to do
4535		notif.protocol_report_accept(IncomingIndex(0));
4536
4537		match notif.peers.get(&(peer, set_id)) {
4538			None => {},
4539			state => panic!("invalid state: {state:?}"),
4540		};
4541	}
4542
4543	#[test]
4544	#[cfg(debug_assertions)]
4545	fn peer_disconnected_protocol_reject() {
4546		let (mut notif, _controller, _notif_service) = development_notifs();
4547		let peer = PeerId::random();
4548		let set_id = SetId::from(0);
4549		let conn = ConnectionId::new_unchecked(0);
4550		let connected = ConnectedPoint::Listener {
4551			local_addr: Multiaddr::empty(),
4552			send_back_addr: Multiaddr::empty(),
4553		};
4554
4555		notif.on_swarm_event(FromSwarm::ConnectionEstablished(
4556			libp2p::swarm::behaviour::ConnectionEstablished {
4557				peer_id: peer,
4558				connection_id: conn,
4559				endpoint: &connected,
4560				failed_addresses: &[],
4561				other_established: 0usize,
4562			},
4563		));
4564		assert!(std::matches!(notif.peers.get(&(peer, set_id)), Some(&PeerState::Disabled { .. })));
4565
4566		// remote opens a substream, verify that peer state is updated to `Incoming`
4567		notif.on_connection_handler_event(
4568			peer,
4569			conn,
4570			NotifsHandlerOut::OpenDesiredByRemote {
4571				protocol_index: 0,
4572				handshake: vec![1, 3, 3, 7],
4573			},
4574		);
4575		assert!(std::matches!(notif.peers.get(&(peer, set_id)), Some(&PeerState::Incoming { .. })));
4576
4577		assert!(notif.incoming.iter().any(|entry| entry.incoming_id == IncomingIndex(0)));
4578		notif.disconnect_peer(&peer, set_id);
4579
4580		// since the connection was closed, nothing happens for the peer state because
4581		// there is nothing actionable
4582		notif.protocol_report_reject(IncomingIndex(0));
4583
4584		match notif.peers.get(&(peer, set_id)) {
4585			Some(&PeerState::Disabled { .. }) => {},
4586			state => panic!("invalid state: {state:?}"),
4587		};
4588
4589		assert!(!notif.incoming.iter().any(|entry| entry.incoming_id == IncomingIndex(0)));
4590	}
4591
4592	#[test]
4593	#[cfg(debug_assertions)]
4594	fn connection_closed_protocol_rejects() {
4595		let (mut notif, _controller, _notif_service) = development_notifs();
4596		let peer = PeerId::random();
4597		let set_id = SetId::from(0);
4598		let conn = ConnectionId::new_unchecked(0);
4599		let connected = ConnectedPoint::Listener {
4600			local_addr: Multiaddr::empty(),
4601			send_back_addr: Multiaddr::empty(),
4602		};
4603
4604		notif.on_swarm_event(FromSwarm::ConnectionEstablished(
4605			libp2p::swarm::behaviour::ConnectionEstablished {
4606				peer_id: peer,
4607				connection_id: conn,
4608				endpoint: &connected,
4609				failed_addresses: &[],
4610				other_established: 0usize,
4611			},
4612		));
4613		assert!(std::matches!(notif.peers.get(&(peer, set_id)), Some(&PeerState::Disabled { .. })));
4614
4615		// remote opens a substream, verify that peer state is updated to `Incoming`
4616		notif.on_connection_handler_event(
4617			peer,
4618			conn,
4619			NotifsHandlerOut::OpenDesiredByRemote {
4620				protocol_index: 0,
4621				handshake: vec![1, 3, 3, 7],
4622			},
4623		);
4624		assert!(std::matches!(notif.peers.get(&(peer, set_id)), Some(&PeerState::Incoming { .. })));
4625
4626		notif.on_swarm_event(FromSwarm::ConnectionClosed(
4627			libp2p::swarm::behaviour::ConnectionClosed {
4628				peer_id: peer,
4629				connection_id: ConnectionId::new_unchecked(0),
4630				endpoint: &connected.clone(),
4631				cause: None,
4632				remaining_established: 0usize,
4633			},
4634		));
4635
4636		// connection closed, nothing to do
4637		notif.protocol_report_reject(IncomingIndex(0));
4638
4639		match notif.peers.get(&(peer, set_id)) {
4640			None => {},
4641			state => panic!("invalid state: {state:?}"),
4642		};
4643	}
4644
4645	#[test]
4646	#[should_panic]
4647	#[cfg(debug_assertions)]
4648	fn protocol_report_accept_not_incoming_peer() {
4649		let (mut notif, _controller, _notif_service) = development_notifs();
4650		let peer = PeerId::random();
4651		let conn = ConnectionId::new_unchecked(0);
4652		let set_id = SetId::from(0);
4653		let connected = ConnectedPoint::Listener {
4654			local_addr: Multiaddr::empty(),
4655			send_back_addr: Multiaddr::empty(),
4656		};
4657		let mut conn_yielder = ConnectionYielder::new();
4658
4659		notif.on_swarm_event(FromSwarm::ConnectionEstablished(
4660			libp2p::swarm::behaviour::ConnectionEstablished {
4661				peer_id: peer,
4662				connection_id: conn,
4663				endpoint: &connected,
4664				failed_addresses: &[],
4665				other_established: 0usize,
4666			},
4667		));
4668		assert!(std::matches!(notif.peers.get(&(peer, set_id)), Some(&PeerState::Disabled { .. })));
4669
4670		// remote opens a substream, verify that peer state is updated to `Incoming`
4671		notif.on_connection_handler_event(
4672			peer,
4673			conn,
4674			NotifsHandlerOut::OpenDesiredByRemote {
4675				protocol_index: 0,
4676				handshake: vec![1, 3, 3, 7],
4677			},
4678		);
4679		assert!(std::matches!(notif.peers.get(&(peer, set_id)), Some(&PeerState::Incoming { .. })));
4680
4681		assert!(std::matches!(
4682			notif.incoming[0],
4683			IncomingPeer { alive: true, incoming_id: IncomingIndex(0), .. },
4684		));
4685
4686		notif.peerset_report_connect(peer, set_id);
4687		assert!(std::matches!(notif.peers.get(&(peer, set_id)), Some(&PeerState::Enabled { .. })));
4688
4689		let event = conn_yielder.open_substream(peer, 0, vec![1, 2, 3, 4]);
4690		notif.on_connection_handler_event(peer, conn, event);
4691
4692		assert!(std::matches!(notif.peers.get(&(peer, set_id)), Some(&PeerState::Enabled { .. })));
4693		notif.incoming[0].alive = true;
4694		notif.protocol_report_accept(IncomingIndex(0));
4695	}
4696
4697	#[test]
4698	#[should_panic]
4699	#[cfg(debug_assertions)]
4700	fn inject_connection_closed_non_existent_peer() {
4701		let (mut notif, _controller, _notif_service) = development_notifs();
4702		let peer = PeerId::random();
4703		let endpoint = ConnectedPoint::Listener {
4704			local_addr: Multiaddr::empty(),
4705			send_back_addr: Multiaddr::empty(),
4706		};
4707
4708		notif.on_swarm_event(FromSwarm::ConnectionClosed(
4709			libp2p::swarm::behaviour::ConnectionClosed {
4710				peer_id: peer,
4711				connection_id: ConnectionId::new_unchecked(0),
4712				endpoint: &endpoint.clone(),
4713				cause: None,
4714				remaining_established: 0usize,
4715			},
4716		));
4717	}
4718
4719	#[test]
4720	fn disconnect_non_existent_peer() {
4721		let (mut notif, _controller, _notif_service) = development_notifs();
4722		let peer = PeerId::random();
4723		let set_id = SetId::from(0);
4724
4725		notif.peerset_report_disconnect(peer, set_id);
4726
4727		assert!(notif.peers.is_empty());
4728		assert!(notif.incoming.is_empty());
4729	}
4730
4731	#[test]
4732	fn accept_non_existent_connection() {
4733		let (mut notif, _controller, _notif_service) = development_notifs();
4734
4735		notif.protocol_report_accept(0.into());
4736
4737		assert!(notif.peers.is_empty());
4738		assert!(notif.incoming.is_empty());
4739	}
4740
4741	#[test]
4742	fn reject_non_existent_connection() {
4743		let (mut notif, _controller, _notif_service) = development_notifs();
4744
4745		notif.protocol_report_reject(0.into());
4746
4747		assert!(notif.peers.is_empty());
4748		assert!(notif.incoming.is_empty());
4749	}
4750
4751	#[test]
4752	fn reject_non_active_connection() {
4753		let (mut notif, _controller, _notif_service) = development_notifs();
4754		let peer = PeerId::random();
4755		let conn = ConnectionId::new_unchecked(0);
4756		let set_id = SetId::from(0);
4757		let connected = ConnectedPoint::Listener {
4758			local_addr: Multiaddr::empty(),
4759			send_back_addr: Multiaddr::empty(),
4760		};
4761
4762		notif.on_swarm_event(FromSwarm::ConnectionEstablished(
4763			libp2p::swarm::behaviour::ConnectionEstablished {
4764				peer_id: peer,
4765				connection_id: conn,
4766				endpoint: &connected,
4767				failed_addresses: &[],
4768				other_established: 0usize,
4769			},
4770		));
4771		assert!(std::matches!(notif.peers.get(&(peer, set_id)), Some(&PeerState::Disabled { .. })));
4772
4773		// remote opens a substream, verify that peer state is updated to `Incoming`
4774		notif.on_connection_handler_event(
4775			peer,
4776			conn,
4777			NotifsHandlerOut::OpenDesiredByRemote {
4778				protocol_index: 0,
4779				handshake: vec![1, 3, 3, 7],
4780			},
4781		);
4782		assert!(std::matches!(notif.peers.get(&(peer, set_id)), Some(&PeerState::Incoming { .. })));
4783
4784		notif.incoming[0].alive = false;
4785		notif.protocol_report_reject(0.into());
4786
4787		assert!(std::matches!(notif.peers.get(&(peer, set_id)), Some(&PeerState::Incoming { .. })));
4788	}
4789
4790	#[test]
4791	#[should_panic]
4792	#[cfg(debug_assertions)]
4793	fn inject_non_existent_connection_closed_for_incoming_peer() {
4794		let (mut notif, _controller, _notif_service) = development_notifs();
4795		let peer = PeerId::random();
4796		let conn = ConnectionId::new_unchecked(0);
4797		let set_id = SetId::from(0);
4798		let connected = ConnectedPoint::Listener {
4799			local_addr: Multiaddr::empty(),
4800			send_back_addr: Multiaddr::empty(),
4801		};
4802
4803		notif.on_swarm_event(FromSwarm::ConnectionEstablished(
4804			libp2p::swarm::behaviour::ConnectionEstablished {
4805				peer_id: peer,
4806				connection_id: conn,
4807				endpoint: &connected,
4808				failed_addresses: &[],
4809				other_established: 0usize,
4810			},
4811		));
4812		assert!(std::matches!(notif.peers.get(&(peer, set_id)), Some(&PeerState::Disabled { .. })));
4813
4814		// remote opens a substream, verify that peer state is updated to `Incoming`
4815		notif.on_connection_handler_event(
4816			peer,
4817			conn,
4818			NotifsHandlerOut::OpenDesiredByRemote {
4819				protocol_index: 0,
4820				handshake: vec![1, 3, 3, 7],
4821			},
4822		);
4823		assert!(std::matches!(notif.peers.get(&(peer, set_id)), Some(&PeerState::Incoming { .. })));
4824
4825		notif.on_swarm_event(FromSwarm::ConnectionClosed(
4826			libp2p::swarm::behaviour::ConnectionClosed {
4827				peer_id: peer,
4828				connection_id: ConnectionId::new_unchecked(1337),
4829				endpoint: &connected.clone(),
4830				cause: None,
4831				remaining_established: 0usize,
4832			},
4833		));
4834	}
4835
4836	#[test]
4837	#[should_panic]
4838	#[cfg(debug_assertions)]
4839	fn inject_non_existent_connection_closed_for_disabled_peer() {
4840		let (mut notif, _controller, _notif_service) = development_notifs();
4841		let set_id = SetId::from(0);
4842		let peer = PeerId::random();
4843		let conn = ConnectionId::new_unchecked(0);
4844		let connected = ConnectedPoint::Listener {
4845			local_addr: Multiaddr::empty(),
4846			send_back_addr: Multiaddr::empty(),
4847		};
4848
4849		notif.on_swarm_event(FromSwarm::ConnectionEstablished(
4850			libp2p::swarm::behaviour::ConnectionEstablished {
4851				peer_id: peer,
4852				connection_id: conn,
4853				endpoint: &connected,
4854				failed_addresses: &[],
4855				other_established: 0usize,
4856			},
4857		));
4858		assert!(std::matches!(notif.peers.get(&(peer, set_id)), Some(&PeerState::Disabled { .. })));
4859
4860		notif.on_swarm_event(FromSwarm::ConnectionClosed(
4861			libp2p::swarm::behaviour::ConnectionClosed {
4862				peer_id: peer,
4863				connection_id: ConnectionId::new_unchecked(1337),
4864				endpoint: &connected.clone(),
4865				cause: None,
4866				remaining_established: 0usize,
4867			},
4868		));
4869	}
4870
4871	#[test]
4872	#[should_panic]
4873	#[cfg(debug_assertions)]
4874	fn inject_non_existent_connection_closed_for_disabled_pending_enable() {
4875		let (mut notif, _controller, _notif_service) = development_notifs();
4876		let set_id = SetId::from(0);
4877		let peer = PeerId::random();
4878		let conn = ConnectionId::new_unchecked(0);
4879		let connected = ConnectedPoint::Listener {
4880			local_addr: Multiaddr::empty(),
4881			send_back_addr: Multiaddr::empty(),
4882		};
4883
4884		notif.on_swarm_event(FromSwarm::ConnectionEstablished(
4885			libp2p::swarm::behaviour::ConnectionEstablished {
4886				peer_id: peer,
4887				connection_id: conn,
4888				endpoint: &connected,
4889				failed_addresses: &[],
4890				other_established: 0usize,
4891			},
4892		));
4893		assert!(std::matches!(notif.peers.get(&(peer, set_id)), Some(&PeerState::Disabled { .. })));
4894
4895		// manually add backoff for the entry
4896		if let Some(PeerState::Disabled { ref mut backoff_until, .. }) =
4897			notif.peers.get_mut(&(peer, set_id))
4898		{
4899			*backoff_until =
4900				Some(Instant::now().checked_add(std::time::Duration::from_secs(5)).unwrap());
4901		}
4902
4903		// switch state to `DisabledPendingEnable`
4904		notif.peerset_report_connect(peer, set_id);
4905
4906		assert!(std::matches!(
4907			notif.peers.get(&(peer, set_id)),
4908			Some(&PeerState::DisabledPendingEnable { .. })
4909		));
4910
4911		notif.on_swarm_event(FromSwarm::ConnectionClosed(
4912			libp2p::swarm::behaviour::ConnectionClosed {
4913				peer_id: peer,
4914				connection_id: ConnectionId::new_unchecked(1337),
4915				endpoint: &connected.clone(),
4916				cause: None,
4917				remaining_established: 0usize,
4918			},
4919		));
4920	}
4921
4922	#[test]
4923	#[should_panic]
4924	#[cfg(debug_assertions)]
4925	fn inject_connection_closed_for_incoming_peer_state_mismatch() {
4926		let (mut notif, _controller, _notif_service) = development_notifs();
4927		let peer = PeerId::random();
4928		let conn = ConnectionId::new_unchecked(0);
4929		let set_id = SetId::from(0);
4930		let connected = ConnectedPoint::Listener {
4931			local_addr: Multiaddr::empty(),
4932			send_back_addr: Multiaddr::empty(),
4933		};
4934
4935		notif.on_swarm_event(FromSwarm::ConnectionEstablished(
4936			libp2p::swarm::behaviour::ConnectionEstablished {
4937				peer_id: peer,
4938				connection_id: conn,
4939				endpoint: &connected,
4940				failed_addresses: &[],
4941				other_established: 0usize,
4942			},
4943		));
4944		assert!(std::matches!(notif.peers.get(&(peer, set_id)), Some(&PeerState::Disabled { .. })));
4945
4946		// remote opens a substream, verify that peer state is updated to `Incoming`
4947		notif.on_connection_handler_event(
4948			peer,
4949			conn,
4950			NotifsHandlerOut::OpenDesiredByRemote {
4951				protocol_index: 0,
4952				handshake: vec![1, 3, 3, 7],
4953			},
4954		);
4955		assert!(std::matches!(notif.peers.get(&(peer, set_id)), Some(&PeerState::Incoming { .. })));
4956		notif.incoming[0].alive = false;
4957
4958		notif.on_swarm_event(FromSwarm::ConnectionClosed(
4959			libp2p::swarm::behaviour::ConnectionClosed {
4960				peer_id: peer,
4961				connection_id: conn,
4962				endpoint: &connected.clone(),
4963				cause: None,
4964				remaining_established: 0usize,
4965			},
4966		));
4967	}
4968
4969	#[test]
4970	#[should_panic]
4971	#[cfg(debug_assertions)]
4972	fn inject_connection_closed_for_enabled_state_mismatch() {
4973		let (mut notif, _controller, _notif_service) = development_notifs();
4974		let peer = PeerId::random();
4975		let conn = ConnectionId::new_unchecked(0);
4976		let set_id = SetId::from(0);
4977		let connected = ConnectedPoint::Listener {
4978			local_addr: Multiaddr::empty(),
4979			send_back_addr: Multiaddr::empty(),
4980		};
4981
4982		notif.on_swarm_event(FromSwarm::ConnectionEstablished(
4983			libp2p::swarm::behaviour::ConnectionEstablished {
4984				peer_id: peer,
4985				connection_id: conn,
4986				endpoint: &connected,
4987				failed_addresses: &[],
4988				other_established: 0usize,
4989			},
4990		));
4991		assert!(std::matches!(notif.peers.get(&(peer, set_id)), Some(&PeerState::Disabled { .. })));
4992
4993		// remote opens a substream, verify that peer state is updated to `Incoming`
4994		notif.on_connection_handler_event(
4995			peer,
4996			conn,
4997			NotifsHandlerOut::OpenDesiredByRemote {
4998				protocol_index: 0,
4999				handshake: vec![1, 3, 3, 7],
5000			},
5001		);
5002		assert!(std::matches!(notif.peers.get(&(peer, set_id)), Some(&PeerState::Incoming { .. })));
5003
5004		// attempt to connect to the peer and verify that the peer state is `Enabled`
5005		notif.peerset_report_connect(peer, set_id);
5006		assert!(std::matches!(notif.peers.get(&(peer, set_id)), Some(&PeerState::Enabled { .. })));
5007
5008		notif.on_swarm_event(FromSwarm::ConnectionClosed(
5009			libp2p::swarm::behaviour::ConnectionClosed {
5010				peer_id: peer,
5011				connection_id: ConnectionId::new_unchecked(1337),
5012				endpoint: &connected.clone(),
5013				cause: None,
5014				remaining_established: 0usize,
5015			},
5016		));
5017	}
5018
5019	#[test]
5020	#[should_panic]
5021	#[cfg(debug_assertions)]
5022	fn inject_connection_closed_for_backoff_peer() {
5023		let (mut notif, _controller, _notif_service) = development_notifs();
5024		let set_id = SetId::from(0);
5025		let peer = PeerId::random();
5026		let conn = ConnectionId::new_unchecked(0);
5027		let connected = ConnectedPoint::Listener {
5028			local_addr: Multiaddr::empty(),
5029			send_back_addr: Multiaddr::empty(),
5030		};
5031
5032		notif.on_swarm_event(FromSwarm::ConnectionEstablished(
5033			libp2p::swarm::behaviour::ConnectionEstablished {
5034				peer_id: peer,
5035				connection_id: conn,
5036				endpoint: &connected,
5037				failed_addresses: &[],
5038				other_established: 0usize,
5039			},
5040		));
5041		assert!(std::matches!(notif.peers.get(&(peer, set_id)), Some(&PeerState::Disabled { .. })));
5042
5043		// manually add backoff for the entry
5044		if let Some(PeerState::Disabled { ref mut backoff_until, .. }) =
5045			notif.peers.get_mut(&(peer, set_id))
5046		{
5047			*backoff_until =
5048				Some(Instant::now().checked_add(std::time::Duration::from_secs(5)).unwrap());
5049		}
5050
5051		notif.on_swarm_event(FromSwarm::ConnectionClosed(
5052			libp2p::swarm::behaviour::ConnectionClosed {
5053				peer_id: peer,
5054				connection_id: conn,
5055				endpoint: &connected.clone(),
5056				cause: None,
5057				remaining_established: 0usize,
5058			},
5059		));
5060		assert!(std::matches!(notif.peers.get(&(peer, set_id)), Some(&PeerState::Backoff { .. })));
5061
5062		notif.on_swarm_event(FromSwarm::ConnectionClosed(
5063			libp2p::swarm::behaviour::ConnectionClosed {
5064				peer_id: peer,
5065				connection_id: conn,
5066				endpoint: &connected.clone(),
5067				cause: None,
5068				remaining_established: 0usize,
5069			},
5070		));
5071	}
5072
5073	#[test]
5074	#[should_panic]
5075	#[cfg(debug_assertions)]
5076	fn open_result_ok_non_existent_peer() {
5077		let (mut notif, _controller, _notif_service) = development_notifs();
5078		let conn = ConnectionId::new_unchecked(0);
5079		let mut conn_yielder = ConnectionYielder::new();
5080
5081		notif.on_connection_handler_event(
5082			PeerId::random(),
5083			conn,
5084			conn_yielder.open_substream(PeerId::random(), 0, vec![1, 2, 3, 4]),
5085		);
5086	}
5087}