1use super::*;
2use crate::SwarmBuilder;
3#[cfg(all(not(target_arch = "wasm32"), feature = "websocket"))]
4use libp2p_core::muxing::{StreamMuxer, StreamMuxerBox};
5#[cfg(all(not(target_arch = "wasm32"), feature = "websocket"))]
6use libp2p_core::Transport;
7#[cfg(any(
8 all(not(target_arch = "wasm32"), feature = "websocket"),
9 feature = "relay"
10))]
11use libp2p_core::{InboundUpgrade, Negotiated, OutboundUpgrade, UpgradeInfo};
12#[cfg(any(
13 all(not(target_arch = "wasm32"), feature = "websocket"),
14 feature = "relay"
15))]
16use libp2p_identity::PeerId;
17use std::marker::PhantomData;
18
19pub struct WebsocketPhase<T> {
20 pub(crate) transport: T,
21}
22
23macro_rules! impl_websocket_builder {
24 ($providerKebabCase:literal, $providerPascalCase:ty, $dnsTcp:expr, $websocketStream:ty) => {
25 #[cfg(all(not(target_arch = "wasm32"), feature = $providerKebabCase, feature = "websocket"))]
47 impl<T> SwarmBuilder<$providerPascalCase, WebsocketPhase<T>> {
48 pub async fn with_websocket<
49 SecUpgrade,
50 SecStream,
51 SecError,
52 MuxUpgrade,
53 MuxStream,
54 MuxError,
55 >(
56 self,
57 security_upgrade: SecUpgrade,
58 multiplexer_upgrade: MuxUpgrade,
59 ) -> Result<
60 SwarmBuilder<
61 $providerPascalCase,
62 RelayPhase<impl AuthenticatedMultiplexedTransport>,
63 >,
64 WebsocketError<SecUpgrade::Error>,
65 >
66
67 where
68 T: AuthenticatedMultiplexedTransport,
69
70 SecStream: futures::AsyncRead + futures::AsyncWrite + Unpin + Send + 'static,
71 SecError: std::error::Error + Send + Sync + 'static,
72 SecUpgrade: IntoSecurityUpgrade<$websocketStream>,
73 SecUpgrade::Upgrade: InboundUpgrade<Negotiated<$websocketStream>, Output = (PeerId, SecStream), Error = SecError> + OutboundUpgrade<Negotiated<$websocketStream>, Output = (PeerId, SecStream), Error = SecError> + Clone + Send + 'static,
74 <SecUpgrade::Upgrade as InboundUpgrade<Negotiated<$websocketStream>>>::Future: Send,
75 <SecUpgrade::Upgrade as OutboundUpgrade<Negotiated<$websocketStream>>>::Future: Send,
76 <<<SecUpgrade as IntoSecurityUpgrade<$websocketStream>>::Upgrade as UpgradeInfo>::InfoIter as IntoIterator>::IntoIter: Send,
77 <<SecUpgrade as IntoSecurityUpgrade<$websocketStream>>::Upgrade as UpgradeInfo>::Info: Send,
78
79 MuxStream: StreamMuxer + Send + 'static,
80 MuxStream::Substream: Send + 'static,
81 MuxStream::Error: Send + Sync + 'static,
82 MuxUpgrade: IntoMultiplexerUpgrade<SecStream>,
83 MuxUpgrade::Upgrade: InboundUpgrade<Negotiated<SecStream>, Output = MuxStream, Error = MuxError> + OutboundUpgrade<Negotiated<SecStream>, Output = MuxStream, Error = MuxError> + Clone + Send + 'static,
84 <MuxUpgrade::Upgrade as InboundUpgrade<Negotiated<SecStream>>>::Future: Send,
85 <MuxUpgrade::Upgrade as OutboundUpgrade<Negotiated<SecStream>>>::Future: Send,
86 MuxError: std::error::Error + Send + Sync + 'static,
87 <<<MuxUpgrade as IntoMultiplexerUpgrade<SecStream>>::Upgrade as UpgradeInfo>::InfoIter as IntoIterator>::IntoIter: Send,
88 <<MuxUpgrade as IntoMultiplexerUpgrade<SecStream>>::Upgrade as UpgradeInfo>::Info: Send,
89
90 {
91 let security_upgrade = security_upgrade.into_security_upgrade(&self.keypair)
92 .map_err(WebsocketErrorInner::SecurityUpgrade)?;
93 let websocket_transport = libp2p_websocket::WsConfig::new(
94 $dnsTcp.await.map_err(WebsocketErrorInner::Dns)?,
95 )
96 .upgrade(libp2p_core::upgrade::Version::V1Lazy)
97 .authenticate(security_upgrade)
98 .multiplex(multiplexer_upgrade.into_multiplexer_upgrade())
99 .map(|(p, c), _| (p, StreamMuxerBox::new(c)));
100
101 Ok(SwarmBuilder {
102 keypair: self.keypair,
103 phantom: PhantomData,
104 phase: RelayPhase {
105 transport: websocket_transport
106 .or_transport(self.phase.transport)
107 .map(|either, _| either.into_inner()),
108 },
109 })
110 }
111 }
112 };
113}
114
115impl_websocket_builder!(
116 "async-std",
117 super::provider::AsyncStd,
118 libp2p_dns::async_std::Transport::system(libp2p_tcp::async_io::Transport::new(
119 libp2p_tcp::Config::default(),
120 )),
121 rw_stream_sink::RwStreamSink<
122 libp2p_websocket::BytesConnection<libp2p_tcp::async_io::TcpStream>,
123 >
124);
125impl_websocket_builder!(
126 "tokio",
127 super::provider::Tokio,
128 futures::future::ready(libp2p_dns::tokio::Transport::system(
131 libp2p_tcp::tokio::Transport::new(libp2p_tcp::Config::default())
132 )),
133 rw_stream_sink::RwStreamSink<libp2p_websocket::BytesConnection<libp2p_tcp::tokio::TcpStream>>
134);
135
136impl<Provider, T: AuthenticatedMultiplexedTransport> SwarmBuilder<Provider, WebsocketPhase<T>> {
137 pub(crate) fn without_websocket(self) -> SwarmBuilder<Provider, RelayPhase<T>> {
138 SwarmBuilder {
139 keypair: self.keypair,
140 phantom: PhantomData,
141 phase: RelayPhase {
142 transport: self.phase.transport,
143 },
144 }
145 }
146}
147
148#[cfg(feature = "relay")]
150impl<T: AuthenticatedMultiplexedTransport, Provider> SwarmBuilder<Provider, WebsocketPhase<T>> {
151 pub fn with_relay_client<SecUpgrade, SecStream, SecError, MuxUpgrade, MuxStream, MuxError>(
153 self,
154 security_upgrade: SecUpgrade,
155 multiplexer_upgrade: MuxUpgrade,
156 ) -> Result<
157 SwarmBuilder<
158 Provider,
159 BandwidthLoggingPhase<impl AuthenticatedMultiplexedTransport, libp2p_relay::client::Behaviour>,
160 >,
161 SecUpgrade::Error,
162 > where
163
164 SecStream: futures::AsyncRead + futures::AsyncWrite + Unpin + Send + 'static,
165 SecError: std::error::Error + Send + Sync + 'static,
166 SecUpgrade: IntoSecurityUpgrade<libp2p_relay::client::Connection>,
167 SecUpgrade::Upgrade: InboundUpgrade<Negotiated<libp2p_relay::client::Connection>, Output = (PeerId, SecStream), Error = SecError> + OutboundUpgrade<Negotiated<libp2p_relay::client::Connection>, Output = (PeerId, SecStream), Error = SecError> + Clone + Send + 'static,
168 <SecUpgrade::Upgrade as InboundUpgrade<Negotiated<libp2p_relay::client::Connection>>>::Future: Send,
169 <SecUpgrade::Upgrade as OutboundUpgrade<Negotiated<libp2p_relay::client::Connection>>>::Future: Send,
170 <<<SecUpgrade as IntoSecurityUpgrade<libp2p_relay::client::Connection>>::Upgrade as UpgradeInfo>::InfoIter as IntoIterator>::IntoIter: Send,
171 <<SecUpgrade as IntoSecurityUpgrade<libp2p_relay::client::Connection>>::Upgrade as UpgradeInfo>::Info: Send,
172
173 MuxStream: libp2p_core::muxing::StreamMuxer + Send + 'static,
174 MuxStream::Substream: Send + 'static,
175 MuxStream::Error: Send + Sync + 'static,
176 MuxUpgrade: IntoMultiplexerUpgrade<SecStream>,
177 MuxUpgrade::Upgrade: InboundUpgrade<Negotiated<SecStream>, Output = MuxStream, Error = MuxError> + OutboundUpgrade<Negotiated<SecStream>, Output = MuxStream, Error = MuxError> + Clone + Send + 'static,
178 <MuxUpgrade::Upgrade as InboundUpgrade<Negotiated<SecStream>>>::Future: Send,
179 <MuxUpgrade::Upgrade as OutboundUpgrade<Negotiated<SecStream>>>::Future: Send,
180 MuxError: std::error::Error + Send + Sync + 'static,
181 <<<MuxUpgrade as IntoMultiplexerUpgrade<SecStream>>::Upgrade as UpgradeInfo>::InfoIter as IntoIterator>::IntoIter: Send,
182 <<MuxUpgrade as IntoMultiplexerUpgrade<SecStream>>::Upgrade as UpgradeInfo>::Info: Send,
183 {
184 self.without_websocket()
185 .with_relay_client(security_upgrade, multiplexer_upgrade)
186 }
187}
188impl<Provider, T: AuthenticatedMultiplexedTransport> SwarmBuilder<Provider, WebsocketPhase<T>> {
189 pub fn with_behaviour<B, R: TryIntoBehaviour<B>>(
190 self,
191 constructor: impl FnOnce(&libp2p_identity::Keypair) -> R,
192 ) -> Result<SwarmBuilder<Provider, SwarmPhase<T, B>>, R::Error> {
193 self.without_websocket()
194 .without_relay()
195 .without_bandwidth_logging()
196 .with_behaviour(constructor)
197 }
198}
199
200#[derive(Debug, thiserror::Error)]
201#[error(transparent)]
202#[cfg(all(not(target_arch = "wasm32"), feature = "websocket"))]
203pub struct WebsocketError<Sec>(#[from] WebsocketErrorInner<Sec>);
204
205#[derive(Debug, thiserror::Error)]
206#[cfg(all(not(target_arch = "wasm32"), feature = "websocket"))]
207enum WebsocketErrorInner<Sec> {
208 #[error("SecurityUpgrade")]
209 SecurityUpgrade(Sec),
210 #[cfg(feature = "dns")]
211 #[error("Dns")]
212 Dns(#[from] std::io::Error),
213}