libp2p_core/transport/
map.rs

1// Copyright 2017 Parity Technologies (UK) Ltd.
2//
3// Permission is hereby granted, free of charge, to any person obtaining a
4// copy of this software and associated documentation files (the "Software"),
5// to deal in the Software without restriction, including without limitation
6// the rights to use, copy, modify, merge, publish, distribute, sublicense,
7// and/or sell copies of the Software, and to permit persons to whom the
8// Software is furnished to do so, subject to the following conditions:
9//
10// The above copyright notice and this permission notice shall be included in
11// all copies or substantial portions of the Software.
12//
13// THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS
14// OR IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
15// FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
16// AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
17// LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING
18// FROM, OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER
19// DEALINGS IN THE SOFTWARE.
20
21use crate::{
22    connection::{ConnectedPoint, Endpoint},
23    transport::{Transport, TransportError, TransportEvent},
24};
25use futures::prelude::*;
26use multiaddr::Multiaddr;
27use std::{pin::Pin, task::Context, task::Poll};
28
29use super::ListenerId;
30
31/// See `Transport::map`.
32#[derive(Debug, Copy, Clone)]
33#[pin_project::pin_project]
34pub struct Map<T, F> {
35    #[pin]
36    transport: T,
37    fun: F,
38}
39
40impl<T, F> Map<T, F> {
41    pub(crate) fn new(transport: T, fun: F) -> Self {
42        Map { transport, fun }
43    }
44
45    pub fn inner(&self) -> &T {
46        &self.transport
47    }
48
49    pub fn inner_mut(&mut self) -> &mut T {
50        &mut self.transport
51    }
52}
53
54impl<T, F, D> Transport for Map<T, F>
55where
56    T: Transport,
57    F: FnOnce(T::Output, ConnectedPoint) -> D + Clone,
58{
59    type Output = D;
60    type Error = T::Error;
61    type ListenerUpgrade = MapFuture<T::ListenerUpgrade, F>;
62    type Dial = MapFuture<T::Dial, F>;
63
64    fn listen_on(
65        &mut self,
66        id: ListenerId,
67        addr: Multiaddr,
68    ) -> Result<(), TransportError<Self::Error>> {
69        self.transport.listen_on(id, addr)
70    }
71
72    fn remove_listener(&mut self, id: ListenerId) -> bool {
73        self.transport.remove_listener(id)
74    }
75
76    fn dial(&mut self, addr: Multiaddr) -> Result<Self::Dial, TransportError<Self::Error>> {
77        let future = self.transport.dial(addr.clone())?;
78        let p = ConnectedPoint::Dialer {
79            address: addr,
80            role_override: Endpoint::Dialer,
81        };
82        Ok(MapFuture {
83            inner: future,
84            args: Some((self.fun.clone(), p)),
85        })
86    }
87
88    fn dial_as_listener(
89        &mut self,
90        addr: Multiaddr,
91    ) -> Result<Self::Dial, TransportError<Self::Error>> {
92        let future = self.transport.dial_as_listener(addr.clone())?;
93        let p = ConnectedPoint::Dialer {
94            address: addr,
95            role_override: Endpoint::Listener,
96        };
97        Ok(MapFuture {
98            inner: future,
99            args: Some((self.fun.clone(), p)),
100        })
101    }
102
103    fn address_translation(&self, server: &Multiaddr, observed: &Multiaddr) -> Option<Multiaddr> {
104        self.transport.address_translation(server, observed)
105    }
106
107    fn poll(
108        self: Pin<&mut Self>,
109        cx: &mut Context<'_>,
110    ) -> Poll<TransportEvent<Self::ListenerUpgrade, Self::Error>> {
111        let this = self.project();
112        match this.transport.poll(cx) {
113            Poll::Ready(TransportEvent::Incoming {
114                listener_id,
115                upgrade,
116                local_addr,
117                send_back_addr,
118            }) => {
119                let point = ConnectedPoint::Listener {
120                    local_addr: local_addr.clone(),
121                    send_back_addr: send_back_addr.clone(),
122                };
123                Poll::Ready(TransportEvent::Incoming {
124                    listener_id,
125                    upgrade: MapFuture {
126                        inner: upgrade,
127                        args: Some((this.fun.clone(), point)),
128                    },
129                    local_addr,
130                    send_back_addr,
131                })
132            }
133            Poll::Ready(other) => {
134                let mapped = other.map_upgrade(|_upgrade| unreachable!("case already matched"));
135                Poll::Ready(mapped)
136            }
137            Poll::Pending => Poll::Pending,
138        }
139    }
140}
141
142/// Custom `Future` to avoid boxing.
143///
144/// Applies a function to the inner future's result.
145#[pin_project::pin_project]
146#[derive(Clone, Debug)]
147pub struct MapFuture<T, F> {
148    #[pin]
149    inner: T,
150    args: Option<(F, ConnectedPoint)>,
151}
152
153impl<T, A, F, B> Future for MapFuture<T, F>
154where
155    T: TryFuture<Ok = A>,
156    F: FnOnce(A, ConnectedPoint) -> B,
157{
158    type Output = Result<B, T::Error>;
159
160    fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
161        let this = self.project();
162        let item = match TryFuture::try_poll(this.inner, cx) {
163            Poll::Pending => return Poll::Pending,
164            Poll::Ready(Ok(v)) => v,
165            Poll::Ready(Err(err)) => return Poll::Ready(Err(err)),
166        };
167        let (f, a) = this.args.take().expect("MapFuture has already finished.");
168        Poll::Ready(Ok(f(item, a)))
169    }
170}