libp2p_swarm/behaviour/
toggle.rs

1// Copyright 2019 Parity Technologies (UK) Ltd.
2//
3// Permission is hereby granted, free of charge, to any person obtaining a
4// copy of this software and associated documentation files (the "Software"),
5// to deal in the Software without restriction, including without limitation
6// the rights to use, copy, modify, merge, publish, distribute, sublicense,
7// and/or sell copies of the Software, and to permit persons to whom the
8// Software is furnished to do so, subject to the following conditions:
9//
10// The above copyright notice and this permission notice shall be included in
11// all copies or substantial portions of the Software.
12//
13// THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS
14// OR IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
15// FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
16// AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
17// LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING
18// FROM, OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER
19// DEALINGS IN THE SOFTWARE.
20
21use crate::behaviour::FromSwarm;
22use crate::connection::ConnectionId;
23use crate::handler::{
24    AddressChange, ConnectionEvent, ConnectionHandler, ConnectionHandlerEvent, DialUpgradeError,
25    FullyNegotiatedInbound, FullyNegotiatedOutbound, KeepAlive, ListenUpgradeError,
26    SubstreamProtocol,
27};
28use crate::upgrade::SendWrapper;
29use crate::{
30    ConnectionDenied, NetworkBehaviour, PollParameters, THandler, THandlerInEvent,
31    THandlerOutEvent, ToSwarm,
32};
33use either::Either;
34use futures::future;
35use libp2p_core::{upgrade::DeniedUpgrade, Endpoint, Multiaddr};
36use libp2p_identity::PeerId;
37use std::{task::Context, task::Poll};
38
39/// Implementation of `NetworkBehaviour` that can be either in the disabled or enabled state.
40///
41/// The state can only be chosen at initialization.
42pub struct Toggle<TBehaviour> {
43    inner: Option<TBehaviour>,
44}
45
46impl<TBehaviour> Toggle<TBehaviour> {
47    /// Returns `true` if `Toggle` is enabled and `false` if it's disabled.
48    pub fn is_enabled(&self) -> bool {
49        self.inner.is_some()
50    }
51
52    /// Returns a reference to the inner `NetworkBehaviour`.
53    pub fn as_ref(&self) -> Option<&TBehaviour> {
54        self.inner.as_ref()
55    }
56
57    /// Returns a mutable reference to the inner `NetworkBehaviour`.
58    pub fn as_mut(&mut self) -> Option<&mut TBehaviour> {
59        self.inner.as_mut()
60    }
61}
62
63impl<TBehaviour> From<Option<TBehaviour>> for Toggle<TBehaviour> {
64    fn from(inner: Option<TBehaviour>) -> Self {
65        Toggle { inner }
66    }
67}
68
69impl<TBehaviour> NetworkBehaviour for Toggle<TBehaviour>
70where
71    TBehaviour: NetworkBehaviour,
72{
73    type ConnectionHandler = ToggleConnectionHandler<THandler<TBehaviour>>;
74    type ToSwarm = TBehaviour::ToSwarm;
75
76    fn handle_pending_inbound_connection(
77        &mut self,
78        connection_id: ConnectionId,
79        local_addr: &Multiaddr,
80        remote_addr: &Multiaddr,
81    ) -> Result<(), ConnectionDenied> {
82        let inner = match self.inner.as_mut() {
83            None => return Ok(()),
84            Some(inner) => inner,
85        };
86
87        inner.handle_pending_inbound_connection(connection_id, local_addr, remote_addr)?;
88
89        Ok(())
90    }
91
92    fn handle_established_inbound_connection(
93        &mut self,
94        connection_id: ConnectionId,
95        peer: PeerId,
96        local_addr: &Multiaddr,
97        remote_addr: &Multiaddr,
98    ) -> Result<THandler<Self>, ConnectionDenied> {
99        let inner = match self.inner.as_mut() {
100            None => return Ok(ToggleConnectionHandler { inner: None }),
101            Some(inner) => inner,
102        };
103
104        let handler = inner.handle_established_inbound_connection(
105            connection_id,
106            peer,
107            local_addr,
108            remote_addr,
109        )?;
110
111        Ok(ToggleConnectionHandler {
112            inner: Some(handler),
113        })
114    }
115
116    fn handle_pending_outbound_connection(
117        &mut self,
118        connection_id: ConnectionId,
119        maybe_peer: Option<PeerId>,
120        addresses: &[Multiaddr],
121        effective_role: Endpoint,
122    ) -> Result<Vec<Multiaddr>, ConnectionDenied> {
123        let inner = match self.inner.as_mut() {
124            None => return Ok(vec![]),
125            Some(inner) => inner,
126        };
127
128        let addresses = inner.handle_pending_outbound_connection(
129            connection_id,
130            maybe_peer,
131            addresses,
132            effective_role,
133        )?;
134
135        Ok(addresses)
136    }
137
138    fn handle_established_outbound_connection(
139        &mut self,
140        connection_id: ConnectionId,
141        peer: PeerId,
142        addr: &Multiaddr,
143        role_override: Endpoint,
144    ) -> Result<THandler<Self>, ConnectionDenied> {
145        let inner = match self.inner.as_mut() {
146            None => return Ok(ToggleConnectionHandler { inner: None }),
147            Some(inner) => inner,
148        };
149
150        let handler = inner.handle_established_outbound_connection(
151            connection_id,
152            peer,
153            addr,
154            role_override,
155        )?;
156
157        Ok(ToggleConnectionHandler {
158            inner: Some(handler),
159        })
160    }
161
162    fn on_swarm_event(&mut self, event: FromSwarm<Self::ConnectionHandler>) {
163        if let Some(behaviour) = &mut self.inner {
164            if let Some(event) = event.maybe_map_handler(|h| h.inner) {
165                behaviour.on_swarm_event(event);
166            }
167        }
168    }
169
170    fn on_connection_handler_event(
171        &mut self,
172        peer_id: PeerId,
173        connection_id: ConnectionId,
174        event: THandlerOutEvent<Self>,
175    ) {
176        if let Some(behaviour) = &mut self.inner {
177            behaviour.on_connection_handler_event(peer_id, connection_id, event)
178        }
179    }
180
181    fn poll(
182        &mut self,
183        cx: &mut Context<'_>,
184        params: &mut impl PollParameters,
185    ) -> Poll<ToSwarm<Self::ToSwarm, THandlerInEvent<Self>>> {
186        if let Some(inner) = self.inner.as_mut() {
187            inner.poll(cx, params)
188        } else {
189            Poll::Pending
190        }
191    }
192}
193
194/// Implementation of [`ConnectionHandler`] that can be in the disabled state.
195pub struct ToggleConnectionHandler<TInner> {
196    inner: Option<TInner>,
197}
198
199impl<TInner> ToggleConnectionHandler<TInner>
200where
201    TInner: ConnectionHandler,
202{
203    fn on_fully_negotiated_inbound(
204        &mut self,
205        FullyNegotiatedInbound {
206            protocol: out,
207            info,
208        }: FullyNegotiatedInbound<
209            <Self as ConnectionHandler>::InboundProtocol,
210            <Self as ConnectionHandler>::InboundOpenInfo,
211        >,
212    ) {
213        let out = match out {
214            future::Either::Left(out) => out,
215            future::Either::Right(v) => void::unreachable(v),
216        };
217
218        if let Either::Left(info) = info {
219            self.inner
220                .as_mut()
221                .expect("Can't receive an inbound substream if disabled; QED")
222                .on_connection_event(ConnectionEvent::FullyNegotiatedInbound(
223                    FullyNegotiatedInbound {
224                        protocol: out,
225                        info,
226                    },
227                ));
228        } else {
229            panic!("Unexpected Either::Right in enabled `on_fully_negotiated_inbound`.")
230        }
231    }
232
233    fn on_listen_upgrade_error(
234        &mut self,
235        ListenUpgradeError { info, error: err }: ListenUpgradeError<
236            <Self as ConnectionHandler>::InboundOpenInfo,
237            <Self as ConnectionHandler>::InboundProtocol,
238        >,
239    ) {
240        let (inner, info) = match (self.inner.as_mut(), info) {
241            (Some(inner), Either::Left(info)) => (inner, info),
242            // Ignore listen upgrade errors in disabled state.
243            (None, Either::Right(())) => return,
244            (Some(_), Either::Right(())) => panic!(
245                "Unexpected `Either::Right` inbound info through \
246                 `on_listen_upgrade_error` in enabled state.",
247            ),
248            (None, Either::Left(_)) => panic!(
249                "Unexpected `Either::Left` inbound info through \
250                 `on_listen_upgrade_error` in disabled state.",
251            ),
252        };
253
254        let err = match err {
255            Either::Left(e) => e,
256            Either::Right(v) => void::unreachable(v),
257        };
258
259        inner.on_connection_event(ConnectionEvent::ListenUpgradeError(ListenUpgradeError {
260            info,
261            error: err,
262        }));
263    }
264}
265
266impl<TInner> ConnectionHandler for ToggleConnectionHandler<TInner>
267where
268    TInner: ConnectionHandler,
269{
270    type FromBehaviour = TInner::FromBehaviour;
271    type ToBehaviour = TInner::ToBehaviour;
272    #[allow(deprecated)]
273    type Error = TInner::Error;
274    type InboundProtocol = Either<SendWrapper<TInner::InboundProtocol>, SendWrapper<DeniedUpgrade>>;
275    type OutboundProtocol = TInner::OutboundProtocol;
276    type OutboundOpenInfo = TInner::OutboundOpenInfo;
277    type InboundOpenInfo = Either<TInner::InboundOpenInfo, ()>;
278
279    fn listen_protocol(&self) -> SubstreamProtocol<Self::InboundProtocol, Self::InboundOpenInfo> {
280        if let Some(inner) = self.inner.as_ref() {
281            inner
282                .listen_protocol()
283                .map_upgrade(|u| Either::Left(SendWrapper(u)))
284                .map_info(Either::Left)
285        } else {
286            SubstreamProtocol::new(Either::Right(SendWrapper(DeniedUpgrade)), Either::Right(()))
287        }
288    }
289
290    fn on_behaviour_event(&mut self, event: Self::FromBehaviour) {
291        self.inner
292            .as_mut()
293            .expect("Can't receive events if disabled; QED")
294            .on_behaviour_event(event)
295    }
296
297    fn connection_keep_alive(&self) -> KeepAlive {
298        self.inner
299            .as_ref()
300            .map(|h| h.connection_keep_alive())
301            .unwrap_or(KeepAlive::No)
302    }
303
304    #[allow(deprecated)]
305    fn poll(
306        &mut self,
307        cx: &mut Context<'_>,
308    ) -> Poll<
309        ConnectionHandlerEvent<
310            Self::OutboundProtocol,
311            Self::OutboundOpenInfo,
312            Self::ToBehaviour,
313            Self::Error,
314        >,
315    > {
316        if let Some(inner) = self.inner.as_mut() {
317            inner.poll(cx)
318        } else {
319            Poll::Pending
320        }
321    }
322
323    fn on_connection_event(
324        &mut self,
325        event: ConnectionEvent<
326            Self::InboundProtocol,
327            Self::OutboundProtocol,
328            Self::InboundOpenInfo,
329            Self::OutboundOpenInfo,
330        >,
331    ) {
332        match event {
333            ConnectionEvent::FullyNegotiatedInbound(fully_negotiated_inbound) => {
334                self.on_fully_negotiated_inbound(fully_negotiated_inbound)
335            }
336            ConnectionEvent::FullyNegotiatedOutbound(FullyNegotiatedOutbound {
337                protocol: out,
338                info,
339            }) => self
340                .inner
341                .as_mut()
342                .expect("Can't receive an outbound substream if disabled; QED")
343                .on_connection_event(ConnectionEvent::FullyNegotiatedOutbound(
344                    FullyNegotiatedOutbound {
345                        protocol: out,
346                        info,
347                    },
348                )),
349            ConnectionEvent::AddressChange(address_change) => {
350                if let Some(inner) = self.inner.as_mut() {
351                    inner.on_connection_event(ConnectionEvent::AddressChange(AddressChange {
352                        new_address: address_change.new_address,
353                    }));
354                }
355            }
356            ConnectionEvent::DialUpgradeError(DialUpgradeError { info, error: err }) => self
357                .inner
358                .as_mut()
359                .expect("Can't receive an outbound substream if disabled; QED")
360                .on_connection_event(ConnectionEvent::DialUpgradeError(DialUpgradeError {
361                    info,
362                    error: err,
363                })),
364            ConnectionEvent::ListenUpgradeError(listen_upgrade_error) => {
365                self.on_listen_upgrade_error(listen_upgrade_error)
366            }
367            ConnectionEvent::LocalProtocolsChange(change) => {
368                if let Some(inner) = self.inner.as_mut() {
369                    inner.on_connection_event(ConnectionEvent::LocalProtocolsChange(change));
370                }
371            }
372            ConnectionEvent::RemoteProtocolsChange(change) => {
373                if let Some(inner) = self.inner.as_mut() {
374                    inner.on_connection_event(ConnectionEvent::RemoteProtocolsChange(change));
375                }
376            }
377        }
378    }
379}