1use super::*;
2use crate::SwarmBuilder;
3#[cfg(all(
4 not(target_arch = "wasm32"),
5 any(feature = "tcp", feature = "websocket")
6))]
7use libp2p_core::muxing::{StreamMuxer, StreamMuxerBox};
8#[cfg(all(feature = "websocket", not(target_arch = "wasm32")))]
9use libp2p_core::Transport;
10#[cfg(all(
11 not(target_arch = "wasm32"),
12 any(feature = "tcp", feature = "websocket")
13))]
14use libp2p_core::{InboundUpgrade, Negotiated, OutboundUpgrade, UpgradeInfo};
15use std::marker::PhantomData;
16
17pub struct TcpPhase {}
18
19macro_rules! impl_tcp_builder {
20 ($providerKebabCase:literal, $providerPascalCase:ty, $path:ident) => {
21 #[cfg(all(
22 not(target_arch = "wasm32"),
23 feature = "tcp",
24 feature = $providerKebabCase,
25 ))]
26 impl SwarmBuilder<$providerPascalCase, TcpPhase> {
27 pub fn with_tcp<SecUpgrade, SecStream, SecError, MuxUpgrade, MuxStream, MuxError>(
49 self,
50 tcp_config: libp2p_tcp::Config,
51 security_upgrade: SecUpgrade,
52 multiplexer_upgrade: MuxUpgrade,
53 ) -> Result<
54 SwarmBuilder<$providerPascalCase, QuicPhase<impl AuthenticatedMultiplexedTransport>>,
55 SecUpgrade::Error,
56 >
57 where
58 SecStream: futures::AsyncRead + futures::AsyncWrite + Unpin + Send + 'static,
59 SecError: std::error::Error + Send + Sync + 'static,
60 SecUpgrade: IntoSecurityUpgrade<libp2p_tcp::$path::TcpStream>,
61 SecUpgrade::Upgrade: InboundUpgrade<Negotiated<libp2p_tcp::$path::TcpStream>, Output = (libp2p_identity::PeerId, SecStream), Error = SecError> + OutboundUpgrade<Negotiated<libp2p_tcp::$path::TcpStream>, Output = (libp2p_identity::PeerId, SecStream), Error = SecError> + Clone + Send + 'static,
62 <SecUpgrade::Upgrade as InboundUpgrade<Negotiated<libp2p_tcp::$path::TcpStream>>>::Future: Send,
63 <SecUpgrade::Upgrade as OutboundUpgrade<Negotiated<libp2p_tcp::$path::TcpStream>>>::Future: Send,
64 <<<SecUpgrade as IntoSecurityUpgrade<libp2p_tcp::$path::TcpStream>>::Upgrade as UpgradeInfo>::InfoIter as IntoIterator>::IntoIter: Send,
65 <<SecUpgrade as IntoSecurityUpgrade<libp2p_tcp::$path::TcpStream>>::Upgrade as UpgradeInfo>::Info: Send,
66
67 MuxStream: StreamMuxer + Send + 'static,
68 MuxStream::Substream: Send + 'static,
69 MuxStream::Error: Send + Sync + 'static,
70 MuxUpgrade: IntoMultiplexerUpgrade<SecStream>,
71 MuxUpgrade::Upgrade: InboundUpgrade<Negotiated<SecStream>, Output = MuxStream, Error = MuxError> + OutboundUpgrade<Negotiated<SecStream>, Output = MuxStream, Error = MuxError> + Clone + Send + 'static,
72 <MuxUpgrade::Upgrade as InboundUpgrade<Negotiated<SecStream>>>::Future: Send,
73 <MuxUpgrade::Upgrade as OutboundUpgrade<Negotiated<SecStream>>>::Future: Send,
74 MuxError: std::error::Error + Send + Sync + 'static,
75 <<<MuxUpgrade as IntoMultiplexerUpgrade<SecStream>>::Upgrade as UpgradeInfo>::InfoIter as IntoIterator>::IntoIter: Send,
76 <<MuxUpgrade as IntoMultiplexerUpgrade<SecStream>>::Upgrade as UpgradeInfo>::Info: Send,
77 {
78 Ok(SwarmBuilder {
79 phase: QuicPhase {
80 transport: libp2p_tcp::$path::Transport::new(tcp_config)
81 .upgrade(libp2p_core::upgrade::Version::V1Lazy)
82 .authenticate(
83 security_upgrade.into_security_upgrade(&self.keypair)?,
84 )
85 .multiplex(multiplexer_upgrade.into_multiplexer_upgrade())
86 .map(|(p, c), _| (p, StreamMuxerBox::new(c))),
87 },
88 keypair: self.keypair,
89 phantom: PhantomData,
90 })
91 }
92 }
93 };
94}
95
96impl_tcp_builder!("async-std", super::provider::AsyncStd, async_io);
97impl_tcp_builder!("tokio", super::provider::Tokio, tokio);
98
99impl<Provider> SwarmBuilder<Provider, TcpPhase> {
100 pub(crate) fn without_tcp(
101 self,
102 ) -> SwarmBuilder<Provider, QuicPhase<impl AuthenticatedMultiplexedTransport>> {
103 SwarmBuilder {
104 keypair: self.keypair,
105 phantom: PhantomData,
106 phase: QuicPhase {
107 transport: libp2p_core::transport::dummy::DummyTransport::new(),
108 },
109 }
110 }
111}
112
113#[cfg(all(not(target_arch = "wasm32"), feature = "quic", feature = "async-std"))]
115impl SwarmBuilder<super::provider::AsyncStd, TcpPhase> {
116 pub fn with_quic(
117 self,
118 ) -> SwarmBuilder<
119 super::provider::AsyncStd,
120 OtherTransportPhase<impl AuthenticatedMultiplexedTransport>,
121 > {
122 self.without_tcp().with_quic()
123 }
124}
125#[cfg(all(not(target_arch = "wasm32"), feature = "quic", feature = "tokio"))]
126impl SwarmBuilder<super::provider::Tokio, TcpPhase> {
127 pub fn with_quic(
128 self,
129 ) -> SwarmBuilder<
130 super::provider::Tokio,
131 OtherTransportPhase<impl AuthenticatedMultiplexedTransport>,
132 > {
133 self.without_tcp().with_quic()
134 }
135}
136impl<Provider> SwarmBuilder<Provider, TcpPhase> {
137 pub fn with_other_transport<
138 Muxer: libp2p_core::muxing::StreamMuxer + Send + 'static,
139 OtherTransport: Transport<Output = (libp2p_identity::PeerId, Muxer)> + Send + Unpin + 'static,
140 R: TryIntoTransport<OtherTransport>,
141 >(
142 self,
143 constructor: impl FnOnce(&libp2p_identity::Keypair) -> R,
144 ) -> Result<
145 SwarmBuilder<Provider, OtherTransportPhase<impl AuthenticatedMultiplexedTransport>>,
146 R::Error,
147 >
148 where
149 <OtherTransport as Transport>::Error: Send + Sync + 'static,
150 <OtherTransport as Transport>::Dial: Send,
151 <OtherTransport as Transport>::ListenerUpgrade: Send,
152 <Muxer as libp2p_core::muxing::StreamMuxer>::Substream: Send,
153 <Muxer as libp2p_core::muxing::StreamMuxer>::Error: Send + Sync,
154 {
155 self.without_tcp()
156 .without_quic()
157 .with_other_transport(constructor)
158 }
159}
160macro_rules! impl_tcp_phase_with_websocket {
161 ($providerKebabCase:literal, $providerPascalCase:ty, $websocketStream:ty) => {
162 #[cfg(all(feature = $providerKebabCase, not(target_arch = "wasm32"), feature = "websocket"))]
163 impl SwarmBuilder<$providerPascalCase, TcpPhase> {
164 pub async fn with_websocket <
166 SecUpgrade,
167 SecStream,
168 SecError,
169 MuxUpgrade,
170 MuxStream,
171 MuxError,
172 > (
173 self,
174 security_upgrade: SecUpgrade,
175 multiplexer_upgrade: MuxUpgrade,
176 ) -> Result<
177 SwarmBuilder<
178 $providerPascalCase,
179 RelayPhase<impl AuthenticatedMultiplexedTransport>,
180 >,
181 WebsocketError<SecUpgrade::Error>,
182 >
183 where
184 SecStream: futures::AsyncRead + futures::AsyncWrite + Unpin + Send + 'static,
185 SecError: std::error::Error + Send + Sync + 'static,
186 SecUpgrade: IntoSecurityUpgrade<$websocketStream>,
187 SecUpgrade::Upgrade: InboundUpgrade<Negotiated<$websocketStream>, Output = (libp2p_identity::PeerId, SecStream), Error = SecError> + OutboundUpgrade<Negotiated<$websocketStream>, Output = (libp2p_identity::PeerId, SecStream), Error = SecError> + Clone + Send + 'static,
188 <SecUpgrade::Upgrade as InboundUpgrade<Negotiated<$websocketStream>>>::Future: Send,
189 <SecUpgrade::Upgrade as OutboundUpgrade<Negotiated<$websocketStream>>>::Future: Send,
190 <<<SecUpgrade as IntoSecurityUpgrade<$websocketStream>>::Upgrade as UpgradeInfo>::InfoIter as IntoIterator>::IntoIter: Send,
191 <<SecUpgrade as IntoSecurityUpgrade<$websocketStream>>::Upgrade as UpgradeInfo>::Info: Send,
192
193 MuxStream: StreamMuxer + Send + 'static,
194 MuxStream::Substream: Send + 'static,
195 MuxStream::Error: Send + Sync + 'static,
196 MuxUpgrade: IntoMultiplexerUpgrade<SecStream>,
197 MuxUpgrade::Upgrade: InboundUpgrade<Negotiated<SecStream>, Output = MuxStream, Error = MuxError> + OutboundUpgrade<Negotiated<SecStream>, Output = MuxStream, Error = MuxError> + Clone + Send + 'static,
198 <MuxUpgrade::Upgrade as InboundUpgrade<Negotiated<SecStream>>>::Future: Send,
199 <MuxUpgrade::Upgrade as OutboundUpgrade<Negotiated<SecStream>>>::Future: Send,
200 MuxError: std::error::Error + Send + Sync + 'static,
201 <<<MuxUpgrade as IntoMultiplexerUpgrade<SecStream>>::Upgrade as UpgradeInfo>::InfoIter as IntoIterator>::IntoIter: Send,
202 <<MuxUpgrade as IntoMultiplexerUpgrade<SecStream>>::Upgrade as UpgradeInfo>::Info: Send,
203 {
204 self.without_tcp()
205 .without_quic()
206 .without_any_other_transports()
207 .without_dns()
208 .with_websocket(security_upgrade, multiplexer_upgrade)
209 .await
210 }
211 }
212 }
213}
214impl_tcp_phase_with_websocket!(
215 "async-std",
216 super::provider::AsyncStd,
217 rw_stream_sink::RwStreamSink<
218 libp2p_websocket::BytesConnection<libp2p_tcp::async_io::TcpStream>,
219 >
220);
221impl_tcp_phase_with_websocket!(
222 "tokio",
223 super::provider::Tokio,
224 rw_stream_sink::RwStreamSink<libp2p_websocket::BytesConnection<libp2p_tcp::tokio::TcpStream>>
225);