libp2p_core/transport/
boxed.rs1use crate::transport::{DialOpts, ListenerId, Transport, TransportError, TransportEvent};
22use futures::{prelude::*, stream::FusedStream};
23use multiaddr::Multiaddr;
24use std::{
25 error::Error,
26 fmt, io,
27 pin::Pin,
28 task::{Context, Poll},
29};
30
31pub(crate) fn boxed<T>(transport: T) -> Boxed<T::Output>
33where
34 T: Transport + Send + Unpin + 'static,
35 T::Error: Send + Sync,
36 T::Dial: Send + 'static,
37 T::ListenerUpgrade: Send + 'static,
38{
39 Boxed {
40 inner: Box::new(transport) as Box<_>,
41 }
42}
43
44pub struct Boxed<O> {
48 inner: Box<dyn Abstract<O> + Send + Unpin>,
49}
50
51type Dial<O> = Pin<Box<dyn Future<Output = io::Result<O>> + Send>>;
52type ListenerUpgrade<O> = Pin<Box<dyn Future<Output = io::Result<O>> + Send>>;
53
54trait Abstract<O> {
55 fn listen_on(
56 &mut self,
57 id: ListenerId,
58 addr: Multiaddr,
59 ) -> Result<(), TransportError<io::Error>>;
60 fn remove_listener(&mut self, id: ListenerId) -> bool;
61 fn dial(
62 &mut self,
63 addr: Multiaddr,
64 opts: DialOpts,
65 ) -> Result<Dial<O>, TransportError<io::Error>>;
66 fn poll(
67 self: Pin<&mut Self>,
68 cx: &mut Context<'_>,
69 ) -> Poll<TransportEvent<ListenerUpgrade<O>, io::Error>>;
70}
71
72impl<T, O> Abstract<O> for T
73where
74 T: Transport<Output = O> + 'static,
75 T::Error: Send + Sync,
76 T::Dial: Send + 'static,
77 T::ListenerUpgrade: Send + 'static,
78{
79 fn listen_on(
80 &mut self,
81 id: ListenerId,
82 addr: Multiaddr,
83 ) -> Result<(), TransportError<io::Error>> {
84 Transport::listen_on(self, id, addr).map_err(|e| e.map(box_err))
85 }
86
87 fn remove_listener(&mut self, id: ListenerId) -> bool {
88 Transport::remove_listener(self, id)
89 }
90
91 fn dial(
92 &mut self,
93 addr: Multiaddr,
94 opts: DialOpts,
95 ) -> Result<Dial<O>, TransportError<io::Error>> {
96 let fut = Transport::dial(self, addr, opts)
97 .map(|r| r.map_err(box_err))
98 .map_err(|e| e.map(box_err))?;
99 Ok(Box::pin(fut) as Dial<_>)
100 }
101
102 fn poll(
103 self: Pin<&mut Self>,
104 cx: &mut Context<'_>,
105 ) -> Poll<TransportEvent<ListenerUpgrade<O>, io::Error>> {
106 self.poll(cx).map(|event| {
107 event
108 .map_upgrade(|upgrade| {
109 let up = upgrade.map_err(box_err);
110 Box::pin(up) as ListenerUpgrade<O>
111 })
112 .map_err(box_err)
113 })
114 }
115}
116
117impl<O> fmt::Debug for Boxed<O> {
118 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
119 write!(f, "BoxedTransport")
120 }
121}
122
123impl<O> Transport for Boxed<O> {
124 type Output = O;
125 type Error = io::Error;
126 type ListenerUpgrade = ListenerUpgrade<O>;
127 type Dial = Dial<O>;
128
129 fn listen_on(
130 &mut self,
131 id: ListenerId,
132 addr: Multiaddr,
133 ) -> Result<(), TransportError<Self::Error>> {
134 self.inner.listen_on(id, addr)
135 }
136
137 fn remove_listener(&mut self, id: ListenerId) -> bool {
138 self.inner.remove_listener(id)
139 }
140
141 fn dial(
142 &mut self,
143 addr: Multiaddr,
144 opts: DialOpts,
145 ) -> Result<Self::Dial, TransportError<Self::Error>> {
146 self.inner.dial(addr, opts)
147 }
148
149 fn poll(
150 mut self: Pin<&mut Self>,
151 cx: &mut Context<'_>,
152 ) -> Poll<TransportEvent<Self::ListenerUpgrade, Self::Error>> {
153 Pin::new(self.inner.as_mut()).poll(cx)
154 }
155}
156
157impl<O> Stream for Boxed<O> {
158 type Item = TransportEvent<ListenerUpgrade<O>, io::Error>;
159
160 fn poll_next(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
161 Transport::poll(self, cx).map(Some)
162 }
163}
164
165impl<O> FusedStream for Boxed<O> {
166 fn is_terminated(&self) -> bool {
167 false
168 }
169}
170
171fn box_err<E: Error + Send + Sync + 'static>(e: E) -> io::Error {
172 io::Error::new(io::ErrorKind::Other, e)
173}