1use crate::behaviour::FromSwarm;
22use crate::connection::ConnectionId;
23use crate::handler::{
24 AddressChange, ConnectionEvent, ConnectionHandler, ConnectionHandlerEvent, DialUpgradeError,
25 FullyNegotiatedInbound, FullyNegotiatedOutbound, ListenUpgradeError, SubstreamProtocol,
26};
27use crate::upgrade::SendWrapper;
28use crate::{
29 ConnectionDenied, NetworkBehaviour, THandler, THandlerInEvent, THandlerOutEvent, ToSwarm,
30};
31use either::Either;
32use futures::future;
33use libp2p_core::transport::PortUse;
34use libp2p_core::{upgrade::DeniedUpgrade, Endpoint, Multiaddr};
35use libp2p_identity::PeerId;
36use std::{task::Context, task::Poll};
37
38pub struct Toggle<TBehaviour> {
42 inner: Option<TBehaviour>,
43}
44
45impl<TBehaviour> Toggle<TBehaviour> {
46 pub fn is_enabled(&self) -> bool {
48 self.inner.is_some()
49 }
50
51 pub fn as_ref(&self) -> Option<&TBehaviour> {
53 self.inner.as_ref()
54 }
55
56 pub fn as_mut(&mut self) -> Option<&mut TBehaviour> {
58 self.inner.as_mut()
59 }
60}
61
62impl<TBehaviour> From<Option<TBehaviour>> for Toggle<TBehaviour> {
63 fn from(inner: Option<TBehaviour>) -> Self {
64 Toggle { inner }
65 }
66}
67
68impl<TBehaviour> NetworkBehaviour for Toggle<TBehaviour>
69where
70 TBehaviour: NetworkBehaviour,
71{
72 type ConnectionHandler = ToggleConnectionHandler<THandler<TBehaviour>>;
73 type ToSwarm = TBehaviour::ToSwarm;
74
75 fn handle_pending_inbound_connection(
76 &mut self,
77 connection_id: ConnectionId,
78 local_addr: &Multiaddr,
79 remote_addr: &Multiaddr,
80 ) -> Result<(), ConnectionDenied> {
81 let inner = match self.inner.as_mut() {
82 None => return Ok(()),
83 Some(inner) => inner,
84 };
85
86 inner.handle_pending_inbound_connection(connection_id, local_addr, remote_addr)?;
87
88 Ok(())
89 }
90
91 fn handle_established_inbound_connection(
92 &mut self,
93 connection_id: ConnectionId,
94 peer: PeerId,
95 local_addr: &Multiaddr,
96 remote_addr: &Multiaddr,
97 ) -> Result<THandler<Self>, ConnectionDenied> {
98 let inner = match self.inner.as_mut() {
99 None => return Ok(ToggleConnectionHandler { inner: None }),
100 Some(inner) => inner,
101 };
102
103 let handler = inner.handle_established_inbound_connection(
104 connection_id,
105 peer,
106 local_addr,
107 remote_addr,
108 )?;
109
110 Ok(ToggleConnectionHandler {
111 inner: Some(handler),
112 })
113 }
114
115 fn handle_pending_outbound_connection(
116 &mut self,
117 connection_id: ConnectionId,
118 maybe_peer: Option<PeerId>,
119 addresses: &[Multiaddr],
120 effective_role: Endpoint,
121 ) -> Result<Vec<Multiaddr>, ConnectionDenied> {
122 let inner = match self.inner.as_mut() {
123 None => return Ok(vec![]),
124 Some(inner) => inner,
125 };
126
127 let addresses = inner.handle_pending_outbound_connection(
128 connection_id,
129 maybe_peer,
130 addresses,
131 effective_role,
132 )?;
133
134 Ok(addresses)
135 }
136
137 fn handle_established_outbound_connection(
138 &mut self,
139 connection_id: ConnectionId,
140 peer: PeerId,
141 addr: &Multiaddr,
142 role_override: Endpoint,
143 port_use: PortUse,
144 ) -> Result<THandler<Self>, ConnectionDenied> {
145 let inner = match self.inner.as_mut() {
146 None => return Ok(ToggleConnectionHandler { inner: None }),
147 Some(inner) => inner,
148 };
149
150 let handler = inner.handle_established_outbound_connection(
151 connection_id,
152 peer,
153 addr,
154 role_override,
155 port_use,
156 )?;
157
158 Ok(ToggleConnectionHandler {
159 inner: Some(handler),
160 })
161 }
162
163 fn on_swarm_event(&mut self, event: FromSwarm) {
164 if let Some(behaviour) = &mut self.inner {
165 behaviour.on_swarm_event(event);
166 }
167 }
168
169 fn on_connection_handler_event(
170 &mut self,
171 peer_id: PeerId,
172 connection_id: ConnectionId,
173 event: THandlerOutEvent<Self>,
174 ) {
175 if let Some(behaviour) = &mut self.inner {
176 behaviour.on_connection_handler_event(peer_id, connection_id, event)
177 }
178 }
179
180 fn poll(
181 &mut self,
182 cx: &mut Context<'_>,
183 ) -> Poll<ToSwarm<Self::ToSwarm, THandlerInEvent<Self>>> {
184 if let Some(inner) = self.inner.as_mut() {
185 inner.poll(cx)
186 } else {
187 Poll::Pending
188 }
189 }
190}
191
192pub struct ToggleConnectionHandler<TInner> {
194 inner: Option<TInner>,
195}
196
197impl<TInner> ToggleConnectionHandler<TInner>
198where
199 TInner: ConnectionHandler,
200{
201 fn on_fully_negotiated_inbound(
202 &mut self,
203 FullyNegotiatedInbound {
204 protocol: out,
205 info,
206 }: FullyNegotiatedInbound<
207 <Self as ConnectionHandler>::InboundProtocol,
208 <Self as ConnectionHandler>::InboundOpenInfo,
209 >,
210 ) {
211 let out = match out {
212 future::Either::Left(out) => out,
213 future::Either::Right(v) => void::unreachable(v),
214 };
215
216 if let Either::Left(info) = info {
217 self.inner
218 .as_mut()
219 .expect("Can't receive an inbound substream if disabled; QED")
220 .on_connection_event(ConnectionEvent::FullyNegotiatedInbound(
221 FullyNegotiatedInbound {
222 protocol: out,
223 info,
224 },
225 ));
226 } else {
227 panic!("Unexpected Either::Right in enabled `on_fully_negotiated_inbound`.")
228 }
229 }
230
231 fn on_listen_upgrade_error(
232 &mut self,
233 ListenUpgradeError { info, error: err }: ListenUpgradeError<
234 <Self as ConnectionHandler>::InboundOpenInfo,
235 <Self as ConnectionHandler>::InboundProtocol,
236 >,
237 ) {
238 let (inner, info) = match (self.inner.as_mut(), info) {
239 (Some(inner), Either::Left(info)) => (inner, info),
240 (None, Either::Right(())) => return,
242 (Some(_), Either::Right(())) => panic!(
243 "Unexpected `Either::Right` inbound info through \
244 `on_listen_upgrade_error` in enabled state.",
245 ),
246 (None, Either::Left(_)) => panic!(
247 "Unexpected `Either::Left` inbound info through \
248 `on_listen_upgrade_error` in disabled state.",
249 ),
250 };
251
252 let err = match err {
253 Either::Left(e) => e,
254 Either::Right(v) => void::unreachable(v),
255 };
256
257 inner.on_connection_event(ConnectionEvent::ListenUpgradeError(ListenUpgradeError {
258 info,
259 error: err,
260 }));
261 }
262}
263
264impl<TInner> ConnectionHandler for ToggleConnectionHandler<TInner>
265where
266 TInner: ConnectionHandler,
267{
268 type FromBehaviour = TInner::FromBehaviour;
269 type ToBehaviour = TInner::ToBehaviour;
270 type InboundProtocol = Either<SendWrapper<TInner::InboundProtocol>, SendWrapper<DeniedUpgrade>>;
271 type OutboundProtocol = TInner::OutboundProtocol;
272 type OutboundOpenInfo = TInner::OutboundOpenInfo;
273 type InboundOpenInfo = Either<TInner::InboundOpenInfo, ()>;
274
275 fn listen_protocol(&self) -> SubstreamProtocol<Self::InboundProtocol, Self::InboundOpenInfo> {
276 if let Some(inner) = self.inner.as_ref() {
277 inner
278 .listen_protocol()
279 .map_upgrade(|u| Either::Left(SendWrapper(u)))
280 .map_info(Either::Left)
281 } else {
282 SubstreamProtocol::new(Either::Right(SendWrapper(DeniedUpgrade)), Either::Right(()))
283 }
284 }
285
286 fn on_behaviour_event(&mut self, event: Self::FromBehaviour) {
287 self.inner
288 .as_mut()
289 .expect("Can't receive events if disabled; QED")
290 .on_behaviour_event(event)
291 }
292
293 fn connection_keep_alive(&self) -> bool {
294 self.inner
295 .as_ref()
296 .map(|h| h.connection_keep_alive())
297 .unwrap_or(false)
298 }
299
300 fn poll(
301 &mut self,
302 cx: &mut Context<'_>,
303 ) -> Poll<
304 ConnectionHandlerEvent<Self::OutboundProtocol, Self::OutboundOpenInfo, Self::ToBehaviour>,
305 > {
306 if let Some(inner) = self.inner.as_mut() {
307 inner.poll(cx)
308 } else {
309 Poll::Pending
310 }
311 }
312
313 fn on_connection_event(
314 &mut self,
315 event: ConnectionEvent<
316 Self::InboundProtocol,
317 Self::OutboundProtocol,
318 Self::InboundOpenInfo,
319 Self::OutboundOpenInfo,
320 >,
321 ) {
322 match event {
323 ConnectionEvent::FullyNegotiatedInbound(fully_negotiated_inbound) => {
324 self.on_fully_negotiated_inbound(fully_negotiated_inbound)
325 }
326 ConnectionEvent::FullyNegotiatedOutbound(FullyNegotiatedOutbound {
327 protocol: out,
328 info,
329 }) => self
330 .inner
331 .as_mut()
332 .expect("Can't receive an outbound substream if disabled; QED")
333 .on_connection_event(ConnectionEvent::FullyNegotiatedOutbound(
334 FullyNegotiatedOutbound {
335 protocol: out,
336 info,
337 },
338 )),
339 ConnectionEvent::AddressChange(address_change) => {
340 if let Some(inner) = self.inner.as_mut() {
341 inner.on_connection_event(ConnectionEvent::AddressChange(AddressChange {
342 new_address: address_change.new_address,
343 }));
344 }
345 }
346 ConnectionEvent::DialUpgradeError(DialUpgradeError { info, error: err }) => self
347 .inner
348 .as_mut()
349 .expect("Can't receive an outbound substream if disabled; QED")
350 .on_connection_event(ConnectionEvent::DialUpgradeError(DialUpgradeError {
351 info,
352 error: err,
353 })),
354 ConnectionEvent::ListenUpgradeError(listen_upgrade_error) => {
355 self.on_listen_upgrade_error(listen_upgrade_error)
356 }
357 ConnectionEvent::LocalProtocolsChange(change) => {
358 if let Some(inner) = self.inner.as_mut() {
359 inner.on_connection_event(ConnectionEvent::LocalProtocolsChange(change));
360 }
361 }
362 ConnectionEvent::RemoteProtocolsChange(change) => {
363 if let Some(inner) = self.inner.as_mut() {
364 inner.on_connection_event(ConnectionEvent::RemoteProtocolsChange(change));
365 }
366 }
367 }
368 }
369
370 fn poll_close(&mut self, cx: &mut Context<'_>) -> Poll<Option<Self::ToBehaviour>> {
371 let Some(inner) = self.inner.as_mut() else {
372 return Poll::Ready(None);
373 };
374
375 inner.poll_close(cx)
376 }
377}