libp2p/builder/phase/
websocket.rs

1use super::*;
2use crate::SwarmBuilder;
3#[cfg(all(not(target_arch = "wasm32"), feature = "websocket"))]
4use libp2p_core::muxing::{StreamMuxer, StreamMuxerBox};
5#[cfg(all(not(target_arch = "wasm32"), feature = "websocket"))]
6use libp2p_core::Transport;
7#[cfg(any(
8    all(not(target_arch = "wasm32"), feature = "websocket"),
9    feature = "relay"
10))]
11use libp2p_core::{InboundUpgrade, Negotiated, OutboundUpgrade, UpgradeInfo};
12#[cfg(any(
13    all(not(target_arch = "wasm32"), feature = "websocket"),
14    feature = "relay"
15))]
16use libp2p_identity::PeerId;
17use std::marker::PhantomData;
18
19pub struct WebsocketPhase<T> {
20    pub(crate) transport: T,
21}
22
23macro_rules! impl_websocket_builder {
24    ($providerKebabCase:literal, $providerPascalCase:ty, $dnsTcp:expr, $websocketStream:ty) => {
25        /// Adds a websocket client transport.
26        ///
27        /// Note that both `security_upgrade` and `multiplexer_upgrade` take function pointers,
28        /// i.e. they take the function themselves (without the invocation via `()`), not the
29        /// result of the function invocation. See example below.
30        ///
31        /// ``` rust
32        /// # use libp2p::SwarmBuilder;
33        /// # use std::error::Error;
34        /// # async fn build_swarm() -> Result<(), Box<dyn Error>> {
35        /// let swarm = SwarmBuilder::with_new_identity()
36        ///     .with_tokio()
37        ///     .with_websocket(
38        ///         (libp2p_tls::Config::new, libp2p_noise::Config::new),
39        ///         libp2p_yamux::Config::default,
40        ///     )
41        ///     .await?
42        /// # ;
43        /// # Ok(())
44        /// # }
45        /// ```
46        #[cfg(all(not(target_arch = "wasm32"), feature = $providerKebabCase, feature = "websocket"))]
47        impl<T> SwarmBuilder<$providerPascalCase, WebsocketPhase<T>> {
48            pub async fn with_websocket<
49                SecUpgrade,
50                SecStream,
51                SecError,
52                MuxUpgrade,
53                MuxStream,
54                MuxError,
55            >(
56                self,
57                security_upgrade: SecUpgrade,
58                multiplexer_upgrade: MuxUpgrade,
59            ) -> Result<
60                SwarmBuilder<
61                    $providerPascalCase,
62                    RelayPhase<impl AuthenticatedMultiplexedTransport>,
63                >,
64                WebsocketError<SecUpgrade::Error>,
65            >
66
67            where
68                T: AuthenticatedMultiplexedTransport,
69
70                SecStream: futures::AsyncRead + futures::AsyncWrite + Unpin + Send + 'static,
71                SecError: std::error::Error + Send + Sync + 'static,
72                SecUpgrade: IntoSecurityUpgrade<$websocketStream>,
73                SecUpgrade::Upgrade: InboundUpgrade<Negotiated<$websocketStream>, Output = (PeerId, SecStream), Error = SecError> + OutboundUpgrade<Negotiated<$websocketStream>, Output = (PeerId, SecStream), Error = SecError> + Clone + Send + 'static,
74                <SecUpgrade::Upgrade as InboundUpgrade<Negotiated<$websocketStream>>>::Future: Send,
75                <SecUpgrade::Upgrade as OutboundUpgrade<Negotiated<$websocketStream>>>::Future: Send,
76                <<<SecUpgrade as IntoSecurityUpgrade<$websocketStream>>::Upgrade as UpgradeInfo>::InfoIter as IntoIterator>::IntoIter: Send,
77                <<SecUpgrade as IntoSecurityUpgrade<$websocketStream>>::Upgrade as UpgradeInfo>::Info: Send,
78
79                MuxStream: StreamMuxer + Send + 'static,
80                MuxStream::Substream: Send + 'static,
81                MuxStream::Error: Send + Sync + 'static,
82                MuxUpgrade: IntoMultiplexerUpgrade<SecStream>,
83                MuxUpgrade::Upgrade: InboundUpgrade<Negotiated<SecStream>, Output = MuxStream, Error = MuxError> + OutboundUpgrade<Negotiated<SecStream>, Output = MuxStream, Error = MuxError> + Clone + Send + 'static,
84                <MuxUpgrade::Upgrade as InboundUpgrade<Negotiated<SecStream>>>::Future: Send,
85                <MuxUpgrade::Upgrade as OutboundUpgrade<Negotiated<SecStream>>>::Future: Send,
86                MuxError: std::error::Error + Send + Sync + 'static,
87                <<<MuxUpgrade as IntoMultiplexerUpgrade<SecStream>>::Upgrade as UpgradeInfo>::InfoIter as IntoIterator>::IntoIter: Send,
88                <<MuxUpgrade as IntoMultiplexerUpgrade<SecStream>>::Upgrade as UpgradeInfo>::Info: Send,
89
90            {
91                let security_upgrade = security_upgrade.into_security_upgrade(&self.keypair)
92                    .map_err(WebsocketErrorInner::SecurityUpgrade)?;
93                let websocket_transport = libp2p_websocket::WsConfig::new(
94                    $dnsTcp.await.map_err(WebsocketErrorInner::Dns)?,
95                )
96                    .upgrade(libp2p_core::upgrade::Version::V1Lazy)
97                    .authenticate(security_upgrade)
98                    .multiplex(multiplexer_upgrade.into_multiplexer_upgrade())
99                    .map(|(p, c), _| (p, StreamMuxerBox::new(c)));
100
101                Ok(SwarmBuilder {
102                    keypair: self.keypair,
103                    phantom: PhantomData,
104                    phase: RelayPhase {
105                        transport: websocket_transport
106                            .or_transport(self.phase.transport)
107                            .map(|either, _| either.into_inner()),
108                    },
109                })
110            }
111        }
112    };
113}
114
115impl_websocket_builder!(
116    "async-std",
117    super::provider::AsyncStd,
118    libp2p_dns::async_std::Transport::system(libp2p_tcp::async_io::Transport::new(
119        libp2p_tcp::Config::default(),
120    )),
121    rw_stream_sink::RwStreamSink<
122        libp2p_websocket::BytesConnection<libp2p_tcp::async_io::TcpStream>,
123    >
124);
125impl_websocket_builder!(
126    "tokio",
127    super::provider::Tokio,
128    // Note this is an unnecessary await for Tokio Websocket (i.e. tokio dns) in order to be consistent
129    // with above AsyncStd construction.
130    futures::future::ready(libp2p_dns::tokio::Transport::system(
131        libp2p_tcp::tokio::Transport::new(libp2p_tcp::Config::default())
132    )),
133    rw_stream_sink::RwStreamSink<libp2p_websocket::BytesConnection<libp2p_tcp::tokio::TcpStream>>
134);
135
136impl<Provider, T: AuthenticatedMultiplexedTransport> SwarmBuilder<Provider, WebsocketPhase<T>> {
137    pub(crate) fn without_websocket(self) -> SwarmBuilder<Provider, RelayPhase<T>> {
138        SwarmBuilder {
139            keypair: self.keypair,
140            phantom: PhantomData,
141            phase: RelayPhase {
142                transport: self.phase.transport,
143            },
144        }
145    }
146}
147
148// Shortcuts
149#[cfg(feature = "relay")]
150impl<T: AuthenticatedMultiplexedTransport, Provider> SwarmBuilder<Provider, WebsocketPhase<T>> {
151    /// See [`SwarmBuilder::with_relay_client`].
152    pub fn with_relay_client<SecUpgrade, SecStream, SecError, MuxUpgrade, MuxStream, MuxError>(
153        self,
154        security_upgrade: SecUpgrade,
155        multiplexer_upgrade: MuxUpgrade,
156    ) -> Result<
157        SwarmBuilder<
158            Provider,
159            BandwidthLoggingPhase<impl AuthenticatedMultiplexedTransport, libp2p_relay::client::Behaviour>,
160        >,
161        SecUpgrade::Error,
162        > where
163
164        SecStream: futures::AsyncRead + futures::AsyncWrite + Unpin + Send + 'static,
165        SecError: std::error::Error + Send + Sync + 'static,
166        SecUpgrade: IntoSecurityUpgrade<libp2p_relay::client::Connection>,
167        SecUpgrade::Upgrade: InboundUpgrade<Negotiated<libp2p_relay::client::Connection>, Output = (PeerId, SecStream), Error = SecError> + OutboundUpgrade<Negotiated<libp2p_relay::client::Connection>, Output = (PeerId, SecStream), Error = SecError> + Clone + Send + 'static,
168    <SecUpgrade::Upgrade as InboundUpgrade<Negotiated<libp2p_relay::client::Connection>>>::Future: Send,
169    <SecUpgrade::Upgrade as OutboundUpgrade<Negotiated<libp2p_relay::client::Connection>>>::Future: Send,
170    <<<SecUpgrade as IntoSecurityUpgrade<libp2p_relay::client::Connection>>::Upgrade as UpgradeInfo>::InfoIter as IntoIterator>::IntoIter: Send,
171    <<SecUpgrade as IntoSecurityUpgrade<libp2p_relay::client::Connection>>::Upgrade as UpgradeInfo>::Info: Send,
172
173        MuxStream: libp2p_core::muxing::StreamMuxer + Send + 'static,
174        MuxStream::Substream: Send + 'static,
175        MuxStream::Error: Send + Sync + 'static,
176        MuxUpgrade: IntoMultiplexerUpgrade<SecStream>,
177        MuxUpgrade::Upgrade: InboundUpgrade<Negotiated<SecStream>, Output = MuxStream, Error = MuxError> + OutboundUpgrade<Negotiated<SecStream>, Output = MuxStream, Error = MuxError> + Clone + Send + 'static,
178    <MuxUpgrade::Upgrade as InboundUpgrade<Negotiated<SecStream>>>::Future: Send,
179    <MuxUpgrade::Upgrade as OutboundUpgrade<Negotiated<SecStream>>>::Future: Send,
180        MuxError: std::error::Error + Send + Sync + 'static,
181    <<<MuxUpgrade as IntoMultiplexerUpgrade<SecStream>>::Upgrade as UpgradeInfo>::InfoIter as IntoIterator>::IntoIter: Send,
182    <<MuxUpgrade as IntoMultiplexerUpgrade<SecStream>>::Upgrade as UpgradeInfo>::Info: Send,
183    {
184        self.without_websocket()
185            .with_relay_client(security_upgrade, multiplexer_upgrade)
186    }
187}
188impl<Provider, T: AuthenticatedMultiplexedTransport> SwarmBuilder<Provider, WebsocketPhase<T>> {
189    pub fn with_behaviour<B, R: TryIntoBehaviour<B>>(
190        self,
191        constructor: impl FnOnce(&libp2p_identity::Keypair) -> R,
192    ) -> Result<SwarmBuilder<Provider, SwarmPhase<T, B>>, R::Error> {
193        self.without_websocket()
194            .without_relay()
195            .without_bandwidth_logging()
196            .with_behaviour(constructor)
197    }
198}
199
200#[derive(Debug, thiserror::Error)]
201#[error(transparent)]
202#[cfg(all(not(target_arch = "wasm32"), feature = "websocket"))]
203pub struct WebsocketError<Sec>(#[from] WebsocketErrorInner<Sec>);
204
205#[derive(Debug, thiserror::Error)]
206#[cfg(all(not(target_arch = "wasm32"), feature = "websocket"))]
207enum WebsocketErrorInner<Sec> {
208    #[error("SecurityUpgrade")]
209    SecurityUpgrade(Sec),
210    #[cfg(feature = "dns")]
211    #[error("Dns")]
212    Dns(#[from] std::io::Error),
213}