libp2p_core/transport.rs
1// Copyright 2017-2018 Parity Technologies (UK) Ltd.
2//
3// Permission is hereby granted, free of charge, to any person obtaining a
4// copy of this software and associated documentation files (the "Software"),
5// to deal in the Software without restriction, including without limitation
6// the rights to use, copy, modify, merge, publish, distribute, sublicense,
7// and/or sell copies of the Software, and to permit persons to whom the
8// Software is furnished to do so, subject to the following conditions:
9//
10// The above copyright notice and this permission notice shall be included in
11// all copies or substantial portions of the Software.
12//
13// THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS
14// OR IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
15// FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
16// AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
17// LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING
18// FROM, OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER
19// DEALINGS IN THE SOFTWARE.
20
21//! Connection-oriented communication channels.
22//!
23//! The main entity of this module is the [`Transport`] trait, which provides an
24//! interface for establishing connections with other nodes, thereby negotiating
25//! any desired protocols. The rest of the module defines combinators for
26//! modifying a transport through composition with other transports or protocol upgrades.
27
28use futures::prelude::*;
29use multiaddr::Multiaddr;
30use std::{
31 error::Error,
32 fmt,
33 pin::Pin,
34 sync::atomic::{AtomicUsize, Ordering},
35 task::{Context, Poll},
36};
37
38pub mod and_then;
39pub mod choice;
40pub mod dummy;
41pub mod global_only;
42pub mod map;
43pub mod map_err;
44pub mod memory;
45pub mod timeout;
46pub mod upgrade;
47
48mod boxed;
49mod optional;
50
51use crate::ConnectedPoint;
52
53pub use self::boxed::Boxed;
54pub use self::choice::OrTransport;
55pub use self::memory::MemoryTransport;
56pub use self::optional::OptionalTransport;
57pub use self::upgrade::Upgrade;
58
59static NEXT_LISTENER_ID: AtomicUsize = AtomicUsize::new(1);
60
61/// A transport provides connection-oriented communication between two peers
62/// through ordered streams of data (i.e. connections).
63///
64/// Connections are established either by [listening](Transport::listen_on)
65/// or [dialing](Transport::dial) on a [`Transport`]. A peer that
66/// obtains a connection by listening is often referred to as the *listener* and the
67/// peer that initiated the connection through dialing as the *dialer*, in
68/// contrast to the traditional roles of *server* and *client*.
69///
70/// Most transports also provide a form of reliable delivery on the established
71/// connections but the precise semantics of these guarantees depend on the
72/// specific transport.
73///
74/// This trait is implemented for concrete connection-oriented transport protocols
75/// like TCP or Unix Domain Sockets, but also on wrappers that add additional
76/// functionality to the dialing or listening process (e.g. name resolution via
77/// the DNS).
78///
79/// Additional protocols can be layered on top of the connections established
80/// by a [`Transport`] through an upgrade mechanism that is initiated via
81/// [`upgrade`](Transport::upgrade).
82///
83/// Note for implementors: Futures returned by [`Transport::dial`] should only
84/// do work once polled for the first time. E.g. in the case of TCP, connecting
85/// to the remote should not happen immediately on [`Transport::dial`] but only
86/// once the returned [`Future`] is polled. The caller of [`Transport::dial`]
87/// may call the method multiple times with a set of addresses, racing a subset
88/// of the returned dials to success concurrently.
89pub trait Transport {
90 /// The result of a connection setup process, including protocol upgrades.
91 ///
92 /// Typically the output contains at least a handle to a data stream (i.e. a
93 /// connection or a substream multiplexer on top of a connection) that
94 /// provides APIs for sending and receiving data through the connection.
95 type Output;
96
97 /// An error that occurred during connection setup.
98 type Error: Error;
99
100 /// A pending [`Output`](Transport::Output) for an inbound connection,
101 /// obtained from the [`Transport`] stream.
102 ///
103 /// After a connection has been accepted by the transport, it may need to go through
104 /// asynchronous post-processing (i.e. protocol upgrade negotiations). Such
105 /// post-processing should not block the `Listener` from producing the next
106 /// connection, hence further connection setup proceeds asynchronously.
107 /// Once a `ListenerUpgrade` future resolves it yields the [`Output`](Transport::Output)
108 /// of the connection setup process.
109 type ListenerUpgrade: Future<Output = Result<Self::Output, Self::Error>>;
110
111 /// A pending [`Output`](Transport::Output) for an outbound connection,
112 /// obtained from [dialing](Transport::dial).
113 type Dial: Future<Output = Result<Self::Output, Self::Error>>;
114
115 /// Listens on the given [`Multiaddr`] for inbound connections with a provided [`ListenerId`].
116 fn listen_on(
117 &mut self,
118 id: ListenerId,
119 addr: Multiaddr,
120 ) -> Result<(), TransportError<Self::Error>>;
121
122 /// Remove a listener.
123 ///
124 /// Return `true` if there was a listener with this Id, `false`
125 /// otherwise.
126 fn remove_listener(&mut self, id: ListenerId) -> bool;
127
128 /// Dials the given [`Multiaddr`], returning a future for a pending outbound connection.
129 ///
130 /// If [`TransportError::MultiaddrNotSupported`] is returned, it may be desirable to
131 /// try an alternative [`Transport`], if available.
132 fn dial(&mut self, addr: Multiaddr) -> Result<Self::Dial, TransportError<Self::Error>>;
133
134 /// As [`Transport::dial`] but has the local node act as a listener on the outgoing connection.
135 ///
136 /// This option is needed for NAT and firewall hole punching.
137 ///
138 /// See [`ConnectedPoint::Dialer`](crate::connection::ConnectedPoint::Dialer) for related option.
139 fn dial_as_listener(
140 &mut self,
141 addr: Multiaddr,
142 ) -> Result<Self::Dial, TransportError<Self::Error>>;
143
144 /// Poll for [`TransportEvent`]s.
145 ///
146 /// A [`TransportEvent::Incoming`] should be produced whenever a connection is received at the lowest
147 /// level of the transport stack. The item must be a [`ListenerUpgrade`](Transport::ListenerUpgrade)
148 /// future that resolves to an [`Output`](Transport::Output) value once all protocol upgrades have
149 /// been applied.
150 ///
151 /// Transports are expected to produce [`TransportEvent::Incoming`] events only for
152 /// listen addresses which have previously been announced via
153 /// a [`TransportEvent::NewAddress`] event and which have not been invalidated by
154 /// an [`TransportEvent::AddressExpired`] event yet.
155 fn poll(
156 self: Pin<&mut Self>,
157 cx: &mut Context<'_>,
158 ) -> Poll<TransportEvent<Self::ListenerUpgrade, Self::Error>>;
159
160 /// Performs a transport-specific mapping of an address `observed` by a remote onto a
161 /// local `listen` address to yield an address for the local node that may be reachable
162 /// for other peers.
163 ///
164 /// This is relevant for transports where Network Address Translation (NAT) can occur
165 /// so that e.g. the peer is observed at a different IP than the IP of the local
166 /// listening address. See also [`address_translation`][crate::address_translation].
167 ///
168 /// Within [`libp2p::Swarm`](<https://docs.rs/libp2p/latest/libp2p/struct.Swarm.html>) this is
169 /// used when extending the listening addresses of the local peer with external addresses
170 /// observed by remote peers.
171 /// On transports where this is not relevant (i.e. no NATs are present) `None` should be
172 /// returned for the sake of de-duplication.
173 ///
174 /// Note: if the listen or observed address is not a valid address of this transport,
175 /// `None` should be returned as well.
176 fn address_translation(&self, listen: &Multiaddr, observed: &Multiaddr) -> Option<Multiaddr>;
177
178 /// Boxes the transport, including custom transport errors.
179 fn boxed(self) -> boxed::Boxed<Self::Output>
180 where
181 Self: Sized + Send + Unpin + 'static,
182 Self::Dial: Send + 'static,
183 Self::ListenerUpgrade: Send + 'static,
184 Self::Error: Send + Sync,
185 {
186 boxed::boxed(self)
187 }
188
189 /// Applies a function on the connections created by the transport.
190 fn map<F, O>(self, f: F) -> map::Map<Self, F>
191 where
192 Self: Sized,
193 F: FnOnce(Self::Output, ConnectedPoint) -> O,
194 {
195 map::Map::new(self, f)
196 }
197
198 /// Applies a function on the errors generated by the futures of the transport.
199 fn map_err<F, E>(self, f: F) -> map_err::MapErr<Self, F>
200 where
201 Self: Sized,
202 F: FnOnce(Self::Error) -> E,
203 {
204 map_err::MapErr::new(self, f)
205 }
206
207 /// Adds a fallback transport that is used when encountering errors
208 /// while establishing inbound or outbound connections.
209 ///
210 /// The returned transport will act like `self`, except that if `listen_on` or `dial`
211 /// return an error then `other` will be tried.
212 fn or_transport<U>(self, other: U) -> OrTransport<Self, U>
213 where
214 Self: Sized,
215 U: Transport,
216 <U as Transport>::Error: 'static,
217 {
218 OrTransport::new(self, other)
219 }
220
221 /// Applies a function producing an asynchronous result to every connection
222 /// created by this transport.
223 ///
224 /// This function can be used for ad-hoc protocol upgrades or
225 /// for processing or adapting the output for following configurations.
226 ///
227 /// For the high-level transport upgrade procedure, see [`Transport::upgrade`].
228 fn and_then<C, F, O>(self, f: C) -> and_then::AndThen<Self, C>
229 where
230 Self: Sized,
231 C: FnOnce(Self::Output, ConnectedPoint) -> F,
232 F: TryFuture<Ok = O>,
233 <F as TryFuture>::Error: Error + 'static,
234 {
235 and_then::AndThen::new(self, f)
236 }
237
238 /// Begins a series of protocol upgrades via an
239 /// [`upgrade::Builder`](upgrade::Builder).
240 fn upgrade(self, version: upgrade::Version) -> upgrade::Builder<Self>
241 where
242 Self: Sized,
243 Self::Error: 'static,
244 {
245 upgrade::Builder::new(self, version)
246 }
247}
248
249/// The ID of a single listener.
250#[derive(Copy, Clone, Debug, PartialEq, Eq, Hash)]
251pub struct ListenerId(usize);
252
253impl ListenerId {
254 #[deprecated(note = "Renamed to ` ListenerId::next`.")]
255 #[allow(clippy::new_without_default)]
256 /// Creates a new `ListenerId`.
257 pub fn new() -> Self {
258 ListenerId::next()
259 }
260
261 /// Creates a new `ListenerId`.
262 pub fn next() -> Self {
263 ListenerId(NEXT_LISTENER_ID.fetch_add(1, Ordering::SeqCst))
264 }
265
266 #[deprecated(note = "Use ` ListenerId::next` instead.")]
267 #[allow(clippy::should_implement_trait)]
268 pub fn default() -> Self {
269 Self::next()
270 }
271}
272
273/// Event produced by [`Transport`]s.
274pub enum TransportEvent<TUpgr, TErr> {
275 /// A new address is being listened on.
276 NewAddress {
277 /// The listener that is listening on the new address.
278 listener_id: ListenerId,
279 /// The new address that is being listened on.
280 listen_addr: Multiaddr,
281 },
282 /// An address is no longer being listened on.
283 AddressExpired {
284 /// The listener that is no longer listening on the address.
285 listener_id: ListenerId,
286 /// The new address that is being listened on.
287 listen_addr: Multiaddr,
288 },
289 /// A connection is incoming on one of the listeners.
290 Incoming {
291 /// The listener that produced the upgrade.
292 listener_id: ListenerId,
293 /// The produced upgrade.
294 upgrade: TUpgr,
295 /// Local connection address.
296 local_addr: Multiaddr,
297 /// Address used to send back data to the incoming client.
298 send_back_addr: Multiaddr,
299 },
300 /// A listener closed.
301 ListenerClosed {
302 /// The ID of the listener that closed.
303 listener_id: ListenerId,
304 /// Reason for the closure. Contains `Ok(())` if the stream produced `None`, or `Err`
305 /// if the stream produced an error.
306 reason: Result<(), TErr>,
307 },
308 /// A listener errored.
309 ///
310 /// The listener will continue to be polled for new events and the event
311 /// is for informational purposes only.
312 ListenerError {
313 /// The ID of the listener that errored.
314 listener_id: ListenerId,
315 /// The error value.
316 error: TErr,
317 },
318}
319
320impl<TUpgr, TErr> TransportEvent<TUpgr, TErr> {
321 /// In case this [`TransportEvent`] is an upgrade, apply the given function
322 /// to the upgrade and produce another transport event based the the function's result.
323 pub fn map_upgrade<U>(self, map: impl FnOnce(TUpgr) -> U) -> TransportEvent<U, TErr> {
324 match self {
325 TransportEvent::Incoming {
326 listener_id,
327 upgrade,
328 local_addr,
329 send_back_addr,
330 } => TransportEvent::Incoming {
331 listener_id,
332 upgrade: map(upgrade),
333 local_addr,
334 send_back_addr,
335 },
336 TransportEvent::NewAddress {
337 listen_addr,
338 listener_id,
339 } => TransportEvent::NewAddress {
340 listen_addr,
341 listener_id,
342 },
343 TransportEvent::AddressExpired {
344 listen_addr,
345 listener_id,
346 } => TransportEvent::AddressExpired {
347 listen_addr,
348 listener_id,
349 },
350 TransportEvent::ListenerError { listener_id, error } => {
351 TransportEvent::ListenerError { listener_id, error }
352 }
353 TransportEvent::ListenerClosed {
354 listener_id,
355 reason,
356 } => TransportEvent::ListenerClosed {
357 listener_id,
358 reason,
359 },
360 }
361 }
362
363 /// In case this [`TransportEvent`] is an [`ListenerError`](TransportEvent::ListenerError),
364 /// or [`ListenerClosed`](TransportEvent::ListenerClosed) apply the given function to the
365 /// error and produce another transport event based on the function's result.
366 pub fn map_err<E>(self, map_err: impl FnOnce(TErr) -> E) -> TransportEvent<TUpgr, E> {
367 match self {
368 TransportEvent::Incoming {
369 listener_id,
370 upgrade,
371 local_addr,
372 send_back_addr,
373 } => TransportEvent::Incoming {
374 listener_id,
375 upgrade,
376 local_addr,
377 send_back_addr,
378 },
379 TransportEvent::NewAddress {
380 listen_addr,
381 listener_id,
382 } => TransportEvent::NewAddress {
383 listen_addr,
384 listener_id,
385 },
386 TransportEvent::AddressExpired {
387 listen_addr,
388 listener_id,
389 } => TransportEvent::AddressExpired {
390 listen_addr,
391 listener_id,
392 },
393 TransportEvent::ListenerError { listener_id, error } => TransportEvent::ListenerError {
394 listener_id,
395 error: map_err(error),
396 },
397 TransportEvent::ListenerClosed {
398 listener_id,
399 reason,
400 } => TransportEvent::ListenerClosed {
401 listener_id,
402 reason: reason.map_err(map_err),
403 },
404 }
405 }
406
407 /// Returns `true` if this is an [`Incoming`](TransportEvent::Incoming) transport event.
408 pub fn is_upgrade(&self) -> bool {
409 matches!(self, TransportEvent::Incoming { .. })
410 }
411
412 /// Try to turn this transport event into the upgrade parts of the
413 /// incoming connection.
414 ///
415 /// Returns `None` if the event is not actually an incoming connection,
416 /// otherwise the upgrade and the remote address.
417 pub fn into_incoming(self) -> Option<(TUpgr, Multiaddr)> {
418 if let TransportEvent::Incoming {
419 upgrade,
420 send_back_addr,
421 ..
422 } = self
423 {
424 Some((upgrade, send_back_addr))
425 } else {
426 None
427 }
428 }
429
430 /// Returns `true` if this is a [`TransportEvent::NewAddress`].
431 pub fn is_new_address(&self) -> bool {
432 matches!(self, TransportEvent::NewAddress { .. })
433 }
434
435 /// Try to turn this transport event into the new `Multiaddr`.
436 ///
437 /// Returns `None` if the event is not actually a [`TransportEvent::NewAddress`],
438 /// otherwise the address.
439 pub fn into_new_address(self) -> Option<Multiaddr> {
440 if let TransportEvent::NewAddress { listen_addr, .. } = self {
441 Some(listen_addr)
442 } else {
443 None
444 }
445 }
446
447 /// Returns `true` if this is an [`TransportEvent::AddressExpired`].
448 pub fn is_address_expired(&self) -> bool {
449 matches!(self, TransportEvent::AddressExpired { .. })
450 }
451
452 /// Try to turn this transport event into the expire `Multiaddr`.
453 ///
454 /// Returns `None` if the event is not actually a [`TransportEvent::AddressExpired`],
455 /// otherwise the address.
456 pub fn into_address_expired(self) -> Option<Multiaddr> {
457 if let TransportEvent::AddressExpired { listen_addr, .. } = self {
458 Some(listen_addr)
459 } else {
460 None
461 }
462 }
463
464 /// Returns `true` if this is an [`TransportEvent::ListenerError`] transport event.
465 pub fn is_listener_error(&self) -> bool {
466 matches!(self, TransportEvent::ListenerError { .. })
467 }
468
469 /// Try to turn this transport event into the listener error.
470 ///
471 /// Returns `None` if the event is not actually a [`TransportEvent::ListenerError`]`,
472 /// otherwise the error.
473 pub fn into_listener_error(self) -> Option<TErr> {
474 if let TransportEvent::ListenerError { error, .. } = self {
475 Some(error)
476 } else {
477 None
478 }
479 }
480}
481
482impl<TUpgr, TErr: fmt::Debug> fmt::Debug for TransportEvent<TUpgr, TErr> {
483 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> Result<(), fmt::Error> {
484 match self {
485 TransportEvent::NewAddress {
486 listener_id,
487 listen_addr,
488 } => f
489 .debug_struct("TransportEvent::NewAddress")
490 .field("listener_id", listener_id)
491 .field("listen_addr", listen_addr)
492 .finish(),
493 TransportEvent::AddressExpired {
494 listener_id,
495 listen_addr,
496 } => f
497 .debug_struct("TransportEvent::AddressExpired")
498 .field("listener_id", listener_id)
499 .field("listen_addr", listen_addr)
500 .finish(),
501 TransportEvent::Incoming {
502 listener_id,
503 local_addr,
504 ..
505 } => f
506 .debug_struct("TransportEvent::Incoming")
507 .field("listener_id", listener_id)
508 .field("local_addr", local_addr)
509 .finish(),
510 TransportEvent::ListenerClosed {
511 listener_id,
512 reason,
513 } => f
514 .debug_struct("TransportEvent::Closed")
515 .field("listener_id", listener_id)
516 .field("reason", reason)
517 .finish(),
518 TransportEvent::ListenerError { listener_id, error } => f
519 .debug_struct("TransportEvent::ListenerError")
520 .field("listener_id", listener_id)
521 .field("error", error)
522 .finish(),
523 }
524 }
525}
526
527/// An error during [dialing][Transport::dial] or [listening][Transport::listen_on]
528/// on a [`Transport`].
529#[derive(Debug, Clone)]
530pub enum TransportError<TErr> {
531 /// The [`Multiaddr`] passed as parameter is not supported.
532 ///
533 /// Contains back the same address.
534 MultiaddrNotSupported(Multiaddr),
535
536 /// Any other error that a [`Transport`] may produce.
537 Other(TErr),
538}
539
540impl<TErr> TransportError<TErr> {
541 /// Applies a function to the the error in [`TransportError::Other`].
542 pub fn map<TNewErr>(self, map: impl FnOnce(TErr) -> TNewErr) -> TransportError<TNewErr> {
543 match self {
544 TransportError::MultiaddrNotSupported(addr) => {
545 TransportError::MultiaddrNotSupported(addr)
546 }
547 TransportError::Other(err) => TransportError::Other(map(err)),
548 }
549 }
550}
551
552impl<TErr> fmt::Display for TransportError<TErr>
553where
554 TErr: fmt::Display,
555{
556 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
557 match self {
558 TransportError::MultiaddrNotSupported(addr) => {
559 write!(f, "Multiaddr is not supported: {addr}")
560 }
561 TransportError::Other(_) => Ok(()),
562 }
563 }
564}
565
566impl<TErr> Error for TransportError<TErr>
567where
568 TErr: Error + 'static,
569{
570 fn source(&self) -> Option<&(dyn Error + 'static)> {
571 match self {
572 TransportError::MultiaddrNotSupported(_) => None,
573 TransportError::Other(err) => Some(err),
574 }
575 }
576}