libp2p_swarm/
handler.rs

1// Copyright 2018 Parity Technologies (UK) Ltd.
2//
3// Permission is hereby granted, free of charge, to any person obtaining a
4// copy of this software and associated documentation files (the "Software"),
5// to deal in the Software without restriction, including without limitation
6// the rights to use, copy, modify, merge, publish, distribute, sublicense,
7// and/or sell copies of the Software, and to permit persons to whom the
8// Software is furnished to do so, subject to the following conditions:
9//
10// The above copyright notice and this permission notice shall be included in
11// all copies or substantial portions of the Software.
12//
13// THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS
14// OR IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
15// FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
16// AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
17// LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING
18// FROM, OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER
19// DEALINGS IN THE SOFTWARE.
20
21//! Once a connection to a remote peer is established, a [`ConnectionHandler`] negotiates
22//! and handles one or more specific protocols on the connection.
23//!
24//! Protocols are negotiated and used on individual substreams of the connection. Thus a
25//! [`ConnectionHandler`] defines the inbound and outbound upgrades to apply when creating a new
26//! inbound or outbound substream, respectively, and is notified by a [`Swarm`](crate::Swarm) when
27//! these upgrades have been successfully applied, including the final output of the upgrade. A
28//! [`ConnectionHandler`] can then continue communicating with the peer over the substream using the
29//! negotiated protocol(s).
30//!
31//! Two [`ConnectionHandler`]s can be composed with [`ConnectionHandler::select()`]
32//! in order to build a new handler supporting the combined set of protocols,
33//! with methods being dispatched to the appropriate handler according to the
34//! used protocol(s) determined by the associated types of the handlers.
35//!
36//! > **Note**: A [`ConnectionHandler`] handles one or more protocols in the context of a single
37//! >           connection with a remote. In order to handle a protocol that requires knowledge of
38//! >           the network as a whole, see the
39//! >           [`NetworkBehaviour`](crate::behaviour::NetworkBehaviour) trait.
40
41pub mod either;
42mod map_in;
43mod map_out;
44pub mod multi;
45mod one_shot;
46mod pending;
47mod select;
48
49pub use crate::upgrade::{InboundUpgradeSend, OutboundUpgradeSend, SendWrapper, UpgradeInfoSend};
50pub use map_in::MapInEvent;
51pub use map_out::MapOutEvent;
52pub use one_shot::{OneShotHandler, OneShotHandlerConfig};
53pub use pending::PendingConnectionHandler;
54pub use select::ConnectionHandlerSelect;
55
56use crate::StreamProtocol;
57use ::either::Either;
58use instant::Instant;
59use libp2p_core::Multiaddr;
60use once_cell::sync::Lazy;
61use smallvec::SmallVec;
62use std::collections::hash_map::RandomState;
63use std::collections::hash_set::{Difference, Intersection};
64use std::collections::HashSet;
65use std::iter::Peekable;
66use std::{cmp::Ordering, error, fmt, io, task::Context, task::Poll, time::Duration};
67
68/// A handler for a set of protocols used on a connection with a remote.
69///
70/// This trait should be implemented for a type that maintains the state for
71/// the execution of a specific protocol with a remote.
72///
73/// # Handling a protocol
74///
75/// Communication with a remote over a set of protocols is initiated in one of two ways:
76///
77///   1. Dialing by initiating a new outbound substream. In order to do so,
78///      [`ConnectionHandler::poll()`] must return an [`ConnectionHandlerEvent::OutboundSubstreamRequest`],
79///      providing an instance of [`libp2p_core::upgrade::OutboundUpgrade`] that is used to negotiate the
80///      protocol(s). Upon success, [`ConnectionHandler::on_connection_event`] is called with
81///      [`ConnectionEvent::FullyNegotiatedOutbound`] translating the final output of the upgrade.
82///
83///   2. Listening by accepting a new inbound substream. When a new inbound substream
84///      is created on a connection, [`ConnectionHandler::listen_protocol`] is called
85///      to obtain an instance of [`libp2p_core::upgrade::InboundUpgrade`] that is used to
86///      negotiate the protocol(s). Upon success,
87///      [`ConnectionHandler::on_connection_event`] is called with [`ConnectionEvent::FullyNegotiatedInbound`]
88///      translating the final output of the upgrade.
89///
90///
91/// # Connection Keep-Alive
92///
93/// A [`ConnectionHandler`] can influence the lifetime of the underlying connection
94/// through [`ConnectionHandler::connection_keep_alive`]. That is, the protocol
95/// implemented by the handler can include conditions for terminating the connection.
96/// The lifetime of successfully negotiated substreams is fully controlled by the handler.
97///
98/// Implementors of this trait should keep in mind that the connection can be closed at any time.
99/// When a connection is closed gracefully, the substreams used by the handler may still
100/// continue reading data until the remote closes its side of the connection.
101pub trait ConnectionHandler: Send + 'static {
102    /// A type representing the message(s) a [`NetworkBehaviour`](crate::behaviour::NetworkBehaviour) can send to a [`ConnectionHandler`] via [`ToSwarm::NotifyHandler`](crate::behaviour::ToSwarm::NotifyHandler)
103    type FromBehaviour: fmt::Debug + Send + 'static;
104    /// A type representing message(s) a [`ConnectionHandler`] can send to a [`NetworkBehaviour`](crate::behaviour::NetworkBehaviour) via [`ConnectionHandlerEvent::NotifyBehaviour`].
105    type ToBehaviour: fmt::Debug + Send + 'static;
106    /// The type of errors returned by [`ConnectionHandler::poll`].
107    #[deprecated(
108        note = "Will be removed together with `ConnectionHandlerEvent::Close`. See <https://github.com/libp2p/rust-libp2p/issues/3591> for details."
109    )]
110    type Error: error::Error + fmt::Debug + Send + 'static;
111    /// The inbound upgrade for the protocol(s) used by the handler.
112    type InboundProtocol: InboundUpgradeSend;
113    /// The outbound upgrade for the protocol(s) used by the handler.
114    type OutboundProtocol: OutboundUpgradeSend;
115    /// The type of additional information returned from `listen_protocol`.
116    type InboundOpenInfo: Send + 'static;
117    /// The type of additional information passed to an `OutboundSubstreamRequest`.
118    type OutboundOpenInfo: Send + 'static;
119
120    /// The [`InboundUpgrade`](libp2p_core::upgrade::InboundUpgrade) to apply on inbound
121    /// substreams to negotiate the desired protocols.
122    ///
123    /// > **Note**: The returned `InboundUpgrade` should always accept all the generally
124    /// >           supported protocols, even if in a specific context a particular one is
125    /// >           not supported, (eg. when only allowing one substream at a time for a protocol).
126    /// >           This allows a remote to put the list of supported protocols in a cache.
127    fn listen_protocol(&self) -> SubstreamProtocol<Self::InboundProtocol, Self::InboundOpenInfo>;
128
129    /// Returns until when the connection should be kept alive.
130    ///
131    /// This method is called by the `Swarm` after each invocation of
132    /// [`ConnectionHandler::poll`] to determine if the connection and the associated
133    /// [`ConnectionHandler`]s should be kept alive as far as this handler is concerned
134    /// and if so, for how long.
135    ///
136    /// Returning [`KeepAlive::No`] indicates that the connection should be
137    /// closed and this handler destroyed immediately.
138    ///
139    /// Returning [`KeepAlive::Until`] indicates that the connection may be closed
140    /// and this handler destroyed after the specified `Instant`.
141    ///
142    /// Returning [`KeepAlive::Yes`] indicates that the connection should
143    /// be kept alive until the next call to this method.
144    ///
145    /// > **Note**: The connection is always closed and the handler destroyed
146    /// > when [`ConnectionHandler::poll`] returns an error. Furthermore, the
147    /// > connection may be closed for reasons outside of the control
148    /// > of the handler.
149    fn connection_keep_alive(&self) -> KeepAlive;
150
151    /// Should behave like `Stream::poll()`.
152    #[allow(deprecated)]
153    fn poll(
154        &mut self,
155        cx: &mut Context<'_>,
156    ) -> Poll<
157        ConnectionHandlerEvent<
158            Self::OutboundProtocol,
159            Self::OutboundOpenInfo,
160            Self::ToBehaviour,
161            Self::Error,
162        >,
163    >;
164
165    /// Adds a closure that turns the input event into something else.
166    fn map_in_event<TNewIn, TMap>(self, map: TMap) -> MapInEvent<Self, TNewIn, TMap>
167    where
168        Self: Sized,
169        TMap: Fn(&TNewIn) -> Option<&Self::FromBehaviour>,
170    {
171        MapInEvent::new(self, map)
172    }
173
174    /// Adds a closure that turns the output event into something else.
175    fn map_out_event<TMap, TNewOut>(self, map: TMap) -> MapOutEvent<Self, TMap>
176    where
177        Self: Sized,
178        TMap: FnMut(Self::ToBehaviour) -> TNewOut,
179    {
180        MapOutEvent::new(self, map)
181    }
182
183    /// Creates a new [`ConnectionHandler`] that selects either this handler or
184    /// `other` by delegating methods calls appropriately.
185    ///
186    /// > **Note**: The largest `KeepAlive` returned by the two handlers takes precedence,
187    /// > i.e. is returned from [`ConnectionHandler::connection_keep_alive`] by the returned
188    /// > handler.
189    fn select<TProto2>(self, other: TProto2) -> ConnectionHandlerSelect<Self, TProto2>
190    where
191        Self: Sized,
192    {
193        ConnectionHandlerSelect::new(self, other)
194    }
195
196    /// Informs the handler about an event from the [`NetworkBehaviour`](super::NetworkBehaviour).
197    fn on_behaviour_event(&mut self, _event: Self::FromBehaviour);
198
199    fn on_connection_event(
200        &mut self,
201        event: ConnectionEvent<
202            Self::InboundProtocol,
203            Self::OutboundProtocol,
204            Self::InboundOpenInfo,
205            Self::OutboundOpenInfo,
206        >,
207    );
208}
209
210/// Enumeration with the list of the possible stream events
211/// to pass to [`on_connection_event`](ConnectionHandler::on_connection_event).
212pub enum ConnectionEvent<'a, IP: InboundUpgradeSend, OP: OutboundUpgradeSend, IOI, OOI> {
213    /// Informs the handler about the output of a successful upgrade on a new inbound substream.
214    FullyNegotiatedInbound(FullyNegotiatedInbound<IP, IOI>),
215    /// Informs the handler about the output of a successful upgrade on a new outbound stream.
216    FullyNegotiatedOutbound(FullyNegotiatedOutbound<OP, OOI>),
217    /// Informs the handler about a change in the address of the remote.
218    AddressChange(AddressChange<'a>),
219    /// Informs the handler that upgrading an outbound substream to the given protocol has failed.
220    DialUpgradeError(DialUpgradeError<OOI, OP>),
221    /// Informs the handler that upgrading an inbound substream to the given protocol has failed.
222    ListenUpgradeError(ListenUpgradeError<IOI, IP>),
223    /// The local [`ConnectionHandler`] added or removed support for one or more protocols.
224    LocalProtocolsChange(ProtocolsChange<'a>),
225    /// The remote [`ConnectionHandler`] now supports a different set of protocols.
226    RemoteProtocolsChange(ProtocolsChange<'a>),
227}
228
229impl<'a, IP, OP, IOI, OOI> fmt::Debug for ConnectionEvent<'a, IP, OP, IOI, OOI>
230where
231    IP: InboundUpgradeSend + fmt::Debug,
232    IP::Output: fmt::Debug,
233    IP::Error: fmt::Debug,
234    OP: OutboundUpgradeSend + fmt::Debug,
235    OP::Output: fmt::Debug,
236    OP::Error: fmt::Debug,
237    IOI: fmt::Debug,
238    OOI: fmt::Debug,
239{
240    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
241        match self {
242            ConnectionEvent::FullyNegotiatedInbound(v) => {
243                f.debug_tuple("FullyNegotiatedInbound").field(v).finish()
244            }
245            ConnectionEvent::FullyNegotiatedOutbound(v) => {
246                f.debug_tuple("FullyNegotiatedOutbound").field(v).finish()
247            }
248            ConnectionEvent::AddressChange(v) => f.debug_tuple("AddressChange").field(v).finish(),
249            ConnectionEvent::DialUpgradeError(v) => {
250                f.debug_tuple("DialUpgradeError").field(v).finish()
251            }
252            ConnectionEvent::ListenUpgradeError(v) => {
253                f.debug_tuple("ListenUpgradeError").field(v).finish()
254            }
255            ConnectionEvent::LocalProtocolsChange(v) => {
256                f.debug_tuple("LocalProtocolsChange").field(v).finish()
257            }
258            ConnectionEvent::RemoteProtocolsChange(v) => {
259                f.debug_tuple("RemoteProtocolsChange").field(v).finish()
260            }
261        }
262    }
263}
264
265impl<'a, IP: InboundUpgradeSend, OP: OutboundUpgradeSend, IOI, OOI>
266    ConnectionEvent<'a, IP, OP, IOI, OOI>
267{
268    /// Whether the event concerns an outbound stream.
269    pub fn is_outbound(&self) -> bool {
270        match self {
271            ConnectionEvent::DialUpgradeError(_) | ConnectionEvent::FullyNegotiatedOutbound(_) => {
272                true
273            }
274            ConnectionEvent::FullyNegotiatedInbound(_)
275            | ConnectionEvent::AddressChange(_)
276            | ConnectionEvent::LocalProtocolsChange(_)
277            | ConnectionEvent::RemoteProtocolsChange(_)
278            | ConnectionEvent::ListenUpgradeError(_) => false,
279        }
280    }
281
282    /// Whether the event concerns an inbound stream.
283    pub fn is_inbound(&self) -> bool {
284        match self {
285            ConnectionEvent::FullyNegotiatedInbound(_) | ConnectionEvent::ListenUpgradeError(_) => {
286                true
287            }
288            ConnectionEvent::FullyNegotiatedOutbound(_)
289            | ConnectionEvent::AddressChange(_)
290            | ConnectionEvent::LocalProtocolsChange(_)
291            | ConnectionEvent::RemoteProtocolsChange(_)
292            | ConnectionEvent::DialUpgradeError(_) => false,
293        }
294    }
295}
296
297/// [`ConnectionEvent`] variant that informs the handler about
298/// the output of a successful upgrade on a new inbound substream.
299///
300/// Note that it is up to the [`ConnectionHandler`] implementation to manage the lifetime of the
301/// negotiated inbound substreams. E.g. the implementation has to enforce a limit on the number
302/// of simultaneously open negotiated inbound substreams. In other words it is up to the
303/// [`ConnectionHandler`] implementation to stop a malicious remote node to open and keep alive
304/// an excessive amount of inbound substreams.
305#[derive(Debug)]
306pub struct FullyNegotiatedInbound<IP: InboundUpgradeSend, IOI> {
307    pub protocol: IP::Output,
308    pub info: IOI,
309}
310
311/// [`ConnectionEvent`] variant that informs the handler about successful upgrade on a new outbound stream.
312///
313/// The `protocol` field is the information that was previously passed to
314/// [`ConnectionHandlerEvent::OutboundSubstreamRequest`].
315#[derive(Debug)]
316pub struct FullyNegotiatedOutbound<OP: OutboundUpgradeSend, OOI> {
317    pub protocol: OP::Output,
318    pub info: OOI,
319}
320
321/// [`ConnectionEvent`] variant that informs the handler about a change in the address of the remote.
322#[derive(Debug)]
323pub struct AddressChange<'a> {
324    pub new_address: &'a Multiaddr,
325}
326
327/// [`ConnectionEvent`] variant that informs the handler about a change in the protocols supported on the connection.
328#[derive(Debug, Clone)]
329pub enum ProtocolsChange<'a> {
330    Added(ProtocolsAdded<'a>),
331    Removed(ProtocolsRemoved<'a>),
332}
333
334impl<'a> ProtocolsChange<'a> {
335    /// Compute the [`ProtocolsChange`] that results from adding `to_add` to `existing_protocols`.
336    ///
337    /// Returns `None` if the change is a no-op, i.e. `to_add` is a subset of `existing_protocols`.
338    pub(crate) fn add(
339        existing_protocols: &'a HashSet<StreamProtocol>,
340        to_add: &'a HashSet<StreamProtocol>,
341    ) -> Option<Self> {
342        let mut actually_added_protocols = to_add.difference(existing_protocols).peekable();
343
344        actually_added_protocols.peek()?;
345
346        Some(ProtocolsChange::Added(ProtocolsAdded {
347            protocols: actually_added_protocols,
348        }))
349    }
350
351    /// Compute the [`ProtocolsChange`] that results from removing `to_remove` from `existing_protocols`.
352    ///
353    /// Returns `None` if the change is a no-op, i.e. none of the protocols in `to_remove` are in `existing_protocols`.
354    pub(crate) fn remove(
355        existing_protocols: &'a HashSet<StreamProtocol>,
356        to_remove: &'a HashSet<StreamProtocol>,
357    ) -> Option<Self> {
358        let mut actually_removed_protocols = existing_protocols.intersection(to_remove).peekable();
359
360        actually_removed_protocols.peek()?;
361
362        Some(ProtocolsChange::Removed(ProtocolsRemoved {
363            protocols: Either::Right(actually_removed_protocols),
364        }))
365    }
366
367    /// Compute the [`ProtocolsChange`]s required to go from `existing_protocols` to `new_protocols`.
368    pub(crate) fn from_full_sets(
369        existing_protocols: &'a HashSet<StreamProtocol>,
370        new_protocols: &'a HashSet<StreamProtocol>,
371    ) -> SmallVec<[Self; 2]> {
372        if existing_protocols == new_protocols {
373            return SmallVec::new();
374        }
375
376        let mut changes = SmallVec::new();
377
378        let mut added_protocols = new_protocols.difference(existing_protocols).peekable();
379        let mut removed_protocols = existing_protocols.difference(new_protocols).peekable();
380
381        if added_protocols.peek().is_some() {
382            changes.push(ProtocolsChange::Added(ProtocolsAdded {
383                protocols: added_protocols,
384            }));
385        }
386
387        if removed_protocols.peek().is_some() {
388            changes.push(ProtocolsChange::Removed(ProtocolsRemoved {
389                protocols: Either::Left(removed_protocols),
390            }));
391        }
392
393        changes
394    }
395}
396
397/// An [`Iterator`] over all protocols that have been added.
398#[derive(Debug, Clone)]
399pub struct ProtocolsAdded<'a> {
400    protocols: Peekable<Difference<'a, StreamProtocol, RandomState>>,
401}
402
403impl<'a> ProtocolsAdded<'a> {
404    pub(crate) fn from_set(protocols: &'a HashSet<StreamProtocol, RandomState>) -> Self {
405        ProtocolsAdded {
406            protocols: protocols.difference(&EMPTY_HASHSET).peekable(),
407        }
408    }
409}
410
411/// An [`Iterator`] over all protocols that have been removed.
412#[derive(Debug, Clone)]
413pub struct ProtocolsRemoved<'a> {
414    protocols: Either<
415        Peekable<Difference<'a, StreamProtocol, RandomState>>,
416        Peekable<Intersection<'a, StreamProtocol, RandomState>>,
417    >,
418}
419
420impl<'a> ProtocolsRemoved<'a> {
421    #[cfg(test)]
422    pub(crate) fn from_set(protocols: &'a HashSet<StreamProtocol, RandomState>) -> Self {
423        ProtocolsRemoved {
424            protocols: Either::Left(protocols.difference(&EMPTY_HASHSET).peekable()),
425        }
426    }
427}
428
429impl<'a> Iterator for ProtocolsAdded<'a> {
430    type Item = &'a StreamProtocol;
431    fn next(&mut self) -> Option<Self::Item> {
432        self.protocols.next()
433    }
434}
435
436impl<'a> Iterator for ProtocolsRemoved<'a> {
437    type Item = &'a StreamProtocol;
438    fn next(&mut self) -> Option<Self::Item> {
439        self.protocols.next()
440    }
441}
442
443/// [`ConnectionEvent`] variant that informs the handler
444/// that upgrading an outbound substream to the given protocol has failed.
445#[derive(Debug)]
446pub struct DialUpgradeError<OOI, OP: OutboundUpgradeSend> {
447    pub info: OOI,
448    pub error: StreamUpgradeError<OP::Error>,
449}
450
451/// [`ConnectionEvent`] variant that informs the handler
452/// that upgrading an inbound substream to the given protocol has failed.
453#[derive(Debug)]
454pub struct ListenUpgradeError<IOI, IP: InboundUpgradeSend> {
455    pub info: IOI,
456    pub error: IP::Error,
457}
458
459/// Configuration of inbound or outbound substream protocol(s)
460/// for a [`ConnectionHandler`].
461///
462/// The inbound substream protocol(s) are defined by [`ConnectionHandler::listen_protocol`]
463/// and the outbound substream protocol(s) by [`ConnectionHandlerEvent::OutboundSubstreamRequest`].
464#[derive(Copy, Clone, Debug, PartialEq, Eq)]
465pub struct SubstreamProtocol<TUpgrade, TInfo> {
466    upgrade: TUpgrade,
467    info: TInfo,
468    timeout: Duration,
469}
470
471impl<TUpgrade, TInfo> SubstreamProtocol<TUpgrade, TInfo> {
472    /// Create a new `SubstreamProtocol` from the given upgrade.
473    ///
474    /// The default timeout for applying the given upgrade on a substream is
475    /// 10 seconds.
476    pub fn new(upgrade: TUpgrade, info: TInfo) -> Self {
477        SubstreamProtocol {
478            upgrade,
479            info,
480            timeout: Duration::from_secs(10),
481        }
482    }
483
484    /// Maps a function over the protocol upgrade.
485    pub fn map_upgrade<U, F>(self, f: F) -> SubstreamProtocol<U, TInfo>
486    where
487        F: FnOnce(TUpgrade) -> U,
488    {
489        SubstreamProtocol {
490            upgrade: f(self.upgrade),
491            info: self.info,
492            timeout: self.timeout,
493        }
494    }
495
496    /// Maps a function over the protocol info.
497    pub fn map_info<U, F>(self, f: F) -> SubstreamProtocol<TUpgrade, U>
498    where
499        F: FnOnce(TInfo) -> U,
500    {
501        SubstreamProtocol {
502            upgrade: self.upgrade,
503            info: f(self.info),
504            timeout: self.timeout,
505        }
506    }
507
508    /// Sets a new timeout for the protocol upgrade.
509    pub fn with_timeout(mut self, timeout: Duration) -> Self {
510        self.timeout = timeout;
511        self
512    }
513
514    /// Borrows the contained protocol upgrade.
515    pub fn upgrade(&self) -> &TUpgrade {
516        &self.upgrade
517    }
518
519    /// Borrows the contained protocol info.
520    pub fn info(&self) -> &TInfo {
521        &self.info
522    }
523
524    /// Borrows the timeout for the protocol upgrade.
525    pub fn timeout(&self) -> &Duration {
526        &self.timeout
527    }
528
529    /// Converts the substream protocol configuration into the contained upgrade.
530    pub fn into_upgrade(self) -> (TUpgrade, TInfo) {
531        (self.upgrade, self.info)
532    }
533}
534
535/// Event produced by a handler.
536#[derive(Debug, Clone, PartialEq, Eq)]
537pub enum ConnectionHandlerEvent<TConnectionUpgrade, TOutboundOpenInfo, TCustom, TErr> {
538    /// Request a new outbound substream to be opened with the remote.
539    OutboundSubstreamRequest {
540        /// The protocol(s) to apply on the substream.
541        protocol: SubstreamProtocol<TConnectionUpgrade, TOutboundOpenInfo>,
542    },
543
544    /// Close the connection for the given reason.
545    ///
546    /// Note this will affect all [`ConnectionHandler`]s handling this
547    /// connection, in other words it will close the connection for all
548    /// [`ConnectionHandler`]s. To signal that one has no more need for the
549    /// connection, while allowing other [`ConnectionHandler`]s to continue using
550    /// the connection, return [`KeepAlive::No`] in
551    /// [`ConnectionHandler::connection_keep_alive`].
552    #[deprecated(
553        note = "To close a connection, use `ToSwarm::CloseConnection` or `Swarm::close_connection`. See <https://github.com/libp2p/rust-libp2p/issues/3591> for more details."
554    )]
555    Close(TErr),
556    /// We learned something about the protocols supported by the remote.
557    ReportRemoteProtocols(ProtocolSupport),
558
559    /// Event that is sent to a [`NetworkBehaviour`](crate::behaviour::NetworkBehaviour).
560    NotifyBehaviour(TCustom),
561}
562
563#[derive(Debug, Clone, PartialEq, Eq)]
564pub enum ProtocolSupport {
565    /// The remote now supports these additional protocols.
566    Added(HashSet<StreamProtocol>),
567    /// The remote no longer supports these protocols.
568    Removed(HashSet<StreamProtocol>),
569}
570
571/// Event produced by a handler.
572impl<TConnectionUpgrade, TOutboundOpenInfo, TCustom, TErr>
573    ConnectionHandlerEvent<TConnectionUpgrade, TOutboundOpenInfo, TCustom, TErr>
574{
575    /// If this is an `OutboundSubstreamRequest`, maps the `info` member from a
576    /// `TOutboundOpenInfo` to something else.
577    pub fn map_outbound_open_info<F, I>(
578        self,
579        map: F,
580    ) -> ConnectionHandlerEvent<TConnectionUpgrade, I, TCustom, TErr>
581    where
582        F: FnOnce(TOutboundOpenInfo) -> I,
583    {
584        match self {
585            ConnectionHandlerEvent::OutboundSubstreamRequest { protocol } => {
586                ConnectionHandlerEvent::OutboundSubstreamRequest {
587                    protocol: protocol.map_info(map),
588                }
589            }
590            ConnectionHandlerEvent::NotifyBehaviour(val) => {
591                ConnectionHandlerEvent::NotifyBehaviour(val)
592            }
593            #[allow(deprecated)]
594            ConnectionHandlerEvent::Close(val) => ConnectionHandlerEvent::Close(val),
595            ConnectionHandlerEvent::ReportRemoteProtocols(support) => {
596                ConnectionHandlerEvent::ReportRemoteProtocols(support)
597            }
598        }
599    }
600
601    /// If this is an `OutboundSubstreamRequest`, maps the protocol (`TConnectionUpgrade`)
602    /// to something else.
603    pub fn map_protocol<F, I>(
604        self,
605        map: F,
606    ) -> ConnectionHandlerEvent<I, TOutboundOpenInfo, TCustom, TErr>
607    where
608        F: FnOnce(TConnectionUpgrade) -> I,
609    {
610        match self {
611            ConnectionHandlerEvent::OutboundSubstreamRequest { protocol } => {
612                ConnectionHandlerEvent::OutboundSubstreamRequest {
613                    protocol: protocol.map_upgrade(map),
614                }
615            }
616            ConnectionHandlerEvent::NotifyBehaviour(val) => {
617                ConnectionHandlerEvent::NotifyBehaviour(val)
618            }
619            #[allow(deprecated)]
620            ConnectionHandlerEvent::Close(val) => ConnectionHandlerEvent::Close(val),
621            ConnectionHandlerEvent::ReportRemoteProtocols(support) => {
622                ConnectionHandlerEvent::ReportRemoteProtocols(support)
623            }
624        }
625    }
626
627    /// If this is a `Custom` event, maps the content to something else.
628    pub fn map_custom<F, I>(
629        self,
630        map: F,
631    ) -> ConnectionHandlerEvent<TConnectionUpgrade, TOutboundOpenInfo, I, TErr>
632    where
633        F: FnOnce(TCustom) -> I,
634    {
635        match self {
636            ConnectionHandlerEvent::OutboundSubstreamRequest { protocol } => {
637                ConnectionHandlerEvent::OutboundSubstreamRequest { protocol }
638            }
639            ConnectionHandlerEvent::NotifyBehaviour(val) => {
640                ConnectionHandlerEvent::NotifyBehaviour(map(val))
641            }
642            #[allow(deprecated)]
643            ConnectionHandlerEvent::Close(val) => ConnectionHandlerEvent::Close(val),
644            ConnectionHandlerEvent::ReportRemoteProtocols(support) => {
645                ConnectionHandlerEvent::ReportRemoteProtocols(support)
646            }
647        }
648    }
649
650    /// If this is a `Close` event, maps the content to something else.
651    pub fn map_close<F, I>(
652        self,
653        map: F,
654    ) -> ConnectionHandlerEvent<TConnectionUpgrade, TOutboundOpenInfo, TCustom, I>
655    where
656        F: FnOnce(TErr) -> I,
657    {
658        match self {
659            ConnectionHandlerEvent::OutboundSubstreamRequest { protocol } => {
660                ConnectionHandlerEvent::OutboundSubstreamRequest { protocol }
661            }
662            ConnectionHandlerEvent::NotifyBehaviour(val) => {
663                ConnectionHandlerEvent::NotifyBehaviour(val)
664            }
665            #[allow(deprecated)]
666            ConnectionHandlerEvent::Close(val) => ConnectionHandlerEvent::Close(map(val)),
667            ConnectionHandlerEvent::ReportRemoteProtocols(support) => {
668                ConnectionHandlerEvent::ReportRemoteProtocols(support)
669            }
670        }
671    }
672}
673
674#[deprecated(note = "Renamed to `StreamUpgradeError`")]
675pub type ConnectionHandlerUpgrErr<TUpgrErr> = StreamUpgradeError<TUpgrErr>;
676
677/// Error that can happen on an outbound substream opening attempt.
678#[derive(Debug)]
679pub enum StreamUpgradeError<TUpgrErr> {
680    /// The opening attempt timed out before the negotiation was fully completed.
681    Timeout,
682    /// The upgrade produced an error.
683    Apply(TUpgrErr),
684    /// No protocol could be agreed upon.
685    NegotiationFailed,
686    /// An IO or otherwise unrecoverable error happened.
687    Io(io::Error),
688}
689
690impl<TUpgrErr> StreamUpgradeError<TUpgrErr> {
691    /// Map the inner [`StreamUpgradeError`] type.
692    pub fn map_upgrade_err<F, E>(self, f: F) -> StreamUpgradeError<E>
693    where
694        F: FnOnce(TUpgrErr) -> E,
695    {
696        match self {
697            StreamUpgradeError::Timeout => StreamUpgradeError::Timeout,
698            StreamUpgradeError::Apply(e) => StreamUpgradeError::Apply(f(e)),
699            StreamUpgradeError::NegotiationFailed => StreamUpgradeError::NegotiationFailed,
700            StreamUpgradeError::Io(e) => StreamUpgradeError::Io(e),
701        }
702    }
703}
704
705impl<TUpgrErr> fmt::Display for StreamUpgradeError<TUpgrErr>
706where
707    TUpgrErr: error::Error + 'static,
708{
709    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
710        match self {
711            StreamUpgradeError::Timeout => {
712                write!(f, "Timeout error while opening a substream")
713            }
714            StreamUpgradeError::Apply(err) => {
715                write!(f, "Apply: ")?;
716                crate::print_error_chain(f, err)
717            }
718            StreamUpgradeError::NegotiationFailed => {
719                write!(f, "no protocols could be agreed upon")
720            }
721            StreamUpgradeError::Io(e) => {
722                write!(f, "IO error: ")?;
723                crate::print_error_chain(f, e)
724            }
725        }
726    }
727}
728
729impl<TUpgrErr> error::Error for StreamUpgradeError<TUpgrErr>
730where
731    TUpgrErr: error::Error + 'static,
732{
733    fn source(&self) -> Option<&(dyn error::Error + 'static)> {
734        None
735    }
736}
737
738/// How long the connection should be kept alive.
739#[derive(Debug, Copy, Clone, PartialEq, Eq)]
740pub enum KeepAlive {
741    /// If nothing new happens, the connection should be closed at the given `Instant`.
742    #[deprecated(
743        note = "Use `swarm::Config::with_idle_connection_timeout` instead. See <https://github.com/libp2p/rust-libp2p/issues/3844> for details."
744    )]
745    Until(Instant),
746    /// Keep the connection alive.
747    Yes,
748    /// Close the connection as soon as possible.
749    No,
750}
751
752impl KeepAlive {
753    /// Returns true for `Yes`, false otherwise.
754    pub fn is_yes(&self) -> bool {
755        matches!(*self, KeepAlive::Yes)
756    }
757}
758
759impl PartialOrd for KeepAlive {
760    fn partial_cmp(&self, other: &KeepAlive) -> Option<Ordering> {
761        Some(self.cmp(other))
762    }
763}
764
765#[allow(deprecated)]
766impl Ord for KeepAlive {
767    fn cmp(&self, other: &KeepAlive) -> Ordering {
768        use self::KeepAlive::*;
769
770        match (self, other) {
771            (No, No) | (Yes, Yes) => Ordering::Equal,
772            (No, _) | (_, Yes) => Ordering::Less,
773            (_, No) | (Yes, _) => Ordering::Greater,
774            (Until(t1), Until(t2)) => t1.cmp(t2),
775        }
776    }
777}
778
779#[cfg(test)]
780impl quickcheck::Arbitrary for KeepAlive {
781    fn arbitrary(g: &mut quickcheck::Gen) -> Self {
782        match quickcheck::GenRange::gen_range(g, 1u8..4) {
783            1 =>
784            {
785                #[allow(deprecated)]
786                KeepAlive::Until(
787                    Instant::now()
788                        .checked_add(Duration::arbitrary(g))
789                        .unwrap_or(Instant::now()),
790                )
791            }
792            2 => KeepAlive::Yes,
793            3 => KeepAlive::No,
794            _ => unreachable!(),
795        }
796    }
797}
798
799/// A statically declared, empty [`HashSet`] allows us to work around borrow-checker rules for
800/// [`ProtocolsAdded::from_set`]. The lifetimes don't work unless we have a [`HashSet`] with a `'static' lifetime.
801static EMPTY_HASHSET: Lazy<HashSet<StreamProtocol>> = Lazy::new(HashSet::new);