1pub use crate::upgrade::Version;
24
25use crate::{
26 connection::ConnectedPoint,
27 muxing::{StreamMuxer, StreamMuxerBox},
28 transport::{
29 and_then::AndThen, boxed::boxed, timeout::TransportTimeout, ListenerId, Transport,
30 TransportError, TransportEvent,
31 },
32 upgrade::{
33 self, apply_inbound, apply_outbound, InboundConnectionUpgrade, InboundUpgradeApply,
34 OutboundConnectionUpgrade, OutboundUpgradeApply, UpgradeError,
35 },
36 Negotiated,
37};
38use futures::{prelude::*, ready};
39use libp2p_identity::PeerId;
40use multiaddr::Multiaddr;
41use std::{
42 error::Error,
43 fmt,
44 pin::Pin,
45 task::{Context, Poll},
46 time::Duration,
47};
48
49#[derive(Clone)]
69pub struct Builder<T> {
70 inner: T,
71 version: upgrade::Version,
72}
73
74impl<T> Builder<T>
75where
76 T: Transport,
77 T::Error: 'static,
78{
79 pub fn new(inner: T, version: upgrade::Version) -> Builder<T> {
81 Builder { inner, version }
82 }
83
84 pub fn authenticate<C, D, U, E>(
97 self,
98 upgrade: U,
99 ) -> Authenticated<AndThen<T, impl FnOnce(C, ConnectedPoint) -> Authenticate<C, U> + Clone>>
100 where
101 T: Transport<Output = C>,
102 C: AsyncRead + AsyncWrite + Unpin,
103 D: AsyncRead + AsyncWrite + Unpin,
104 U: InboundConnectionUpgrade<Negotiated<C>, Output = (PeerId, D), Error = E>,
105 U: OutboundConnectionUpgrade<Negotiated<C>, Output = (PeerId, D), Error = E> + Clone,
106 E: Error + 'static,
107 {
108 let version = self.version;
109 Authenticated(Builder::new(
110 self.inner.and_then(move |conn, endpoint| Authenticate {
111 inner: upgrade::apply(conn, upgrade, endpoint, version),
112 }),
113 version,
114 ))
115 }
116}
117
118#[pin_project::pin_project]
123pub struct Authenticate<C, U>
124where
125 C: AsyncRead + AsyncWrite + Unpin,
126 U: InboundConnectionUpgrade<Negotiated<C>> + OutboundConnectionUpgrade<Negotiated<C>>,
127{
128 #[pin]
129 inner: EitherUpgrade<C, U>,
130}
131
132impl<C, U> Future for Authenticate<C, U>
133where
134 C: AsyncRead + AsyncWrite + Unpin,
135 U: InboundConnectionUpgrade<Negotiated<C>>
136 + OutboundConnectionUpgrade<
137 Negotiated<C>,
138 Output = <U as InboundConnectionUpgrade<Negotiated<C>>>::Output,
139 Error = <U as InboundConnectionUpgrade<Negotiated<C>>>::Error,
140 >,
141{
142 type Output = <EitherUpgrade<C, U> as Future>::Output;
143
144 fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
145 let this = self.project();
146 Future::poll(this.inner, cx)
147 }
148}
149
150#[pin_project::pin_project]
155pub struct Multiplex<C, U>
156where
157 C: AsyncRead + AsyncWrite + Unpin,
158 U: InboundConnectionUpgrade<Negotiated<C>> + OutboundConnectionUpgrade<Negotiated<C>>,
159{
160 peer_id: Option<PeerId>,
161 #[pin]
162 upgrade: EitherUpgrade<C, U>,
163}
164
165impl<C, U, M, E> Future for Multiplex<C, U>
166where
167 C: AsyncRead + AsyncWrite + Unpin,
168 U: InboundConnectionUpgrade<Negotiated<C>, Output = M, Error = E>,
169 U: OutboundConnectionUpgrade<Negotiated<C>, Output = M, Error = E>,
170{
171 type Output = Result<(PeerId, M), UpgradeError<E>>;
172
173 fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
174 let this = self.project();
175 let m = match ready!(Future::poll(this.upgrade, cx)) {
176 Ok(m) => m,
177 Err(err) => return Poll::Ready(Err(err)),
178 };
179 let i = this
180 .peer_id
181 .take()
182 .expect("Multiplex future polled after completion.");
183 Poll::Ready(Ok((i, m)))
184 }
185}
186
187#[derive(Clone)]
189pub struct Authenticated<T>(Builder<T>);
190
191impl<T> Authenticated<T>
192where
193 T: Transport,
194 T::Error: 'static,
195{
196 pub fn apply<C, D, U, E>(self, upgrade: U) -> Authenticated<Upgrade<T, U>>
207 where
208 T: Transport<Output = (PeerId, C)>,
209 C: AsyncRead + AsyncWrite + Unpin,
210 D: AsyncRead + AsyncWrite + Unpin,
211 U: InboundConnectionUpgrade<Negotiated<C>, Output = D, Error = E>,
212 U: OutboundConnectionUpgrade<Negotiated<C>, Output = D, Error = E> + Clone,
213 E: Error + 'static,
214 {
215 Authenticated(Builder::new(
216 Upgrade::new(self.0.inner, upgrade),
217 self.0.version,
218 ))
219 }
220
221 pub fn multiplex<C, M, U, E>(
232 self,
233 upgrade: U,
234 ) -> Multiplexed<AndThen<T, impl FnOnce((PeerId, C), ConnectedPoint) -> Multiplex<C, U> + Clone>>
235 where
236 T: Transport<Output = (PeerId, C)>,
237 C: AsyncRead + AsyncWrite + Unpin,
238 M: StreamMuxer,
239 U: InboundConnectionUpgrade<Negotiated<C>, Output = M, Error = E>,
240 U: OutboundConnectionUpgrade<Negotiated<C>, Output = M, Error = E> + Clone,
241 E: Error + 'static,
242 {
243 let version = self.0.version;
244 Multiplexed(self.0.inner.and_then(move |(i, c), endpoint| {
245 let upgrade = upgrade::apply(c, upgrade, endpoint, version);
246 Multiplex {
247 peer_id: Some(i),
248 upgrade,
249 }
250 }))
251 }
252
253 pub fn multiplex_ext<C, M, U, E, F>(
265 self,
266 up: F,
267 ) -> Multiplexed<AndThen<T, impl FnOnce((PeerId, C), ConnectedPoint) -> Multiplex<C, U> + Clone>>
268 where
269 T: Transport<Output = (PeerId, C)>,
270 C: AsyncRead + AsyncWrite + Unpin,
271 M: StreamMuxer,
272 U: InboundConnectionUpgrade<Negotiated<C>, Output = M, Error = E>,
273 U: OutboundConnectionUpgrade<Negotiated<C>, Output = M, Error = E> + Clone,
274 E: Error + 'static,
275 F: for<'a> FnOnce(&'a PeerId, &'a ConnectedPoint) -> U + Clone,
276 {
277 let version = self.0.version;
278 Multiplexed(self.0.inner.and_then(move |(peer_id, c), endpoint| {
279 let upgrade = upgrade::apply(c, up(&peer_id, &endpoint), endpoint, version);
280 Multiplex {
281 peer_id: Some(peer_id),
282 upgrade,
283 }
284 }))
285 }
286}
287
288#[derive(Clone)]
291#[pin_project::pin_project]
292pub struct Multiplexed<T>(#[pin] T);
293
294impl<T> Multiplexed<T> {
295 pub fn boxed<M>(self) -> super::Boxed<(PeerId, StreamMuxerBox)>
298 where
299 T: Transport<Output = (PeerId, M)> + Sized + Send + Unpin + 'static,
300 T::Dial: Send + 'static,
301 T::ListenerUpgrade: Send + 'static,
302 T::Error: Send + Sync,
303 M: StreamMuxer + Send + 'static,
304 M::Substream: Send + 'static,
305 M::Error: Send + Sync + 'static,
306 {
307 boxed(self.map(|(i, m), _| (i, StreamMuxerBox::new(m))))
308 }
309
310 pub fn timeout(self, timeout: Duration) -> Multiplexed<TransportTimeout<T>> {
313 Multiplexed(TransportTimeout::new(self.0, timeout))
314 }
315
316 pub fn outbound_timeout(self, timeout: Duration) -> Multiplexed<TransportTimeout<T>> {
319 Multiplexed(TransportTimeout::with_outgoing_timeout(self.0, timeout))
320 }
321
322 pub fn inbound_timeout(self, timeout: Duration) -> Multiplexed<TransportTimeout<T>> {
325 Multiplexed(TransportTimeout::with_ingoing_timeout(self.0, timeout))
326 }
327}
328
329impl<T> Transport for Multiplexed<T>
330where
331 T: Transport,
332{
333 type Output = T::Output;
334 type Error = T::Error;
335 type ListenerUpgrade = T::ListenerUpgrade;
336 type Dial = T::Dial;
337
338 fn dial(&mut self, addr: Multiaddr) -> Result<Self::Dial, TransportError<Self::Error>> {
339 self.0.dial(addr)
340 }
341
342 fn remove_listener(&mut self, id: ListenerId) -> bool {
343 self.0.remove_listener(id)
344 }
345
346 fn dial_as_listener(
347 &mut self,
348 addr: Multiaddr,
349 ) -> Result<Self::Dial, TransportError<Self::Error>> {
350 self.0.dial_as_listener(addr)
351 }
352
353 fn listen_on(
354 &mut self,
355 id: ListenerId,
356 addr: Multiaddr,
357 ) -> Result<(), TransportError<Self::Error>> {
358 self.0.listen_on(id, addr)
359 }
360
361 fn address_translation(&self, server: &Multiaddr, observed: &Multiaddr) -> Option<Multiaddr> {
362 self.0.address_translation(server, observed)
363 }
364
365 fn poll(
366 self: Pin<&mut Self>,
367 cx: &mut Context<'_>,
368 ) -> Poll<TransportEvent<Self::ListenerUpgrade, Self::Error>> {
369 self.project().0.poll(cx)
370 }
371}
372
373type EitherUpgrade<C, U> = future::Either<InboundUpgradeApply<C, U>, OutboundUpgradeApply<C, U>>;
375
376#[derive(Debug, Copy, Clone)]
380#[pin_project::pin_project]
381pub struct Upgrade<T, U> {
382 #[pin]
383 inner: T,
384 upgrade: U,
385}
386
387impl<T, U> Upgrade<T, U> {
388 pub fn new(inner: T, upgrade: U) -> Self {
389 Upgrade { inner, upgrade }
390 }
391}
392
393impl<T, C, D, U, E> Transport for Upgrade<T, U>
394where
395 T: Transport<Output = (PeerId, C)>,
396 T::Error: 'static,
397 C: AsyncRead + AsyncWrite + Unpin,
398 U: InboundConnectionUpgrade<Negotiated<C>, Output = D, Error = E>,
399 U: OutboundConnectionUpgrade<Negotiated<C>, Output = D, Error = E> + Clone,
400 E: Error + 'static,
401{
402 type Output = (PeerId, D);
403 type Error = TransportUpgradeError<T::Error, E>;
404 type ListenerUpgrade = ListenerUpgradeFuture<T::ListenerUpgrade, U, C>;
405 type Dial = DialUpgradeFuture<T::Dial, U, C>;
406
407 fn dial(&mut self, addr: Multiaddr) -> Result<Self::Dial, TransportError<Self::Error>> {
408 let future = self
409 .inner
410 .dial(addr)
411 .map_err(|err| err.map(TransportUpgradeError::Transport))?;
412 Ok(DialUpgradeFuture {
413 future: Box::pin(future),
414 upgrade: future::Either::Left(Some(self.upgrade.clone())),
415 })
416 }
417
418 fn remove_listener(&mut self, id: ListenerId) -> bool {
419 self.inner.remove_listener(id)
420 }
421
422 fn dial_as_listener(
423 &mut self,
424 addr: Multiaddr,
425 ) -> Result<Self::Dial, TransportError<Self::Error>> {
426 let future = self
427 .inner
428 .dial_as_listener(addr)
429 .map_err(|err| err.map(TransportUpgradeError::Transport))?;
430 Ok(DialUpgradeFuture {
431 future: Box::pin(future),
432 upgrade: future::Either::Left(Some(self.upgrade.clone())),
433 })
434 }
435
436 fn listen_on(
437 &mut self,
438 id: ListenerId,
439 addr: Multiaddr,
440 ) -> Result<(), TransportError<Self::Error>> {
441 self.inner
442 .listen_on(id, addr)
443 .map_err(|err| err.map(TransportUpgradeError::Transport))
444 }
445
446 fn address_translation(&self, server: &Multiaddr, observed: &Multiaddr) -> Option<Multiaddr> {
447 self.inner.address_translation(server, observed)
448 }
449
450 fn poll(
451 self: Pin<&mut Self>,
452 cx: &mut Context<'_>,
453 ) -> Poll<TransportEvent<Self::ListenerUpgrade, Self::Error>> {
454 let this = self.project();
455 let upgrade = this.upgrade.clone();
456 this.inner.poll(cx).map(|event| {
457 event
458 .map_upgrade(move |future| ListenerUpgradeFuture {
459 future: Box::pin(future),
460 upgrade: future::Either::Left(Some(upgrade)),
461 })
462 .map_err(TransportUpgradeError::Transport)
463 })
464 }
465}
466
467#[derive(Debug)]
469pub enum TransportUpgradeError<T, U> {
470 Transport(T),
472 Upgrade(UpgradeError<U>),
474}
475
476impl<T, U> fmt::Display for TransportUpgradeError<T, U>
477where
478 T: fmt::Display,
479 U: fmt::Display,
480{
481 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
482 match self {
483 TransportUpgradeError::Transport(e) => write!(f, "Transport error: {e}"),
484 TransportUpgradeError::Upgrade(e) => write!(f, "Upgrade error: {e}"),
485 }
486 }
487}
488
489impl<T, U> Error for TransportUpgradeError<T, U>
490where
491 T: Error + 'static,
492 U: Error + 'static,
493{
494 fn source(&self) -> Option<&(dyn Error + 'static)> {
495 match self {
496 TransportUpgradeError::Transport(e) => Some(e),
497 TransportUpgradeError::Upgrade(e) => Some(e),
498 }
499 }
500}
501
502pub struct DialUpgradeFuture<F, U, C>
504where
505 U: OutboundConnectionUpgrade<Negotiated<C>>,
506 C: AsyncRead + AsyncWrite + Unpin,
507{
508 future: Pin<Box<F>>,
509 upgrade: future::Either<Option<U>, (PeerId, OutboundUpgradeApply<C, U>)>,
510}
511
512impl<F, U, C, D> Future for DialUpgradeFuture<F, U, C>
513where
514 F: TryFuture<Ok = (PeerId, C)>,
515 C: AsyncRead + AsyncWrite + Unpin,
516 U: OutboundConnectionUpgrade<Negotiated<C>, Output = D>,
517 U::Error: Error,
518{
519 type Output = Result<(PeerId, D), TransportUpgradeError<F::Error, U::Error>>;
520
521 fn poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
522 let this = &mut *self;
525
526 loop {
527 this.upgrade = match this.upgrade {
528 future::Either::Left(ref mut up) => {
529 let (i, c) = match ready!(TryFuture::try_poll(this.future.as_mut(), cx)
530 .map_err(TransportUpgradeError::Transport))
531 {
532 Ok(v) => v,
533 Err(err) => return Poll::Ready(Err(err)),
534 };
535 let u = up
536 .take()
537 .expect("DialUpgradeFuture is constructed with Either::Left(Some).");
538 future::Either::Right((i, apply_outbound(c, u, upgrade::Version::V1)))
539 }
540 future::Either::Right((i, ref mut up)) => {
541 let d = match ready!(
542 Future::poll(Pin::new(up), cx).map_err(TransportUpgradeError::Upgrade)
543 ) {
544 Ok(d) => d,
545 Err(err) => return Poll::Ready(Err(err)),
546 };
547 return Poll::Ready(Ok((i, d)));
548 }
549 }
550 }
551 }
552}
553
554impl<F, U, C> Unpin for DialUpgradeFuture<F, U, C>
555where
556 U: OutboundConnectionUpgrade<Negotiated<C>>,
557 C: AsyncRead + AsyncWrite + Unpin,
558{
559}
560
561pub struct ListenerUpgradeFuture<F, U, C>
563where
564 C: AsyncRead + AsyncWrite + Unpin,
565 U: InboundConnectionUpgrade<Negotiated<C>>,
566{
567 future: Pin<Box<F>>,
568 upgrade: future::Either<Option<U>, (PeerId, InboundUpgradeApply<C, U>)>,
569}
570
571impl<F, U, C, D> Future for ListenerUpgradeFuture<F, U, C>
572where
573 F: TryFuture<Ok = (PeerId, C)>,
574 C: AsyncRead + AsyncWrite + Unpin,
575 U: InboundConnectionUpgrade<Negotiated<C>, Output = D>,
576 U::Error: Error,
577{
578 type Output = Result<(PeerId, D), TransportUpgradeError<F::Error, U::Error>>;
579
580 fn poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
581 let this = &mut *self;
584
585 loop {
586 this.upgrade = match this.upgrade {
587 future::Either::Left(ref mut up) => {
588 let (i, c) = match ready!(TryFuture::try_poll(this.future.as_mut(), cx)
589 .map_err(TransportUpgradeError::Transport))
590 {
591 Ok(v) => v,
592 Err(err) => return Poll::Ready(Err(err)),
593 };
594 let u = up
595 .take()
596 .expect("ListenerUpgradeFuture is constructed with Either::Left(Some).");
597 future::Either::Right((i, apply_inbound(c, u)))
598 }
599 future::Either::Right((i, ref mut up)) => {
600 let d = match ready!(TryFuture::try_poll(Pin::new(up), cx)
601 .map_err(TransportUpgradeError::Upgrade))
602 {
603 Ok(v) => v,
604 Err(err) => return Poll::Ready(Err(err)),
605 };
606 return Poll::Ready(Ok((i, d)));
607 }
608 }
609 }
610 }
611}
612
613impl<F, U, C> Unpin for ListenerUpgradeFuture<F, U, C>
614where
615 C: AsyncRead + AsyncWrite + Unpin,
616 U: InboundConnectionUpgrade<Negotiated<C>>,
617{
618}