libp2p_core/transport/
and_then.rs

1// Copyright 2018 Parity Technologies (UK) Ltd.
2//
3// Permission is hereby granted, free of charge, to any person obtaining a
4// copy of this software and associated documentation files (the "Software"),
5// to deal in the Software without restriction, including without limitation
6// the rights to use, copy, modify, merge, publish, distribute, sublicense,
7// and/or sell copies of the Software, and to permit persons to whom the
8// Software is furnished to do so, subject to the following conditions:
9//
10// The above copyright notice and this permission notice shall be included in
11// all copies or substantial portions of the Software.
12//
13// THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS
14// OR IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
15// FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
16// AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
17// LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING
18// FROM, OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER
19// DEALINGS IN THE SOFTWARE.
20
21use crate::{
22    connection::{ConnectedPoint, Endpoint},
23    transport::{ListenerId, Transport, TransportError, TransportEvent},
24};
25use either::Either;
26use futures::prelude::*;
27use multiaddr::Multiaddr;
28use std::{error, marker::PhantomPinned, pin::Pin, task::Context, task::Poll};
29
30/// See the [`Transport::and_then`] method.
31#[pin_project::pin_project]
32#[derive(Debug, Clone)]
33pub struct AndThen<T, C> {
34    #[pin]
35    transport: T,
36    fun: C,
37}
38
39impl<T, C> AndThen<T, C> {
40    pub(crate) fn new(transport: T, fun: C) -> Self {
41        AndThen { transport, fun }
42    }
43}
44
45impl<T, C, F, O> Transport for AndThen<T, C>
46where
47    T: Transport,
48    C: FnOnce(T::Output, ConnectedPoint) -> F + Clone,
49    F: TryFuture<Ok = O>,
50    F::Error: error::Error,
51{
52    type Output = O;
53    type Error = Either<T::Error, F::Error>;
54    type ListenerUpgrade = AndThenFuture<T::ListenerUpgrade, C, F>;
55    type Dial = AndThenFuture<T::Dial, C, F>;
56
57    fn listen_on(
58        &mut self,
59        id: ListenerId,
60        addr: Multiaddr,
61    ) -> Result<(), TransportError<Self::Error>> {
62        self.transport
63            .listen_on(id, addr)
64            .map_err(|err| err.map(Either::Left))
65    }
66
67    fn remove_listener(&mut self, id: ListenerId) -> bool {
68        self.transport.remove_listener(id)
69    }
70
71    fn dial(&mut self, addr: Multiaddr) -> Result<Self::Dial, TransportError<Self::Error>> {
72        let dialed_fut = self
73            .transport
74            .dial(addr.clone())
75            .map_err(|err| err.map(Either::Left))?;
76        let future = AndThenFuture {
77            inner: Either::Left(Box::pin(dialed_fut)),
78            args: Some((
79                self.fun.clone(),
80                ConnectedPoint::Dialer {
81                    address: addr,
82                    role_override: Endpoint::Dialer,
83                },
84            )),
85            _marker: PhantomPinned,
86        };
87        Ok(future)
88    }
89
90    fn dial_as_listener(
91        &mut self,
92        addr: Multiaddr,
93    ) -> Result<Self::Dial, TransportError<Self::Error>> {
94        let dialed_fut = self
95            .transport
96            .dial_as_listener(addr.clone())
97            .map_err(|err| err.map(Either::Left))?;
98        let future = AndThenFuture {
99            inner: Either::Left(Box::pin(dialed_fut)),
100            args: Some((
101                self.fun.clone(),
102                ConnectedPoint::Dialer {
103                    address: addr,
104                    role_override: Endpoint::Listener,
105                },
106            )),
107            _marker: PhantomPinned,
108        };
109        Ok(future)
110    }
111
112    fn address_translation(&self, server: &Multiaddr, observed: &Multiaddr) -> Option<Multiaddr> {
113        self.transport.address_translation(server, observed)
114    }
115
116    fn poll(
117        self: Pin<&mut Self>,
118        cx: &mut Context<'_>,
119    ) -> Poll<TransportEvent<Self::ListenerUpgrade, Self::Error>> {
120        let this = self.project();
121        match this.transport.poll(cx) {
122            Poll::Ready(TransportEvent::Incoming {
123                listener_id,
124                upgrade,
125                local_addr,
126                send_back_addr,
127            }) => {
128                let point = ConnectedPoint::Listener {
129                    local_addr: local_addr.clone(),
130                    send_back_addr: send_back_addr.clone(),
131                };
132                Poll::Ready(TransportEvent::Incoming {
133                    listener_id,
134                    upgrade: AndThenFuture {
135                        inner: Either::Left(Box::pin(upgrade)),
136                        args: Some((this.fun.clone(), point)),
137                        _marker: PhantomPinned,
138                    },
139                    local_addr,
140                    send_back_addr,
141                })
142            }
143            Poll::Ready(other) => {
144                let mapped = other
145                    .map_upgrade(|_upgrade| unreachable!("case already matched"))
146                    .map_err(Either::Left);
147                Poll::Ready(mapped)
148            }
149            Poll::Pending => Poll::Pending,
150        }
151    }
152}
153
154/// Custom `Future` to avoid boxing.
155///
156/// Applies a function to the result of the inner future.
157#[derive(Debug)]
158pub struct AndThenFuture<TFut, TMap, TMapOut> {
159    inner: Either<Pin<Box<TFut>>, Pin<Box<TMapOut>>>,
160    args: Option<(TMap, ConnectedPoint)>,
161    _marker: PhantomPinned,
162}
163
164impl<TFut, TMap, TMapOut> Future for AndThenFuture<TFut, TMap, TMapOut>
165where
166    TFut: TryFuture,
167    TMap: FnOnce(TFut::Ok, ConnectedPoint) -> TMapOut,
168    TMapOut: TryFuture,
169{
170    type Output = Result<TMapOut::Ok, Either<TFut::Error, TMapOut::Error>>;
171
172    fn poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
173        loop {
174            let future = match &mut self.inner {
175                Either::Left(future) => {
176                    let item = match TryFuture::try_poll(future.as_mut(), cx) {
177                        Poll::Ready(Ok(v)) => v,
178                        Poll::Ready(Err(err)) => return Poll::Ready(Err(Either::Left(err))),
179                        Poll::Pending => return Poll::Pending,
180                    };
181                    let (f, a) = self
182                        .args
183                        .take()
184                        .expect("AndThenFuture has already finished.");
185                    f(item, a)
186                }
187                Either::Right(future) => {
188                    return match TryFuture::try_poll(future.as_mut(), cx) {
189                        Poll::Ready(Ok(v)) => Poll::Ready(Ok(v)),
190                        Poll::Ready(Err(err)) => return Poll::Ready(Err(Either::Right(err))),
191                        Poll::Pending => Poll::Pending,
192                    }
193                }
194            };
195
196            self.inner = Either::Right(Box::pin(future));
197        }
198    }
199}
200
201impl<TFut, TMap, TMapOut> Unpin for AndThenFuture<TFut, TMap, TMapOut> {}