libp2p_core/transport/
map.rs1use crate::{
22 connection::{ConnectedPoint, Endpoint},
23 transport::{Transport, TransportError, TransportEvent},
24};
25use futures::prelude::*;
26use multiaddr::Multiaddr;
27use std::{pin::Pin, task::Context, task::Poll};
28
29use super::ListenerId;
30
31#[derive(Debug, Copy, Clone)]
33#[pin_project::pin_project]
34pub struct Map<T, F> {
35 #[pin]
36 transport: T,
37 fun: F,
38}
39
40impl<T, F> Map<T, F> {
41 pub(crate) fn new(transport: T, fun: F) -> Self {
42 Map { transport, fun }
43 }
44
45 pub fn inner(&self) -> &T {
46 &self.transport
47 }
48
49 pub fn inner_mut(&mut self) -> &mut T {
50 &mut self.transport
51 }
52}
53
54impl<T, F, D> Transport for Map<T, F>
55where
56 T: Transport,
57 F: FnOnce(T::Output, ConnectedPoint) -> D + Clone,
58{
59 type Output = D;
60 type Error = T::Error;
61 type ListenerUpgrade = MapFuture<T::ListenerUpgrade, F>;
62 type Dial = MapFuture<T::Dial, F>;
63
64 fn listen_on(
65 &mut self,
66 id: ListenerId,
67 addr: Multiaddr,
68 ) -> Result<(), TransportError<Self::Error>> {
69 self.transport.listen_on(id, addr)
70 }
71
72 fn remove_listener(&mut self, id: ListenerId) -> bool {
73 self.transport.remove_listener(id)
74 }
75
76 fn dial(&mut self, addr: Multiaddr) -> Result<Self::Dial, TransportError<Self::Error>> {
77 let future = self.transport.dial(addr.clone())?;
78 let p = ConnectedPoint::Dialer {
79 address: addr,
80 role_override: Endpoint::Dialer,
81 };
82 Ok(MapFuture {
83 inner: future,
84 args: Some((self.fun.clone(), p)),
85 })
86 }
87
88 fn dial_as_listener(
89 &mut self,
90 addr: Multiaddr,
91 ) -> Result<Self::Dial, TransportError<Self::Error>> {
92 let future = self.transport.dial_as_listener(addr.clone())?;
93 let p = ConnectedPoint::Dialer {
94 address: addr,
95 role_override: Endpoint::Listener,
96 };
97 Ok(MapFuture {
98 inner: future,
99 args: Some((self.fun.clone(), p)),
100 })
101 }
102
103 fn address_translation(&self, server: &Multiaddr, observed: &Multiaddr) -> Option<Multiaddr> {
104 self.transport.address_translation(server, observed)
105 }
106
107 fn poll(
108 self: Pin<&mut Self>,
109 cx: &mut Context<'_>,
110 ) -> Poll<TransportEvent<Self::ListenerUpgrade, Self::Error>> {
111 let this = self.project();
112 match this.transport.poll(cx) {
113 Poll::Ready(TransportEvent::Incoming {
114 listener_id,
115 upgrade,
116 local_addr,
117 send_back_addr,
118 }) => {
119 let point = ConnectedPoint::Listener {
120 local_addr: local_addr.clone(),
121 send_back_addr: send_back_addr.clone(),
122 };
123 Poll::Ready(TransportEvent::Incoming {
124 listener_id,
125 upgrade: MapFuture {
126 inner: upgrade,
127 args: Some((this.fun.clone(), point)),
128 },
129 local_addr,
130 send_back_addr,
131 })
132 }
133 Poll::Ready(other) => {
134 let mapped = other.map_upgrade(|_upgrade| unreachable!("case already matched"));
135 Poll::Ready(mapped)
136 }
137 Poll::Pending => Poll::Pending,
138 }
139 }
140}
141
142#[pin_project::pin_project]
146#[derive(Clone, Debug)]
147pub struct MapFuture<T, F> {
148 #[pin]
149 inner: T,
150 args: Option<(F, ConnectedPoint)>,
151}
152
153impl<T, A, F, B> Future for MapFuture<T, F>
154where
155 T: TryFuture<Ok = A>,
156 F: FnOnce(A, ConnectedPoint) -> B,
157{
158 type Output = Result<B, T::Error>;
159
160 fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
161 let this = self.project();
162 let item = match TryFuture::try_poll(this.inner, cx) {
163 Poll::Pending => return Poll::Pending,
164 Poll::Ready(Ok(v)) => v,
165 Poll::Ready(Err(err)) => return Poll::Ready(Err(err)),
166 };
167 let (f, a) = this.args.take().expect("MapFuture has already finished.");
168 Poll::Ready(Ok(f(item, a)))
169 }
170}