libp2p_core/transport/
and_then.rs

1// Copyright 2018 Parity Technologies (UK) Ltd.
2//
3// Permission is hereby granted, free of charge, to any person obtaining a
4// copy of this software and associated documentation files (the "Software"),
5// to deal in the Software without restriction, including without limitation
6// the rights to use, copy, modify, merge, publish, distribute, sublicense,
7// and/or sell copies of the Software, and to permit persons to whom the
8// Software is furnished to do so, subject to the following conditions:
9//
10// The above copyright notice and this permission notice shall be included in
11// all copies or substantial portions of the Software.
12//
13// THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS
14// OR IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
15// FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
16// AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
17// LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING
18// FROM, OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER
19// DEALINGS IN THE SOFTWARE.
20
21use crate::{
22    connection::ConnectedPoint,
23    transport::{DialOpts, ListenerId, Transport, TransportError, TransportEvent},
24};
25use either::Either;
26use futures::prelude::*;
27use multiaddr::Multiaddr;
28use std::{error, marker::PhantomPinned, pin::Pin, task::Context, task::Poll};
29
30/// See the [`Transport::and_then`] method.
31#[pin_project::pin_project]
32#[derive(Debug, Clone)]
33pub struct AndThen<T, C> {
34    #[pin]
35    transport: T,
36    fun: C,
37}
38
39impl<T, C> AndThen<T, C> {
40    pub(crate) fn new(transport: T, fun: C) -> Self {
41        AndThen { transport, fun }
42    }
43}
44
45impl<T, C, F, O> Transport for AndThen<T, C>
46where
47    T: Transport,
48    C: FnOnce(T::Output, ConnectedPoint) -> F + Clone,
49    F: TryFuture<Ok = O>,
50    F::Error: error::Error,
51{
52    type Output = O;
53    type Error = Either<T::Error, F::Error>;
54    type ListenerUpgrade = AndThenFuture<T::ListenerUpgrade, C, F>;
55    type Dial = AndThenFuture<T::Dial, C, F>;
56
57    fn listen_on(
58        &mut self,
59        id: ListenerId,
60        addr: Multiaddr,
61    ) -> Result<(), TransportError<Self::Error>> {
62        self.transport
63            .listen_on(id, addr)
64            .map_err(|err| err.map(Either::Left))
65    }
66
67    fn remove_listener(&mut self, id: ListenerId) -> bool {
68        self.transport.remove_listener(id)
69    }
70
71    fn dial(
72        &mut self,
73        addr: Multiaddr,
74        opts: DialOpts,
75    ) -> Result<Self::Dial, TransportError<Self::Error>> {
76        let dialed_fut = self
77            .transport
78            .dial(addr.clone(), opts)
79            .map_err(|err| err.map(Either::Left))?;
80        let future = AndThenFuture {
81            inner: Either::Left(Box::pin(dialed_fut)),
82            args: Some((
83                self.fun.clone(),
84                ConnectedPoint::Dialer {
85                    address: addr,
86                    role_override: opts.role,
87                    port_use: opts.port_use,
88                },
89            )),
90            _marker: PhantomPinned,
91        };
92        Ok(future)
93    }
94
95    fn poll(
96        self: Pin<&mut Self>,
97        cx: &mut Context<'_>,
98    ) -> Poll<TransportEvent<Self::ListenerUpgrade, Self::Error>> {
99        let this = self.project();
100        match this.transport.poll(cx) {
101            Poll::Ready(TransportEvent::Incoming {
102                listener_id,
103                upgrade,
104                local_addr,
105                send_back_addr,
106            }) => {
107                let point = ConnectedPoint::Listener {
108                    local_addr: local_addr.clone(),
109                    send_back_addr: send_back_addr.clone(),
110                };
111                Poll::Ready(TransportEvent::Incoming {
112                    listener_id,
113                    upgrade: AndThenFuture {
114                        inner: Either::Left(Box::pin(upgrade)),
115                        args: Some((this.fun.clone(), point)),
116                        _marker: PhantomPinned,
117                    },
118                    local_addr,
119                    send_back_addr,
120                })
121            }
122            Poll::Ready(other) => {
123                let mapped = other
124                    .map_upgrade(|_upgrade| unreachable!("case already matched"))
125                    .map_err(Either::Left);
126                Poll::Ready(mapped)
127            }
128            Poll::Pending => Poll::Pending,
129        }
130    }
131}
132
133/// Custom `Future` to avoid boxing.
134///
135/// Applies a function to the result of the inner future.
136#[derive(Debug)]
137pub struct AndThenFuture<TFut, TMap, TMapOut> {
138    inner: Either<Pin<Box<TFut>>, Pin<Box<TMapOut>>>,
139    args: Option<(TMap, ConnectedPoint)>,
140    _marker: PhantomPinned,
141}
142
143impl<TFut, TMap, TMapOut> Future for AndThenFuture<TFut, TMap, TMapOut>
144where
145    TFut: TryFuture,
146    TMap: FnOnce(TFut::Ok, ConnectedPoint) -> TMapOut,
147    TMapOut: TryFuture,
148{
149    type Output = Result<TMapOut::Ok, Either<TFut::Error, TMapOut::Error>>;
150
151    fn poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
152        loop {
153            let future = match &mut self.inner {
154                Either::Left(future) => {
155                    let item = match TryFuture::try_poll(future.as_mut(), cx) {
156                        Poll::Ready(Ok(v)) => v,
157                        Poll::Ready(Err(err)) => return Poll::Ready(Err(Either::Left(err))),
158                        Poll::Pending => return Poll::Pending,
159                    };
160                    let (f, a) = self
161                        .args
162                        .take()
163                        .expect("AndThenFuture has already finished.");
164                    f(item, a)
165                }
166                Either::Right(future) => {
167                    return match TryFuture::try_poll(future.as_mut(), cx) {
168                        Poll::Ready(Ok(v)) => Poll::Ready(Ok(v)),
169                        Poll::Ready(Err(err)) => return Poll::Ready(Err(Either::Right(err))),
170                        Poll::Pending => Poll::Pending,
171                    }
172                }
173            };
174
175            self.inner = Either::Right(Box::pin(future));
176        }
177    }
178}
179
180impl<TFut, TMap, TMapOut> Unpin for AndThenFuture<TFut, TMap, TMapOut> {}