litep2p/transport/
mod.rs

1// Copyright 2023 litep2p developers
2//
3// Permission is hereby granted, free of charge, to any person obtaining a
4// copy of this software and associated documentation files (the "Software"),
5// to deal in the Software without restriction, including without limitation
6// the rights to use, copy, modify, merge, publish, distribute, sublicense,
7// and/or sell copies of the Software, and to permit persons to whom the
8// Software is furnished to do so, subject to the following conditions:
9//
10// The above copyright notice and this permission notice shall be included in
11// all copies or substantial portions of the Software.
12//
13// THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS
14// OR IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
15// FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
16// AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
17// LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING
18// FROM, OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER
19// DEALINGS IN THE SOFTWARE.
20
21//! Transport protocol implementations provided by [`Litep2p`](`crate::Litep2p`).
22
23use crate::{error::DialError, transport::manager::TransportHandle, types::ConnectionId, PeerId};
24
25use futures::{future::BoxFuture, Stream};
26use hickory_resolver::TokioResolver;
27use multiaddr::Multiaddr;
28
29use std::{fmt::Debug, sync::Arc, time::Duration};
30
31pub(crate) mod common;
32#[cfg(feature = "quic")]
33pub mod quic;
34pub mod tcp;
35#[cfg(feature = "webrtc")]
36pub mod webrtc;
37#[cfg(feature = "websocket")]
38pub mod websocket;
39
40#[cfg(test)]
41pub(crate) mod dummy;
42
43pub(crate) mod manager;
44
45pub use manager::limits::{ConnectionLimitsConfig, ConnectionLimitsError};
46
47/// Timeout for opening a connection.
48pub(crate) const CONNECTION_OPEN_TIMEOUT: Duration = Duration::from_secs(10);
49
50/// Timeout for opening a substream.
51pub(crate) const SUBSTREAM_OPEN_TIMEOUT: Duration = Duration::from_secs(5);
52
53/// Timeout for connection waiting new substreams.
54pub(crate) const KEEP_ALIVE_TIMEOUT: Duration = Duration::from_secs(5);
55
56/// Maximum number of parallel dial attempts.
57pub(crate) const MAX_PARALLEL_DIALS: usize = 8;
58
59/// Multiplier applied to `connection_open_timeout` to derive the overall dial deadline.
60///
61/// When dialing multiple addresses concurrently, the total time allowed is
62/// `DIAL_DEADLINE_MULTIPLIER * connection_open_timeout`. This gives enough time
63/// to cycle through addresses without stalling indefinitely.
64pub(crate) const DIAL_DEADLINE_MULTIPLIER: u32 = 2;
65
66/// Connection endpoint.
67#[derive(Debug, Clone, PartialEq, Eq)]
68pub enum Endpoint {
69    /// Successfully established outbound connection.
70    Dialer {
71        /// Address that was dialed.
72        address: Multiaddr,
73
74        /// Connection ID.
75        connection_id: ConnectionId,
76    },
77
78    /// Successfully established inbound connection.
79    Listener {
80        /// Local connection address.
81        address: Multiaddr,
82
83        /// Connection ID.
84        connection_id: ConnectionId,
85    },
86}
87
88impl Endpoint {
89    /// Get `Multiaddr` of the [`Endpoint`].
90    pub fn address(&self) -> &Multiaddr {
91        match self {
92            Self::Dialer { address, .. } => address,
93            Self::Listener { address, .. } => address,
94        }
95    }
96
97    /// Crate dialer.
98    pub(crate) fn dialer(address: Multiaddr, connection_id: ConnectionId) -> Self {
99        Endpoint::Dialer {
100            address,
101            connection_id,
102        }
103    }
104
105    /// Create listener.
106    pub(crate) fn listener(address: Multiaddr, connection_id: ConnectionId) -> Self {
107        Endpoint::Listener {
108            address,
109            connection_id,
110        }
111    }
112
113    /// Get `ConnectionId` of the `Endpoint`.
114    pub fn connection_id(&self) -> ConnectionId {
115        match self {
116            Self::Dialer { connection_id, .. } => *connection_id,
117            Self::Listener { connection_id, .. } => *connection_id,
118        }
119    }
120
121    /// Is this a listener endpoint?
122    pub fn is_listener(&self) -> bool {
123        std::matches!(self, Self::Listener { .. })
124    }
125}
126
127/// Transport event.
128#[derive(Debug)]
129pub(crate) enum TransportEvent {
130    /// Fully negotiated connection established to remote peer.
131    ConnectionEstablished {
132        /// Peer ID.
133        peer: PeerId,
134
135        /// Endpoint.
136        endpoint: Endpoint,
137    },
138
139    PendingInboundConnection {
140        /// Connection ID.
141        connection_id: ConnectionId,
142    },
143
144    /// Connection opened to remote but not yet negotiated.
145    ConnectionOpened {
146        /// Connection ID.
147        connection_id: ConnectionId,
148
149        /// Address that was dialed.
150        address: Multiaddr,
151
152        /// Errors from unsuccessful dial attempts.
153        errors: Vec<(Multiaddr, DialError)>,
154    },
155
156    /// Connection closed to remote peer.
157    #[allow(unused)]
158    ConnectionClosed {
159        /// Peer ID.
160        peer: PeerId,
161
162        /// Connection ID.
163        connection_id: ConnectionId,
164    },
165
166    /// Failed to dial remote peer.
167    DialFailure {
168        /// Connection ID.
169        connection_id: ConnectionId,
170
171        /// Dialed address.
172        address: Multiaddr,
173
174        /// Error.
175        error: DialError,
176    },
177
178    /// Open failure for an unnegotiated set of connections.
179    OpenFailure {
180        /// Connection ID.
181        connection_id: ConnectionId,
182
183        /// Errors.
184        errors: Vec<(Multiaddr, DialError)>,
185    },
186}
187
188pub(crate) trait TransportBuilder {
189    type Config: Debug;
190    type Transport: Transport;
191
192    /// Create new [`Transport`] object.
193    fn new(
194        context: TransportHandle,
195        config: Self::Config,
196        resolver: Arc<TokioResolver>,
197    ) -> crate::Result<(Self, Vec<Multiaddr>)>
198    where
199        Self: Sized;
200}
201
202pub(crate) trait Transport: Stream + Unpin + Send {
203    /// Dial `address` and negotiate connection.
204    fn dial(&mut self, connection_id: ConnectionId, address: Multiaddr) -> crate::Result<()>;
205
206    /// Accept negotiated connection.
207    ///
208    /// Returns a future that completes when the connection has been fully established
209    /// and all installed protocols have been notified via their event channels.
210    /// This ensures that by the time the caller receives a ConnectionEstablished event,
211    /// protocols are ready to handle substream operations.
212    fn accept(
213        &mut self,
214        connection_id: ConnectionId,
215    ) -> crate::Result<BoxFuture<'static, crate::Result<()>>>;
216
217    /// Accept pending connection.
218    fn accept_pending(&mut self, connection_id: ConnectionId) -> crate::Result<()>;
219
220    /// Reject pending connection.
221    fn reject_pending(&mut self, connection_id: ConnectionId) -> crate::Result<()>;
222
223    /// Reject negotiated connection.
224    fn reject(&mut self, connection_id: ConnectionId) -> crate::Result<()>;
225
226    /// Attempt to open connection to remote peer over one or more addresses.
227    fn open(&mut self, connection_id: ConnectionId, addresses: Vec<Multiaddr>)
228        -> crate::Result<()>;
229
230    /// Negotiate opened connection.
231    fn negotiate(&mut self, connection_id: ConnectionId) -> crate::Result<()>;
232
233    /// Cancel opening connections.
234    ///
235    /// This is a no-op for connections that have already succeeded/canceled.
236    fn cancel(&mut self, connection_id: ConnectionId);
237}