libp2p_core/
transport.rs

1// Copyright 2017-2018 Parity Technologies (UK) Ltd.
2//
3// Permission is hereby granted, free of charge, to any person obtaining a
4// copy of this software and associated documentation files (the "Software"),
5// to deal in the Software without restriction, including without limitation
6// the rights to use, copy, modify, merge, publish, distribute, sublicense,
7// and/or sell copies of the Software, and to permit persons to whom the
8// Software is furnished to do so, subject to the following conditions:
9//
10// The above copyright notice and this permission notice shall be included in
11// all copies or substantial portions of the Software.
12//
13// THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS
14// OR IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
15// FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
16// AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
17// LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING
18// FROM, OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER
19// DEALINGS IN THE SOFTWARE.
20
21//! Connection-oriented communication channels.
22//!
23//! The main entity of this module is the [`Transport`] trait, which provides an
24//! interface for establishing connections with other nodes, thereby negotiating
25//! any desired protocols. The rest of the module defines combinators for
26//! modifying a transport through composition with other transports or protocol upgrades.
27
28use futures::prelude::*;
29use multiaddr::Multiaddr;
30use std::{
31    error::Error,
32    fmt,
33    pin::Pin,
34    sync::atomic::{AtomicUsize, Ordering},
35    task::{Context, Poll},
36};
37
38pub mod and_then;
39pub mod choice;
40pub mod dummy;
41pub mod global_only;
42pub mod map;
43pub mod map_err;
44pub mod memory;
45pub mod timeout;
46pub mod upgrade;
47
48mod boxed;
49mod optional;
50
51use crate::ConnectedPoint;
52
53pub use self::boxed::Boxed;
54pub use self::choice::OrTransport;
55pub use self::memory::MemoryTransport;
56pub use self::optional::OptionalTransport;
57pub use self::upgrade::Upgrade;
58
59static NEXT_LISTENER_ID: AtomicUsize = AtomicUsize::new(1);
60
61/// A transport provides connection-oriented communication between two peers
62/// through ordered streams of data (i.e. connections).
63///
64/// Connections are established either by [listening](Transport::listen_on)
65/// or [dialing](Transport::dial) on a [`Transport`]. A peer that
66/// obtains a connection by listening is often referred to as the *listener* and the
67/// peer that initiated the connection through dialing as the *dialer*, in
68/// contrast to the traditional roles of *server* and *client*.
69///
70/// Most transports also provide a form of reliable delivery on the established
71/// connections but the precise semantics of these guarantees depend on the
72/// specific transport.
73///
74/// This trait is implemented for concrete connection-oriented transport protocols
75/// like TCP or Unix Domain Sockets, but also on wrappers that add additional
76/// functionality to the dialing or listening process (e.g. name resolution via
77/// the DNS).
78///
79/// Additional protocols can be layered on top of the connections established
80/// by a [`Transport`] through an upgrade mechanism that is initiated via
81/// [`upgrade`](Transport::upgrade).
82///
83/// Note for implementors: Futures returned by [`Transport::dial`] should only
84/// do work once polled for the first time. E.g. in the case of TCP, connecting
85/// to the remote should not happen immediately on [`Transport::dial`] but only
86/// once the returned [`Future`] is polled. The caller of [`Transport::dial`]
87/// may call the method multiple times with a set of addresses, racing a subset
88/// of the returned dials to success concurrently.
89pub trait Transport {
90    /// The result of a connection setup process, including protocol upgrades.
91    ///
92    /// Typically the output contains at least a handle to a data stream (i.e. a
93    /// connection or a substream multiplexer on top of a connection) that
94    /// provides APIs for sending and receiving data through the connection.
95    type Output;
96
97    /// An error that occurred during connection setup.
98    type Error: Error;
99
100    /// A pending [`Output`](Transport::Output) for an inbound connection,
101    /// obtained from the [`Transport`] stream.
102    ///
103    /// After a connection has been accepted by the transport, it may need to go through
104    /// asynchronous post-processing (i.e. protocol upgrade negotiations). Such
105    /// post-processing should not block the `Listener` from producing the next
106    /// connection, hence further connection setup proceeds asynchronously.
107    /// Once a `ListenerUpgrade` future resolves it yields the [`Output`](Transport::Output)
108    /// of the connection setup process.
109    type ListenerUpgrade: Future<Output = Result<Self::Output, Self::Error>>;
110
111    /// A pending [`Output`](Transport::Output) for an outbound connection,
112    /// obtained from [dialing](Transport::dial).
113    type Dial: Future<Output = Result<Self::Output, Self::Error>>;
114
115    /// Listens on the given [`Multiaddr`] for inbound connections with a provided [`ListenerId`].
116    fn listen_on(
117        &mut self,
118        id: ListenerId,
119        addr: Multiaddr,
120    ) -> Result<(), TransportError<Self::Error>>;
121
122    /// Remove a listener.
123    ///
124    /// Return `true` if there was a listener with this Id, `false`
125    /// otherwise.
126    fn remove_listener(&mut self, id: ListenerId) -> bool;
127
128    /// Dials the given [`Multiaddr`], returning a future for a pending outbound connection.
129    ///
130    /// If [`TransportError::MultiaddrNotSupported`] is returned, it may be desirable to
131    /// try an alternative [`Transport`], if available.
132    fn dial(&mut self, addr: Multiaddr) -> Result<Self::Dial, TransportError<Self::Error>>;
133
134    /// As [`Transport::dial`] but has the local node act as a listener on the outgoing connection.
135    ///
136    /// This option is needed for NAT and firewall hole punching.
137    ///
138    /// See [`ConnectedPoint::Dialer`](crate::connection::ConnectedPoint::Dialer) for related option.
139    fn dial_as_listener(
140        &mut self,
141        addr: Multiaddr,
142    ) -> Result<Self::Dial, TransportError<Self::Error>>;
143
144    /// Poll for [`TransportEvent`]s.
145    ///
146    /// A [`TransportEvent::Incoming`] should be produced whenever a connection is received at the lowest
147    /// level of the transport stack. The item must be a [`ListenerUpgrade`](Transport::ListenerUpgrade)
148    /// future that resolves to an [`Output`](Transport::Output) value once all protocol upgrades have
149    /// been applied.
150    ///
151    /// Transports are expected to produce [`TransportEvent::Incoming`] events only for
152    /// listen addresses which have previously been announced via
153    /// a [`TransportEvent::NewAddress`] event and which have not been invalidated by
154    /// an [`TransportEvent::AddressExpired`] event yet.
155    fn poll(
156        self: Pin<&mut Self>,
157        cx: &mut Context<'_>,
158    ) -> Poll<TransportEvent<Self::ListenerUpgrade, Self::Error>>;
159
160    /// Performs a transport-specific mapping of an address `observed` by a remote onto a
161    /// local `listen` address to yield an address for the local node that may be reachable
162    /// for other peers.
163    ///
164    /// This is relevant for transports where Network Address Translation (NAT) can occur
165    /// so that e.g. the peer is observed at a different IP than the IP of the local
166    /// listening address. See also [`address_translation`][crate::address_translation].
167    ///
168    /// Within [`libp2p::Swarm`](<https://docs.rs/libp2p/latest/libp2p/struct.Swarm.html>) this is
169    /// used when extending the listening addresses of the local peer with external addresses
170    /// observed by remote peers.
171    /// On transports where this is not relevant (i.e. no NATs are present) `None` should be
172    /// returned for the sake of de-duplication.
173    ///
174    /// Note: if the listen or observed address is not a valid address of this transport,
175    /// `None` should be returned as well.
176    fn address_translation(&self, listen: &Multiaddr, observed: &Multiaddr) -> Option<Multiaddr>;
177
178    /// Boxes the transport, including custom transport errors.
179    fn boxed(self) -> boxed::Boxed<Self::Output>
180    where
181        Self: Sized + Send + Unpin + 'static,
182        Self::Dial: Send + 'static,
183        Self::ListenerUpgrade: Send + 'static,
184        Self::Error: Send + Sync,
185    {
186        boxed::boxed(self)
187    }
188
189    /// Applies a function on the connections created by the transport.
190    fn map<F, O>(self, f: F) -> map::Map<Self, F>
191    where
192        Self: Sized,
193        F: FnOnce(Self::Output, ConnectedPoint) -> O,
194    {
195        map::Map::new(self, f)
196    }
197
198    /// Applies a function on the errors generated by the futures of the transport.
199    fn map_err<F, E>(self, f: F) -> map_err::MapErr<Self, F>
200    where
201        Self: Sized,
202        F: FnOnce(Self::Error) -> E,
203    {
204        map_err::MapErr::new(self, f)
205    }
206
207    /// Adds a fallback transport that is used when encountering errors
208    /// while establishing inbound or outbound connections.
209    ///
210    /// The returned transport will act like `self`, except that if `listen_on` or `dial`
211    /// return an error then `other` will be tried.
212    fn or_transport<U>(self, other: U) -> OrTransport<Self, U>
213    where
214        Self: Sized,
215        U: Transport,
216        <U as Transport>::Error: 'static,
217    {
218        OrTransport::new(self, other)
219    }
220
221    /// Applies a function producing an asynchronous result to every connection
222    /// created by this transport.
223    ///
224    /// This function can be used for ad-hoc protocol upgrades or
225    /// for processing or adapting the output for following configurations.
226    ///
227    /// For the high-level transport upgrade procedure, see [`Transport::upgrade`].
228    fn and_then<C, F, O>(self, f: C) -> and_then::AndThen<Self, C>
229    where
230        Self: Sized,
231        C: FnOnce(Self::Output, ConnectedPoint) -> F,
232        F: TryFuture<Ok = O>,
233        <F as TryFuture>::Error: Error + 'static,
234    {
235        and_then::AndThen::new(self, f)
236    }
237
238    /// Begins a series of protocol upgrades via an
239    /// [`upgrade::Builder`](upgrade::Builder).
240    fn upgrade(self, version: upgrade::Version) -> upgrade::Builder<Self>
241    where
242        Self: Sized,
243        Self::Error: 'static,
244    {
245        upgrade::Builder::new(self, version)
246    }
247}
248
249/// The ID of a single listener.
250#[derive(Copy, Clone, Debug, PartialEq, Eq, Hash)]
251pub struct ListenerId(usize);
252
253impl ListenerId {
254    #[deprecated(note = "Renamed to ` ListenerId::next`.")]
255    #[allow(clippy::new_without_default)]
256    /// Creates a new `ListenerId`.
257    pub fn new() -> Self {
258        ListenerId::next()
259    }
260
261    /// Creates a new `ListenerId`.
262    pub fn next() -> Self {
263        ListenerId(NEXT_LISTENER_ID.fetch_add(1, Ordering::SeqCst))
264    }
265
266    #[deprecated(note = "Use ` ListenerId::next` instead.")]
267    #[allow(clippy::should_implement_trait)]
268    pub fn default() -> Self {
269        Self::next()
270    }
271}
272
273/// Event produced by [`Transport`]s.
274pub enum TransportEvent<TUpgr, TErr> {
275    /// A new address is being listened on.
276    NewAddress {
277        /// The listener that is listening on the new address.
278        listener_id: ListenerId,
279        /// The new address that is being listened on.
280        listen_addr: Multiaddr,
281    },
282    /// An address is no longer being listened on.
283    AddressExpired {
284        /// The listener that is no longer listening on the address.
285        listener_id: ListenerId,
286        /// The new address that is being listened on.
287        listen_addr: Multiaddr,
288    },
289    /// A connection is incoming on one of the listeners.
290    Incoming {
291        /// The listener that produced the upgrade.
292        listener_id: ListenerId,
293        /// The produced upgrade.
294        upgrade: TUpgr,
295        /// Local connection address.
296        local_addr: Multiaddr,
297        /// Address used to send back data to the incoming client.
298        send_back_addr: Multiaddr,
299    },
300    /// A listener closed.
301    ListenerClosed {
302        /// The ID of the listener that closed.
303        listener_id: ListenerId,
304        /// Reason for the closure. Contains `Ok(())` if the stream produced `None`, or `Err`
305        /// if the stream produced an error.
306        reason: Result<(), TErr>,
307    },
308    /// A listener errored.
309    ///
310    /// The listener will continue to be polled for new events and the event
311    /// is for informational purposes only.
312    ListenerError {
313        /// The ID of the listener that errored.
314        listener_id: ListenerId,
315        /// The error value.
316        error: TErr,
317    },
318}
319
320impl<TUpgr, TErr> TransportEvent<TUpgr, TErr> {
321    /// In case this [`TransportEvent`] is an upgrade, apply the given function
322    /// to the upgrade and produce another transport event based the the function's result.
323    pub fn map_upgrade<U>(self, map: impl FnOnce(TUpgr) -> U) -> TransportEvent<U, TErr> {
324        match self {
325            TransportEvent::Incoming {
326                listener_id,
327                upgrade,
328                local_addr,
329                send_back_addr,
330            } => TransportEvent::Incoming {
331                listener_id,
332                upgrade: map(upgrade),
333                local_addr,
334                send_back_addr,
335            },
336            TransportEvent::NewAddress {
337                listen_addr,
338                listener_id,
339            } => TransportEvent::NewAddress {
340                listen_addr,
341                listener_id,
342            },
343            TransportEvent::AddressExpired {
344                listen_addr,
345                listener_id,
346            } => TransportEvent::AddressExpired {
347                listen_addr,
348                listener_id,
349            },
350            TransportEvent::ListenerError { listener_id, error } => {
351                TransportEvent::ListenerError { listener_id, error }
352            }
353            TransportEvent::ListenerClosed {
354                listener_id,
355                reason,
356            } => TransportEvent::ListenerClosed {
357                listener_id,
358                reason,
359            },
360        }
361    }
362
363    /// In case this [`TransportEvent`] is an [`ListenerError`](TransportEvent::ListenerError),
364    /// or [`ListenerClosed`](TransportEvent::ListenerClosed) apply the given function to the
365    /// error and produce another transport event based on the function's result.
366    pub fn map_err<E>(self, map_err: impl FnOnce(TErr) -> E) -> TransportEvent<TUpgr, E> {
367        match self {
368            TransportEvent::Incoming {
369                listener_id,
370                upgrade,
371                local_addr,
372                send_back_addr,
373            } => TransportEvent::Incoming {
374                listener_id,
375                upgrade,
376                local_addr,
377                send_back_addr,
378            },
379            TransportEvent::NewAddress {
380                listen_addr,
381                listener_id,
382            } => TransportEvent::NewAddress {
383                listen_addr,
384                listener_id,
385            },
386            TransportEvent::AddressExpired {
387                listen_addr,
388                listener_id,
389            } => TransportEvent::AddressExpired {
390                listen_addr,
391                listener_id,
392            },
393            TransportEvent::ListenerError { listener_id, error } => TransportEvent::ListenerError {
394                listener_id,
395                error: map_err(error),
396            },
397            TransportEvent::ListenerClosed {
398                listener_id,
399                reason,
400            } => TransportEvent::ListenerClosed {
401                listener_id,
402                reason: reason.map_err(map_err),
403            },
404        }
405    }
406
407    /// Returns `true` if this is an [`Incoming`](TransportEvent::Incoming) transport event.
408    pub fn is_upgrade(&self) -> bool {
409        matches!(self, TransportEvent::Incoming { .. })
410    }
411
412    /// Try to turn this transport event into the upgrade parts of the
413    /// incoming connection.
414    ///
415    /// Returns `None` if the event is not actually an incoming connection,
416    /// otherwise the upgrade and the remote address.
417    pub fn into_incoming(self) -> Option<(TUpgr, Multiaddr)> {
418        if let TransportEvent::Incoming {
419            upgrade,
420            send_back_addr,
421            ..
422        } = self
423        {
424            Some((upgrade, send_back_addr))
425        } else {
426            None
427        }
428    }
429
430    /// Returns `true` if this is a [`TransportEvent::NewAddress`].
431    pub fn is_new_address(&self) -> bool {
432        matches!(self, TransportEvent::NewAddress { .. })
433    }
434
435    /// Try to turn this transport event into the new `Multiaddr`.
436    ///
437    /// Returns `None` if the event is not actually a [`TransportEvent::NewAddress`],
438    /// otherwise the address.
439    pub fn into_new_address(self) -> Option<Multiaddr> {
440        if let TransportEvent::NewAddress { listen_addr, .. } = self {
441            Some(listen_addr)
442        } else {
443            None
444        }
445    }
446
447    /// Returns `true` if this is an [`TransportEvent::AddressExpired`].
448    pub fn is_address_expired(&self) -> bool {
449        matches!(self, TransportEvent::AddressExpired { .. })
450    }
451
452    /// Try to turn this transport event into the expire `Multiaddr`.
453    ///
454    /// Returns `None` if the event is not actually a [`TransportEvent::AddressExpired`],
455    /// otherwise the address.
456    pub fn into_address_expired(self) -> Option<Multiaddr> {
457        if let TransportEvent::AddressExpired { listen_addr, .. } = self {
458            Some(listen_addr)
459        } else {
460            None
461        }
462    }
463
464    /// Returns `true` if this is an [`TransportEvent::ListenerError`] transport event.
465    pub fn is_listener_error(&self) -> bool {
466        matches!(self, TransportEvent::ListenerError { .. })
467    }
468
469    /// Try to turn this transport event into the listener error.
470    ///
471    /// Returns `None` if the event is not actually a [`TransportEvent::ListenerError`]`,
472    /// otherwise the error.
473    pub fn into_listener_error(self) -> Option<TErr> {
474        if let TransportEvent::ListenerError { error, .. } = self {
475            Some(error)
476        } else {
477            None
478        }
479    }
480}
481
482impl<TUpgr, TErr: fmt::Debug> fmt::Debug for TransportEvent<TUpgr, TErr> {
483    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> Result<(), fmt::Error> {
484        match self {
485            TransportEvent::NewAddress {
486                listener_id,
487                listen_addr,
488            } => f
489                .debug_struct("TransportEvent::NewAddress")
490                .field("listener_id", listener_id)
491                .field("listen_addr", listen_addr)
492                .finish(),
493            TransportEvent::AddressExpired {
494                listener_id,
495                listen_addr,
496            } => f
497                .debug_struct("TransportEvent::AddressExpired")
498                .field("listener_id", listener_id)
499                .field("listen_addr", listen_addr)
500                .finish(),
501            TransportEvent::Incoming {
502                listener_id,
503                local_addr,
504                ..
505            } => f
506                .debug_struct("TransportEvent::Incoming")
507                .field("listener_id", listener_id)
508                .field("local_addr", local_addr)
509                .finish(),
510            TransportEvent::ListenerClosed {
511                listener_id,
512                reason,
513            } => f
514                .debug_struct("TransportEvent::Closed")
515                .field("listener_id", listener_id)
516                .field("reason", reason)
517                .finish(),
518            TransportEvent::ListenerError { listener_id, error } => f
519                .debug_struct("TransportEvent::ListenerError")
520                .field("listener_id", listener_id)
521                .field("error", error)
522                .finish(),
523        }
524    }
525}
526
527/// An error during [dialing][Transport::dial] or [listening][Transport::listen_on]
528/// on a [`Transport`].
529#[derive(Debug, Clone)]
530pub enum TransportError<TErr> {
531    /// The [`Multiaddr`] passed as parameter is not supported.
532    ///
533    /// Contains back the same address.
534    MultiaddrNotSupported(Multiaddr),
535
536    /// Any other error that a [`Transport`] may produce.
537    Other(TErr),
538}
539
540impl<TErr> TransportError<TErr> {
541    /// Applies a function to the the error in [`TransportError::Other`].
542    pub fn map<TNewErr>(self, map: impl FnOnce(TErr) -> TNewErr) -> TransportError<TNewErr> {
543        match self {
544            TransportError::MultiaddrNotSupported(addr) => {
545                TransportError::MultiaddrNotSupported(addr)
546            }
547            TransportError::Other(err) => TransportError::Other(map(err)),
548        }
549    }
550}
551
552impl<TErr> fmt::Display for TransportError<TErr>
553where
554    TErr: fmt::Display,
555{
556    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
557        match self {
558            TransportError::MultiaddrNotSupported(addr) => {
559                write!(f, "Multiaddr is not supported: {addr}")
560            }
561            TransportError::Other(_) => Ok(()),
562        }
563    }
564}
565
566impl<TErr> Error for TransportError<TErr>
567where
568    TErr: Error + 'static,
569{
570    fn source(&self) -> Option<&(dyn Error + 'static)> {
571        match self {
572            TransportError::MultiaddrNotSupported(_) => None,
573            TransportError::Other(err) => Some(err),
574        }
575    }
576}