libp2p/builder/phase/
tcp.rs

1use super::*;
2use crate::SwarmBuilder;
3#[cfg(all(
4    not(target_arch = "wasm32"),
5    any(feature = "tcp", feature = "websocket")
6))]
7use libp2p_core::muxing::{StreamMuxer, StreamMuxerBox};
8#[cfg(all(feature = "websocket", not(target_arch = "wasm32")))]
9use libp2p_core::Transport;
10#[cfg(all(
11    not(target_arch = "wasm32"),
12    any(feature = "tcp", feature = "websocket")
13))]
14use libp2p_core::{InboundUpgrade, Negotiated, OutboundUpgrade, UpgradeInfo};
15use std::marker::PhantomData;
16
17pub struct TcpPhase {}
18
19macro_rules! impl_tcp_builder {
20    ($providerKebabCase:literal, $providerPascalCase:ty, $path:ident) => {
21        #[cfg(all(
22            not(target_arch = "wasm32"),
23            feature = "tcp",
24            feature = $providerKebabCase,
25        ))]
26        impl SwarmBuilder<$providerPascalCase, TcpPhase> {
27            /// Adds a TCP based transport.
28            ///
29            /// Note that both `security_upgrade` and `multiplexer_upgrade` take function pointers,
30            /// i.e. they take the function themselves (without the invocation via `()`), not the
31            /// result of the function invocation. See example below.
32            ///
33            /// ``` rust
34            /// # use libp2p::SwarmBuilder;
35            /// # use std::error::Error;
36            /// # async fn build_swarm() -> Result<(), Box<dyn Error>> {
37            /// let swarm = SwarmBuilder::with_new_identity()
38            ///     .with_tokio()
39            ///     .with_tcp(
40            ///         Default::default(),
41            ///         (libp2p_tls::Config::new, libp2p_noise::Config::new),
42            ///         libp2p_yamux::Config::default,
43            ///     )?
44            /// # ;
45            /// # Ok(())
46            /// # }
47            /// ```
48            pub fn with_tcp<SecUpgrade, SecStream, SecError, MuxUpgrade, MuxStream, MuxError>(
49                self,
50                tcp_config: libp2p_tcp::Config,
51                security_upgrade: SecUpgrade,
52                multiplexer_upgrade: MuxUpgrade,
53            ) -> Result<
54                SwarmBuilder<$providerPascalCase, QuicPhase<impl AuthenticatedMultiplexedTransport>>,
55            SecUpgrade::Error,
56            >
57            where
58                SecStream: futures::AsyncRead + futures::AsyncWrite + Unpin + Send + 'static,
59                SecError: std::error::Error + Send + Sync + 'static,
60                SecUpgrade: IntoSecurityUpgrade<libp2p_tcp::$path::TcpStream>,
61                SecUpgrade::Upgrade: InboundUpgrade<Negotiated<libp2p_tcp::$path::TcpStream>, Output = (libp2p_identity::PeerId, SecStream), Error = SecError> + OutboundUpgrade<Negotiated<libp2p_tcp::$path::TcpStream>, Output = (libp2p_identity::PeerId, SecStream), Error = SecError> + Clone + Send + 'static,
62                <SecUpgrade::Upgrade as InboundUpgrade<Negotiated<libp2p_tcp::$path::TcpStream>>>::Future: Send,
63                <SecUpgrade::Upgrade as OutboundUpgrade<Negotiated<libp2p_tcp::$path::TcpStream>>>::Future: Send,
64                <<<SecUpgrade as IntoSecurityUpgrade<libp2p_tcp::$path::TcpStream>>::Upgrade as UpgradeInfo>::InfoIter as IntoIterator>::IntoIter: Send,
65                <<SecUpgrade as IntoSecurityUpgrade<libp2p_tcp::$path::TcpStream>>::Upgrade as UpgradeInfo>::Info: Send,
66
67                MuxStream: StreamMuxer + Send + 'static,
68                MuxStream::Substream: Send + 'static,
69                MuxStream::Error: Send + Sync + 'static,
70                MuxUpgrade: IntoMultiplexerUpgrade<SecStream>,
71                MuxUpgrade::Upgrade: InboundUpgrade<Negotiated<SecStream>, Output = MuxStream, Error = MuxError> + OutboundUpgrade<Negotiated<SecStream>, Output = MuxStream, Error = MuxError> + Clone + Send + 'static,
72                <MuxUpgrade::Upgrade as InboundUpgrade<Negotiated<SecStream>>>::Future: Send,
73                <MuxUpgrade::Upgrade as OutboundUpgrade<Negotiated<SecStream>>>::Future: Send,
74                MuxError: std::error::Error + Send + Sync + 'static,
75                <<<MuxUpgrade as IntoMultiplexerUpgrade<SecStream>>::Upgrade as UpgradeInfo>::InfoIter as IntoIterator>::IntoIter: Send,
76                <<MuxUpgrade as IntoMultiplexerUpgrade<SecStream>>::Upgrade as UpgradeInfo>::Info: Send,
77            {
78                Ok(SwarmBuilder {
79                    phase: QuicPhase {
80                        transport: libp2p_tcp::$path::Transport::new(tcp_config)
81                            .upgrade(libp2p_core::upgrade::Version::V1Lazy)
82                            .authenticate(
83                                security_upgrade.into_security_upgrade(&self.keypair)?,
84                            )
85                            .multiplex(multiplexer_upgrade.into_multiplexer_upgrade())
86                            .map(|(p, c), _| (p, StreamMuxerBox::new(c))),
87                    },
88                    keypair: self.keypair,
89                    phantom: PhantomData,
90                })
91            }
92        }
93    };
94}
95
96impl_tcp_builder!("async-std", super::provider::AsyncStd, async_io);
97impl_tcp_builder!("tokio", super::provider::Tokio, tokio);
98
99impl<Provider> SwarmBuilder<Provider, TcpPhase> {
100    pub(crate) fn without_tcp(
101        self,
102    ) -> SwarmBuilder<Provider, QuicPhase<impl AuthenticatedMultiplexedTransport>> {
103        SwarmBuilder {
104            keypair: self.keypair,
105            phantom: PhantomData,
106            phase: QuicPhase {
107                transport: libp2p_core::transport::dummy::DummyTransport::new(),
108            },
109        }
110    }
111}
112
113// Shortcuts
114#[cfg(all(not(target_arch = "wasm32"), feature = "quic", feature = "async-std"))]
115impl SwarmBuilder<super::provider::AsyncStd, TcpPhase> {
116    pub fn with_quic(
117        self,
118    ) -> SwarmBuilder<
119        super::provider::AsyncStd,
120        OtherTransportPhase<impl AuthenticatedMultiplexedTransport>,
121    > {
122        self.without_tcp().with_quic()
123    }
124}
125#[cfg(all(not(target_arch = "wasm32"), feature = "quic", feature = "tokio"))]
126impl SwarmBuilder<super::provider::Tokio, TcpPhase> {
127    pub fn with_quic(
128        self,
129    ) -> SwarmBuilder<
130        super::provider::Tokio,
131        OtherTransportPhase<impl AuthenticatedMultiplexedTransport>,
132    > {
133        self.without_tcp().with_quic()
134    }
135}
136impl<Provider> SwarmBuilder<Provider, TcpPhase> {
137    pub fn with_other_transport<
138        Muxer: libp2p_core::muxing::StreamMuxer + Send + 'static,
139        OtherTransport: Transport<Output = (libp2p_identity::PeerId, Muxer)> + Send + Unpin + 'static,
140        R: TryIntoTransport<OtherTransport>,
141    >(
142        self,
143        constructor: impl FnOnce(&libp2p_identity::Keypair) -> R,
144    ) -> Result<
145        SwarmBuilder<Provider, OtherTransportPhase<impl AuthenticatedMultiplexedTransport>>,
146        R::Error,
147    >
148    where
149        <OtherTransport as Transport>::Error: Send + Sync + 'static,
150        <OtherTransport as Transport>::Dial: Send,
151        <OtherTransport as Transport>::ListenerUpgrade: Send,
152        <Muxer as libp2p_core::muxing::StreamMuxer>::Substream: Send,
153        <Muxer as libp2p_core::muxing::StreamMuxer>::Error: Send + Sync,
154    {
155        self.without_tcp()
156            .without_quic()
157            .with_other_transport(constructor)
158    }
159}
160macro_rules! impl_tcp_phase_with_websocket {
161    ($providerKebabCase:literal, $providerPascalCase:ty, $websocketStream:ty) => {
162        #[cfg(all(feature = $providerKebabCase, not(target_arch = "wasm32"), feature = "websocket"))]
163        impl SwarmBuilder<$providerPascalCase, TcpPhase> {
164            /// See [`SwarmBuilder::with_websocket`].
165            pub async fn with_websocket <
166                SecUpgrade,
167                SecStream,
168                SecError,
169                MuxUpgrade,
170                MuxStream,
171                MuxError,
172            > (
173                self,
174                security_upgrade: SecUpgrade,
175                multiplexer_upgrade: MuxUpgrade,
176            ) -> Result<
177                    SwarmBuilder<
178                        $providerPascalCase,
179                        RelayPhase<impl AuthenticatedMultiplexedTransport>,
180                    >,
181                    WebsocketError<SecUpgrade::Error>,
182                >
183            where
184                SecStream: futures::AsyncRead + futures::AsyncWrite + Unpin + Send + 'static,
185                SecError: std::error::Error + Send + Sync + 'static,
186                SecUpgrade: IntoSecurityUpgrade<$websocketStream>,
187                SecUpgrade::Upgrade: InboundUpgrade<Negotiated<$websocketStream>, Output = (libp2p_identity::PeerId, SecStream), Error = SecError> + OutboundUpgrade<Negotiated<$websocketStream>, Output = (libp2p_identity::PeerId, SecStream), Error = SecError> + Clone + Send + 'static,
188            <SecUpgrade::Upgrade as InboundUpgrade<Negotiated<$websocketStream>>>::Future: Send,
189            <SecUpgrade::Upgrade as OutboundUpgrade<Negotiated<$websocketStream>>>::Future: Send,
190            <<<SecUpgrade as IntoSecurityUpgrade<$websocketStream>>::Upgrade as UpgradeInfo>::InfoIter as IntoIterator>::IntoIter: Send,
191            <<SecUpgrade as IntoSecurityUpgrade<$websocketStream>>::Upgrade as UpgradeInfo>::Info: Send,
192
193                MuxStream: StreamMuxer + Send + 'static,
194                MuxStream::Substream: Send + 'static,
195                MuxStream::Error: Send + Sync + 'static,
196                MuxUpgrade: IntoMultiplexerUpgrade<SecStream>,
197                MuxUpgrade::Upgrade: InboundUpgrade<Negotiated<SecStream>, Output = MuxStream, Error = MuxError> + OutboundUpgrade<Negotiated<SecStream>, Output = MuxStream, Error = MuxError> + Clone + Send + 'static,
198                <MuxUpgrade::Upgrade as InboundUpgrade<Negotiated<SecStream>>>::Future: Send,
199                <MuxUpgrade::Upgrade as OutboundUpgrade<Negotiated<SecStream>>>::Future: Send,
200                    MuxError: std::error::Error + Send + Sync + 'static,
201                <<<MuxUpgrade as IntoMultiplexerUpgrade<SecStream>>::Upgrade as UpgradeInfo>::InfoIter as IntoIterator>::IntoIter: Send,
202                <<MuxUpgrade as IntoMultiplexerUpgrade<SecStream>>::Upgrade as UpgradeInfo>::Info: Send,
203            {
204                self.without_tcp()
205                    .without_quic()
206                    .without_any_other_transports()
207                    .without_dns()
208                    .with_websocket(security_upgrade, multiplexer_upgrade)
209                    .await
210            }
211        }
212    }
213}
214impl_tcp_phase_with_websocket!(
215    "async-std",
216    super::provider::AsyncStd,
217    rw_stream_sink::RwStreamSink<
218        libp2p_websocket::BytesConnection<libp2p_tcp::async_io::TcpStream>,
219    >
220);
221impl_tcp_phase_with_websocket!(
222    "tokio",
223    super::provider::Tokio,
224    rw_stream_sink::RwStreamSink<libp2p_websocket::BytesConnection<libp2p_tcp::tokio::TcpStream>>
225);