libp2p_core/transport/
map.rs

1// Copyright 2017 Parity Technologies (UK) Ltd.
2//
3// Permission is hereby granted, free of charge, to any person obtaining a
4// copy of this software and associated documentation files (the "Software"),
5// to deal in the Software without restriction, including without limitation
6// the rights to use, copy, modify, merge, publish, distribute, sublicense,
7// and/or sell copies of the Software, and to permit persons to whom the
8// Software is furnished to do so, subject to the following conditions:
9//
10// The above copyright notice and this permission notice shall be included in
11// all copies or substantial portions of the Software.
12//
13// THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS
14// OR IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
15// FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
16// AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
17// LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING
18// FROM, OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER
19// DEALINGS IN THE SOFTWARE.
20
21use crate::transport::DialOpts;
22use crate::{
23    connection::ConnectedPoint,
24    transport::{Transport, TransportError, TransportEvent},
25};
26use futures::prelude::*;
27use multiaddr::Multiaddr;
28use std::{pin::Pin, task::Context, task::Poll};
29
30use super::ListenerId;
31
32/// See `Transport::map`.
33#[derive(Debug, Copy, Clone)]
34#[pin_project::pin_project]
35pub struct Map<T, F> {
36    #[pin]
37    transport: T,
38    fun: F,
39}
40
41impl<T, F> Map<T, F> {
42    pub(crate) fn new(transport: T, fun: F) -> Self {
43        Map { transport, fun }
44    }
45
46    pub fn inner(&self) -> &T {
47        &self.transport
48    }
49
50    pub fn inner_mut(&mut self) -> &mut T {
51        &mut self.transport
52    }
53}
54
55impl<T, F, D> Transport for Map<T, F>
56where
57    T: Transport,
58    F: FnOnce(T::Output, ConnectedPoint) -> D + Clone,
59{
60    type Output = D;
61    type Error = T::Error;
62    type ListenerUpgrade = MapFuture<T::ListenerUpgrade, F>;
63    type Dial = MapFuture<T::Dial, F>;
64
65    fn listen_on(
66        &mut self,
67        id: ListenerId,
68        addr: Multiaddr,
69    ) -> Result<(), TransportError<Self::Error>> {
70        self.transport.listen_on(id, addr)
71    }
72
73    fn remove_listener(&mut self, id: ListenerId) -> bool {
74        self.transport.remove_listener(id)
75    }
76
77    fn dial(
78        &mut self,
79        addr: Multiaddr,
80        opts: DialOpts,
81    ) -> Result<Self::Dial, TransportError<Self::Error>> {
82        let future = self.transport.dial(addr.clone(), opts)?;
83        let p = ConnectedPoint::Dialer {
84            address: addr,
85            role_override: opts.role,
86            port_use: opts.port_use,
87        };
88        Ok(MapFuture {
89            inner: future,
90            args: Some((self.fun.clone(), p)),
91        })
92    }
93
94    fn poll(
95        self: Pin<&mut Self>,
96        cx: &mut Context<'_>,
97    ) -> Poll<TransportEvent<Self::ListenerUpgrade, Self::Error>> {
98        let this = self.project();
99        match this.transport.poll(cx) {
100            Poll::Ready(TransportEvent::Incoming {
101                listener_id,
102                upgrade,
103                local_addr,
104                send_back_addr,
105            }) => {
106                let point = ConnectedPoint::Listener {
107                    local_addr: local_addr.clone(),
108                    send_back_addr: send_back_addr.clone(),
109                };
110                Poll::Ready(TransportEvent::Incoming {
111                    listener_id,
112                    upgrade: MapFuture {
113                        inner: upgrade,
114                        args: Some((this.fun.clone(), point)),
115                    },
116                    local_addr,
117                    send_back_addr,
118                })
119            }
120            Poll::Ready(other) => {
121                let mapped = other.map_upgrade(|_upgrade| unreachable!("case already matched"));
122                Poll::Ready(mapped)
123            }
124            Poll::Pending => Poll::Pending,
125        }
126    }
127}
128
129/// Custom `Future` to avoid boxing.
130///
131/// Applies a function to the inner future's result.
132#[pin_project::pin_project]
133#[derive(Clone, Debug)]
134pub struct MapFuture<T, F> {
135    #[pin]
136    inner: T,
137    args: Option<(F, ConnectedPoint)>,
138}
139
140impl<T, A, F, B> Future for MapFuture<T, F>
141where
142    T: TryFuture<Ok = A>,
143    F: FnOnce(A, ConnectedPoint) -> B,
144{
145    type Output = Result<B, T::Error>;
146
147    fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
148        let this = self.project();
149        let item = match TryFuture::try_poll(this.inner, cx) {
150            Poll::Pending => return Poll::Pending,
151            Poll::Ready(Ok(v)) => v,
152            Poll::Ready(Err(err)) => return Poll::Ready(Err(err)),
153        };
154        let (f, a) = this.args.take().expect("MapFuture has already finished.");
155        Poll::Ready(Ok(f(item, a)))
156    }
157}