libp2p_swarm/behaviour/
toggle.rs

1// Copyright 2019 Parity Technologies (UK) Ltd.
2//
3// Permission is hereby granted, free of charge, to any person obtaining a
4// copy of this software and associated documentation files (the "Software"),
5// to deal in the Software without restriction, including without limitation
6// the rights to use, copy, modify, merge, publish, distribute, sublicense,
7// and/or sell copies of the Software, and to permit persons to whom the
8// Software is furnished to do so, subject to the following conditions:
9//
10// The above copyright notice and this permission notice shall be included in
11// all copies or substantial portions of the Software.
12//
13// THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS
14// OR IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
15// FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
16// AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
17// LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING
18// FROM, OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER
19// DEALINGS IN THE SOFTWARE.
20
21use crate::behaviour::FromSwarm;
22use crate::connection::ConnectionId;
23use crate::handler::{
24    AddressChange, ConnectionEvent, ConnectionHandler, ConnectionHandlerEvent, DialUpgradeError,
25    FullyNegotiatedInbound, FullyNegotiatedOutbound, ListenUpgradeError, SubstreamProtocol,
26};
27use crate::upgrade::SendWrapper;
28use crate::{
29    ConnectionDenied, NetworkBehaviour, THandler, THandlerInEvent, THandlerOutEvent, ToSwarm,
30};
31use either::Either;
32use futures::future;
33use libp2p_core::transport::PortUse;
34use libp2p_core::{upgrade::DeniedUpgrade, Endpoint, Multiaddr};
35use libp2p_identity::PeerId;
36use std::{task::Context, task::Poll};
37
38/// Implementation of `NetworkBehaviour` that can be either in the disabled or enabled state.
39///
40/// The state can only be chosen at initialization.
41pub struct Toggle<TBehaviour> {
42    inner: Option<TBehaviour>,
43}
44
45impl<TBehaviour> Toggle<TBehaviour> {
46    /// Returns `true` if `Toggle` is enabled and `false` if it's disabled.
47    pub fn is_enabled(&self) -> bool {
48        self.inner.is_some()
49    }
50
51    /// Returns a reference to the inner `NetworkBehaviour`.
52    pub fn as_ref(&self) -> Option<&TBehaviour> {
53        self.inner.as_ref()
54    }
55
56    /// Returns a mutable reference to the inner `NetworkBehaviour`.
57    pub fn as_mut(&mut self) -> Option<&mut TBehaviour> {
58        self.inner.as_mut()
59    }
60}
61
62impl<TBehaviour> From<Option<TBehaviour>> for Toggle<TBehaviour> {
63    fn from(inner: Option<TBehaviour>) -> Self {
64        Toggle { inner }
65    }
66}
67
68impl<TBehaviour> NetworkBehaviour for Toggle<TBehaviour>
69where
70    TBehaviour: NetworkBehaviour,
71{
72    type ConnectionHandler = ToggleConnectionHandler<THandler<TBehaviour>>;
73    type ToSwarm = TBehaviour::ToSwarm;
74
75    fn handle_pending_inbound_connection(
76        &mut self,
77        connection_id: ConnectionId,
78        local_addr: &Multiaddr,
79        remote_addr: &Multiaddr,
80    ) -> Result<(), ConnectionDenied> {
81        let inner = match self.inner.as_mut() {
82            None => return Ok(()),
83            Some(inner) => inner,
84        };
85
86        inner.handle_pending_inbound_connection(connection_id, local_addr, remote_addr)?;
87
88        Ok(())
89    }
90
91    fn handle_established_inbound_connection(
92        &mut self,
93        connection_id: ConnectionId,
94        peer: PeerId,
95        local_addr: &Multiaddr,
96        remote_addr: &Multiaddr,
97    ) -> Result<THandler<Self>, ConnectionDenied> {
98        let inner = match self.inner.as_mut() {
99            None => return Ok(ToggleConnectionHandler { inner: None }),
100            Some(inner) => inner,
101        };
102
103        let handler = inner.handle_established_inbound_connection(
104            connection_id,
105            peer,
106            local_addr,
107            remote_addr,
108        )?;
109
110        Ok(ToggleConnectionHandler {
111            inner: Some(handler),
112        })
113    }
114
115    fn handle_pending_outbound_connection(
116        &mut self,
117        connection_id: ConnectionId,
118        maybe_peer: Option<PeerId>,
119        addresses: &[Multiaddr],
120        effective_role: Endpoint,
121    ) -> Result<Vec<Multiaddr>, ConnectionDenied> {
122        let inner = match self.inner.as_mut() {
123            None => return Ok(vec![]),
124            Some(inner) => inner,
125        };
126
127        let addresses = inner.handle_pending_outbound_connection(
128            connection_id,
129            maybe_peer,
130            addresses,
131            effective_role,
132        )?;
133
134        Ok(addresses)
135    }
136
137    fn handle_established_outbound_connection(
138        &mut self,
139        connection_id: ConnectionId,
140        peer: PeerId,
141        addr: &Multiaddr,
142        role_override: Endpoint,
143        port_use: PortUse,
144    ) -> Result<THandler<Self>, ConnectionDenied> {
145        let inner = match self.inner.as_mut() {
146            None => return Ok(ToggleConnectionHandler { inner: None }),
147            Some(inner) => inner,
148        };
149
150        let handler = inner.handle_established_outbound_connection(
151            connection_id,
152            peer,
153            addr,
154            role_override,
155            port_use,
156        )?;
157
158        Ok(ToggleConnectionHandler {
159            inner: Some(handler),
160        })
161    }
162
163    fn on_swarm_event(&mut self, event: FromSwarm) {
164        if let Some(behaviour) = &mut self.inner {
165            behaviour.on_swarm_event(event);
166        }
167    }
168
169    fn on_connection_handler_event(
170        &mut self,
171        peer_id: PeerId,
172        connection_id: ConnectionId,
173        event: THandlerOutEvent<Self>,
174    ) {
175        if let Some(behaviour) = &mut self.inner {
176            behaviour.on_connection_handler_event(peer_id, connection_id, event)
177        }
178    }
179
180    fn poll(
181        &mut self,
182        cx: &mut Context<'_>,
183    ) -> Poll<ToSwarm<Self::ToSwarm, THandlerInEvent<Self>>> {
184        if let Some(inner) = self.inner.as_mut() {
185            inner.poll(cx)
186        } else {
187            Poll::Pending
188        }
189    }
190}
191
192/// Implementation of [`ConnectionHandler`] that can be in the disabled state.
193pub struct ToggleConnectionHandler<TInner> {
194    inner: Option<TInner>,
195}
196
197impl<TInner> ToggleConnectionHandler<TInner>
198where
199    TInner: ConnectionHandler,
200{
201    fn on_fully_negotiated_inbound(
202        &mut self,
203        FullyNegotiatedInbound {
204            protocol: out,
205            info,
206        }: FullyNegotiatedInbound<
207            <Self as ConnectionHandler>::InboundProtocol,
208            <Self as ConnectionHandler>::InboundOpenInfo,
209        >,
210    ) {
211        let out = match out {
212            future::Either::Left(out) => out,
213            future::Either::Right(v) => void::unreachable(v),
214        };
215
216        if let Either::Left(info) = info {
217            self.inner
218                .as_mut()
219                .expect("Can't receive an inbound substream if disabled; QED")
220                .on_connection_event(ConnectionEvent::FullyNegotiatedInbound(
221                    FullyNegotiatedInbound {
222                        protocol: out,
223                        info,
224                    },
225                ));
226        } else {
227            panic!("Unexpected Either::Right in enabled `on_fully_negotiated_inbound`.")
228        }
229    }
230
231    fn on_listen_upgrade_error(
232        &mut self,
233        ListenUpgradeError { info, error: err }: ListenUpgradeError<
234            <Self as ConnectionHandler>::InboundOpenInfo,
235            <Self as ConnectionHandler>::InboundProtocol,
236        >,
237    ) {
238        let (inner, info) = match (self.inner.as_mut(), info) {
239            (Some(inner), Either::Left(info)) => (inner, info),
240            // Ignore listen upgrade errors in disabled state.
241            (None, Either::Right(())) => return,
242            (Some(_), Either::Right(())) => panic!(
243                "Unexpected `Either::Right` inbound info through \
244                 `on_listen_upgrade_error` in enabled state.",
245            ),
246            (None, Either::Left(_)) => panic!(
247                "Unexpected `Either::Left` inbound info through \
248                 `on_listen_upgrade_error` in disabled state.",
249            ),
250        };
251
252        let err = match err {
253            Either::Left(e) => e,
254            Either::Right(v) => void::unreachable(v),
255        };
256
257        inner.on_connection_event(ConnectionEvent::ListenUpgradeError(ListenUpgradeError {
258            info,
259            error: err,
260        }));
261    }
262}
263
264impl<TInner> ConnectionHandler for ToggleConnectionHandler<TInner>
265where
266    TInner: ConnectionHandler,
267{
268    type FromBehaviour = TInner::FromBehaviour;
269    type ToBehaviour = TInner::ToBehaviour;
270    type InboundProtocol = Either<SendWrapper<TInner::InboundProtocol>, SendWrapper<DeniedUpgrade>>;
271    type OutboundProtocol = TInner::OutboundProtocol;
272    type OutboundOpenInfo = TInner::OutboundOpenInfo;
273    type InboundOpenInfo = Either<TInner::InboundOpenInfo, ()>;
274
275    fn listen_protocol(&self) -> SubstreamProtocol<Self::InboundProtocol, Self::InboundOpenInfo> {
276        if let Some(inner) = self.inner.as_ref() {
277            inner
278                .listen_protocol()
279                .map_upgrade(|u| Either::Left(SendWrapper(u)))
280                .map_info(Either::Left)
281        } else {
282            SubstreamProtocol::new(Either::Right(SendWrapper(DeniedUpgrade)), Either::Right(()))
283        }
284    }
285
286    fn on_behaviour_event(&mut self, event: Self::FromBehaviour) {
287        self.inner
288            .as_mut()
289            .expect("Can't receive events if disabled; QED")
290            .on_behaviour_event(event)
291    }
292
293    fn connection_keep_alive(&self) -> bool {
294        self.inner
295            .as_ref()
296            .map(|h| h.connection_keep_alive())
297            .unwrap_or(false)
298    }
299
300    fn poll(
301        &mut self,
302        cx: &mut Context<'_>,
303    ) -> Poll<
304        ConnectionHandlerEvent<Self::OutboundProtocol, Self::OutboundOpenInfo, Self::ToBehaviour>,
305    > {
306        if let Some(inner) = self.inner.as_mut() {
307            inner.poll(cx)
308        } else {
309            Poll::Pending
310        }
311    }
312
313    fn on_connection_event(
314        &mut self,
315        event: ConnectionEvent<
316            Self::InboundProtocol,
317            Self::OutboundProtocol,
318            Self::InboundOpenInfo,
319            Self::OutboundOpenInfo,
320        >,
321    ) {
322        match event {
323            ConnectionEvent::FullyNegotiatedInbound(fully_negotiated_inbound) => {
324                self.on_fully_negotiated_inbound(fully_negotiated_inbound)
325            }
326            ConnectionEvent::FullyNegotiatedOutbound(FullyNegotiatedOutbound {
327                protocol: out,
328                info,
329            }) => self
330                .inner
331                .as_mut()
332                .expect("Can't receive an outbound substream if disabled; QED")
333                .on_connection_event(ConnectionEvent::FullyNegotiatedOutbound(
334                    FullyNegotiatedOutbound {
335                        protocol: out,
336                        info,
337                    },
338                )),
339            ConnectionEvent::AddressChange(address_change) => {
340                if let Some(inner) = self.inner.as_mut() {
341                    inner.on_connection_event(ConnectionEvent::AddressChange(AddressChange {
342                        new_address: address_change.new_address,
343                    }));
344                }
345            }
346            ConnectionEvent::DialUpgradeError(DialUpgradeError { info, error: err }) => self
347                .inner
348                .as_mut()
349                .expect("Can't receive an outbound substream if disabled; QED")
350                .on_connection_event(ConnectionEvent::DialUpgradeError(DialUpgradeError {
351                    info,
352                    error: err,
353                })),
354            ConnectionEvent::ListenUpgradeError(listen_upgrade_error) => {
355                self.on_listen_upgrade_error(listen_upgrade_error)
356            }
357            ConnectionEvent::LocalProtocolsChange(change) => {
358                if let Some(inner) = self.inner.as_mut() {
359                    inner.on_connection_event(ConnectionEvent::LocalProtocolsChange(change));
360                }
361            }
362            ConnectionEvent::RemoteProtocolsChange(change) => {
363                if let Some(inner) = self.inner.as_mut() {
364                    inner.on_connection_event(ConnectionEvent::RemoteProtocolsChange(change));
365                }
366            }
367        }
368    }
369
370    fn poll_close(&mut self, cx: &mut Context<'_>) -> Poll<Option<Self::ToBehaviour>> {
371        let Some(inner) = self.inner.as_mut() else {
372            return Poll::Ready(None);
373        };
374
375        inner.poll_close(cx)
376    }
377}