libp2p_core/transport/
map.rs1use crate::transport::DialOpts;
22use crate::{
23 connection::ConnectedPoint,
24 transport::{Transport, TransportError, TransportEvent},
25};
26use futures::prelude::*;
27use multiaddr::Multiaddr;
28use std::{pin::Pin, task::Context, task::Poll};
29
30use super::ListenerId;
31
32#[derive(Debug, Copy, Clone)]
34#[pin_project::pin_project]
35pub struct Map<T, F> {
36 #[pin]
37 transport: T,
38 fun: F,
39}
40
41impl<T, F> Map<T, F> {
42 pub(crate) fn new(transport: T, fun: F) -> Self {
43 Map { transport, fun }
44 }
45
46 pub fn inner(&self) -> &T {
47 &self.transport
48 }
49
50 pub fn inner_mut(&mut self) -> &mut T {
51 &mut self.transport
52 }
53}
54
55impl<T, F, D> Transport for Map<T, F>
56where
57 T: Transport,
58 F: FnOnce(T::Output, ConnectedPoint) -> D + Clone,
59{
60 type Output = D;
61 type Error = T::Error;
62 type ListenerUpgrade = MapFuture<T::ListenerUpgrade, F>;
63 type Dial = MapFuture<T::Dial, F>;
64
65 fn listen_on(
66 &mut self,
67 id: ListenerId,
68 addr: Multiaddr,
69 ) -> Result<(), TransportError<Self::Error>> {
70 self.transport.listen_on(id, addr)
71 }
72
73 fn remove_listener(&mut self, id: ListenerId) -> bool {
74 self.transport.remove_listener(id)
75 }
76
77 fn dial(
78 &mut self,
79 addr: Multiaddr,
80 opts: DialOpts,
81 ) -> Result<Self::Dial, TransportError<Self::Error>> {
82 let future = self.transport.dial(addr.clone(), opts)?;
83 let p = ConnectedPoint::Dialer {
84 address: addr,
85 role_override: opts.role,
86 port_use: opts.port_use,
87 };
88 Ok(MapFuture {
89 inner: future,
90 args: Some((self.fun.clone(), p)),
91 })
92 }
93
94 fn poll(
95 self: Pin<&mut Self>,
96 cx: &mut Context<'_>,
97 ) -> Poll<TransportEvent<Self::ListenerUpgrade, Self::Error>> {
98 let this = self.project();
99 match this.transport.poll(cx) {
100 Poll::Ready(TransportEvent::Incoming {
101 listener_id,
102 upgrade,
103 local_addr,
104 send_back_addr,
105 }) => {
106 let point = ConnectedPoint::Listener {
107 local_addr: local_addr.clone(),
108 send_back_addr: send_back_addr.clone(),
109 };
110 Poll::Ready(TransportEvent::Incoming {
111 listener_id,
112 upgrade: MapFuture {
113 inner: upgrade,
114 args: Some((this.fun.clone(), point)),
115 },
116 local_addr,
117 send_back_addr,
118 })
119 }
120 Poll::Ready(other) => {
121 let mapped = other.map_upgrade(|_upgrade| unreachable!("case already matched"));
122 Poll::Ready(mapped)
123 }
124 Poll::Pending => Poll::Pending,
125 }
126 }
127}
128
129#[pin_project::pin_project]
133#[derive(Clone, Debug)]
134pub struct MapFuture<T, F> {
135 #[pin]
136 inner: T,
137 args: Option<(F, ConnectedPoint)>,
138}
139
140impl<T, A, F, B> Future for MapFuture<T, F>
141where
142 T: TryFuture<Ok = A>,
143 F: FnOnce(A, ConnectedPoint) -> B,
144{
145 type Output = Result<B, T::Error>;
146
147 fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
148 let this = self.project();
149 let item = match TryFuture::try_poll(this.inner, cx) {
150 Poll::Pending => return Poll::Pending,
151 Poll::Ready(Ok(v)) => v,
152 Poll::Ready(Err(err)) => return Poll::Ready(Err(err)),
153 };
154 let (f, a) = this.args.take().expect("MapFuture has already finished.");
155 Poll::Ready(Ok(f(item, a)))
156 }
157}