1use crate::{
22 connection::{ConnectedPoint, Endpoint},
23 transport::{ListenerId, Transport, TransportError, TransportEvent},
24};
25use either::Either;
26use futures::prelude::*;
27use multiaddr::Multiaddr;
28use std::{error, marker::PhantomPinned, pin::Pin, task::Context, task::Poll};
29
30#[pin_project::pin_project]
32#[derive(Debug, Clone)]
33pub struct AndThen<T, C> {
34 #[pin]
35 transport: T,
36 fun: C,
37}
38
39impl<T, C> AndThen<T, C> {
40 pub(crate) fn new(transport: T, fun: C) -> Self {
41 AndThen { transport, fun }
42 }
43}
44
45impl<T, C, F, O> Transport for AndThen<T, C>
46where
47 T: Transport,
48 C: FnOnce(T::Output, ConnectedPoint) -> F + Clone,
49 F: TryFuture<Ok = O>,
50 F::Error: error::Error,
51{
52 type Output = O;
53 type Error = Either<T::Error, F::Error>;
54 type ListenerUpgrade = AndThenFuture<T::ListenerUpgrade, C, F>;
55 type Dial = AndThenFuture<T::Dial, C, F>;
56
57 fn listen_on(
58 &mut self,
59 id: ListenerId,
60 addr: Multiaddr,
61 ) -> Result<(), TransportError<Self::Error>> {
62 self.transport
63 .listen_on(id, addr)
64 .map_err(|err| err.map(Either::Left))
65 }
66
67 fn remove_listener(&mut self, id: ListenerId) -> bool {
68 self.transport.remove_listener(id)
69 }
70
71 fn dial(&mut self, addr: Multiaddr) -> Result<Self::Dial, TransportError<Self::Error>> {
72 let dialed_fut = self
73 .transport
74 .dial(addr.clone())
75 .map_err(|err| err.map(Either::Left))?;
76 let future = AndThenFuture {
77 inner: Either::Left(Box::pin(dialed_fut)),
78 args: Some((
79 self.fun.clone(),
80 ConnectedPoint::Dialer {
81 address: addr,
82 role_override: Endpoint::Dialer,
83 },
84 )),
85 _marker: PhantomPinned,
86 };
87 Ok(future)
88 }
89
90 fn dial_as_listener(
91 &mut self,
92 addr: Multiaddr,
93 ) -> Result<Self::Dial, TransportError<Self::Error>> {
94 let dialed_fut = self
95 .transport
96 .dial_as_listener(addr.clone())
97 .map_err(|err| err.map(Either::Left))?;
98 let future = AndThenFuture {
99 inner: Either::Left(Box::pin(dialed_fut)),
100 args: Some((
101 self.fun.clone(),
102 ConnectedPoint::Dialer {
103 address: addr,
104 role_override: Endpoint::Listener,
105 },
106 )),
107 _marker: PhantomPinned,
108 };
109 Ok(future)
110 }
111
112 fn address_translation(&self, server: &Multiaddr, observed: &Multiaddr) -> Option<Multiaddr> {
113 self.transport.address_translation(server, observed)
114 }
115
116 fn poll(
117 self: Pin<&mut Self>,
118 cx: &mut Context<'_>,
119 ) -> Poll<TransportEvent<Self::ListenerUpgrade, Self::Error>> {
120 let this = self.project();
121 match this.transport.poll(cx) {
122 Poll::Ready(TransportEvent::Incoming {
123 listener_id,
124 upgrade,
125 local_addr,
126 send_back_addr,
127 }) => {
128 let point = ConnectedPoint::Listener {
129 local_addr: local_addr.clone(),
130 send_back_addr: send_back_addr.clone(),
131 };
132 Poll::Ready(TransportEvent::Incoming {
133 listener_id,
134 upgrade: AndThenFuture {
135 inner: Either::Left(Box::pin(upgrade)),
136 args: Some((this.fun.clone(), point)),
137 _marker: PhantomPinned,
138 },
139 local_addr,
140 send_back_addr,
141 })
142 }
143 Poll::Ready(other) => {
144 let mapped = other
145 .map_upgrade(|_upgrade| unreachable!("case already matched"))
146 .map_err(Either::Left);
147 Poll::Ready(mapped)
148 }
149 Poll::Pending => Poll::Pending,
150 }
151 }
152}
153
154#[derive(Debug)]
158pub struct AndThenFuture<TFut, TMap, TMapOut> {
159 inner: Either<Pin<Box<TFut>>, Pin<Box<TMapOut>>>,
160 args: Option<(TMap, ConnectedPoint)>,
161 _marker: PhantomPinned,
162}
163
164impl<TFut, TMap, TMapOut> Future for AndThenFuture<TFut, TMap, TMapOut>
165where
166 TFut: TryFuture,
167 TMap: FnOnce(TFut::Ok, ConnectedPoint) -> TMapOut,
168 TMapOut: TryFuture,
169{
170 type Output = Result<TMapOut::Ok, Either<TFut::Error, TMapOut::Error>>;
171
172 fn poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
173 loop {
174 let future = match &mut self.inner {
175 Either::Left(future) => {
176 let item = match TryFuture::try_poll(future.as_mut(), cx) {
177 Poll::Ready(Ok(v)) => v,
178 Poll::Ready(Err(err)) => return Poll::Ready(Err(Either::Left(err))),
179 Poll::Pending => return Poll::Pending,
180 };
181 let (f, a) = self
182 .args
183 .take()
184 .expect("AndThenFuture has already finished.");
185 f(item, a)
186 }
187 Either::Right(future) => {
188 return match TryFuture::try_poll(future.as_mut(), cx) {
189 Poll::Ready(Ok(v)) => Poll::Ready(Ok(v)),
190 Poll::Ready(Err(err)) => return Poll::Ready(Err(Either::Right(err))),
191 Poll::Pending => Poll::Pending,
192 }
193 }
194 };
195
196 self.inner = Either::Right(Box::pin(future));
197 }
198 }
199}
200
201impl<TFut, TMap, TMapOut> Unpin for AndThenFuture<TFut, TMap, TMapOut> {}