libp2p_swarm/
handler.rs

1// Copyright 2018 Parity Technologies (UK) Ltd.
2//
3// Permission is hereby granted, free of charge, to any person obtaining a
4// copy of this software and associated documentation files (the "Software"),
5// to deal in the Software without restriction, including without limitation
6// the rights to use, copy, modify, merge, publish, distribute, sublicense,
7// and/or sell copies of the Software, and to permit persons to whom the
8// Software is furnished to do so, subject to the following conditions:
9//
10// The above copyright notice and this permission notice shall be included in
11// all copies or substantial portions of the Software.
12//
13// THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS
14// OR IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
15// FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
16// AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
17// LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING
18// FROM, OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER
19// DEALINGS IN THE SOFTWARE.
20
21//! Once a connection to a remote peer is established, a [`ConnectionHandler`] negotiates
22//! and handles one or more specific protocols on the connection.
23//!
24//! Protocols are negotiated and used on individual substreams of the connection. Thus a
25//! [`ConnectionHandler`] defines the inbound and outbound upgrades to apply when creating a new
26//! inbound or outbound substream, respectively, and is notified by a [`Swarm`](crate::Swarm) when
27//! these upgrades have been successfully applied, including the final output of the upgrade. A
28//! [`ConnectionHandler`] can then continue communicating with the peer over the substream using the
29//! negotiated protocol(s).
30//!
31//! Two [`ConnectionHandler`]s can be composed with [`ConnectionHandler::select()`]
32//! in order to build a new handler supporting the combined set of protocols,
33//! with methods being dispatched to the appropriate handler according to the
34//! used protocol(s) determined by the associated types of the handlers.
35//!
36//! > **Note**: A [`ConnectionHandler`] handles one or more protocols in the context of a single
37//! >           connection with a remote. In order to handle a protocol that requires knowledge of
38//! >           the network as a whole, see the
39//! >           [`NetworkBehaviour`](crate::behaviour::NetworkBehaviour) trait.
40
41pub mod either;
42mod map_in;
43mod map_out;
44pub mod multi;
45mod one_shot;
46mod pending;
47mod select;
48
49use crate::connection::AsStrHashEq;
50pub use crate::upgrade::{InboundUpgradeSend, OutboundUpgradeSend, SendWrapper, UpgradeInfoSend};
51pub use map_in::MapInEvent;
52pub use map_out::MapOutEvent;
53pub use one_shot::{OneShotHandler, OneShotHandlerConfig};
54pub use pending::PendingConnectionHandler;
55pub use select::ConnectionHandlerSelect;
56use smallvec::SmallVec;
57
58use crate::StreamProtocol;
59use core::slice;
60use libp2p_core::Multiaddr;
61use std::collections::{HashMap, HashSet};
62use std::{error, fmt, io, task::Context, task::Poll, time::Duration};
63
64/// A handler for a set of protocols used on a connection with a remote.
65///
66/// This trait should be implemented for a type that maintains the state for
67/// the execution of a specific protocol with a remote.
68///
69/// # Handling a protocol
70///
71/// Communication with a remote over a set of protocols is initiated in one of two ways:
72///
73///   1. Dialing by initiating a new outbound substream. In order to do so,
74///      [`ConnectionHandler::poll()`] must return an [`ConnectionHandlerEvent::OutboundSubstreamRequest`],
75///      providing an instance of [`libp2p_core::upgrade::OutboundUpgrade`] that is used to negotiate the
76///      protocol(s). Upon success, [`ConnectionHandler::on_connection_event`] is called with
77///      [`ConnectionEvent::FullyNegotiatedOutbound`] translating the final output of the upgrade.
78///
79///   2. Listening by accepting a new inbound substream. When a new inbound substream
80///      is created on a connection, [`ConnectionHandler::listen_protocol`] is called
81///      to obtain an instance of [`libp2p_core::upgrade::InboundUpgrade`] that is used to
82///      negotiate the protocol(s). Upon success,
83///      [`ConnectionHandler::on_connection_event`] is called with [`ConnectionEvent::FullyNegotiatedInbound`]
84///      translating the final output of the upgrade.
85///
86///
87/// # Connection Keep-Alive
88///
89/// A [`ConnectionHandler`] can influence the lifetime of the underlying connection
90/// through [`ConnectionHandler::connection_keep_alive`]. That is, the protocol
91/// implemented by the handler can include conditions for terminating the connection.
92/// The lifetime of successfully negotiated substreams is fully controlled by the handler.
93///
94/// Implementors of this trait should keep in mind that the connection can be closed at any time.
95/// When a connection is closed gracefully, the substreams used by the handler may still
96/// continue reading data until the remote closes its side of the connection.
97pub trait ConnectionHandler: Send + 'static {
98    /// A type representing the message(s) a [`NetworkBehaviour`](crate::behaviour::NetworkBehaviour) can send to a [`ConnectionHandler`] via [`ToSwarm::NotifyHandler`](crate::behaviour::ToSwarm::NotifyHandler)
99    type FromBehaviour: fmt::Debug + Send + 'static;
100    /// A type representing message(s) a [`ConnectionHandler`] can send to a [`NetworkBehaviour`](crate::behaviour::NetworkBehaviour) via [`ConnectionHandlerEvent::NotifyBehaviour`].
101    type ToBehaviour: fmt::Debug + Send + 'static;
102    /// The inbound upgrade for the protocol(s) used by the handler.
103    type InboundProtocol: InboundUpgradeSend;
104    /// The outbound upgrade for the protocol(s) used by the handler.
105    type OutboundProtocol: OutboundUpgradeSend;
106    /// The type of additional information returned from `listen_protocol`.
107    type InboundOpenInfo: Send + 'static;
108    /// The type of additional information passed to an `OutboundSubstreamRequest`.
109    type OutboundOpenInfo: Send + 'static;
110
111    /// The [`InboundUpgrade`](libp2p_core::upgrade::InboundUpgrade) to apply on inbound
112    /// substreams to negotiate the desired protocols.
113    ///
114    /// > **Note**: The returned `InboundUpgrade` should always accept all the generally
115    /// >           supported protocols, even if in a specific context a particular one is
116    /// >           not supported, (eg. when only allowing one substream at a time for a protocol).
117    /// >           This allows a remote to put the list of supported protocols in a cache.
118    fn listen_protocol(&self) -> SubstreamProtocol<Self::InboundProtocol, Self::InboundOpenInfo>;
119
120    /// Returns whether the connection should be kept alive.
121    ///
122    /// ## Keep alive algorithm
123    ///
124    /// A connection is always kept alive:
125    ///
126    /// - Whilst a [`ConnectionHandler`] returns [`Poll::Ready`].
127    /// - We are negotiating inbound or outbound streams.
128    /// - There are active [`Stream`](crate::Stream)s on the connection.
129    ///
130    /// The combination of the above means that _most_ protocols will not need to override this method.
131    /// This method is only invoked when all of the above are `false`, i.e. when the connection is entirely idle.
132    ///
133    /// ## Exceptions
134    ///
135    /// - Protocols like [circuit-relay v2](https://github.com/libp2p/specs/blob/master/relay/circuit-v2.md) need to keep a connection alive beyond these circumstances and can thus override this method.
136    /// - Protocols like [ping](https://github.com/libp2p/specs/blob/master/ping/ping.md) **don't** want to keep a connection alive despite an active streams.
137    ///
138    /// In that case, protocol authors can use [`Stream::ignore_for_keep_alive`](crate::Stream::ignore_for_keep_alive) to opt-out a particular stream from the keep-alive algorithm.
139    fn connection_keep_alive(&self) -> bool {
140        false
141    }
142
143    /// Should behave like `Stream::poll()`.
144    fn poll(
145        &mut self,
146        cx: &mut Context<'_>,
147    ) -> Poll<
148        ConnectionHandlerEvent<Self::OutboundProtocol, Self::OutboundOpenInfo, Self::ToBehaviour>,
149    >;
150
151    /// Gracefully close the [`ConnectionHandler`].
152    ///
153    /// The contract for this function is equivalent to a [`Stream`](futures::Stream).
154    /// When a connection is being shut down, we will first poll this function to completion.
155    /// Following that, the physical connection will be shut down.
156    ///
157    /// This is also called when the shutdown was initiated due to an error on the connection.
158    /// We therefore cannot guarantee that performing IO within here will succeed.
159    ///
160    /// To signal completion, [`Poll::Ready(None)`] should be returned.
161    ///
162    /// Implementations MUST have a [`fuse`](futures::StreamExt::fuse)-like behaviour.
163    /// That is, [`Poll::Ready(None)`] MUST be returned on repeated calls to [`ConnectionHandler::poll_close`].
164    fn poll_close(&mut self, _: &mut Context<'_>) -> Poll<Option<Self::ToBehaviour>> {
165        Poll::Ready(None)
166    }
167
168    /// Adds a closure that turns the input event into something else.
169    fn map_in_event<TNewIn, TMap>(self, map: TMap) -> MapInEvent<Self, TNewIn, TMap>
170    where
171        Self: Sized,
172        TMap: Fn(&TNewIn) -> Option<&Self::FromBehaviour>,
173    {
174        MapInEvent::new(self, map)
175    }
176
177    /// Adds a closure that turns the output event into something else.
178    fn map_out_event<TMap, TNewOut>(self, map: TMap) -> MapOutEvent<Self, TMap>
179    where
180        Self: Sized,
181        TMap: FnMut(Self::ToBehaviour) -> TNewOut,
182    {
183        MapOutEvent::new(self, map)
184    }
185
186    /// Creates a new [`ConnectionHandler`] that selects either this handler or
187    /// `other` by delegating methods calls appropriately.
188    fn select<TProto2>(self, other: TProto2) -> ConnectionHandlerSelect<Self, TProto2>
189    where
190        Self: Sized,
191    {
192        ConnectionHandlerSelect::new(self, other)
193    }
194
195    /// Informs the handler about an event from the [`NetworkBehaviour`](super::NetworkBehaviour).
196    fn on_behaviour_event(&mut self, _event: Self::FromBehaviour);
197
198    fn on_connection_event(
199        &mut self,
200        event: ConnectionEvent<
201            Self::InboundProtocol,
202            Self::OutboundProtocol,
203            Self::InboundOpenInfo,
204            Self::OutboundOpenInfo,
205        >,
206    );
207}
208
209/// Enumeration with the list of the possible stream events
210/// to pass to [`on_connection_event`](ConnectionHandler::on_connection_event).
211#[non_exhaustive]
212pub enum ConnectionEvent<'a, IP: InboundUpgradeSend, OP: OutboundUpgradeSend, IOI, OOI> {
213    /// Informs the handler about the output of a successful upgrade on a new inbound substream.
214    FullyNegotiatedInbound(FullyNegotiatedInbound<IP, IOI>),
215    /// Informs the handler about the output of a successful upgrade on a new outbound stream.
216    FullyNegotiatedOutbound(FullyNegotiatedOutbound<OP, OOI>),
217    /// Informs the handler about a change in the address of the remote.
218    AddressChange(AddressChange<'a>),
219    /// Informs the handler that upgrading an outbound substream to the given protocol has failed.
220    DialUpgradeError(DialUpgradeError<OOI, OP>),
221    /// Informs the handler that upgrading an inbound substream to the given protocol has failed.
222    ListenUpgradeError(ListenUpgradeError<IOI, IP>),
223    /// The local [`ConnectionHandler`] added or removed support for one or more protocols.
224    LocalProtocolsChange(ProtocolsChange<'a>),
225    /// The remote [`ConnectionHandler`] now supports a different set of protocols.
226    RemoteProtocolsChange(ProtocolsChange<'a>),
227}
228
229impl<'a, IP, OP, IOI, OOI> fmt::Debug for ConnectionEvent<'a, IP, OP, IOI, OOI>
230where
231    IP: InboundUpgradeSend + fmt::Debug,
232    IP::Output: fmt::Debug,
233    IP::Error: fmt::Debug,
234    OP: OutboundUpgradeSend + fmt::Debug,
235    OP::Output: fmt::Debug,
236    OP::Error: fmt::Debug,
237    IOI: fmt::Debug,
238    OOI: fmt::Debug,
239{
240    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
241        match self {
242            ConnectionEvent::FullyNegotiatedInbound(v) => {
243                f.debug_tuple("FullyNegotiatedInbound").field(v).finish()
244            }
245            ConnectionEvent::FullyNegotiatedOutbound(v) => {
246                f.debug_tuple("FullyNegotiatedOutbound").field(v).finish()
247            }
248            ConnectionEvent::AddressChange(v) => f.debug_tuple("AddressChange").field(v).finish(),
249            ConnectionEvent::DialUpgradeError(v) => {
250                f.debug_tuple("DialUpgradeError").field(v).finish()
251            }
252            ConnectionEvent::ListenUpgradeError(v) => {
253                f.debug_tuple("ListenUpgradeError").field(v).finish()
254            }
255            ConnectionEvent::LocalProtocolsChange(v) => {
256                f.debug_tuple("LocalProtocolsChange").field(v).finish()
257            }
258            ConnectionEvent::RemoteProtocolsChange(v) => {
259                f.debug_tuple("RemoteProtocolsChange").field(v).finish()
260            }
261        }
262    }
263}
264
265impl<'a, IP: InboundUpgradeSend, OP: OutboundUpgradeSend, IOI, OOI>
266    ConnectionEvent<'a, IP, OP, IOI, OOI>
267{
268    /// Whether the event concerns an outbound stream.
269    pub fn is_outbound(&self) -> bool {
270        match self {
271            ConnectionEvent::DialUpgradeError(_) | ConnectionEvent::FullyNegotiatedOutbound(_) => {
272                true
273            }
274            ConnectionEvent::FullyNegotiatedInbound(_)
275            | ConnectionEvent::AddressChange(_)
276            | ConnectionEvent::LocalProtocolsChange(_)
277            | ConnectionEvent::RemoteProtocolsChange(_)
278            | ConnectionEvent::ListenUpgradeError(_) => false,
279        }
280    }
281
282    /// Whether the event concerns an inbound stream.
283    pub fn is_inbound(&self) -> bool {
284        match self {
285            ConnectionEvent::FullyNegotiatedInbound(_) | ConnectionEvent::ListenUpgradeError(_) => {
286                true
287            }
288            ConnectionEvent::FullyNegotiatedOutbound(_)
289            | ConnectionEvent::AddressChange(_)
290            | ConnectionEvent::LocalProtocolsChange(_)
291            | ConnectionEvent::RemoteProtocolsChange(_)
292            | ConnectionEvent::DialUpgradeError(_) => false,
293        }
294    }
295}
296
297/// [`ConnectionEvent`] variant that informs the handler about
298/// the output of a successful upgrade on a new inbound substream.
299///
300/// Note that it is up to the [`ConnectionHandler`] implementation to manage the lifetime of the
301/// negotiated inbound substreams. E.g. the implementation has to enforce a limit on the number
302/// of simultaneously open negotiated inbound substreams. In other words it is up to the
303/// [`ConnectionHandler`] implementation to stop a malicious remote node to open and keep alive
304/// an excessive amount of inbound substreams.
305#[derive(Debug)]
306pub struct FullyNegotiatedInbound<IP: InboundUpgradeSend, IOI> {
307    pub protocol: IP::Output,
308    pub info: IOI,
309}
310
311/// [`ConnectionEvent`] variant that informs the handler about successful upgrade on a new outbound stream.
312///
313/// The `protocol` field is the information that was previously passed to
314/// [`ConnectionHandlerEvent::OutboundSubstreamRequest`].
315#[derive(Debug)]
316pub struct FullyNegotiatedOutbound<OP: OutboundUpgradeSend, OOI> {
317    pub protocol: OP::Output,
318    pub info: OOI,
319}
320
321/// [`ConnectionEvent`] variant that informs the handler about a change in the address of the remote.
322#[derive(Debug)]
323pub struct AddressChange<'a> {
324    pub new_address: &'a Multiaddr,
325}
326
327/// [`ConnectionEvent`] variant that informs the handler about a change in the protocols supported on the connection.
328#[derive(Debug, Clone)]
329pub enum ProtocolsChange<'a> {
330    Added(ProtocolsAdded<'a>),
331    Removed(ProtocolsRemoved<'a>),
332}
333
334impl<'a> ProtocolsChange<'a> {
335    /// Compute the protocol change for the initial set of protocols.
336    pub(crate) fn from_initial_protocols<'b, T: AsRef<str> + 'b>(
337        new_protocols: impl IntoIterator<Item = &'b T>,
338        buffer: &'a mut Vec<StreamProtocol>,
339    ) -> Self {
340        buffer.clear();
341        buffer.extend(
342            new_protocols
343                .into_iter()
344                .filter_map(|i| StreamProtocol::try_from_owned(i.as_ref().to_owned()).ok()),
345        );
346
347        ProtocolsChange::Added(ProtocolsAdded {
348            protocols: buffer.iter(),
349        })
350    }
351
352    /// Compute the [`ProtocolsChange`] that results from adding `to_add` to `existing_protocols`.
353    ///
354    /// Returns `None` if the change is a no-op, i.e. `to_add` is a subset of `existing_protocols`.
355    pub(crate) fn add(
356        existing_protocols: &HashSet<StreamProtocol>,
357        to_add: HashSet<StreamProtocol>,
358        buffer: &'a mut Vec<StreamProtocol>,
359    ) -> Option<Self> {
360        buffer.clear();
361        buffer.extend(
362            to_add
363                .into_iter()
364                .filter(|i| !existing_protocols.contains(i)),
365        );
366
367        if buffer.is_empty() {
368            return None;
369        }
370
371        Some(Self::Added(ProtocolsAdded {
372            protocols: buffer.iter(),
373        }))
374    }
375
376    /// Compute the [`ProtocolsChange`] that results from removing `to_remove` from `existing_protocols`. Removes the protocols from `existing_protocols`.
377    ///
378    /// Returns `None` if the change is a no-op, i.e. none of the protocols in `to_remove` are in `existing_protocols`.
379    pub(crate) fn remove(
380        existing_protocols: &mut HashSet<StreamProtocol>,
381        to_remove: HashSet<StreamProtocol>,
382        buffer: &'a mut Vec<StreamProtocol>,
383    ) -> Option<Self> {
384        buffer.clear();
385        buffer.extend(
386            to_remove
387                .into_iter()
388                .filter_map(|i| existing_protocols.take(&i)),
389        );
390
391        if buffer.is_empty() {
392            return None;
393        }
394
395        Some(Self::Removed(ProtocolsRemoved {
396            protocols: buffer.iter(),
397        }))
398    }
399
400    /// Compute the [`ProtocolsChange`]s required to go from `existing_protocols` to `new_protocols`.
401    pub(crate) fn from_full_sets<T: AsRef<str>>(
402        existing_protocols: &mut HashMap<AsStrHashEq<T>, bool>,
403        new_protocols: impl IntoIterator<Item = T>,
404        buffer: &'a mut Vec<StreamProtocol>,
405    ) -> SmallVec<[Self; 2]> {
406        buffer.clear();
407
408        // Initially, set the boolean for all protocols to `false`, meaning "not visited".
409        for v in existing_protocols.values_mut() {
410            *v = false;
411        }
412
413        let mut new_protocol_count = 0; // We can only iterate `new_protocols` once, so keep track of its length separately.
414        for new_protocol in new_protocols {
415            existing_protocols
416                .entry(AsStrHashEq(new_protocol))
417                .and_modify(|v| *v = true) // Mark protocol as visited (i.e. we still support it)
418                .or_insert_with_key(|k| {
419                    // Encountered a previously unsupported protocol, remember it in `buffer`.
420                    buffer.extend(StreamProtocol::try_from_owned(k.0.as_ref().to_owned()).ok());
421                    true
422                });
423            new_protocol_count += 1;
424        }
425
426        if new_protocol_count == existing_protocols.len() && buffer.is_empty() {
427            return SmallVec::new();
428        }
429
430        let num_new_protocols = buffer.len();
431        // Drain all protocols that we haven't visited.
432        // For existing protocols that are not in `new_protocols`, the boolean will be false, meaning we need to remove it.
433        existing_protocols.retain(|p, &mut is_supported| {
434            if !is_supported {
435                buffer.extend(StreamProtocol::try_from_owned(p.0.as_ref().to_owned()).ok());
436            }
437
438            is_supported
439        });
440
441        let (added, removed) = buffer.split_at(num_new_protocols);
442        let mut changes = SmallVec::new();
443        if !added.is_empty() {
444            changes.push(ProtocolsChange::Added(ProtocolsAdded {
445                protocols: added.iter(),
446            }));
447        }
448        if !removed.is_empty() {
449            changes.push(ProtocolsChange::Removed(ProtocolsRemoved {
450                protocols: removed.iter(),
451            }));
452        }
453        changes
454    }
455}
456
457/// An [`Iterator`] over all protocols that have been added.
458#[derive(Debug, Clone)]
459pub struct ProtocolsAdded<'a> {
460    pub(crate) protocols: slice::Iter<'a, StreamProtocol>,
461}
462
463/// An [`Iterator`] over all protocols that have been removed.
464#[derive(Debug, Clone)]
465pub struct ProtocolsRemoved<'a> {
466    pub(crate) protocols: slice::Iter<'a, StreamProtocol>,
467}
468
469impl<'a> Iterator for ProtocolsAdded<'a> {
470    type Item = &'a StreamProtocol;
471    fn next(&mut self) -> Option<Self::Item> {
472        self.protocols.next()
473    }
474}
475
476impl<'a> Iterator for ProtocolsRemoved<'a> {
477    type Item = &'a StreamProtocol;
478    fn next(&mut self) -> Option<Self::Item> {
479        self.protocols.next()
480    }
481}
482
483/// [`ConnectionEvent`] variant that informs the handler
484/// that upgrading an outbound substream to the given protocol has failed.
485#[derive(Debug)]
486pub struct DialUpgradeError<OOI, OP: OutboundUpgradeSend> {
487    pub info: OOI,
488    pub error: StreamUpgradeError<OP::Error>,
489}
490
491/// [`ConnectionEvent`] variant that informs the handler
492/// that upgrading an inbound substream to the given protocol has failed.
493#[derive(Debug)]
494pub struct ListenUpgradeError<IOI, IP: InboundUpgradeSend> {
495    pub info: IOI,
496    pub error: IP::Error,
497}
498
499/// Configuration of inbound or outbound substream protocol(s)
500/// for a [`ConnectionHandler`].
501///
502/// The inbound substream protocol(s) are defined by [`ConnectionHandler::listen_protocol`]
503/// and the outbound substream protocol(s) by [`ConnectionHandlerEvent::OutboundSubstreamRequest`].
504#[derive(Copy, Clone, Debug, PartialEq, Eq)]
505pub struct SubstreamProtocol<TUpgrade, TInfo> {
506    upgrade: TUpgrade,
507    info: TInfo,
508    timeout: Duration,
509}
510
511impl<TUpgrade, TInfo> SubstreamProtocol<TUpgrade, TInfo> {
512    /// Create a new `SubstreamProtocol` from the given upgrade.
513    ///
514    /// The default timeout for applying the given upgrade on a substream is
515    /// 10 seconds.
516    pub fn new(upgrade: TUpgrade, info: TInfo) -> Self {
517        SubstreamProtocol {
518            upgrade,
519            info,
520            timeout: Duration::from_secs(10),
521        }
522    }
523
524    /// Maps a function over the protocol upgrade.
525    pub fn map_upgrade<U, F>(self, f: F) -> SubstreamProtocol<U, TInfo>
526    where
527        F: FnOnce(TUpgrade) -> U,
528    {
529        SubstreamProtocol {
530            upgrade: f(self.upgrade),
531            info: self.info,
532            timeout: self.timeout,
533        }
534    }
535
536    /// Maps a function over the protocol info.
537    pub fn map_info<U, F>(self, f: F) -> SubstreamProtocol<TUpgrade, U>
538    where
539        F: FnOnce(TInfo) -> U,
540    {
541        SubstreamProtocol {
542            upgrade: self.upgrade,
543            info: f(self.info),
544            timeout: self.timeout,
545        }
546    }
547
548    /// Sets a new timeout for the protocol upgrade.
549    pub fn with_timeout(mut self, timeout: Duration) -> Self {
550        self.timeout = timeout;
551        self
552    }
553
554    /// Borrows the contained protocol upgrade.
555    pub fn upgrade(&self) -> &TUpgrade {
556        &self.upgrade
557    }
558
559    /// Borrows the contained protocol info.
560    pub fn info(&self) -> &TInfo {
561        &self.info
562    }
563
564    /// Borrows the timeout for the protocol upgrade.
565    pub fn timeout(&self) -> &Duration {
566        &self.timeout
567    }
568
569    /// Converts the substream protocol configuration into the contained upgrade.
570    pub fn into_upgrade(self) -> (TUpgrade, TInfo) {
571        (self.upgrade, self.info)
572    }
573}
574
575/// Event produced by a handler.
576#[derive(Debug, Clone, PartialEq, Eq)]
577#[non_exhaustive]
578pub enum ConnectionHandlerEvent<TConnectionUpgrade, TOutboundOpenInfo, TCustom> {
579    /// Request a new outbound substream to be opened with the remote.
580    OutboundSubstreamRequest {
581        /// The protocol(s) to apply on the substream.
582        protocol: SubstreamProtocol<TConnectionUpgrade, TOutboundOpenInfo>,
583    },
584    /// We learned something about the protocols supported by the remote.
585    ReportRemoteProtocols(ProtocolSupport),
586
587    /// Event that is sent to a [`NetworkBehaviour`](crate::behaviour::NetworkBehaviour).
588    NotifyBehaviour(TCustom),
589}
590
591#[derive(Debug, Clone, PartialEq, Eq)]
592pub enum ProtocolSupport {
593    /// The remote now supports these additional protocols.
594    Added(HashSet<StreamProtocol>),
595    /// The remote no longer supports these protocols.
596    Removed(HashSet<StreamProtocol>),
597}
598
599/// Event produced by a handler.
600impl<TConnectionUpgrade, TOutboundOpenInfo, TCustom>
601    ConnectionHandlerEvent<TConnectionUpgrade, TOutboundOpenInfo, TCustom>
602{
603    /// If this is an `OutboundSubstreamRequest`, maps the `info` member from a
604    /// `TOutboundOpenInfo` to something else.
605    pub fn map_outbound_open_info<F, I>(
606        self,
607        map: F,
608    ) -> ConnectionHandlerEvent<TConnectionUpgrade, I, TCustom>
609    where
610        F: FnOnce(TOutboundOpenInfo) -> I,
611    {
612        match self {
613            ConnectionHandlerEvent::OutboundSubstreamRequest { protocol } => {
614                ConnectionHandlerEvent::OutboundSubstreamRequest {
615                    protocol: protocol.map_info(map),
616                }
617            }
618            ConnectionHandlerEvent::NotifyBehaviour(val) => {
619                ConnectionHandlerEvent::NotifyBehaviour(val)
620            }
621            ConnectionHandlerEvent::ReportRemoteProtocols(support) => {
622                ConnectionHandlerEvent::ReportRemoteProtocols(support)
623            }
624        }
625    }
626
627    /// If this is an `OutboundSubstreamRequest`, maps the protocol (`TConnectionUpgrade`)
628    /// to something else.
629    pub fn map_protocol<F, I>(self, map: F) -> ConnectionHandlerEvent<I, TOutboundOpenInfo, TCustom>
630    where
631        F: FnOnce(TConnectionUpgrade) -> I,
632    {
633        match self {
634            ConnectionHandlerEvent::OutboundSubstreamRequest { protocol } => {
635                ConnectionHandlerEvent::OutboundSubstreamRequest {
636                    protocol: protocol.map_upgrade(map),
637                }
638            }
639            ConnectionHandlerEvent::NotifyBehaviour(val) => {
640                ConnectionHandlerEvent::NotifyBehaviour(val)
641            }
642            ConnectionHandlerEvent::ReportRemoteProtocols(support) => {
643                ConnectionHandlerEvent::ReportRemoteProtocols(support)
644            }
645        }
646    }
647
648    /// If this is a `Custom` event, maps the content to something else.
649    pub fn map_custom<F, I>(
650        self,
651        map: F,
652    ) -> ConnectionHandlerEvent<TConnectionUpgrade, TOutboundOpenInfo, I>
653    where
654        F: FnOnce(TCustom) -> I,
655    {
656        match self {
657            ConnectionHandlerEvent::OutboundSubstreamRequest { protocol } => {
658                ConnectionHandlerEvent::OutboundSubstreamRequest { protocol }
659            }
660            ConnectionHandlerEvent::NotifyBehaviour(val) => {
661                ConnectionHandlerEvent::NotifyBehaviour(map(val))
662            }
663            ConnectionHandlerEvent::ReportRemoteProtocols(support) => {
664                ConnectionHandlerEvent::ReportRemoteProtocols(support)
665            }
666        }
667    }
668}
669
670/// Error that can happen on an outbound substream opening attempt.
671#[derive(Debug)]
672pub enum StreamUpgradeError<TUpgrErr> {
673    /// The opening attempt timed out before the negotiation was fully completed.
674    Timeout,
675    /// The upgrade produced an error.
676    Apply(TUpgrErr),
677    /// No protocol could be agreed upon.
678    NegotiationFailed,
679    /// An IO or otherwise unrecoverable error happened.
680    Io(io::Error),
681}
682
683impl<TUpgrErr> StreamUpgradeError<TUpgrErr> {
684    /// Map the inner [`StreamUpgradeError`] type.
685    pub fn map_upgrade_err<F, E>(self, f: F) -> StreamUpgradeError<E>
686    where
687        F: FnOnce(TUpgrErr) -> E,
688    {
689        match self {
690            StreamUpgradeError::Timeout => StreamUpgradeError::Timeout,
691            StreamUpgradeError::Apply(e) => StreamUpgradeError::Apply(f(e)),
692            StreamUpgradeError::NegotiationFailed => StreamUpgradeError::NegotiationFailed,
693            StreamUpgradeError::Io(e) => StreamUpgradeError::Io(e),
694        }
695    }
696}
697
698impl<TUpgrErr> fmt::Display for StreamUpgradeError<TUpgrErr>
699where
700    TUpgrErr: error::Error + 'static,
701{
702    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
703        match self {
704            StreamUpgradeError::Timeout => {
705                write!(f, "Timeout error while opening a substream")
706            }
707            StreamUpgradeError::Apply(err) => {
708                write!(f, "Apply: ")?;
709                crate::print_error_chain(f, err)
710            }
711            StreamUpgradeError::NegotiationFailed => {
712                write!(f, "no protocols could be agreed upon")
713            }
714            StreamUpgradeError::Io(e) => {
715                write!(f, "IO error: ")?;
716                crate::print_error_chain(f, e)
717            }
718        }
719    }
720}
721
722impl<TUpgrErr> error::Error for StreamUpgradeError<TUpgrErr>
723where
724    TUpgrErr: error::Error + 'static,
725{
726    fn source(&self) -> Option<&(dyn error::Error + 'static)> {
727        None
728    }
729}
730
731#[cfg(test)]
732mod test {
733    use super::*;
734
735    fn protocol_set_of(s: &'static str) -> HashSet<StreamProtocol> {
736        s.split_whitespace()
737            .map(|p| StreamProtocol::try_from_owned(format!("/{p}")).unwrap())
738            .collect()
739    }
740
741    fn test_remove(
742        existing: &mut HashSet<StreamProtocol>,
743        to_remove: HashSet<StreamProtocol>,
744    ) -> HashSet<StreamProtocol> {
745        ProtocolsChange::remove(existing, to_remove, &mut Vec::new())
746            .into_iter()
747            .flat_map(|c| match c {
748                ProtocolsChange::Added(_) => panic!("unexpected added"),
749                ProtocolsChange::Removed(r) => r.cloned(),
750            })
751            .collect::<HashSet<_>>()
752    }
753
754    #[test]
755    fn test_protocol_remove_subset() {
756        let mut existing = protocol_set_of("a b c");
757        let to_remove = protocol_set_of("a b");
758
759        let change = test_remove(&mut existing, to_remove);
760
761        assert_eq!(existing, protocol_set_of("c"));
762        assert_eq!(change, protocol_set_of("a b"));
763    }
764
765    #[test]
766    fn test_protocol_remove_all() {
767        let mut existing = protocol_set_of("a b c");
768        let to_remove = protocol_set_of("a b c");
769
770        let change = test_remove(&mut existing, to_remove);
771
772        assert_eq!(existing, protocol_set_of(""));
773        assert_eq!(change, protocol_set_of("a b c"));
774    }
775
776    #[test]
777    fn test_protocol_remove_superset() {
778        let mut existing = protocol_set_of("a b c");
779        let to_remove = protocol_set_of("a b c d");
780
781        let change = test_remove(&mut existing, to_remove);
782
783        assert_eq!(existing, protocol_set_of(""));
784        assert_eq!(change, protocol_set_of("a b c"));
785    }
786
787    #[test]
788    fn test_protocol_remove_none() {
789        let mut existing = protocol_set_of("a b c");
790        let to_remove = protocol_set_of("d");
791
792        let change = test_remove(&mut existing, to_remove);
793
794        assert_eq!(existing, protocol_set_of("a b c"));
795        assert_eq!(change, protocol_set_of(""));
796    }
797
798    #[test]
799    fn test_protocol_remove_none_from_empty() {
800        let mut existing = protocol_set_of("");
801        let to_remove = protocol_set_of("d");
802
803        let change = test_remove(&mut existing, to_remove);
804
805        assert_eq!(existing, protocol_set_of(""));
806        assert_eq!(change, protocol_set_of(""));
807    }
808
809    fn test_from_full_sets(
810        existing: HashSet<StreamProtocol>,
811        new: HashSet<StreamProtocol>,
812    ) -> [HashSet<StreamProtocol>; 2] {
813        let mut buffer = Vec::new();
814        let mut existing = existing
815            .iter()
816            .map(|p| (AsStrHashEq(p.as_ref()), true))
817            .collect::<HashMap<_, _>>();
818
819        let changes = ProtocolsChange::from_full_sets(
820            &mut existing,
821            new.iter().map(AsRef::as_ref),
822            &mut buffer,
823        );
824
825        let mut added_changes = HashSet::new();
826        let mut removed_changes = HashSet::new();
827
828        for change in changes {
829            match change {
830                ProtocolsChange::Added(a) => {
831                    added_changes.extend(a.cloned());
832                }
833                ProtocolsChange::Removed(r) => {
834                    removed_changes.extend(r.cloned());
835                }
836            }
837        }
838
839        [removed_changes, added_changes]
840    }
841
842    #[test]
843    fn test_from_full_stes_subset() {
844        let existing = protocol_set_of("a b c");
845        let new = protocol_set_of("a b");
846
847        let [removed_changes, added_changes] = test_from_full_sets(existing, new);
848
849        assert_eq!(added_changes, protocol_set_of(""));
850        assert_eq!(removed_changes, protocol_set_of("c"));
851    }
852
853    #[test]
854    fn test_from_full_sets_superset() {
855        let existing = protocol_set_of("a b");
856        let new = protocol_set_of("a b c");
857
858        let [removed_changes, added_changes] = test_from_full_sets(existing, new);
859
860        assert_eq!(added_changes, protocol_set_of("c"));
861        assert_eq!(removed_changes, protocol_set_of(""));
862    }
863
864    #[test]
865    fn test_from_full_sets_intersection() {
866        let existing = protocol_set_of("a b c");
867        let new = protocol_set_of("b c d");
868
869        let [removed_changes, added_changes] = test_from_full_sets(existing, new);
870
871        assert_eq!(added_changes, protocol_set_of("d"));
872        assert_eq!(removed_changes, protocol_set_of("a"));
873    }
874
875    #[test]
876    fn test_from_full_sets_disjoint() {
877        let existing = protocol_set_of("a b c");
878        let new = protocol_set_of("d e f");
879
880        let [removed_changes, added_changes] = test_from_full_sets(existing, new);
881
882        assert_eq!(added_changes, protocol_set_of("d e f"));
883        assert_eq!(removed_changes, protocol_set_of("a b c"));
884    }
885
886    #[test]
887    fn test_from_full_sets_empty() {
888        let existing = protocol_set_of("");
889        let new = protocol_set_of("");
890
891        let [removed_changes, added_changes] = test_from_full_sets(existing, new);
892
893        assert_eq!(added_changes, protocol_set_of(""));
894        assert_eq!(removed_changes, protocol_set_of(""));
895    }
896}