1use crate::behaviour::FromSwarm;
22use crate::connection::ConnectionId;
23use crate::handler::{
24 AddressChange, ConnectionEvent, ConnectionHandler, ConnectionHandlerEvent, DialUpgradeError,
25 FullyNegotiatedInbound, FullyNegotiatedOutbound, KeepAlive, ListenUpgradeError,
26 SubstreamProtocol,
27};
28use crate::upgrade::SendWrapper;
29use crate::{
30 ConnectionDenied, NetworkBehaviour, PollParameters, THandler, THandlerInEvent,
31 THandlerOutEvent, ToSwarm,
32};
33use either::Either;
34use futures::future;
35use libp2p_core::{upgrade::DeniedUpgrade, Endpoint, Multiaddr};
36use libp2p_identity::PeerId;
37use std::{task::Context, task::Poll};
38
39pub struct Toggle<TBehaviour> {
43 inner: Option<TBehaviour>,
44}
45
46impl<TBehaviour> Toggle<TBehaviour> {
47 pub fn is_enabled(&self) -> bool {
49 self.inner.is_some()
50 }
51
52 pub fn as_ref(&self) -> Option<&TBehaviour> {
54 self.inner.as_ref()
55 }
56
57 pub fn as_mut(&mut self) -> Option<&mut TBehaviour> {
59 self.inner.as_mut()
60 }
61}
62
63impl<TBehaviour> From<Option<TBehaviour>> for Toggle<TBehaviour> {
64 fn from(inner: Option<TBehaviour>) -> Self {
65 Toggle { inner }
66 }
67}
68
69impl<TBehaviour> NetworkBehaviour for Toggle<TBehaviour>
70where
71 TBehaviour: NetworkBehaviour,
72{
73 type ConnectionHandler = ToggleConnectionHandler<THandler<TBehaviour>>;
74 type ToSwarm = TBehaviour::ToSwarm;
75
76 fn handle_pending_inbound_connection(
77 &mut self,
78 connection_id: ConnectionId,
79 local_addr: &Multiaddr,
80 remote_addr: &Multiaddr,
81 ) -> Result<(), ConnectionDenied> {
82 let inner = match self.inner.as_mut() {
83 None => return Ok(()),
84 Some(inner) => inner,
85 };
86
87 inner.handle_pending_inbound_connection(connection_id, local_addr, remote_addr)?;
88
89 Ok(())
90 }
91
92 fn handle_established_inbound_connection(
93 &mut self,
94 connection_id: ConnectionId,
95 peer: PeerId,
96 local_addr: &Multiaddr,
97 remote_addr: &Multiaddr,
98 ) -> Result<THandler<Self>, ConnectionDenied> {
99 let inner = match self.inner.as_mut() {
100 None => return Ok(ToggleConnectionHandler { inner: None }),
101 Some(inner) => inner,
102 };
103
104 let handler = inner.handle_established_inbound_connection(
105 connection_id,
106 peer,
107 local_addr,
108 remote_addr,
109 )?;
110
111 Ok(ToggleConnectionHandler {
112 inner: Some(handler),
113 })
114 }
115
116 fn handle_pending_outbound_connection(
117 &mut self,
118 connection_id: ConnectionId,
119 maybe_peer: Option<PeerId>,
120 addresses: &[Multiaddr],
121 effective_role: Endpoint,
122 ) -> Result<Vec<Multiaddr>, ConnectionDenied> {
123 let inner = match self.inner.as_mut() {
124 None => return Ok(vec![]),
125 Some(inner) => inner,
126 };
127
128 let addresses = inner.handle_pending_outbound_connection(
129 connection_id,
130 maybe_peer,
131 addresses,
132 effective_role,
133 )?;
134
135 Ok(addresses)
136 }
137
138 fn handle_established_outbound_connection(
139 &mut self,
140 connection_id: ConnectionId,
141 peer: PeerId,
142 addr: &Multiaddr,
143 role_override: Endpoint,
144 ) -> Result<THandler<Self>, ConnectionDenied> {
145 let inner = match self.inner.as_mut() {
146 None => return Ok(ToggleConnectionHandler { inner: None }),
147 Some(inner) => inner,
148 };
149
150 let handler = inner.handle_established_outbound_connection(
151 connection_id,
152 peer,
153 addr,
154 role_override,
155 )?;
156
157 Ok(ToggleConnectionHandler {
158 inner: Some(handler),
159 })
160 }
161
162 fn on_swarm_event(&mut self, event: FromSwarm<Self::ConnectionHandler>) {
163 if let Some(behaviour) = &mut self.inner {
164 if let Some(event) = event.maybe_map_handler(|h| h.inner) {
165 behaviour.on_swarm_event(event);
166 }
167 }
168 }
169
170 fn on_connection_handler_event(
171 &mut self,
172 peer_id: PeerId,
173 connection_id: ConnectionId,
174 event: THandlerOutEvent<Self>,
175 ) {
176 if let Some(behaviour) = &mut self.inner {
177 behaviour.on_connection_handler_event(peer_id, connection_id, event)
178 }
179 }
180
181 fn poll(
182 &mut self,
183 cx: &mut Context<'_>,
184 params: &mut impl PollParameters,
185 ) -> Poll<ToSwarm<Self::ToSwarm, THandlerInEvent<Self>>> {
186 if let Some(inner) = self.inner.as_mut() {
187 inner.poll(cx, params)
188 } else {
189 Poll::Pending
190 }
191 }
192}
193
194pub struct ToggleConnectionHandler<TInner> {
196 inner: Option<TInner>,
197}
198
199impl<TInner> ToggleConnectionHandler<TInner>
200where
201 TInner: ConnectionHandler,
202{
203 fn on_fully_negotiated_inbound(
204 &mut self,
205 FullyNegotiatedInbound {
206 protocol: out,
207 info,
208 }: FullyNegotiatedInbound<
209 <Self as ConnectionHandler>::InboundProtocol,
210 <Self as ConnectionHandler>::InboundOpenInfo,
211 >,
212 ) {
213 let out = match out {
214 future::Either::Left(out) => out,
215 future::Either::Right(v) => void::unreachable(v),
216 };
217
218 if let Either::Left(info) = info {
219 self.inner
220 .as_mut()
221 .expect("Can't receive an inbound substream if disabled; QED")
222 .on_connection_event(ConnectionEvent::FullyNegotiatedInbound(
223 FullyNegotiatedInbound {
224 protocol: out,
225 info,
226 },
227 ));
228 } else {
229 panic!("Unexpected Either::Right in enabled `on_fully_negotiated_inbound`.")
230 }
231 }
232
233 fn on_listen_upgrade_error(
234 &mut self,
235 ListenUpgradeError { info, error: err }: ListenUpgradeError<
236 <Self as ConnectionHandler>::InboundOpenInfo,
237 <Self as ConnectionHandler>::InboundProtocol,
238 >,
239 ) {
240 let (inner, info) = match (self.inner.as_mut(), info) {
241 (Some(inner), Either::Left(info)) => (inner, info),
242 (None, Either::Right(())) => return,
244 (Some(_), Either::Right(())) => panic!(
245 "Unexpected `Either::Right` inbound info through \
246 `on_listen_upgrade_error` in enabled state.",
247 ),
248 (None, Either::Left(_)) => panic!(
249 "Unexpected `Either::Left` inbound info through \
250 `on_listen_upgrade_error` in disabled state.",
251 ),
252 };
253
254 let err = match err {
255 Either::Left(e) => e,
256 Either::Right(v) => void::unreachable(v),
257 };
258
259 inner.on_connection_event(ConnectionEvent::ListenUpgradeError(ListenUpgradeError {
260 info,
261 error: err,
262 }));
263 }
264}
265
266impl<TInner> ConnectionHandler for ToggleConnectionHandler<TInner>
267where
268 TInner: ConnectionHandler,
269{
270 type FromBehaviour = TInner::FromBehaviour;
271 type ToBehaviour = TInner::ToBehaviour;
272 #[allow(deprecated)]
273 type Error = TInner::Error;
274 type InboundProtocol = Either<SendWrapper<TInner::InboundProtocol>, SendWrapper<DeniedUpgrade>>;
275 type OutboundProtocol = TInner::OutboundProtocol;
276 type OutboundOpenInfo = TInner::OutboundOpenInfo;
277 type InboundOpenInfo = Either<TInner::InboundOpenInfo, ()>;
278
279 fn listen_protocol(&self) -> SubstreamProtocol<Self::InboundProtocol, Self::InboundOpenInfo> {
280 if let Some(inner) = self.inner.as_ref() {
281 inner
282 .listen_protocol()
283 .map_upgrade(|u| Either::Left(SendWrapper(u)))
284 .map_info(Either::Left)
285 } else {
286 SubstreamProtocol::new(Either::Right(SendWrapper(DeniedUpgrade)), Either::Right(()))
287 }
288 }
289
290 fn on_behaviour_event(&mut self, event: Self::FromBehaviour) {
291 self.inner
292 .as_mut()
293 .expect("Can't receive events if disabled; QED")
294 .on_behaviour_event(event)
295 }
296
297 fn connection_keep_alive(&self) -> KeepAlive {
298 self.inner
299 .as_ref()
300 .map(|h| h.connection_keep_alive())
301 .unwrap_or(KeepAlive::No)
302 }
303
304 #[allow(deprecated)]
305 fn poll(
306 &mut self,
307 cx: &mut Context<'_>,
308 ) -> Poll<
309 ConnectionHandlerEvent<
310 Self::OutboundProtocol,
311 Self::OutboundOpenInfo,
312 Self::ToBehaviour,
313 Self::Error,
314 >,
315 > {
316 if let Some(inner) = self.inner.as_mut() {
317 inner.poll(cx)
318 } else {
319 Poll::Pending
320 }
321 }
322
323 fn on_connection_event(
324 &mut self,
325 event: ConnectionEvent<
326 Self::InboundProtocol,
327 Self::OutboundProtocol,
328 Self::InboundOpenInfo,
329 Self::OutboundOpenInfo,
330 >,
331 ) {
332 match event {
333 ConnectionEvent::FullyNegotiatedInbound(fully_negotiated_inbound) => {
334 self.on_fully_negotiated_inbound(fully_negotiated_inbound)
335 }
336 ConnectionEvent::FullyNegotiatedOutbound(FullyNegotiatedOutbound {
337 protocol: out,
338 info,
339 }) => self
340 .inner
341 .as_mut()
342 .expect("Can't receive an outbound substream if disabled; QED")
343 .on_connection_event(ConnectionEvent::FullyNegotiatedOutbound(
344 FullyNegotiatedOutbound {
345 protocol: out,
346 info,
347 },
348 )),
349 ConnectionEvent::AddressChange(address_change) => {
350 if let Some(inner) = self.inner.as_mut() {
351 inner.on_connection_event(ConnectionEvent::AddressChange(AddressChange {
352 new_address: address_change.new_address,
353 }));
354 }
355 }
356 ConnectionEvent::DialUpgradeError(DialUpgradeError { info, error: err }) => self
357 .inner
358 .as_mut()
359 .expect("Can't receive an outbound substream if disabled; QED")
360 .on_connection_event(ConnectionEvent::DialUpgradeError(DialUpgradeError {
361 info,
362 error: err,
363 })),
364 ConnectionEvent::ListenUpgradeError(listen_upgrade_error) => {
365 self.on_listen_upgrade_error(listen_upgrade_error)
366 }
367 ConnectionEvent::LocalProtocolsChange(change) => {
368 if let Some(inner) = self.inner.as_mut() {
369 inner.on_connection_event(ConnectionEvent::LocalProtocolsChange(change));
370 }
371 }
372 ConnectionEvent::RemoteProtocolsChange(change) => {
373 if let Some(inner) = self.inner.as_mut() {
374 inner.on_connection_event(ConnectionEvent::RemoteProtocolsChange(change));
375 }
376 }
377 }
378 }
379}