libp2p_core/transport/
upgrade.rs

1// Copyright 2017-2019 Parity Technologies (UK) Ltd.
2//
3// Permission is hereby granted, free of charge, to any person obtaining a
4// copy of this software and associated documentation files (the "Software"),
5// to deal in the Software without restriction, including without limitation
6// the rights to use, copy, modify, merge, publish, distribute, sublicense,
7// and/or sell copies of the Software, and to permit persons to whom the
8// Software is furnished to do so, subject to the following conditions:
9//
10// The above copyright notice and this permission notice shall be included in
11// all copies or substantial portions of the Software.
12//
13// THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS
14// OR IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
15// FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
16// AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
17// LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING
18// FROM, OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER
19// DEALINGS IN THE SOFTWARE.
20
21//! Configuration of transport protocol upgrades.
22
23pub use crate::upgrade::Version;
24
25use crate::{
26    connection::ConnectedPoint,
27    muxing::{StreamMuxer, StreamMuxerBox},
28    transport::{
29        and_then::AndThen, boxed::boxed, timeout::TransportTimeout, ListenerId, Transport,
30        TransportError, TransportEvent,
31    },
32    upgrade::{
33        self, apply_inbound, apply_outbound, InboundConnectionUpgrade, InboundUpgradeApply,
34        OutboundConnectionUpgrade, OutboundUpgradeApply, UpgradeError,
35    },
36    Negotiated,
37};
38use futures::{prelude::*, ready};
39use libp2p_identity::PeerId;
40use multiaddr::Multiaddr;
41use std::{
42    error::Error,
43    fmt,
44    pin::Pin,
45    task::{Context, Poll},
46    time::Duration,
47};
48
49/// A `Builder` facilitates upgrading of a [`Transport`] for use with
50/// a `Swarm`.
51///
52/// The upgrade process is defined by the following stages:
53///
54///    [`authenticate`](Builder::authenticate)`{1}`
55/// -> [`apply`](Authenticated::apply)`{*}`
56/// -> [`multiplex`](Authenticated::multiplex)`{1}`
57///
58/// It thus enforces the following invariants on every transport
59/// obtained from [`multiplex`](Authenticated::multiplex):
60///
61///   1. The transport must be [authenticated](Builder::authenticate)
62///      and [multiplexed](Authenticated::multiplex).
63///   2. Authentication must precede the negotiation of a multiplexer.
64///   3. Applying a multiplexer is the last step in the upgrade process.
65///   4. The [`Transport::Output`] conforms to the requirements of a `Swarm`,
66///      namely a tuple of a [`PeerId`] (from the authentication upgrade) and a
67///      [`StreamMuxer`] (from the multiplexing upgrade).
68#[derive(Clone)]
69pub struct Builder<T> {
70    inner: T,
71    version: upgrade::Version,
72}
73
74impl<T> Builder<T>
75where
76    T: Transport,
77    T::Error: 'static,
78{
79    /// Creates a `Builder` over the given (base) `Transport`.
80    pub fn new(inner: T, version: upgrade::Version) -> Builder<T> {
81        Builder { inner, version }
82    }
83
84    /// Upgrades the transport to perform authentication of the remote.
85    ///
86    /// The supplied upgrade receives the I/O resource `C` and must
87    /// produce a pair `(PeerId, D)`, where `D` is a new I/O resource.
88    /// The upgrade must thus at a minimum identify the remote, which typically
89    /// involves the use of a cryptographic authentication protocol in the
90    /// context of establishing a secure channel.
91    ///
92    /// ## Transitions
93    ///
94    ///   * I/O upgrade: `C -> (PeerId, D)`.
95    ///   * Transport output: `C -> (PeerId, D)`
96    pub fn authenticate<C, D, U, E>(
97        self,
98        upgrade: U,
99    ) -> Authenticated<AndThen<T, impl FnOnce(C, ConnectedPoint) -> Authenticate<C, U> + Clone>>
100    where
101        T: Transport<Output = C>,
102        C: AsyncRead + AsyncWrite + Unpin,
103        D: AsyncRead + AsyncWrite + Unpin,
104        U: InboundConnectionUpgrade<Negotiated<C>, Output = (PeerId, D), Error = E>,
105        U: OutboundConnectionUpgrade<Negotiated<C>, Output = (PeerId, D), Error = E> + Clone,
106        E: Error + 'static,
107    {
108        let version = self.version;
109        Authenticated(Builder::new(
110            self.inner.and_then(move |conn, endpoint| Authenticate {
111                inner: upgrade::apply(conn, upgrade, endpoint, version),
112            }),
113            version,
114        ))
115    }
116}
117
118/// An upgrade that authenticates the remote peer, typically
119/// in the context of negotiating a secure channel.
120///
121/// Configured through [`Builder::authenticate`].
122#[pin_project::pin_project]
123pub struct Authenticate<C, U>
124where
125    C: AsyncRead + AsyncWrite + Unpin,
126    U: InboundConnectionUpgrade<Negotiated<C>> + OutboundConnectionUpgrade<Negotiated<C>>,
127{
128    #[pin]
129    inner: EitherUpgrade<C, U>,
130}
131
132impl<C, U> Future for Authenticate<C, U>
133where
134    C: AsyncRead + AsyncWrite + Unpin,
135    U: InboundConnectionUpgrade<Negotiated<C>>
136        + OutboundConnectionUpgrade<
137            Negotiated<C>,
138            Output = <U as InboundConnectionUpgrade<Negotiated<C>>>::Output,
139            Error = <U as InboundConnectionUpgrade<Negotiated<C>>>::Error,
140        >,
141{
142    type Output = <EitherUpgrade<C, U> as Future>::Output;
143
144    fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
145        let this = self.project();
146        Future::poll(this.inner, cx)
147    }
148}
149
150/// An upgrade that negotiates a (sub)stream multiplexer on
151/// top of an authenticated transport.
152///
153/// Configured through [`Authenticated::multiplex`].
154#[pin_project::pin_project]
155pub struct Multiplex<C, U>
156where
157    C: AsyncRead + AsyncWrite + Unpin,
158    U: InboundConnectionUpgrade<Negotiated<C>> + OutboundConnectionUpgrade<Negotiated<C>>,
159{
160    peer_id: Option<PeerId>,
161    #[pin]
162    upgrade: EitherUpgrade<C, U>,
163}
164
165impl<C, U, M, E> Future for Multiplex<C, U>
166where
167    C: AsyncRead + AsyncWrite + Unpin,
168    U: InboundConnectionUpgrade<Negotiated<C>, Output = M, Error = E>,
169    U: OutboundConnectionUpgrade<Negotiated<C>, Output = M, Error = E>,
170{
171    type Output = Result<(PeerId, M), UpgradeError<E>>;
172
173    fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
174        let this = self.project();
175        let m = match ready!(Future::poll(this.upgrade, cx)) {
176            Ok(m) => m,
177            Err(err) => return Poll::Ready(Err(err)),
178        };
179        let i = this
180            .peer_id
181            .take()
182            .expect("Multiplex future polled after completion.");
183        Poll::Ready(Ok((i, m)))
184    }
185}
186
187/// An transport with peer authentication, obtained from [`Builder::authenticate`].
188#[derive(Clone)]
189pub struct Authenticated<T>(Builder<T>);
190
191impl<T> Authenticated<T>
192where
193    T: Transport,
194    T::Error: 'static,
195{
196    /// Applies an arbitrary upgrade.
197    ///
198    /// The upgrade receives the I/O resource (i.e. connection) `C` and
199    /// must produce a new I/O resource `D`. Any number of such upgrades
200    /// can be performed.
201    ///
202    /// ## Transitions
203    ///
204    ///   * I/O upgrade: `C -> D`.
205    ///   * Transport output: `(PeerId, C) -> (PeerId, D)`.
206    pub fn apply<C, D, U, E>(self, upgrade: U) -> Authenticated<Upgrade<T, U>>
207    where
208        T: Transport<Output = (PeerId, C)>,
209        C: AsyncRead + AsyncWrite + Unpin,
210        D: AsyncRead + AsyncWrite + Unpin,
211        U: InboundConnectionUpgrade<Negotiated<C>, Output = D, Error = E>,
212        U: OutboundConnectionUpgrade<Negotiated<C>, Output = D, Error = E> + Clone,
213        E: Error + 'static,
214    {
215        Authenticated(Builder::new(
216            Upgrade::new(self.0.inner, upgrade),
217            self.0.version,
218        ))
219    }
220
221    /// Upgrades the transport with a (sub)stream multiplexer.
222    ///
223    /// The supplied upgrade receives the I/O resource `C` and must
224    /// produce a [`StreamMuxer`] `M`. The transport must already be authenticated.
225    /// This ends the (regular) transport upgrade process.
226    ///
227    /// ## Transitions
228    ///
229    ///   * I/O upgrade: `C -> M`.
230    ///   * Transport output: `(PeerId, C) -> (PeerId, M)`.
231    pub fn multiplex<C, M, U, E>(
232        self,
233        upgrade: U,
234    ) -> Multiplexed<AndThen<T, impl FnOnce((PeerId, C), ConnectedPoint) -> Multiplex<C, U> + Clone>>
235    where
236        T: Transport<Output = (PeerId, C)>,
237        C: AsyncRead + AsyncWrite + Unpin,
238        M: StreamMuxer,
239        U: InboundConnectionUpgrade<Negotiated<C>, Output = M, Error = E>,
240        U: OutboundConnectionUpgrade<Negotiated<C>, Output = M, Error = E> + Clone,
241        E: Error + 'static,
242    {
243        let version = self.0.version;
244        Multiplexed(self.0.inner.and_then(move |(i, c), endpoint| {
245            let upgrade = upgrade::apply(c, upgrade, endpoint, version);
246            Multiplex {
247                peer_id: Some(i),
248                upgrade,
249            }
250        }))
251    }
252
253    /// Like [`Authenticated::multiplex`] but accepts a function which returns the upgrade.
254    ///
255    /// The supplied function is applied to [`PeerId`] and [`ConnectedPoint`]
256    /// and returns an upgrade which receives the I/O resource `C` and must
257    /// produce a [`StreamMuxer`] `M`. The transport must already be authenticated.
258    /// This ends the (regular) transport upgrade process.
259    ///
260    /// ## Transitions
261    ///
262    ///   * I/O upgrade: `C -> M`.
263    ///   * Transport output: `(PeerId, C) -> (PeerId, M)`.
264    pub fn multiplex_ext<C, M, U, E, F>(
265        self,
266        up: F,
267    ) -> Multiplexed<AndThen<T, impl FnOnce((PeerId, C), ConnectedPoint) -> Multiplex<C, U> + Clone>>
268    where
269        T: Transport<Output = (PeerId, C)>,
270        C: AsyncRead + AsyncWrite + Unpin,
271        M: StreamMuxer,
272        U: InboundConnectionUpgrade<Negotiated<C>, Output = M, Error = E>,
273        U: OutboundConnectionUpgrade<Negotiated<C>, Output = M, Error = E> + Clone,
274        E: Error + 'static,
275        F: for<'a> FnOnce(&'a PeerId, &'a ConnectedPoint) -> U + Clone,
276    {
277        let version = self.0.version;
278        Multiplexed(self.0.inner.and_then(move |(peer_id, c), endpoint| {
279            let upgrade = upgrade::apply(c, up(&peer_id, &endpoint), endpoint, version);
280            Multiplex {
281                peer_id: Some(peer_id),
282                upgrade,
283            }
284        }))
285    }
286}
287
288/// A authenticated and multiplexed transport, obtained from
289/// [`Authenticated::multiplex`].
290#[derive(Clone)]
291#[pin_project::pin_project]
292pub struct Multiplexed<T>(#[pin] T);
293
294impl<T> Multiplexed<T> {
295    /// Boxes the authenticated, multiplexed transport, including
296    /// the [`StreamMuxer`] and custom transport errors.
297    pub fn boxed<M>(self) -> super::Boxed<(PeerId, StreamMuxerBox)>
298    where
299        T: Transport<Output = (PeerId, M)> + Sized + Send + Unpin + 'static,
300        T::Dial: Send + 'static,
301        T::ListenerUpgrade: Send + 'static,
302        T::Error: Send + Sync,
303        M: StreamMuxer + Send + 'static,
304        M::Substream: Send + 'static,
305        M::Error: Send + Sync + 'static,
306    {
307        boxed(self.map(|(i, m), _| (i, StreamMuxerBox::new(m))))
308    }
309
310    /// Adds a timeout to the setup and protocol upgrade process for all
311    /// inbound and outbound connections established through the transport.
312    pub fn timeout(self, timeout: Duration) -> Multiplexed<TransportTimeout<T>> {
313        Multiplexed(TransportTimeout::new(self.0, timeout))
314    }
315
316    /// Adds a timeout to the setup and protocol upgrade process for all
317    /// outbound connections established through the transport.
318    pub fn outbound_timeout(self, timeout: Duration) -> Multiplexed<TransportTimeout<T>> {
319        Multiplexed(TransportTimeout::with_outgoing_timeout(self.0, timeout))
320    }
321
322    /// Adds a timeout to the setup and protocol upgrade process for all
323    /// inbound connections established through the transport.
324    pub fn inbound_timeout(self, timeout: Duration) -> Multiplexed<TransportTimeout<T>> {
325        Multiplexed(TransportTimeout::with_ingoing_timeout(self.0, timeout))
326    }
327}
328
329impl<T> Transport for Multiplexed<T>
330where
331    T: Transport,
332{
333    type Output = T::Output;
334    type Error = T::Error;
335    type ListenerUpgrade = T::ListenerUpgrade;
336    type Dial = T::Dial;
337
338    fn dial(&mut self, addr: Multiaddr) -> Result<Self::Dial, TransportError<Self::Error>> {
339        self.0.dial(addr)
340    }
341
342    fn remove_listener(&mut self, id: ListenerId) -> bool {
343        self.0.remove_listener(id)
344    }
345
346    fn dial_as_listener(
347        &mut self,
348        addr: Multiaddr,
349    ) -> Result<Self::Dial, TransportError<Self::Error>> {
350        self.0.dial_as_listener(addr)
351    }
352
353    fn listen_on(
354        &mut self,
355        id: ListenerId,
356        addr: Multiaddr,
357    ) -> Result<(), TransportError<Self::Error>> {
358        self.0.listen_on(id, addr)
359    }
360
361    fn address_translation(&self, server: &Multiaddr, observed: &Multiaddr) -> Option<Multiaddr> {
362        self.0.address_translation(server, observed)
363    }
364
365    fn poll(
366        self: Pin<&mut Self>,
367        cx: &mut Context<'_>,
368    ) -> Poll<TransportEvent<Self::ListenerUpgrade, Self::Error>> {
369        self.project().0.poll(cx)
370    }
371}
372
373/// An inbound or outbound upgrade.
374type EitherUpgrade<C, U> = future::Either<InboundUpgradeApply<C, U>, OutboundUpgradeApply<C, U>>;
375
376/// A custom upgrade on an [`Authenticated`] transport.
377///
378/// See [`Transport::upgrade`]
379#[derive(Debug, Copy, Clone)]
380#[pin_project::pin_project]
381pub struct Upgrade<T, U> {
382    #[pin]
383    inner: T,
384    upgrade: U,
385}
386
387impl<T, U> Upgrade<T, U> {
388    pub fn new(inner: T, upgrade: U) -> Self {
389        Upgrade { inner, upgrade }
390    }
391}
392
393impl<T, C, D, U, E> Transport for Upgrade<T, U>
394where
395    T: Transport<Output = (PeerId, C)>,
396    T::Error: 'static,
397    C: AsyncRead + AsyncWrite + Unpin,
398    U: InboundConnectionUpgrade<Negotiated<C>, Output = D, Error = E>,
399    U: OutboundConnectionUpgrade<Negotiated<C>, Output = D, Error = E> + Clone,
400    E: Error + 'static,
401{
402    type Output = (PeerId, D);
403    type Error = TransportUpgradeError<T::Error, E>;
404    type ListenerUpgrade = ListenerUpgradeFuture<T::ListenerUpgrade, U, C>;
405    type Dial = DialUpgradeFuture<T::Dial, U, C>;
406
407    fn dial(&mut self, addr: Multiaddr) -> Result<Self::Dial, TransportError<Self::Error>> {
408        let future = self
409            .inner
410            .dial(addr)
411            .map_err(|err| err.map(TransportUpgradeError::Transport))?;
412        Ok(DialUpgradeFuture {
413            future: Box::pin(future),
414            upgrade: future::Either::Left(Some(self.upgrade.clone())),
415        })
416    }
417
418    fn remove_listener(&mut self, id: ListenerId) -> bool {
419        self.inner.remove_listener(id)
420    }
421
422    fn dial_as_listener(
423        &mut self,
424        addr: Multiaddr,
425    ) -> Result<Self::Dial, TransportError<Self::Error>> {
426        let future = self
427            .inner
428            .dial_as_listener(addr)
429            .map_err(|err| err.map(TransportUpgradeError::Transport))?;
430        Ok(DialUpgradeFuture {
431            future: Box::pin(future),
432            upgrade: future::Either::Left(Some(self.upgrade.clone())),
433        })
434    }
435
436    fn listen_on(
437        &mut self,
438        id: ListenerId,
439        addr: Multiaddr,
440    ) -> Result<(), TransportError<Self::Error>> {
441        self.inner
442            .listen_on(id, addr)
443            .map_err(|err| err.map(TransportUpgradeError::Transport))
444    }
445
446    fn address_translation(&self, server: &Multiaddr, observed: &Multiaddr) -> Option<Multiaddr> {
447        self.inner.address_translation(server, observed)
448    }
449
450    fn poll(
451        self: Pin<&mut Self>,
452        cx: &mut Context<'_>,
453    ) -> Poll<TransportEvent<Self::ListenerUpgrade, Self::Error>> {
454        let this = self.project();
455        let upgrade = this.upgrade.clone();
456        this.inner.poll(cx).map(|event| {
457            event
458                .map_upgrade(move |future| ListenerUpgradeFuture {
459                    future: Box::pin(future),
460                    upgrade: future::Either::Left(Some(upgrade)),
461                })
462                .map_err(TransportUpgradeError::Transport)
463        })
464    }
465}
466
467/// Errors produced by a transport upgrade.
468#[derive(Debug)]
469pub enum TransportUpgradeError<T, U> {
470    /// Error in the transport.
471    Transport(T),
472    /// Error while upgrading to a protocol.
473    Upgrade(UpgradeError<U>),
474}
475
476impl<T, U> fmt::Display for TransportUpgradeError<T, U>
477where
478    T: fmt::Display,
479    U: fmt::Display,
480{
481    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
482        match self {
483            TransportUpgradeError::Transport(e) => write!(f, "Transport error: {e}"),
484            TransportUpgradeError::Upgrade(e) => write!(f, "Upgrade error: {e}"),
485        }
486    }
487}
488
489impl<T, U> Error for TransportUpgradeError<T, U>
490where
491    T: Error + 'static,
492    U: Error + 'static,
493{
494    fn source(&self) -> Option<&(dyn Error + 'static)> {
495        match self {
496            TransportUpgradeError::Transport(e) => Some(e),
497            TransportUpgradeError::Upgrade(e) => Some(e),
498        }
499    }
500}
501
502/// The [`Transport::Dial`] future of an [`Upgrade`]d transport.
503pub struct DialUpgradeFuture<F, U, C>
504where
505    U: OutboundConnectionUpgrade<Negotiated<C>>,
506    C: AsyncRead + AsyncWrite + Unpin,
507{
508    future: Pin<Box<F>>,
509    upgrade: future::Either<Option<U>, (PeerId, OutboundUpgradeApply<C, U>)>,
510}
511
512impl<F, U, C, D> Future for DialUpgradeFuture<F, U, C>
513where
514    F: TryFuture<Ok = (PeerId, C)>,
515    C: AsyncRead + AsyncWrite + Unpin,
516    U: OutboundConnectionUpgrade<Negotiated<C>, Output = D>,
517    U::Error: Error,
518{
519    type Output = Result<(PeerId, D), TransportUpgradeError<F::Error, U::Error>>;
520
521    fn poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
522        // We use a `this` variable because the compiler can't mutably borrow multiple times
523        // accross a `Deref`.
524        let this = &mut *self;
525
526        loop {
527            this.upgrade = match this.upgrade {
528                future::Either::Left(ref mut up) => {
529                    let (i, c) = match ready!(TryFuture::try_poll(this.future.as_mut(), cx)
530                        .map_err(TransportUpgradeError::Transport))
531                    {
532                        Ok(v) => v,
533                        Err(err) => return Poll::Ready(Err(err)),
534                    };
535                    let u = up
536                        .take()
537                        .expect("DialUpgradeFuture is constructed with Either::Left(Some).");
538                    future::Either::Right((i, apply_outbound(c, u, upgrade::Version::V1)))
539                }
540                future::Either::Right((i, ref mut up)) => {
541                    let d = match ready!(
542                        Future::poll(Pin::new(up), cx).map_err(TransportUpgradeError::Upgrade)
543                    ) {
544                        Ok(d) => d,
545                        Err(err) => return Poll::Ready(Err(err)),
546                    };
547                    return Poll::Ready(Ok((i, d)));
548                }
549            }
550        }
551    }
552}
553
554impl<F, U, C> Unpin for DialUpgradeFuture<F, U, C>
555where
556    U: OutboundConnectionUpgrade<Negotiated<C>>,
557    C: AsyncRead + AsyncWrite + Unpin,
558{
559}
560
561/// The [`Transport::ListenerUpgrade`] future of an [`Upgrade`]d transport.
562pub struct ListenerUpgradeFuture<F, U, C>
563where
564    C: AsyncRead + AsyncWrite + Unpin,
565    U: InboundConnectionUpgrade<Negotiated<C>>,
566{
567    future: Pin<Box<F>>,
568    upgrade: future::Either<Option<U>, (PeerId, InboundUpgradeApply<C, U>)>,
569}
570
571impl<F, U, C, D> Future for ListenerUpgradeFuture<F, U, C>
572where
573    F: TryFuture<Ok = (PeerId, C)>,
574    C: AsyncRead + AsyncWrite + Unpin,
575    U: InboundConnectionUpgrade<Negotiated<C>, Output = D>,
576    U::Error: Error,
577{
578    type Output = Result<(PeerId, D), TransportUpgradeError<F::Error, U::Error>>;
579
580    fn poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
581        // We use a `this` variable because the compiler can't mutably borrow multiple times
582        // accross a `Deref`.
583        let this = &mut *self;
584
585        loop {
586            this.upgrade = match this.upgrade {
587                future::Either::Left(ref mut up) => {
588                    let (i, c) = match ready!(TryFuture::try_poll(this.future.as_mut(), cx)
589                        .map_err(TransportUpgradeError::Transport))
590                    {
591                        Ok(v) => v,
592                        Err(err) => return Poll::Ready(Err(err)),
593                    };
594                    let u = up
595                        .take()
596                        .expect("ListenerUpgradeFuture is constructed with Either::Left(Some).");
597                    future::Either::Right((i, apply_inbound(c, u)))
598                }
599                future::Either::Right((i, ref mut up)) => {
600                    let d = match ready!(TryFuture::try_poll(Pin::new(up), cx)
601                        .map_err(TransportUpgradeError::Upgrade))
602                    {
603                        Ok(v) => v,
604                        Err(err) => return Poll::Ready(Err(err)),
605                    };
606                    return Poll::Ready(Ok((i, d)));
607                }
608            }
609        }
610    }
611}
612
613impl<F, U, C> Unpin for ListenerUpgradeFuture<F, U, C>
614where
615    C: AsyncRead + AsyncWrite + Unpin,
616    U: InboundConnectionUpgrade<Negotiated<C>>,
617{
618}