1use crate::{
22 connection::ConnectedPoint,
23 transport::{DialOpts, ListenerId, Transport, TransportError, TransportEvent},
24};
25use either::Either;
26use futures::prelude::*;
27use multiaddr::Multiaddr;
28use std::{error, marker::PhantomPinned, pin::Pin, task::Context, task::Poll};
29
30#[pin_project::pin_project]
32#[derive(Debug, Clone)]
33pub struct AndThen<T, C> {
34 #[pin]
35 transport: T,
36 fun: C,
37}
38
39impl<T, C> AndThen<T, C> {
40 pub(crate) fn new(transport: T, fun: C) -> Self {
41 AndThen { transport, fun }
42 }
43}
44
45impl<T, C, F, O> Transport for AndThen<T, C>
46where
47 T: Transport,
48 C: FnOnce(T::Output, ConnectedPoint) -> F + Clone,
49 F: TryFuture<Ok = O>,
50 F::Error: error::Error,
51{
52 type Output = O;
53 type Error = Either<T::Error, F::Error>;
54 type ListenerUpgrade = AndThenFuture<T::ListenerUpgrade, C, F>;
55 type Dial = AndThenFuture<T::Dial, C, F>;
56
57 fn listen_on(
58 &mut self,
59 id: ListenerId,
60 addr: Multiaddr,
61 ) -> Result<(), TransportError<Self::Error>> {
62 self.transport
63 .listen_on(id, addr)
64 .map_err(|err| err.map(Either::Left))
65 }
66
67 fn remove_listener(&mut self, id: ListenerId) -> bool {
68 self.transport.remove_listener(id)
69 }
70
71 fn dial(
72 &mut self,
73 addr: Multiaddr,
74 opts: DialOpts,
75 ) -> Result<Self::Dial, TransportError<Self::Error>> {
76 let dialed_fut = self
77 .transport
78 .dial(addr.clone(), opts)
79 .map_err(|err| err.map(Either::Left))?;
80 let future = AndThenFuture {
81 inner: Either::Left(Box::pin(dialed_fut)),
82 args: Some((
83 self.fun.clone(),
84 ConnectedPoint::Dialer {
85 address: addr,
86 role_override: opts.role,
87 port_use: opts.port_use,
88 },
89 )),
90 _marker: PhantomPinned,
91 };
92 Ok(future)
93 }
94
95 fn poll(
96 self: Pin<&mut Self>,
97 cx: &mut Context<'_>,
98 ) -> Poll<TransportEvent<Self::ListenerUpgrade, Self::Error>> {
99 let this = self.project();
100 match this.transport.poll(cx) {
101 Poll::Ready(TransportEvent::Incoming {
102 listener_id,
103 upgrade,
104 local_addr,
105 send_back_addr,
106 }) => {
107 let point = ConnectedPoint::Listener {
108 local_addr: local_addr.clone(),
109 send_back_addr: send_back_addr.clone(),
110 };
111 Poll::Ready(TransportEvent::Incoming {
112 listener_id,
113 upgrade: AndThenFuture {
114 inner: Either::Left(Box::pin(upgrade)),
115 args: Some((this.fun.clone(), point)),
116 _marker: PhantomPinned,
117 },
118 local_addr,
119 send_back_addr,
120 })
121 }
122 Poll::Ready(other) => {
123 let mapped = other
124 .map_upgrade(|_upgrade| unreachable!("case already matched"))
125 .map_err(Either::Left);
126 Poll::Ready(mapped)
127 }
128 Poll::Pending => Poll::Pending,
129 }
130 }
131}
132
133#[derive(Debug)]
137pub struct AndThenFuture<TFut, TMap, TMapOut> {
138 inner: Either<Pin<Box<TFut>>, Pin<Box<TMapOut>>>,
139 args: Option<(TMap, ConnectedPoint)>,
140 _marker: PhantomPinned,
141}
142
143impl<TFut, TMap, TMapOut> Future for AndThenFuture<TFut, TMap, TMapOut>
144where
145 TFut: TryFuture,
146 TMap: FnOnce(TFut::Ok, ConnectedPoint) -> TMapOut,
147 TMapOut: TryFuture,
148{
149 type Output = Result<TMapOut::Ok, Either<TFut::Error, TMapOut::Error>>;
150
151 fn poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
152 loop {
153 let future = match &mut self.inner {
154 Either::Left(future) => {
155 let item = match TryFuture::try_poll(future.as_mut(), cx) {
156 Poll::Ready(Ok(v)) => v,
157 Poll::Ready(Err(err)) => return Poll::Ready(Err(Either::Left(err))),
158 Poll::Pending => return Poll::Pending,
159 };
160 let (f, a) = self
161 .args
162 .take()
163 .expect("AndThenFuture has already finished.");
164 f(item, a)
165 }
166 Either::Right(future) => {
167 return match TryFuture::try_poll(future.as_mut(), cx) {
168 Poll::Ready(Ok(v)) => Poll::Ready(Ok(v)),
169 Poll::Ready(Err(err)) => return Poll::Ready(Err(Either::Right(err))),
170 Poll::Pending => Poll::Pending,
171 }
172 }
173 };
174
175 self.inner = Either::Right(Box::pin(future));
176 }
177 }
178}
179
180impl<TFut, TMap, TMapOut> Unpin for AndThenFuture<TFut, TMap, TMapOut> {}