1use crate::{
20 protocol::notifications::{
21 handler::{
22 self, CloseReason, NotificationsSink, NotifsHandler, NotifsHandlerIn, NotifsHandlerOut,
23 },
24 service::{NotificationCommand, ProtocolHandle, ValidationCallResult},
25 },
26 protocol_controller::{self, IncomingIndex, Message, SetId},
27 service::{
28 metrics::NotificationMetrics,
29 traits::{Direction, ValidationResult},
30 },
31 types::ProtocolName,
32};
33
34use bytes::BytesMut;
35use fnv::FnvHashMap;
36use futures::{future::BoxFuture, prelude::*, stream::FuturesUnordered};
37use libp2p::{
38 core::{transport::PortUse, Endpoint, Multiaddr},
39 swarm::{
40 behaviour::{ConnectionClosed, ConnectionEstablished, DialFailure, FromSwarm},
41 ConnectionDenied, ConnectionId, DialError, NetworkBehaviour, NotifyHandler, THandler,
42 THandlerInEvent, THandlerOutEvent, ToSwarm,
43 },
44 PeerId,
45};
46use log::{debug, error, trace, warn};
47use parking_lot::RwLock;
48use rand::distributions::{Distribution as _, Uniform};
49use sc_utils::mpsc::TracingUnboundedReceiver;
50use smallvec::SmallVec;
51use tokio::sync::oneshot::error::RecvError;
52use tokio_stream::StreamMap;
53
54use libp2p::swarm::CloseConnection;
55use std::{
56 cmp,
57 collections::{hash_map::Entry, VecDeque},
58 mem,
59 pin::Pin,
60 sync::Arc,
61 task::{Context, Poll},
62 time::{Duration, Instant},
63};
64
65const LOG_TARGET: &str = "sub-libp2p::notification::behaviour";
67
68type PendingInboundValidation =
70 BoxFuture<'static, (Result<ValidationResult, RecvError>, IncomingIndex)>;
71
72pub struct Notifications {
124 notif_protocols: Vec<handler::ProtocolConfig>,
126
127 protocol_handles: Vec<ProtocolHandle>,
129
130 command_streams: StreamMap<usize, Box<dyn Stream<Item = NotificationCommand> + Send + Unpin>>,
132
133 protocol_controller_handles: Vec<protocol_controller::ProtocolHandle>,
135
136 from_protocol_controllers: TracingUnboundedReceiver<Message>,
138
139 peers: FnvHashMap<(PeerId, SetId), PeerState>,
141
142 delays:
150 stream::FuturesUnordered<Pin<Box<dyn Future<Output = (DelayId, PeerId, SetId)> + Send>>>,
151
152 next_delay_id: DelayId,
154
155 incoming: SmallVec<[IncomingPeer; 6]>,
158
159 next_incoming_index: IncomingIndex,
162
163 events: VecDeque<ToSwarm<NotificationsOut, NotifsHandlerIn>>,
165
166 pending_inbound_validations: FuturesUnordered<PendingInboundValidation>,
174
175 metrics: NotificationMetrics,
177}
178
179#[derive(Debug, Clone)]
181pub struct ProtocolConfig {
182 pub name: ProtocolName,
184 pub fallback_names: Vec<ProtocolName>,
186 pub handshake: Vec<u8>,
188 pub max_notification_size: u64,
190}
191
192#[derive(Debug, Copy, Clone, PartialEq, Eq)]
194struct DelayId(u64);
195
196#[derive(Debug)]
200enum PeerState {
201 Poisoned,
205
206 Backoff {
209 timer: DelayId,
211 timer_deadline: Instant,
213 },
214
215 PendingRequest {
217 timer: DelayId,
219 timer_deadline: Instant,
221 },
222
223 Requested,
225
226 Disabled {
231 backoff_until: Option<Instant>,
234
235 connections: SmallVec<[(ConnectionId, ConnectionState); crate::MAX_CONNECTIONS_PER_PEER]>,
237 },
238
239 DisabledPendingEnable {
248 timer: DelayId,
250 timer_deadline: Instant,
252
253 connections: SmallVec<[(ConnectionId, ConnectionState); crate::MAX_CONNECTIONS_PER_PEER]>,
255 },
256
257 Enabled {
259 connections: SmallVec<[(ConnectionId, ConnectionState); crate::MAX_CONNECTIONS_PER_PEER]>,
261 },
262
263 Incoming {
268 backoff_until: Option<Instant>,
270
271 incoming_index: IncomingIndex,
273
274 peerset_rejected: bool,
276
277 connections: SmallVec<[(ConnectionId, ConnectionState); crate::MAX_CONNECTIONS_PER_PEER]>,
279 },
280}
281
282impl PeerState {
283 fn is_open(&self) -> bool {
286 self.get_open().is_some()
287 }
288
289 fn get_open(&self) -> Option<&NotificationsSink> {
292 match self {
293 Self::Enabled { connections, .. } => connections.iter().find_map(|(_, s)| match s {
294 ConnectionState::Open(s) => Some(s),
295 _ => None,
296 }),
297 _ => None,
298 }
299 }
300}
301
302#[derive(Debug)]
304enum ConnectionState {
305 Closed,
307
308 Closing,
312
313 Opening,
316
317 OpeningThenClosing,
321
322 OpenDesiredByRemote,
325
326 Open(NotificationsSink),
331}
332
333#[derive(Debug)]
335struct IncomingPeer {
336 peer_id: PeerId,
338 set_id: SetId,
340 alive: bool,
343 incoming_id: IncomingIndex,
345 handshake: Vec<u8>,
347}
348
349#[derive(Debug)]
351pub enum NotificationsOut {
352 CustomProtocolOpen {
354 peer_id: PeerId,
356 set_id: SetId,
358 direction: Direction,
360 negotiated_fallback: Option<ProtocolName>,
363 received_handshake: Vec<u8>,
366 notifications_sink: NotificationsSink,
368 },
369
370 CustomProtocolReplaced {
376 peer_id: PeerId,
378 set_id: SetId,
380 notifications_sink: NotificationsSink,
382 },
383
384 CustomProtocolClosed {
387 peer_id: PeerId,
389 set_id: SetId,
391 },
392
393 Notification {
397 peer_id: PeerId,
399 set_id: SetId,
401 message: BytesMut,
403 },
404
405 ProtocolMisbehavior {
407 peer_id: PeerId,
409 set_id: SetId,
411 },
412}
413
414impl Notifications {
415 pub(crate) fn new(
417 protocol_controller_handles: Vec<protocol_controller::ProtocolHandle>,
418 from_protocol_controllers: TracingUnboundedReceiver<Message>,
419 metrics: NotificationMetrics,
420 notif_protocols: impl Iterator<
421 Item = (
422 ProtocolConfig,
423 ProtocolHandle,
424 Box<dyn Stream<Item = NotificationCommand> + Send + Unpin>,
425 ),
426 >,
427 ) -> Self {
428 let (notif_protocols, protocol_handles): (Vec<_>, Vec<_>) = notif_protocols
429 .map(|(cfg, protocol_handle, command_stream)| {
430 (
431 handler::ProtocolConfig {
432 name: cfg.name,
433 fallback_names: cfg.fallback_names,
434 handshake: Arc::new(RwLock::new(cfg.handshake)),
435 max_notification_size: cfg.max_notification_size,
436 },
437 (protocol_handle, command_stream),
438 )
439 })
440 .unzip();
441 assert!(!notif_protocols.is_empty());
442
443 let (mut protocol_handles, command_streams): (Vec<_>, Vec<_>) = protocol_handles
444 .into_iter()
445 .enumerate()
446 .map(|(set_id, (mut protocol_handle, command_stream))| {
447 protocol_handle.set_metrics(metrics.clone());
448
449 (protocol_handle, (set_id, command_stream))
450 })
451 .unzip();
452
453 protocol_handles.iter_mut().skip(1).for_each(|handle| {
454 handle.delegate_to_peerset(true);
455 });
456
457 Self {
458 notif_protocols,
459 protocol_handles,
460 command_streams: StreamMap::from_iter(command_streams.into_iter()),
461 protocol_controller_handles,
462 from_protocol_controllers,
463 peers: FnvHashMap::default(),
464 delays: Default::default(),
465 next_delay_id: DelayId(0),
466 incoming: SmallVec::new(),
467 next_incoming_index: IncomingIndex(0),
468 events: VecDeque::new(),
469 pending_inbound_validations: FuturesUnordered::new(),
470 metrics,
471 }
472 }
473
474 pub fn set_notif_protocol_handshake(
476 &mut self,
477 set_id: SetId,
478 handshake_message: impl Into<Vec<u8>>,
479 ) {
480 if let Some(p) = self.notif_protocols.get_mut(usize::from(set_id)) {
481 *p.handshake.write() = handshake_message.into();
482 } else {
483 log::error!(target: LOG_TARGET, "Unknown handshake change set: {:?}", set_id);
484 debug_assert!(false);
485 }
486 }
487
488 pub fn open_peers(&self) -> impl Iterator<Item = &PeerId> {
490 self.peers.iter().filter(|(_, state)| state.is_open()).map(|((id, _), _)| id)
491 }
492
493 pub fn is_open(&self, peer_id: &PeerId, set_id: SetId) -> bool {
495 self.peers.get(&(*peer_id, set_id)).map(|p| p.is_open()).unwrap_or(false)
496 }
497
498 pub fn disconnect_peer(&mut self, peer_id: &PeerId, set_id: SetId) {
500 trace!(target: LOG_TARGET, "External API => Disconnect({}, {:?})", peer_id, set_id);
501 self.disconnect_peer_inner(peer_id, set_id);
502 }
503
504 fn disconnect_peer_inner(&mut self, peer_id: &PeerId, set_id: SetId) {
506 let mut entry = if let Entry::Occupied(entry) = self.peers.entry((*peer_id, set_id)) {
507 entry
508 } else {
509 return
510 };
511
512 match mem::replace(entry.get_mut(), PeerState::Poisoned) {
513 st @ PeerState::Disabled { .. } => *entry.into_mut() = st,
515 st @ PeerState::Requested => *entry.into_mut() = st,
516 st @ PeerState::PendingRequest { .. } => *entry.into_mut() = st,
517 st @ PeerState::Backoff { .. } => *entry.into_mut() = st,
518
519 PeerState::DisabledPendingEnable { connections, timer_deadline, timer: _ } => {
521 trace!(target: LOG_TARGET, "PSM <= Dropped({}, {:?})", peer_id, set_id);
522 self.protocol_controller_handles[usize::from(set_id)].dropped(*peer_id);
523 *entry.into_mut() =
524 PeerState::Disabled { connections, backoff_until: Some(timer_deadline) }
525 },
526
527 PeerState::Enabled { mut connections } => {
531 trace!(target: LOG_TARGET, "PSM <= Dropped({}, {:?})", peer_id, set_id);
532 self.protocol_controller_handles[usize::from(set_id)].dropped(*peer_id);
533
534 if connections.iter().any(|(_, s)| matches!(s, ConnectionState::Open(_))) {
535 trace!(target: LOG_TARGET, "External API <= Closed({}, {:?})", peer_id, set_id);
536 let event =
537 NotificationsOut::CustomProtocolClosed { peer_id: *peer_id, set_id };
538 self.events.push_back(ToSwarm::GenerateEvent(event));
539 }
540
541 for (connec_id, connec_state) in
542 connections.iter_mut().filter(|(_, s)| matches!(s, ConnectionState::Open(_)))
543 {
544 trace!(target: LOG_TARGET, "Handler({:?}, {:?}) <= Close({:?})", peer_id, *connec_id, set_id);
545 self.events.push_back(ToSwarm::NotifyHandler {
546 peer_id: *peer_id,
547 handler: NotifyHandler::One(*connec_id),
548 event: NotifsHandlerIn::Close { protocol_index: set_id.into() },
549 });
550 *connec_state = ConnectionState::Closing;
551 }
552
553 for (connec_id, connec_state) in
554 connections.iter_mut().filter(|(_, s)| matches!(s, ConnectionState::Opening))
555 {
556 trace!(target: LOG_TARGET, "Handler({:?}, {:?}) <= Close({:?})", peer_id, *connec_id, set_id);
557 self.events.push_back(ToSwarm::NotifyHandler {
558 peer_id: *peer_id,
559 handler: NotifyHandler::One(*connec_id),
560 event: NotifsHandlerIn::Close { protocol_index: set_id.into() },
561 });
562 *connec_state = ConnectionState::OpeningThenClosing;
563 }
564
565 debug_assert!(!connections
566 .iter()
567 .any(|(_, s)| matches!(s, ConnectionState::Open(_))));
568 debug_assert!(!connections
569 .iter()
570 .any(|(_, s)| matches!(s, ConnectionState::Opening)));
571
572 *entry.into_mut() = PeerState::Disabled { connections, backoff_until: None }
573 },
574
575 PeerState::Incoming { mut connections, backoff_until, .. } => {
578 let inc = if let Some(inc) = self
579 .incoming
580 .iter_mut()
581 .find(|i| i.peer_id == entry.key().0 && i.set_id == set_id && i.alive)
582 {
583 inc
584 } else {
585 error!(
586 target: LOG_TARGET,
587 "State mismatch in libp2p: no entry in incoming for incoming peer"
588 );
589 return
590 };
591
592 inc.alive = false;
593
594 for (connec_id, connec_state) in connections
595 .iter_mut()
596 .filter(|(_, s)| matches!(s, ConnectionState::OpenDesiredByRemote))
597 {
598 trace!(target: LOG_TARGET, "Handler({:?}, {:?}) <= Close({:?})", peer_id, *connec_id, set_id);
599 self.events.push_back(ToSwarm::NotifyHandler {
600 peer_id: *peer_id,
601 handler: NotifyHandler::One(*connec_id),
602 event: NotifsHandlerIn::Close { protocol_index: set_id.into() },
603 });
604 *connec_state = ConnectionState::Closing;
605 }
606
607 debug_assert!(!connections
608 .iter()
609 .any(|(_, s)| matches!(s, ConnectionState::OpenDesiredByRemote)));
610 *entry.into_mut() = PeerState::Disabled { connections, backoff_until }
611 },
612
613 PeerState::Poisoned => {
614 error!(target: LOG_TARGET, "State of {:?} is poisoned", peer_id)
615 },
616 }
617 }
618
619 fn peerset_report_connect(&mut self, peer_id: PeerId, set_id: SetId) {
621 let mut occ_entry = match self.peers.entry((peer_id, set_id)) {
623 Entry::Occupied(entry) => entry,
624 Entry::Vacant(entry) => {
625 trace!(
627 target: LOG_TARGET,
628 "PSM => Connect({}, {:?}): Starting to connect",
629 entry.key().0,
630 set_id,
631 );
632 trace!(target: LOG_TARGET, "Libp2p <= Dial {}", entry.key().0);
633 self.events.push_back(ToSwarm::Dial { opts: entry.key().0.into() });
634 entry.insert(PeerState::Requested);
635 return
636 },
637 };
638
639 let now = Instant::now();
640
641 match mem::replace(occ_entry.get_mut(), PeerState::Poisoned) {
642 PeerState::Backoff { ref timer, ref timer_deadline } if *timer_deadline > now => {
644 let peer_id = occ_entry.key().0;
645 trace!(
646 target: LOG_TARGET,
647 "PSM => Connect({}, {:?}): Will start to connect at {:?}",
648 peer_id,
649 set_id,
650 timer_deadline,
651 );
652 *occ_entry.into_mut() =
653 PeerState::PendingRequest { timer: *timer, timer_deadline: *timer_deadline };
654 },
655
656 PeerState::Backoff { .. } => {
658 trace!(
659 target: LOG_TARGET,
660 "PSM => Connect({}, {:?}): Starting to connect",
661 occ_entry.key().0,
662 set_id,
663 );
664 trace!(target: LOG_TARGET, "Libp2p <= Dial {:?}", occ_entry.key());
665 self.events.push_back(ToSwarm::Dial { opts: occ_entry.key().0.into() });
666 *occ_entry.into_mut() = PeerState::Requested;
667 },
668
669 PeerState::Disabled { connections, backoff_until: Some(ref backoff) }
671 if *backoff > now =>
672 {
673 let peer_id = occ_entry.key().0;
674 trace!(
675 target: LOG_TARGET,
676 "PSM => Connect({}, {:?}): But peer is backed-off until {:?}",
677 peer_id,
678 set_id,
679 backoff,
680 );
681
682 let delay_id = self.next_delay_id;
683 self.next_delay_id.0 += 1;
684 let delay = futures_timer::Delay::new(*backoff - now);
685 self.delays.push(
686 async move {
687 delay.await;
688 (delay_id, peer_id, set_id)
689 }
690 .boxed(),
691 );
692
693 *occ_entry.into_mut() = PeerState::DisabledPendingEnable {
694 connections,
695 timer: delay_id,
696 timer_deadline: *backoff,
697 };
698 },
699
700 PeerState::Disabled { mut connections, backoff_until } => {
702 debug_assert!(!connections
703 .iter()
704 .any(|(_, s)| { matches!(s, ConnectionState::Open(_)) }));
705
706 if let Some((connec_id, connec_state)) =
708 connections.iter_mut().find(|(_, s)| matches!(s, ConnectionState::Closed))
709 {
710 trace!(target: LOG_TARGET, "PSM => Connect({}, {:?}): Enabling connections.",
711 occ_entry.key().0, set_id);
712 trace!(target: LOG_TARGET, "Handler({:?}, {:?}) <= Open({:?})", peer_id, *connec_id, set_id);
713 self.events.push_back(ToSwarm::NotifyHandler {
714 peer_id,
715 handler: NotifyHandler::One(*connec_id),
716 event: NotifsHandlerIn::Open { protocol_index: set_id.into(), peer_id },
717 });
718 *connec_state = ConnectionState::Opening;
719 *occ_entry.into_mut() = PeerState::Enabled { connections };
720 } else {
721 debug_assert!(connections.iter().any(|(_, s)| {
724 matches!(s, ConnectionState::OpeningThenClosing | ConnectionState::Closing)
725 }));
726 trace!(
727 target: LOG_TARGET,
728 "PSM => Connect({}, {:?}): No connection in proper state. Delaying.",
729 occ_entry.key().0, set_id
730 );
731
732 let timer_deadline = {
733 let base = now + Duration::from_secs(5);
734 if let Some(backoff_until) = backoff_until {
735 cmp::max(base, backoff_until)
736 } else {
737 base
738 }
739 };
740
741 let delay_id = self.next_delay_id;
742 self.next_delay_id.0 += 1;
743 debug_assert!(timer_deadline > now);
744 let delay = futures_timer::Delay::new(timer_deadline - now);
745 self.delays.push(
746 async move {
747 delay.await;
748 (delay_id, peer_id, set_id)
749 }
750 .boxed(),
751 );
752
753 *occ_entry.into_mut() = PeerState::DisabledPendingEnable {
754 connections,
755 timer: delay_id,
756 timer_deadline,
757 };
758 }
759 },
760 st @ PeerState::Incoming { .. } => {
762 debug!(
763 target: LOG_TARGET,
764 "PSM => Connect({}, {:?}): Ignoring obsolete connect, we are awaiting accept/reject.",
765 occ_entry.key().0, set_id
766 );
767 *occ_entry.into_mut() = st;
768 },
769
770 st @ PeerState::Enabled { .. } => {
772 debug!(target: LOG_TARGET,
773 "PSM => Connect({}, {:?}): Already connected.",
774 occ_entry.key().0, set_id);
775 *occ_entry.into_mut() = st;
776 },
777 st @ PeerState::DisabledPendingEnable { .. } => {
778 debug!(target: LOG_TARGET,
779 "PSM => Connect({}, {:?}): Already pending enabling.",
780 occ_entry.key().0, set_id);
781 *occ_entry.into_mut() = st;
782 },
783 st @ PeerState::Requested { .. } | st @ PeerState::PendingRequest { .. } => {
784 debug!(target: LOG_TARGET,
785 "PSM => Connect({}, {:?}): Duplicate request.",
786 occ_entry.key().0, set_id);
787 *occ_entry.into_mut() = st;
788 },
789
790 PeerState::Poisoned => {
791 error!(target: LOG_TARGET, "State of {:?} is poisoned", occ_entry.key());
792 debug_assert!(false);
793 },
794 }
795 }
796
797 fn peerset_report_disconnect(&mut self, peer_id: PeerId, set_id: SetId) {
799 let mut entry = match self.peers.entry((peer_id, set_id)) {
800 Entry::Occupied(entry) => entry,
801 Entry::Vacant(entry) => {
802 trace!(target: LOG_TARGET, "PSM => Drop({}, {:?}): Already disabled.",
803 entry.key().0, set_id);
804 return
805 },
806 };
807
808 match mem::replace(entry.get_mut(), PeerState::Poisoned) {
809 st @ PeerState::Disabled { .. } | st @ PeerState::Backoff { .. } => {
810 trace!(target: LOG_TARGET, "PSM => Drop({}, {:?}): Already disabled.",
811 entry.key().0, set_id);
812 *entry.into_mut() = st;
813 },
814
815 PeerState::DisabledPendingEnable { connections, timer_deadline, timer: _ } => {
817 debug_assert!(!connections.is_empty());
818 trace!(target: LOG_TARGET,
819 "PSM => Drop({}, {:?}): Interrupting pending enabling.",
820 entry.key().0, set_id);
821 *entry.into_mut() =
822 PeerState::Disabled { connections, backoff_until: Some(timer_deadline) };
823 },
824
825 PeerState::Enabled { mut connections } => {
827 trace!(target: LOG_TARGET, "PSM => Drop({}, {:?}): Disabling connections.",
828 entry.key().0, set_id);
829
830 debug_assert!(connections.iter().any(|(_, s)| matches!(
831 s,
832 ConnectionState::Opening | ConnectionState::Open(_)
833 )));
834
835 if connections.iter().any(|(_, s)| matches!(s, ConnectionState::Open(_))) {
836 trace!(target: LOG_TARGET, "External API <= Closed({}, {:?})", entry.key().0, set_id);
837 let event =
838 NotificationsOut::CustomProtocolClosed { peer_id: entry.key().0, set_id };
839 self.events.push_back(ToSwarm::GenerateEvent(event));
840 }
841
842 for (connec_id, connec_state) in
843 connections.iter_mut().filter(|(_, s)| matches!(s, ConnectionState::Opening))
844 {
845 trace!(target: LOG_TARGET, "Handler({:?}, {:?}) <= Close({:?})",
846 entry.key(), *connec_id, set_id);
847 self.events.push_back(ToSwarm::NotifyHandler {
848 peer_id: entry.key().0,
849 handler: NotifyHandler::One(*connec_id),
850 event: NotifsHandlerIn::Close { protocol_index: set_id.into() },
851 });
852 *connec_state = ConnectionState::OpeningThenClosing;
853 }
854
855 for (connec_id, connec_state) in
856 connections.iter_mut().filter(|(_, s)| matches!(s, ConnectionState::Open(_)))
857 {
858 trace!(target: LOG_TARGET, "Handler({:?}, {:?}) <= Close({:?})",
859 entry.key(), *connec_id, set_id);
860 self.events.push_back(ToSwarm::NotifyHandler {
861 peer_id: entry.key().0,
862 handler: NotifyHandler::One(*connec_id),
863 event: NotifsHandlerIn::Close { protocol_index: set_id.into() },
864 });
865 *connec_state = ConnectionState::Closing;
866 }
867
868 *entry.into_mut() = PeerState::Disabled { connections, backoff_until: None }
869 },
870
871 PeerState::Requested => {
873 trace!(target: LOG_TARGET, "PSM => Drop({}, {:?}): Not yet connected.",
877 entry.key().0, set_id);
878 entry.remove();
879 },
880
881 PeerState::PendingRequest { timer, timer_deadline } => {
883 trace!(target: LOG_TARGET, "PSM => Drop({}, {:?}): Not yet connected",
884 entry.key().0, set_id);
885 *entry.into_mut() = PeerState::Backoff { timer, timer_deadline }
886 },
887
888 PeerState::Incoming { backoff_until, connections, incoming_index, .. } => {
892 debug!(
893 target: LOG_TARGET,
894 "PSM => Drop({}, {:?}): Ignoring obsolete disconnect, we are awaiting accept/reject.",
895 entry.key().0, set_id,
896 );
897 *entry.into_mut() = PeerState::Incoming {
898 backoff_until,
899 connections,
900 incoming_index,
901 peerset_rejected: true,
902 };
903 },
904 PeerState::Poisoned => {
905 error!(target: LOG_TARGET, "State of {:?} is poisoned", entry.key());
906 debug_assert!(false);
907 },
908 }
909 }
910
911 fn peerset_report_preaccept(&mut self, index: IncomingIndex) {
914 let Some(pos) = self.incoming.iter().position(|i| i.incoming_id == index) else {
915 error!(target: LOG_TARGET, "PSM => Preaccept({:?}): Invalid index", index);
916 return
917 };
918
919 trace!(
920 target: LOG_TARGET,
921 "PSM => Preaccept({:?}): Sent to protocol for validation",
922 index
923 );
924 let incoming = &self.incoming[pos];
925
926 match self.protocol_handles[usize::from(incoming.set_id)]
927 .report_incoming_substream(incoming.peer_id, incoming.handshake.clone())
928 {
929 Ok(ValidationCallResult::Delegated) => {
930 self.protocol_report_accept(index);
931 },
932 Ok(ValidationCallResult::WaitForValidation(rx)) => {
933 self.pending_inbound_validations
934 .push(Box::pin(async move { (rx.await, index) }));
935 },
936 Err(err) => {
937 debug!(target: LOG_TARGET, "protocol has exited: {err:?} {:?}", incoming.set_id);
944
945 self.protocol_report_reject(index);
946 },
947 }
948 }
949
950 fn protocol_report_accept(&mut self, index: IncomingIndex) {
953 let (pos, incoming) =
954 if let Some(pos) = self.incoming.iter().position(|i| i.incoming_id == index) {
955 (pos, self.incoming.get(pos))
956 } else {
957 error!(target: LOG_TARGET, "PSM => Accept({:?}): Invalid index", index);
958 return
959 };
960
961 let Some(incoming) = incoming else {
962 error!(target: LOG_TARGET, "Incoming connection ({:?}) doesn't exist", index);
963 debug_assert!(false);
964 return;
965 };
966
967 if !incoming.alive {
968 trace!(
969 target: LOG_TARGET,
970 "PSM => Accept({:?}, {}, {:?}): Obsolete incoming",
971 index,
972 incoming.peer_id,
973 incoming.set_id,
974 );
975
976 match self.peers.get_mut(&(incoming.peer_id, incoming.set_id)) {
977 Some(PeerState::DisabledPendingEnable { .. }) | Some(PeerState::Enabled { .. }) => {
978 },
979 _ => {
980 trace!(target: LOG_TARGET, "PSM <= Dropped({}, {:?})",
981 incoming.peer_id, incoming.set_id);
982 self.protocol_controller_handles[usize::from(incoming.set_id)]
983 .dropped(incoming.peer_id);
984 },
985 }
986
987 self.incoming.remove(pos);
988 return
989 }
990
991 let state = match self.peers.get_mut(&(incoming.peer_id, incoming.set_id)) {
992 Some(s) => s,
993 None => {
994 log::debug!(
995 target: LOG_TARGET,
996 "Connection to {:?} closed, ({:?} {:?}), ignoring accept",
997 incoming.peer_id,
998 incoming.set_id,
999 index,
1000 );
1001 self.incoming.remove(pos);
1002 return
1003 },
1004 };
1005
1006 match mem::replace(state, PeerState::Poisoned) {
1007 PeerState::Incoming {
1009 mut connections,
1010 incoming_index,
1011 peerset_rejected,
1012 backoff_until,
1013 } => {
1014 if index < incoming_index {
1015 warn!(
1016 target: LOG_TARGET,
1017 "PSM => Accept({:?}, {}, {:?}): Ignoring obsolete incoming index, we are already awaiting {:?}.",
1018 index, incoming.peer_id, incoming.set_id, incoming_index
1019 );
1020
1021 self.incoming.remove(pos);
1022 return
1023 } else if index > incoming_index {
1024 error!(
1025 target: LOG_TARGET,
1026 "PSM => Accept({:?}, {}, {:?}): Ignoring incoming index from the future, we are awaiting {:?}.",
1027 index, incoming.peer_id, incoming.set_id, incoming_index
1028 );
1029
1030 self.incoming.remove(pos);
1031 debug_assert!(false);
1032 return
1033 }
1034
1035 if peerset_rejected {
1038 trace!(
1039 target: LOG_TARGET,
1040 "Protocol accepted ({:?} {:?} {:?}) but Peerset had requested disconnection, rejecting",
1041 index,
1042 incoming.peer_id,
1043 incoming.set_id
1044 );
1045
1046 *state = PeerState::Incoming {
1047 connections,
1048 backoff_until,
1049 peerset_rejected,
1050 incoming_index,
1051 };
1052 return self.report_reject(index).map_or((), |_| ())
1053 }
1054
1055 trace!(
1056 target: LOG_TARGET,
1057 "PSM => Accept({:?}, {}, {:?}): Enabling connections.",
1058 index,
1059 incoming.peer_id,
1060 incoming.set_id
1061 );
1062
1063 debug_assert!(connections
1064 .iter()
1065 .any(|(_, s)| matches!(s, ConnectionState::OpenDesiredByRemote)));
1066 for (connec_id, connec_state) in connections
1067 .iter_mut()
1068 .filter(|(_, s)| matches!(s, ConnectionState::OpenDesiredByRemote))
1069 {
1070 trace!(target: LOG_TARGET, "Handler({:?}, {:?}) <= Open({:?})",
1071 incoming.peer_id, *connec_id, incoming.set_id);
1072 self.events.push_back(ToSwarm::NotifyHandler {
1073 peer_id: incoming.peer_id,
1074 handler: NotifyHandler::One(*connec_id),
1075 event: NotifsHandlerIn::Open {
1076 protocol_index: incoming.set_id.into(),
1077 peer_id: incoming.peer_id,
1078 },
1079 });
1080 *connec_state = ConnectionState::Opening;
1081 }
1082
1083 self.incoming.remove(pos);
1084 *state = PeerState::Enabled { connections };
1085 },
1086 st @ PeerState::Disabled { .. } | st @ PeerState::Backoff { .. } => {
1087 self.incoming.remove(pos);
1088 *state = st;
1089 },
1090 peer => {
1092 error!(
1093 target: LOG_TARGET,
1094 "State mismatch in libp2p: Expected alive incoming. Got {:?}.",
1095 peer
1096 );
1097
1098 self.incoming.remove(pos);
1099 debug_assert!(false);
1100 },
1101 }
1102 }
1103
1104 fn peerset_report_reject(&mut self, index: IncomingIndex) {
1106 let _ = self.report_reject(index);
1107 }
1108
1109 fn protocol_report_reject(&mut self, index: IncomingIndex) {
1111 if let Some((set_id, peer_id)) = self.report_reject(index) {
1112 self.protocol_controller_handles[usize::from(set_id)].dropped(peer_id)
1113 }
1114 }
1115
1116 fn report_reject(&mut self, index: IncomingIndex) -> Option<(SetId, PeerId)> {
1118 let incoming = if let Some(pos) = self.incoming.iter().position(|i| i.incoming_id == index)
1119 {
1120 self.incoming.remove(pos)
1121 } else {
1122 error!(target: LOG_TARGET, "PSM => Reject({:?}): Invalid index", index);
1123 return None
1124 };
1125
1126 if !incoming.alive {
1127 trace!(
1128 target: LOG_TARGET,
1129 "PSM => Reject({:?}, {}, {:?}): Obsolete incoming, ignoring",
1130 index,
1131 incoming.peer_id,
1132 incoming.set_id,
1133 );
1134
1135 return None
1136 }
1137
1138 let state = match self.peers.get_mut(&(incoming.peer_id, incoming.set_id)) {
1139 Some(s) => s,
1140 None => {
1141 log::debug!(
1142 target: LOG_TARGET,
1143 "Connection to {:?} closed, ({:?} {:?}), ignoring accept",
1144 incoming.peer_id,
1145 incoming.set_id,
1146 index,
1147 );
1148 return None
1149 },
1150 };
1151
1152 match mem::replace(state, PeerState::Poisoned) {
1153 PeerState::Incoming { mut connections, backoff_until, incoming_index, .. } => {
1155 if index < incoming_index {
1156 warn!(
1157 target: LOG_TARGET,
1158 "PSM => Reject({:?}, {}, {:?}): Ignoring obsolete incoming index, we are already awaiting {:?}.",
1159 index, incoming.peer_id, incoming.set_id, incoming_index
1160 );
1161 return None
1162 } else if index > incoming_index {
1163 error!(
1164 target: LOG_TARGET,
1165 "PSM => Reject({:?}, {}, {:?}): Ignoring incoming index from the future, we are awaiting {:?}.",
1166 index, incoming.peer_id, incoming.set_id, incoming_index
1167 );
1168 debug_assert!(false);
1169 return None
1170 }
1171
1172 trace!(target: LOG_TARGET, "PSM => Reject({:?}, {}, {:?}): Rejecting connections.",
1173 index, incoming.peer_id, incoming.set_id);
1174
1175 debug_assert!(connections
1176 .iter()
1177 .any(|(_, s)| matches!(s, ConnectionState::OpenDesiredByRemote)));
1178 for (connec_id, connec_state) in connections
1179 .iter_mut()
1180 .filter(|(_, s)| matches!(s, ConnectionState::OpenDesiredByRemote))
1181 {
1182 trace!(target: LOG_TARGET, "Handler({:?}, {:?}) <= Close({:?})",
1183 incoming.peer_id, connec_id, incoming.set_id);
1184 self.events.push_back(ToSwarm::NotifyHandler {
1185 peer_id: incoming.peer_id,
1186 handler: NotifyHandler::One(*connec_id),
1187 event: NotifsHandlerIn::Close { protocol_index: incoming.set_id.into() },
1188 });
1189 *connec_state = ConnectionState::Closing;
1190 }
1191
1192 *state = PeerState::Disabled { connections, backoff_until };
1193 Some((incoming.set_id, incoming.peer_id))
1194 },
1195 st @ PeerState::Disabled { .. } | st @ PeerState::Backoff { .. } => {
1197 *state = st;
1198 None
1199 },
1200 peer => {
1201 error!(
1202 target: LOG_TARGET,
1203 "State mismatch in libp2p: Expected alive incoming. Got {peer:?}.",
1204 );
1205 None
1206 },
1207 }
1208 }
1209}
1210
1211impl NetworkBehaviour for Notifications {
1212 type ConnectionHandler = NotifsHandler;
1213 type ToSwarm = NotificationsOut;
1214
1215 fn handle_pending_inbound_connection(
1216 &mut self,
1217 _connection_id: ConnectionId,
1218 _local_addr: &Multiaddr,
1219 _remote_addr: &Multiaddr,
1220 ) -> Result<(), ConnectionDenied> {
1221 Ok(())
1222 }
1223
1224 fn handle_pending_outbound_connection(
1225 &mut self,
1226 _connection_id: ConnectionId,
1227 _maybe_peer: Option<PeerId>,
1228 _addresses: &[Multiaddr],
1229 _effective_role: Endpoint,
1230 ) -> Result<Vec<Multiaddr>, ConnectionDenied> {
1231 Ok(Vec::new())
1232 }
1233
1234 fn handle_established_inbound_connection(
1235 &mut self,
1236 _connection_id: ConnectionId,
1237 peer: PeerId,
1238 _local_addr: &Multiaddr,
1239 _remote_addr: &Multiaddr,
1240 ) -> Result<THandler<Self>, ConnectionDenied> {
1241 Ok(NotifsHandler::new(peer, self.notif_protocols.clone(), Some(self.metrics.clone())))
1242 }
1243
1244 fn handle_established_outbound_connection(
1245 &mut self,
1246 _connection_id: ConnectionId,
1247 peer: PeerId,
1248 _addr: &Multiaddr,
1249 _role_override: Endpoint,
1250 _port_use: PortUse,
1251 ) -> Result<THandler<Self>, ConnectionDenied> {
1252 Ok(NotifsHandler::new(peer, self.notif_protocols.clone(), Some(self.metrics.clone())))
1253 }
1254
1255 fn on_swarm_event(&mut self, event: FromSwarm) {
1256 match event {
1257 FromSwarm::ConnectionEstablished(ConnectionEstablished {
1258 peer_id,
1259 endpoint,
1260 connection_id,
1261 ..
1262 }) => {
1263 for set_id in (0..self.notif_protocols.len()).map(SetId::from) {
1264 match self.peers.entry((peer_id, set_id)).or_insert(PeerState::Poisoned) {
1265 st @ &mut PeerState::Requested |
1267 st @ &mut PeerState::PendingRequest { .. } => {
1268 trace!(target: LOG_TARGET,
1269 "Libp2p => Connected({}, {:?}, {:?}): Connection was requested by PSM.",
1270 peer_id, set_id, endpoint
1271 );
1272 trace!(target: LOG_TARGET, "Handler({:?}, {:?}) <= Open({:?})", peer_id, connection_id, set_id);
1273 self.events.push_back(ToSwarm::NotifyHandler {
1274 peer_id,
1275 handler: NotifyHandler::One(connection_id),
1276 event: NotifsHandlerIn::Open {
1277 protocol_index: set_id.into(),
1278 peer_id,
1279 },
1280 });
1281
1282 let mut connections = SmallVec::new();
1283 connections.push((connection_id, ConnectionState::Opening));
1284 *st = PeerState::Enabled { connections };
1285 },
1286
1287 st @ &mut PeerState::Poisoned | st @ &mut PeerState::Backoff { .. } => {
1290 let backoff_until =
1291 if let PeerState::Backoff { timer_deadline, .. } = st {
1292 Some(*timer_deadline)
1293 } else {
1294 None
1295 };
1296 trace!(target: LOG_TARGET,
1297 "Libp2p => Connected({}, {:?}, {:?}, {:?}): Not requested by PSM, disabling.",
1298 peer_id, set_id, endpoint, connection_id);
1299
1300 let mut connections = SmallVec::new();
1301 connections.push((connection_id, ConnectionState::Closed));
1302 *st = PeerState::Disabled { connections, backoff_until };
1303 },
1304
1305 PeerState::Incoming { connections, .. } |
1308 PeerState::Disabled { connections, .. } |
1309 PeerState::DisabledPendingEnable { connections, .. } |
1310 PeerState::Enabled { connections, .. } => {
1311 trace!(target: LOG_TARGET,
1312 "Libp2p => Connected({}, {:?}, {:?}, {:?}): Secondary connection. Leaving closed.",
1313 peer_id, set_id, endpoint, connection_id);
1314 connections.push((connection_id, ConnectionState::Closed));
1315 },
1316 }
1317 }
1318 },
1319 FromSwarm::ConnectionClosed(ConnectionClosed { peer_id, connection_id, .. }) => {
1320 for set_id in (0..self.notif_protocols.len()).map(SetId::from) {
1321 let mut entry = if let Entry::Occupied(entry) =
1322 self.peers.entry((peer_id, set_id))
1323 {
1324 entry
1325 } else {
1326 error!(target: LOG_TARGET, "inject_connection_closed: State mismatch in the custom protos handler");
1327 debug_assert!(false);
1328 return
1329 };
1330
1331 match mem::replace(entry.get_mut(), PeerState::Poisoned) {
1332 PeerState::Disabled { mut connections, backoff_until } => {
1334 trace!(target: LOG_TARGET, "Libp2p => Disconnected({}, {:?}, {:?}): Disabled.",
1335 peer_id, set_id, connection_id);
1336
1337 if let Some(pos) =
1338 connections.iter().position(|(c, _)| *c == connection_id)
1339 {
1340 connections.remove(pos);
1341 } else {
1342 debug_assert!(false);
1343 error!(target: LOG_TARGET,
1344 "inject_connection_closed: State mismatch in the custom protos handler");
1345 }
1346
1347 if connections.is_empty() {
1348 if let Some(until) = backoff_until {
1349 let now = Instant::now();
1350 if until > now {
1351 let delay_id = self.next_delay_id;
1352 self.next_delay_id.0 += 1;
1353 let delay = futures_timer::Delay::new(until - now);
1354 self.delays.push(
1355 async move {
1356 delay.await;
1357 (delay_id, peer_id, set_id)
1358 }
1359 .boxed(),
1360 );
1361
1362 *entry.get_mut() = PeerState::Backoff {
1363 timer: delay_id,
1364 timer_deadline: until,
1365 };
1366 } else {
1367 entry.remove();
1368 }
1369 } else {
1370 entry.remove();
1371 }
1372 } else {
1373 *entry.get_mut() =
1374 PeerState::Disabled { connections, backoff_until };
1375 }
1376 },
1377
1378 PeerState::DisabledPendingEnable {
1380 mut connections,
1381 timer_deadline,
1382 timer,
1383 } => {
1384 trace!(
1385 target: LOG_TARGET,
1386 "Libp2p => Disconnected({}, {:?}, {:?}): Disabled but pending enable.",
1387 peer_id, set_id, connection_id
1388 );
1389
1390 if let Some(pos) =
1391 connections.iter().position(|(c, _)| *c == connection_id)
1392 {
1393 connections.remove(pos);
1394 } else {
1395 error!(target: LOG_TARGET,
1396 "inject_connection_closed: State mismatch in the custom protos handler");
1397 debug_assert!(false);
1398 }
1399
1400 if connections.is_empty() {
1401 trace!(target: LOG_TARGET, "PSM <= Dropped({}, {:?})", peer_id, set_id);
1402 self.protocol_controller_handles[usize::from(set_id)]
1403 .dropped(peer_id);
1404 *entry.get_mut() = PeerState::Backoff { timer, timer_deadline };
1405 } else {
1406 *entry.get_mut() = PeerState::DisabledPendingEnable {
1407 connections,
1408 timer_deadline,
1409 timer,
1410 };
1411 }
1412 },
1413
1414 PeerState::Incoming {
1416 mut connections,
1417 backoff_until,
1418 incoming_index,
1419 peerset_rejected,
1420 } => {
1421 trace!(
1422 target: LOG_TARGET,
1423 "Libp2p => Disconnected({}, {:?}, {:?}): OpenDesiredByRemote.",
1424 peer_id, set_id, connection_id
1425 );
1426
1427 debug_assert!(connections
1428 .iter()
1429 .any(|(_, s)| matches!(s, ConnectionState::OpenDesiredByRemote)));
1430
1431 if let Some(pos) =
1432 connections.iter().position(|(c, _)| *c == connection_id)
1433 {
1434 connections.remove(pos);
1435 } else {
1436 error!(target: LOG_TARGET,
1437 "inject_connection_closed: State mismatch in the custom protos handler");
1438 debug_assert!(false);
1439 }
1440
1441 let no_desired_left = !connections
1442 .iter()
1443 .any(|(_, s)| matches!(s, ConnectionState::OpenDesiredByRemote));
1444
1445 if no_desired_left {
1448 if let Some(state) = self
1452 .incoming
1453 .iter_mut()
1454 .find(|i| i.alive && i.set_id == set_id && i.peer_id == peer_id)
1455 {
1456 state.alive = false;
1457 } else {
1458 error!(target: LOG_TARGET, "State mismatch in libp2p: no entry in \
1459 incoming corresponding to an incoming state in peers");
1460 debug_assert!(false);
1461 }
1462 }
1463
1464 if connections.is_empty() {
1465 if let Some(until) = backoff_until {
1466 let now = Instant::now();
1467 if until > now {
1468 let delay_id = self.next_delay_id;
1469 self.next_delay_id.0 += 1;
1470 let delay = futures_timer::Delay::new(until - now);
1471 self.delays.push(
1472 async move {
1473 delay.await;
1474 (delay_id, peer_id, set_id)
1475 }
1476 .boxed(),
1477 );
1478
1479 *entry.get_mut() = PeerState::Backoff {
1480 timer: delay_id,
1481 timer_deadline: until,
1482 };
1483 } else {
1484 entry.remove();
1485 }
1486 } else {
1487 entry.remove();
1488 }
1489 } else if no_desired_left {
1490 *entry.get_mut() =
1493 PeerState::Disabled { connections, backoff_until };
1494 } else {
1495 *entry.get_mut() = PeerState::Incoming {
1496 connections,
1497 backoff_until,
1498 incoming_index,
1499 peerset_rejected,
1500 };
1501 }
1502 },
1503
1504 PeerState::Enabled { mut connections } => {
1507 trace!(
1508 target: LOG_TARGET,
1509 "Libp2p => Disconnected({}, {:?}, {:?}): Enabled.",
1510 peer_id, set_id, connection_id
1511 );
1512
1513 debug_assert!(connections.iter().any(|(_, s)| matches!(
1514 s,
1515 ConnectionState::Opening | ConnectionState::Open(_)
1516 )));
1517
1518 if let Some(pos) =
1519 connections.iter().position(|(c, _)| *c == connection_id)
1520 {
1521 let (_, state) = connections.remove(pos);
1522 if let ConnectionState::Open(_) = state {
1523 if let Some((replacement_pos, replacement_sink)) = connections
1524 .iter()
1525 .enumerate()
1526 .find_map(|(num, (_, s))| match s {
1527 ConnectionState::Open(s) => Some((num, s.clone())),
1528 _ => None,
1529 }) {
1530 if pos <= replacement_pos {
1531 trace!(
1532 target: LOG_TARGET,
1533 "External API <= Sink replaced({}, {:?})",
1534 peer_id, set_id
1535 );
1536 let event = NotificationsOut::CustomProtocolReplaced {
1537 peer_id,
1538 set_id,
1539 notifications_sink: replacement_sink.clone(),
1540 };
1541 self.events.push_back(ToSwarm::GenerateEvent(event));
1542 }
1543 } else {
1544 trace!(
1545 target: LOG_TARGET, "External API <= Closed({}, {:?})",
1546 peer_id, set_id
1547 );
1548 let event = NotificationsOut::CustomProtocolClosed {
1549 peer_id,
1550 set_id,
1551 };
1552 self.events.push_back(ToSwarm::GenerateEvent(event));
1553 }
1554 }
1555 } else {
1556 error!(target: LOG_TARGET,
1557 "inject_connection_closed: State mismatch in the custom protos handler");
1558 debug_assert!(false);
1559 }
1560
1561 if connections.is_empty() {
1562 trace!(target: LOG_TARGET, "PSM <= Dropped({}, {:?})", peer_id, set_id);
1563 self.protocol_controller_handles[usize::from(set_id)]
1564 .dropped(peer_id);
1565 let ban_dur = Uniform::new(5, 10).sample(&mut rand::thread_rng());
1566
1567 let delay_id = self.next_delay_id;
1568 self.next_delay_id.0 += 1;
1569 let delay = futures_timer::Delay::new(Duration::from_secs(ban_dur));
1570 self.delays.push(
1571 async move {
1572 delay.await;
1573 (delay_id, peer_id, set_id)
1574 }
1575 .boxed(),
1576 );
1577
1578 *entry.get_mut() = PeerState::Backoff {
1579 timer: delay_id,
1580 timer_deadline: Instant::now() + Duration::from_secs(ban_dur),
1581 };
1582 } else if !connections.iter().any(|(_, s)| {
1583 matches!(s, ConnectionState::Opening | ConnectionState::Open(_))
1584 }) {
1585 trace!(target: LOG_TARGET, "PSM <= Dropped({}, {:?})", peer_id, set_id);
1586 self.protocol_controller_handles[usize::from(set_id)]
1587 .dropped(peer_id);
1588
1589 *entry.get_mut() =
1590 PeerState::Disabled { connections, backoff_until: None };
1591 } else {
1592 *entry.get_mut() = PeerState::Enabled { connections };
1593 }
1594 },
1595
1596 PeerState::Requested |
1597 PeerState::PendingRequest { .. } |
1598 PeerState::Backoff { .. } => {
1599 error!(target: LOG_TARGET,
1601 "`inject_connection_closed` called for unknown peer {}",
1602 peer_id);
1603 debug_assert!(false);
1604 },
1605 PeerState::Poisoned => {
1606 error!(target: LOG_TARGET, "State of peer {} is poisoned", peer_id);
1607 debug_assert!(false);
1608 },
1609 }
1610 }
1611 },
1612 FromSwarm::DialFailure(DialFailure { peer_id, error, .. }) => {
1613 if let DialError::Transport(errors) = error {
1614 for (addr, error) in errors.iter() {
1615 trace!(target: LOG_TARGET, "Libp2p => Reach failure for {:?} through {:?}: {:?}", peer_id, addr, error);
1616 }
1617 }
1618
1619 if let Some(peer_id) = peer_id {
1620 trace!(target: LOG_TARGET, "Libp2p => Dial failure for {:?}", peer_id);
1621
1622 for set_id in (0..self.notif_protocols.len()).map(SetId::from) {
1623 if let Entry::Occupied(mut entry) = self.peers.entry((peer_id, set_id)) {
1624 match mem::replace(entry.get_mut(), PeerState::Poisoned) {
1625 st @ PeerState::Backoff { .. } => {
1627 *entry.into_mut() = st;
1628 },
1629
1630 st @ PeerState::Requested |
1633 st @ PeerState::PendingRequest { .. } => {
1634 trace!(target: LOG_TARGET, "PSM <= Dropped({}, {:?})", peer_id, set_id);
1635 self.protocol_controller_handles[usize::from(set_id)]
1636 .dropped(peer_id);
1637
1638 let now = Instant::now();
1639 let ban_duration = match st {
1640 PeerState::PendingRequest { timer_deadline, .. }
1641 if timer_deadline > now =>
1642 cmp::max(timer_deadline - now, Duration::from_secs(5)),
1643 _ => Duration::from_secs(5),
1644 };
1645
1646 let delay_id = self.next_delay_id;
1647 self.next_delay_id.0 += 1;
1648 let delay = futures_timer::Delay::new(ban_duration);
1649 self.delays.push(
1650 async move {
1651 delay.await;
1652 (delay_id, peer_id, set_id)
1653 }
1654 .boxed(),
1655 );
1656
1657 *entry.into_mut() = PeerState::Backoff {
1658 timer: delay_id,
1659 timer_deadline: now + ban_duration,
1660 };
1661 },
1662
1663 st @ PeerState::Disabled { .. } |
1666 st @ PeerState::Enabled { .. } |
1667 st @ PeerState::DisabledPendingEnable { .. } |
1668 st @ PeerState::Incoming { .. } => {
1669 *entry.into_mut() = st;
1670 },
1671
1672 PeerState::Poisoned => {
1673 error!(target: LOG_TARGET, "State of {:?} is poisoned", peer_id);
1674 debug_assert!(false);
1675 },
1676 }
1677 }
1678 }
1679 }
1680 },
1681 FromSwarm::ListenerClosed(_) => {},
1682 FromSwarm::ListenFailure(_) => {},
1683 FromSwarm::ListenerError(_) => {},
1684 FromSwarm::ExternalAddrExpired(_) => {},
1685 FromSwarm::NewListener(_) => {},
1686 FromSwarm::ExpiredListenAddr(_) => {},
1687 FromSwarm::NewExternalAddrCandidate(_) => {},
1688 FromSwarm::ExternalAddrConfirmed(_) => {},
1689 FromSwarm::AddressChange(_) => {},
1690 FromSwarm::NewListenAddr(_) => {},
1691 FromSwarm::NewExternalAddrOfPeer(_) => {},
1692 event => {
1693 warn!(target: LOG_TARGET, "New unknown `FromSwarm` libp2p event: {event:?}");
1694 },
1695 }
1696 }
1697
1698 fn on_connection_handler_event(
1699 &mut self,
1700 peer_id: PeerId,
1701 connection_id: ConnectionId,
1702 event: THandlerOutEvent<Self>,
1703 ) {
1704 match event {
1705 NotifsHandlerOut::OpenDesiredByRemote { protocol_index, handshake } => {
1706 let set_id = SetId::from(protocol_index);
1707
1708 trace!(target: LOG_TARGET,
1709 "Handler({:?}, {:?}]) => OpenDesiredByRemote({:?})",
1710 peer_id, connection_id, set_id);
1711
1712 let mut entry = if let Entry::Occupied(entry) = self.peers.entry((peer_id, set_id))
1713 {
1714 entry
1715 } else {
1716 error!(
1717 target: LOG_TARGET,
1718 "OpenDesiredByRemote: State mismatch in the custom protos handler"
1719 );
1720 debug_assert!(false);
1721 return
1722 };
1723
1724 match mem::replace(entry.get_mut(), PeerState::Poisoned) {
1725 PeerState::Incoming {
1727 mut connections,
1728 backoff_until,
1729 incoming_index,
1730 peerset_rejected,
1731 } => {
1732 debug_assert!(connections
1733 .iter()
1734 .any(|(_, s)| matches!(s, ConnectionState::OpenDesiredByRemote)));
1735 if let Some((_, connec_state)) =
1736 connections.iter_mut().find(|(c, _)| *c == connection_id)
1737 {
1738 if let ConnectionState::Closed = *connec_state {
1739 *connec_state = ConnectionState::OpenDesiredByRemote;
1740 } else {
1741 debug_assert!(matches!(
1747 connec_state,
1748 ConnectionState::OpeningThenClosing | ConnectionState::Closing
1749 ));
1750 }
1751 } else {
1752 error!(
1753 target: LOG_TARGET,
1754 "OpenDesiredByRemote: State mismatch in the custom protos handler"
1755 );
1756 debug_assert!(false);
1757 }
1758
1759 *entry.into_mut() = PeerState::Incoming {
1760 connections,
1761 backoff_until,
1762 incoming_index,
1763 peerset_rejected,
1764 };
1765 },
1766
1767 PeerState::Enabled { mut connections } => {
1768 debug_assert!(connections.iter().any(|(_, s)| matches!(
1769 s,
1770 ConnectionState::Opening | ConnectionState::Open(_)
1771 )));
1772
1773 if let Some((_, connec_state)) =
1774 connections.iter_mut().find(|(c, _)| *c == connection_id)
1775 {
1776 if let ConnectionState::Closed = *connec_state {
1777 trace!(target: LOG_TARGET, "Handler({:?}, {:?}) <= Open({:?})",
1778 peer_id, connection_id, set_id);
1779 self.events.push_back(ToSwarm::NotifyHandler {
1780 peer_id,
1781 handler: NotifyHandler::One(connection_id),
1782 event: NotifsHandlerIn::Open {
1783 protocol_index: set_id.into(),
1784 peer_id,
1785 },
1786 });
1787 *connec_state = ConnectionState::Opening;
1788 } else {
1789 debug_assert!(matches!(
1795 connec_state,
1796 ConnectionState::OpenDesiredByRemote |
1797 ConnectionState::Closing | ConnectionState::Opening
1798 ));
1799 }
1800 } else {
1801 error!(
1802 target: LOG_TARGET,
1803 "OpenDesiredByRemote: State mismatch in the custom protos handler"
1804 );
1805 debug_assert!(false);
1806 }
1807
1808 *entry.into_mut() = PeerState::Enabled { connections };
1809 },
1810
1811 PeerState::Disabled { mut connections, backoff_until } => {
1813 if let Some((_, connec_state)) =
1814 connections.iter_mut().find(|(c, _)| *c == connection_id)
1815 {
1816 if let ConnectionState::Closed = *connec_state {
1817 *connec_state = ConnectionState::OpenDesiredByRemote;
1818
1819 let incoming_id = self.next_incoming_index;
1820 self.next_incoming_index.0 += 1;
1821
1822 trace!(target: LOG_TARGET, "PSM <= Incoming({}, {:?}, {:?}).",
1823 peer_id, set_id, incoming_id);
1824 self.protocol_controller_handles[usize::from(set_id)]
1825 .incoming_connection(peer_id, incoming_id);
1826 self.incoming.push(IncomingPeer {
1827 peer_id,
1828 set_id,
1829 alive: true,
1830 incoming_id,
1831 handshake,
1832 });
1833
1834 *entry.into_mut() = PeerState::Incoming {
1835 connections,
1836 backoff_until,
1837 peerset_rejected: false,
1838 incoming_index: incoming_id,
1839 };
1840 } else {
1841 debug_assert!(matches!(
1846 connec_state,
1847 ConnectionState::OpeningThenClosing | ConnectionState::Closing
1848 ));
1849 *entry.into_mut() =
1850 PeerState::Disabled { connections, backoff_until };
1851 }
1852 } else {
1853 error!(
1854 target: LOG_TARGET,
1855 "OpenDesiredByRemote: State mismatch in the custom protos handler"
1856 );
1857 debug_assert!(false);
1858 }
1859 },
1860
1861 PeerState::DisabledPendingEnable { mut connections, timer, timer_deadline } => {
1863 if let Some((_, connec_state)) =
1864 connections.iter_mut().find(|(c, _)| *c == connection_id)
1865 {
1866 if let ConnectionState::Closed = *connec_state {
1867 trace!(target: LOG_TARGET, "Handler({:?}, {:?}) <= Open({:?})",
1868 peer_id, connection_id, set_id);
1869 self.events.push_back(ToSwarm::NotifyHandler {
1870 peer_id,
1871 handler: NotifyHandler::One(connection_id),
1872 event: NotifsHandlerIn::Open {
1873 protocol_index: set_id.into(),
1874 peer_id,
1875 },
1876 });
1877 *connec_state = ConnectionState::Opening;
1878
1879 *entry.into_mut() = PeerState::Enabled { connections };
1880 } else {
1881 debug_assert!(matches!(
1886 connec_state,
1887 ConnectionState::OpeningThenClosing | ConnectionState::Closing
1888 ));
1889 *entry.into_mut() = PeerState::DisabledPendingEnable {
1890 connections,
1891 timer,
1892 timer_deadline,
1893 };
1894 }
1895 } else {
1896 error!(
1897 target: LOG_TARGET,
1898 "OpenDesiredByRemote: State mismatch in the custom protos handler"
1899 );
1900 debug_assert!(false);
1901 }
1902 },
1903
1904 state => {
1905 error!(target: LOG_TARGET,
1906 "OpenDesiredByRemote: Unexpected state in the custom protos handler: {:?}",
1907 state);
1908 debug_assert!(false);
1909 },
1910 };
1911 },
1912
1913 NotifsHandlerOut::CloseDesired { protocol_index, reason } => {
1914 let set_id = SetId::from(protocol_index);
1915
1916 trace!(target: LOG_TARGET,
1917 "Handler({}, {:?}) => CloseDesired({:?})",
1918 peer_id, connection_id, set_id);
1919
1920 let mut entry = if let Entry::Occupied(entry) = self.peers.entry((peer_id, set_id))
1921 {
1922 entry
1923 } else {
1924 error!(target: LOG_TARGET, "CloseDesired: State mismatch in the custom protos handler");
1925 debug_assert!(false);
1926 return
1927 };
1928
1929 if reason == CloseReason::ProtocolMisbehavior {
1930 self.events.push_back(ToSwarm::GenerateEvent(
1931 NotificationsOut::ProtocolMisbehavior { peer_id, set_id },
1932 ));
1933 }
1934
1935 match mem::replace(entry.get_mut(), PeerState::Poisoned) {
1936 PeerState::Enabled { mut connections } => {
1938 debug_assert!(connections.iter().any(|(_, s)| matches!(
1939 s,
1940 ConnectionState::Opening | ConnectionState::Open(_)
1941 )));
1942
1943 let pos = if let Some(pos) =
1944 connections.iter().position(|(c, _)| *c == connection_id)
1945 {
1946 pos
1947 } else {
1948 error!(target: LOG_TARGET,
1949 "CloseDesired: State mismatch in the custom protos handler");
1950 debug_assert!(false);
1951 return
1952 };
1953
1954 if matches!(connections[pos].1, ConnectionState::Closing) {
1955 *entry.into_mut() = PeerState::Enabled { connections };
1956 return
1957 }
1958
1959 debug_assert!(matches!(connections[pos].1, ConnectionState::Open(_)));
1960 connections[pos].1 = ConnectionState::Closing;
1961
1962 trace!(target: LOG_TARGET, "Handler({}, {:?}) <= Close({:?})", peer_id, connection_id, set_id);
1963 self.events.push_back(ToSwarm::NotifyHandler {
1964 peer_id,
1965 handler: NotifyHandler::One(connection_id),
1966 event: NotifsHandlerIn::Close { protocol_index: set_id.into() },
1967 });
1968
1969 if let Some((replacement_pos, replacement_sink)) =
1970 connections.iter().enumerate().find_map(|(num, (_, s))| match s {
1971 ConnectionState::Open(s) => Some((num, s.clone())),
1972 _ => None,
1973 }) {
1974 if pos <= replacement_pos {
1975 trace!(target: LOG_TARGET, "External API <= Sink replaced({:?}, {:?})", peer_id, set_id);
1976 let event = NotificationsOut::CustomProtocolReplaced {
1977 peer_id,
1978 set_id,
1979 notifications_sink: replacement_sink.clone(),
1980 };
1981 self.events.push_back(ToSwarm::GenerateEvent(event));
1982 }
1983
1984 *entry.into_mut() = PeerState::Enabled { connections };
1985 } else {
1986 if !connections
1988 .iter()
1989 .any(|(_, s)| matches!(s, ConnectionState::Opening))
1990 {
1991 trace!(target: LOG_TARGET, "PSM <= Dropped({}, {:?})", peer_id, set_id);
1992 self.protocol_controller_handles[usize::from(set_id)]
1993 .dropped(peer_id);
1994 *entry.into_mut() =
1995 PeerState::Disabled { connections, backoff_until: None };
1996 } else {
1997 *entry.into_mut() = PeerState::Enabled { connections };
1998 }
1999
2000 trace!(target: LOG_TARGET, "External API <= Closed({}, {:?})", peer_id, set_id);
2001 let event = NotificationsOut::CustomProtocolClosed { peer_id, set_id };
2002 self.events.push_back(ToSwarm::GenerateEvent(event));
2003 }
2004 },
2005
2006 state @ PeerState::Disabled { .. } |
2009 state @ PeerState::DisabledPendingEnable { .. } => {
2010 *entry.into_mut() = state;
2011 },
2012 state => {
2013 error!(target: LOG_TARGET,
2014 "Unexpected state in the custom protos handler: {:?}",
2015 state);
2016 },
2017 }
2018 },
2019
2020 NotifsHandlerOut::CloseResult { protocol_index } => {
2021 let set_id = SetId::from(protocol_index);
2022
2023 trace!(target: LOG_TARGET,
2024 "Handler({}, {:?}) => CloseResult({:?})",
2025 peer_id, connection_id, set_id);
2026
2027 match self.peers.get_mut(&(peer_id, set_id)) {
2028 Some(PeerState::Incoming { connections, .. }) |
2030 Some(PeerState::DisabledPendingEnable { connections, .. }) |
2031 Some(PeerState::Disabled { connections, .. }) |
2032 Some(PeerState::Enabled { connections, .. }) => {
2033 if let Some((_, connec_state)) = connections.iter_mut().find(|(c, s)| {
2034 *c == connection_id && matches!(s, ConnectionState::Closing)
2035 }) {
2036 *connec_state = ConnectionState::Closed;
2037 } else {
2038 error!(target: LOG_TARGET,
2039 "CloseResult: State mismatch in the custom protos handler");
2040 debug_assert!(false);
2041 }
2042 },
2043
2044 state => {
2045 error!(target: LOG_TARGET,
2046 "CloseResult: Unexpected state in the custom protos handler: {:?}",
2047 state);
2048 debug_assert!(false);
2049 },
2050 }
2051 },
2052
2053 NotifsHandlerOut::OpenResultOk {
2054 protocol_index,
2055 negotiated_fallback,
2056 received_handshake,
2057 notifications_sink,
2058 inbound,
2059 ..
2060 } => {
2061 let set_id = SetId::from(protocol_index);
2062 trace!(target: LOG_TARGET,
2063 "Handler({}, {:?}) => OpenResultOk({:?})",
2064 peer_id, connection_id, set_id);
2065
2066 match self.peers.get_mut(&(peer_id, set_id)) {
2067 Some(PeerState::Enabled { connections, .. }) => {
2068 debug_assert!(connections.iter().any(|(_, s)| matches!(
2069 s,
2070 ConnectionState::Opening | ConnectionState::Open(_)
2071 )));
2072 let any_open =
2073 connections.iter().any(|(_, s)| matches!(s, ConnectionState::Open(_)));
2074
2075 if let Some((_, connec_state)) = connections.iter_mut().find(|(c, s)| {
2076 *c == connection_id && matches!(s, ConnectionState::Opening)
2077 }) {
2078 if !any_open {
2079 trace!(target: LOG_TARGET, "External API <= Open({}, {:?})", peer_id, set_id);
2080 let event = NotificationsOut::CustomProtocolOpen {
2081 peer_id,
2082 set_id,
2083 direction: if inbound {
2084 Direction::Inbound
2085 } else {
2086 Direction::Outbound
2087 },
2088 received_handshake: received_handshake.clone(),
2089 negotiated_fallback: negotiated_fallback.clone(),
2090 notifications_sink: notifications_sink.clone(),
2091 };
2092 self.events.push_back(ToSwarm::GenerateEvent(event));
2093 }
2094 *connec_state = ConnectionState::Open(notifications_sink);
2095 } else if let Some((_, connec_state)) =
2096 connections.iter_mut().find(|(c, s)| {
2097 *c == connection_id &&
2098 matches!(s, ConnectionState::OpeningThenClosing)
2099 }) {
2100 *connec_state = ConnectionState::Closing;
2101 } else {
2102 error!(target: LOG_TARGET,
2103 "OpenResultOk State mismatch in the custom protos handler");
2104 debug_assert!(false);
2105 }
2106 },
2107
2108 Some(PeerState::Incoming { connections, .. }) |
2109 Some(PeerState::DisabledPendingEnable { connections, .. }) |
2110 Some(PeerState::Disabled { connections, .. }) => {
2111 if let Some((_, connec_state)) = connections.iter_mut().find(|(c, s)| {
2112 *c == connection_id && matches!(s, ConnectionState::OpeningThenClosing)
2113 }) {
2114 *connec_state = ConnectionState::Closing;
2115 } else {
2116 error!(target: LOG_TARGET,
2117 "OpenResultOk State mismatch in the custom protos handler");
2118 debug_assert!(false);
2119 }
2120 },
2121
2122 state => {
2123 error!(target: LOG_TARGET,
2124 "OpenResultOk: Unexpected state in the custom protos handler: {:?}",
2125 state);
2126 debug_assert!(false);
2127 },
2128 }
2129 },
2130
2131 NotifsHandlerOut::OpenResultErr { protocol_index } => {
2132 let set_id = SetId::from(protocol_index);
2133 trace!(target: LOG_TARGET,
2134 "Handler({:?}, {:?}) => OpenResultErr({:?})",
2135 peer_id, connection_id, set_id);
2136
2137 let mut entry = if let Entry::Occupied(entry) = self.peers.entry((peer_id, set_id))
2138 {
2139 entry
2140 } else {
2141 error!(target: LOG_TARGET, "OpenResultErr: State mismatch in the custom protos handler");
2142 debug_assert!(false);
2143 return
2144 };
2145
2146 match mem::replace(entry.get_mut(), PeerState::Poisoned) {
2147 PeerState::Enabled { mut connections } => {
2148 debug_assert!(connections.iter().any(|(_, s)| matches!(
2149 s,
2150 ConnectionState::Opening | ConnectionState::Open(_)
2151 )));
2152
2153 if let Some((_, connec_state)) = connections.iter_mut().find(|(c, s)| {
2154 *c == connection_id && matches!(s, ConnectionState::Opening)
2155 }) {
2156 *connec_state = ConnectionState::Closed;
2157 } else if let Some((_, connec_state)) =
2158 connections.iter_mut().find(|(c, s)| {
2159 *c == connection_id &&
2160 matches!(s, ConnectionState::OpeningThenClosing)
2161 }) {
2162 *connec_state = ConnectionState::Closing;
2163 } else {
2164 error!(target: LOG_TARGET,
2165 "OpenResultErr: State mismatch in the custom protos handler");
2166 debug_assert!(false);
2167 }
2168
2169 if !connections.iter().any(|(_, s)| {
2170 matches!(s, ConnectionState::Opening | ConnectionState::Open(_))
2171 }) {
2172 trace!(target: LOG_TARGET, "PSM <= Dropped({:?}, {:?})", peer_id, set_id);
2173 self.protocol_controller_handles[usize::from(set_id)].dropped(peer_id);
2174
2175 let ban_dur = Uniform::new(5, 10).sample(&mut rand::thread_rng());
2176 *entry.into_mut() = PeerState::Disabled {
2177 connections,
2178 backoff_until: Some(Instant::now() + Duration::from_secs(ban_dur)),
2179 };
2180 } else {
2181 *entry.into_mut() = PeerState::Enabled { connections };
2182 }
2183 },
2184 mut state @ PeerState::Incoming { .. } |
2185 mut state @ PeerState::DisabledPendingEnable { .. } |
2186 mut state @ PeerState::Disabled { .. } => {
2187 match &mut state {
2188 PeerState::Incoming { connections, .. } |
2189 PeerState::Disabled { connections, .. } |
2190 PeerState::DisabledPendingEnable { connections, .. } => {
2191 if let Some((_, connec_state)) =
2192 connections.iter_mut().find(|(c, s)| {
2193 *c == connection_id &&
2194 matches!(s, ConnectionState::OpeningThenClosing)
2195 }) {
2196 *connec_state = ConnectionState::Closing;
2197 } else {
2198 error!(target: LOG_TARGET,
2199 "OpenResultErr: State mismatch in the custom protos handler");
2200 debug_assert!(false);
2201 }
2202 },
2203 _ => unreachable!(
2204 "Match branches are the same as the one on which we
2205 enter this block; qed"
2206 ),
2207 };
2208
2209 *entry.into_mut() = state;
2210 },
2211 state => {
2212 error!(target: LOG_TARGET,
2213 "Unexpected state in the custom protos handler: {:?}",
2214 state);
2215 debug_assert!(false);
2216 },
2217 };
2218 },
2219
2220 NotifsHandlerOut::Notification { protocol_index, message } => {
2221 let set_id = SetId::from(protocol_index);
2222 if self.is_open(&peer_id, set_id) {
2223 trace!(
2224 target: LOG_TARGET,
2225 "Handler({:?}) => Notification({}, {:?}, {} bytes)",
2226 connection_id,
2227 peer_id,
2228 set_id,
2229 message.len()
2230 );
2231 trace!(
2232 target: LOG_TARGET,
2233 "External API <= Message({}, {:?})",
2234 peer_id,
2235 set_id,
2236 );
2237 let event = NotificationsOut::Notification {
2238 peer_id,
2239 set_id,
2240 message: message.clone(),
2241 };
2242 self.events.push_back(ToSwarm::GenerateEvent(event));
2243 } else {
2244 trace!(
2245 target: LOG_TARGET,
2246 "Handler({:?}) => Post-close notification({}, {:?}, {} bytes)",
2247 connection_id,
2248 peer_id,
2249 set_id,
2250 message.len()
2251 );
2252 }
2253 },
2254 NotifsHandlerOut::Close { protocol_index } => {
2255 let set_id = SetId::from(protocol_index);
2256
2257 trace!(target: LOG_TARGET, "Handler({}, {:?}) => SyncNotificationsClogged({:?})", peer_id, connection_id, set_id);
2258 self.events.push_back(ToSwarm::CloseConnection {
2259 peer_id,
2260 connection: CloseConnection::One(connection_id),
2261 });
2262 },
2263 }
2264 }
2265
2266 fn poll(&mut self, cx: &mut Context) -> Poll<ToSwarm<Self::ToSwarm, THandlerInEvent<Self>>> {
2267 if let Some(event) = self.events.pop_front() {
2268 return Poll::Ready(event)
2269 }
2270
2271 loop {
2273 match futures::Stream::poll_next(Pin::new(&mut self.from_protocol_controllers), cx) {
2274 Poll::Ready(Some(Message::Accept(index))) => {
2275 self.peerset_report_preaccept(index);
2276 },
2277 Poll::Ready(Some(Message::Reject(index))) => {
2278 let _ = self.peerset_report_reject(index);
2279 },
2280 Poll::Ready(Some(Message::Connect { peer_id, set_id, .. })) => {
2281 self.peerset_report_connect(peer_id, set_id);
2282 },
2283 Poll::Ready(Some(Message::Drop { peer_id, set_id, .. })) => {
2284 self.peerset_report_disconnect(peer_id, set_id);
2285 },
2286 Poll::Ready(None) => {
2287 error!(
2288 target: LOG_TARGET,
2289 "Protocol controllers receiver stream has returned `None`. Ignore this error if the node is shutting down.",
2290 );
2291 break
2292 },
2293 Poll::Pending => break,
2294 }
2295 }
2296
2297 loop {
2299 match futures::Stream::poll_next(Pin::new(&mut self.command_streams), cx) {
2300 Poll::Ready(Some((set_id, command))) => match command {
2301 NotificationCommand::SetHandshake(handshake) => {
2302 self.set_notif_protocol_handshake(set_id.into(), handshake);
2303 },
2304 NotificationCommand::OpenSubstream(_peer) |
2305 NotificationCommand::CloseSubstream(_peer) => {
2306 todo!("substream control not implemented");
2307 },
2308 },
2309 Poll::Ready(None) => {
2310 error!(target: LOG_TARGET, "Protocol command streams have been shut down");
2311 break
2312 },
2313 Poll::Pending => break,
2314 }
2315 }
2316
2317 while let Poll::Ready(Some((result, index))) =
2318 self.pending_inbound_validations.poll_next_unpin(cx)
2319 {
2320 match result {
2321 Ok(ValidationResult::Accept) => {
2322 self.protocol_report_accept(index);
2323 },
2324 Ok(ValidationResult::Reject) => {
2325 self.protocol_report_reject(index);
2326 },
2327 Err(_) => {
2328 error!(target: LOG_TARGET, "Protocol has shut down");
2329 break
2330 },
2331 }
2332 }
2333
2334 while let Poll::Ready(Some((delay_id, peer_id, set_id))) =
2335 Pin::new(&mut self.delays).poll_next(cx)
2336 {
2337 let peer_state = match self.peers.get_mut(&(peer_id, set_id)) {
2338 Some(s) => s,
2339 None => continue,
2342 };
2343
2344 match peer_state {
2345 PeerState::Backoff { timer, .. } if *timer == delay_id => {
2346 trace!(target: LOG_TARGET, "Libp2p <= Clean up ban of {:?} from the state ({:?})", peer_id, set_id);
2347 self.peers.remove(&(peer_id, set_id));
2348 },
2349
2350 PeerState::PendingRequest { timer, .. } if *timer == delay_id => {
2351 trace!(target: LOG_TARGET, "Libp2p <= Dial {:?} now that ban has expired ({:?})", peer_id, set_id);
2352 self.events.push_back(ToSwarm::Dial { opts: peer_id.into() });
2353 *peer_state = PeerState::Requested;
2354 },
2355
2356 PeerState::DisabledPendingEnable { connections, timer, timer_deadline }
2357 if *timer == delay_id =>
2358 {
2359 if let Some((connec_id, connec_state)) =
2361 connections.iter_mut().find(|(_, s)| matches!(s, ConnectionState::Closed))
2362 {
2363 trace!(target: LOG_TARGET, "Handler({}, {:?}) <= Open({:?}) (ban expired)",
2364 peer_id, *connec_id, set_id);
2365 self.events.push_back(ToSwarm::NotifyHandler {
2366 peer_id,
2367 handler: NotifyHandler::One(*connec_id),
2368 event: NotifsHandlerIn::Open { protocol_index: set_id.into(), peer_id },
2369 });
2370 *connec_state = ConnectionState::Opening;
2371 *peer_state = PeerState::Enabled { connections: mem::take(connections) };
2372 } else {
2373 *timer_deadline = Instant::now() + Duration::from_secs(5);
2374 let delay = futures_timer::Delay::new(Duration::from_secs(5));
2375 let timer = *timer;
2376 self.delays.push(
2377 async move {
2378 delay.await;
2379 (timer, peer_id, set_id)
2380 }
2381 .boxed(),
2382 );
2383 }
2384 },
2385
2386 _ => {},
2389 }
2390 }
2391
2392 if let Some(event) = self.events.pop_front() {
2393 return Poll::Ready(event)
2394 }
2395
2396 Poll::Pending
2397 }
2398}
2399
2400#[cfg(test)]
2401mod tests {
2402 use super::*;
2403 use crate::{
2404 mock::MockPeerStore,
2405 protocol::notifications::handler::tests::*,
2406 protocol_controller::{IncomingIndex, ProtoSetConfig, ProtocolController},
2407 };
2408 use libp2p::core::ConnectedPoint;
2409 use sc_utils::mpsc::tracing_unbounded;
2410 use std::{collections::HashSet, iter};
2411
2412 impl PartialEq for ConnectionState {
2413 fn eq(&self, other: &ConnectionState) -> bool {
2414 match (self, other) {
2415 (ConnectionState::Closed, ConnectionState::Closed) => true,
2416 (ConnectionState::Closing, ConnectionState::Closing) => true,
2417 (ConnectionState::Opening, ConnectionState::Opening) => true,
2418 (ConnectionState::OpeningThenClosing, ConnectionState::OpeningThenClosing) => true,
2419 (ConnectionState::OpenDesiredByRemote, ConnectionState::OpenDesiredByRemote) =>
2420 true,
2421 (ConnectionState::Open(_), ConnectionState::Open(_)) => true,
2422 _ => false,
2423 }
2424 }
2425 }
2426
2427 fn development_notifs(
2428 ) -> (Notifications, ProtocolController, Box<dyn crate::service::traits::NotificationService>)
2429 {
2430 let (protocol_handle_pair, notif_service) =
2431 crate::protocol::notifications::service::notification_service("/proto/1".into());
2432 let (to_notifications, from_controller) =
2433 tracing_unbounded("test_controller_to_notifications", 10_000);
2434
2435 let (handle, controller) = ProtocolController::new(
2436 SetId::from(0),
2437 ProtoSetConfig {
2438 in_peers: 25,
2439 out_peers: 25,
2440 reserved_nodes: HashSet::new(),
2441 reserved_only: false,
2442 },
2443 to_notifications,
2444 Arc::new(MockPeerStore {}),
2445 );
2446
2447 let (notif_handle, command_stream) = protocol_handle_pair.split();
2448 (
2449 Notifications::new(
2450 vec![handle],
2451 from_controller,
2452 NotificationMetrics::new(None),
2453 iter::once((
2454 ProtocolConfig {
2455 name: "/foo".into(),
2456 fallback_names: Vec::new(),
2457 handshake: vec![1, 2, 3, 4],
2458 max_notification_size: u64::MAX,
2459 },
2460 notif_handle,
2461 command_stream,
2462 )),
2463 ),
2464 controller,
2465 notif_service,
2466 )
2467 }
2468
2469 #[test]
2470 fn update_handshake() {
2471 let (mut notif, _controller, _notif_service) = development_notifs();
2472
2473 let inner = notif.notif_protocols.get_mut(0).unwrap().handshake.read().clone();
2474 assert_eq!(inner, vec![1, 2, 3, 4]);
2475
2476 notif.set_notif_protocol_handshake(0.into(), vec![5, 6, 7, 8]);
2477
2478 let inner = notif.notif_protocols.get_mut(0).unwrap().handshake.read().clone();
2479 assert_eq!(inner, vec![5, 6, 7, 8]);
2480 }
2481
2482 #[test]
2483 #[should_panic]
2484 #[cfg(debug_assertions)]
2485 fn update_unknown_handshake() {
2486 let (mut notif, _controller, _notif_service) = development_notifs();
2487
2488 notif.set_notif_protocol_handshake(1337.into(), vec![5, 6, 7, 8]);
2489 }
2490
2491 #[test]
2492 fn disconnect_backoff_peer() {
2493 let (mut notif, _controller, _notif_service) = development_notifs();
2494
2495 let peer = PeerId::random();
2496 notif.peers.insert(
2497 (peer, 0.into()),
2498 PeerState::Backoff { timer: DelayId(0), timer_deadline: Instant::now() },
2499 );
2500 notif.disconnect_peer(&peer, 0.into());
2501
2502 assert!(std::matches!(
2503 notif.peers.get(&(peer, 0.into())),
2504 Some(PeerState::Backoff { timer: DelayId(0), .. })
2505 ));
2506 }
2507
2508 #[test]
2509 fn disconnect_pending_request() {
2510 let (mut notif, _controller, _notif_service) = development_notifs();
2511 let peer = PeerId::random();
2512
2513 notif.peers.insert(
2514 (peer, 0.into()),
2515 PeerState::PendingRequest { timer: DelayId(0), timer_deadline: Instant::now() },
2516 );
2517 notif.disconnect_peer(&peer, 0.into());
2518
2519 assert!(std::matches!(
2520 notif.peers.get(&(peer, 0.into())),
2521 Some(PeerState::PendingRequest { timer: DelayId(0), .. })
2522 ));
2523 }
2524
2525 #[test]
2526 fn disconnect_requested_peer() {
2527 let (mut notif, _controller, _notif_service) = development_notifs();
2528
2529 let peer = PeerId::random();
2530 notif.peers.insert((peer, 0.into()), PeerState::Requested);
2531 notif.disconnect_peer(&peer, 0.into());
2532
2533 assert!(std::matches!(notif.peers.get(&(peer, 0.into())), Some(PeerState::Requested)));
2534 }
2535
2536 #[test]
2537 fn disconnect_disabled_peer() {
2538 let (mut notif, _controller, _notif_service) = development_notifs();
2539 let peer = PeerId::random();
2540 notif.peers.insert(
2541 (peer, 0.into()),
2542 PeerState::Disabled { backoff_until: None, connections: SmallVec::new() },
2543 );
2544 notif.disconnect_peer(&peer, 0.into());
2545
2546 assert!(std::matches!(
2547 notif.peers.get(&(peer, 0.into())),
2548 Some(PeerState::Disabled { backoff_until: None, .. })
2549 ));
2550 }
2551
2552 #[test]
2553 fn remote_opens_connection_and_substream() {
2554 let (mut notif, _controller, _notif_service) = development_notifs();
2555 let peer = PeerId::random();
2556 let conn = ConnectionId::new_unchecked(0);
2557 let connected = ConnectedPoint::Listener {
2558 local_addr: Multiaddr::empty(),
2559 send_back_addr: Multiaddr::empty(),
2560 };
2561
2562 notif.on_swarm_event(FromSwarm::ConnectionEstablished(
2563 libp2p::swarm::behaviour::ConnectionEstablished {
2564 peer_id: peer,
2565 connection_id: conn,
2566 endpoint: &connected,
2567 failed_addresses: &[],
2568 other_established: 0usize,
2569 },
2570 ));
2571
2572 if let Some(&PeerState::Disabled { ref connections, backoff_until: None }) =
2573 notif.peers.get(&(peer, 0.into()))
2574 {
2575 assert_eq!(connections[0], (conn, ConnectionState::Closed));
2576 } else {
2577 panic!("invalid state");
2578 }
2579
2580 notif.on_connection_handler_event(
2582 peer,
2583 conn,
2584 NotifsHandlerOut::OpenDesiredByRemote {
2585 protocol_index: 0,
2586 handshake: vec![1, 3, 3, 7],
2587 },
2588 );
2589
2590 if let Some(&PeerState::Incoming { ref connections, backoff_until: None, .. }) =
2591 notif.peers.get(&(peer, 0.into()))
2592 {
2593 assert_eq!(connections.len(), 1);
2594 assert_eq!(connections[0], (conn, ConnectionState::OpenDesiredByRemote));
2595 } else {
2596 panic!("invalid state");
2597 }
2598
2599 assert!(std::matches!(
2600 notif.incoming.pop(),
2601 Some(IncomingPeer { alive: true, incoming_id: IncomingIndex(0), .. }),
2602 ));
2603 }
2604
2605 #[tokio::test]
2606 async fn disconnect_remote_substream_before_handled_by_controller() {
2607 let (mut notif, _controller, _notif_service) = development_notifs();
2608 let peer = PeerId::random();
2609 let conn = ConnectionId::new_unchecked(0);
2610 let connected = ConnectedPoint::Listener {
2611 local_addr: Multiaddr::empty(),
2612 send_back_addr: Multiaddr::empty(),
2613 };
2614
2615 notif.on_swarm_event(FromSwarm::ConnectionEstablished(
2616 libp2p::swarm::behaviour::ConnectionEstablished {
2617 peer_id: peer,
2618 connection_id: conn,
2619 endpoint: &connected,
2620 failed_addresses: &[],
2621 other_established: 0usize,
2622 },
2623 ));
2624 notif.on_connection_handler_event(
2625 peer,
2626 conn,
2627 NotifsHandlerOut::OpenDesiredByRemote {
2628 protocol_index: 0,
2629 handshake: vec![1, 3, 3, 7],
2630 },
2631 );
2632 notif.disconnect_peer(&peer, 0.into());
2633
2634 if let Some(&PeerState::Disabled { ref connections, backoff_until: None }) =
2635 notif.peers.get(&(peer, 0.into()))
2636 {
2637 assert_eq!(connections.len(), 1);
2638 assert_eq!(connections[0], (conn, ConnectionState::Closing));
2639 } else {
2640 panic!("invalid state");
2641 }
2642 }
2643
2644 #[test]
2645 fn peerset_report_connect_backoff() {
2646 let (mut notif, _controller, _notif_service) = development_notifs();
2647 let set_id = SetId::from(0);
2648 let peer = PeerId::random();
2649 let conn = ConnectionId::new_unchecked(0);
2650 let connected = ConnectedPoint::Listener {
2651 local_addr: Multiaddr::empty(),
2652 send_back_addr: Multiaddr::empty(),
2653 };
2654
2655 notif.on_swarm_event(FromSwarm::ConnectionEstablished(
2656 libp2p::swarm::behaviour::ConnectionEstablished {
2657 peer_id: peer,
2658 connection_id: conn,
2659 endpoint: &connected,
2660 failed_addresses: &[],
2661 other_established: 0usize,
2662 },
2663 ));
2664 assert!(std::matches!(notif.peers.get(&(peer, set_id)), Some(&PeerState::Disabled { .. })));
2665
2666 if let Some(PeerState::Disabled { ref mut backoff_until, .. }) =
2673 notif.peers.get_mut(&(peer, set_id))
2674 {
2675 *backoff_until =
2676 Some(Instant::now().checked_add(std::time::Duration::from_secs(5)).unwrap());
2677 }
2678
2679 notif.on_swarm_event(FromSwarm::ConnectionClosed(
2680 libp2p::swarm::behaviour::ConnectionClosed {
2681 peer_id: peer,
2682 connection_id: conn,
2683 endpoint: &connected.clone(),
2684 cause: None,
2685 remaining_established: 0usize,
2686 },
2687 ));
2688
2689 let timer = if let Some(&PeerState::Backoff { timer_deadline, .. }) =
2690 notif.peers.get(&(peer, set_id))
2691 {
2692 timer_deadline
2693 } else {
2694 panic!("invalid state");
2695 };
2696
2697 notif.peerset_report_connect(peer, set_id);
2699
2700 if let Some(&PeerState::PendingRequest { timer_deadline, .. }) =
2701 notif.peers.get(&(peer, set_id))
2702 {
2703 assert_eq!(timer, timer_deadline);
2704 } else {
2705 panic!("invalid state");
2706 }
2707 }
2708
2709 #[test]
2710 fn peerset_connect_incoming() {
2711 let (mut notif, _controller, _notif_service) = development_notifs();
2712 let peer = PeerId::random();
2713 let conn = ConnectionId::new_unchecked(0);
2714 let set_id = SetId::from(0);
2715 let connected = ConnectedPoint::Listener {
2716 local_addr: Multiaddr::empty(),
2717 send_back_addr: Multiaddr::empty(),
2718 };
2719
2720 notif.on_swarm_event(FromSwarm::ConnectionEstablished(
2721 libp2p::swarm::behaviour::ConnectionEstablished {
2722 peer_id: peer,
2723 connection_id: conn,
2724 endpoint: &connected,
2725 failed_addresses: &[],
2726 other_established: 0usize,
2727 },
2728 ));
2729 assert!(std::matches!(notif.peers.get(&(peer, set_id)), Some(&PeerState::Disabled { .. })));
2730
2731 notif.on_connection_handler_event(
2733 peer,
2734 conn,
2735 NotifsHandlerOut::OpenDesiredByRemote {
2736 protocol_index: 0,
2737 handshake: vec![1, 3, 3, 7],
2738 },
2739 );
2740
2741 notif.protocol_report_accept(IncomingIndex(0));
2745 assert!(std::matches!(notif.peers.get(&(peer, set_id)), Some(&PeerState::Enabled { .. })));
2746 }
2747
2748 #[test]
2749 fn peerset_disconnect_disable_pending_enable() {
2750 let (mut notif, _controller, _notif_service) = development_notifs();
2751 let set_id = SetId::from(0);
2752 let peer = PeerId::random();
2753 let conn = ConnectionId::new_unchecked(0);
2754 let connected = ConnectedPoint::Listener {
2755 local_addr: Multiaddr::empty(),
2756 send_back_addr: Multiaddr::empty(),
2757 };
2758
2759 notif.on_swarm_event(FromSwarm::ConnectionEstablished(
2760 libp2p::swarm::behaviour::ConnectionEstablished {
2761 peer_id: peer,
2762 connection_id: conn,
2763 endpoint: &connected,
2764 failed_addresses: &[],
2765 other_established: 0usize,
2766 },
2767 ));
2768 assert!(std::matches!(notif.peers.get(&(peer, set_id)), Some(&PeerState::Disabled { .. })));
2769
2770 if let Some(PeerState::Disabled { ref mut backoff_until, .. }) =
2772 notif.peers.get_mut(&(peer, set_id))
2773 {
2774 *backoff_until =
2775 Some(Instant::now().checked_add(std::time::Duration::from_secs(5)).unwrap());
2776 }
2777
2778 notif.peerset_report_connect(peer, set_id);
2780 assert!(std::matches!(
2781 notif.peers.get(&(peer, set_id)),
2782 Some(&PeerState::DisabledPendingEnable { .. })
2783 ));
2784
2785 notif.peerset_report_disconnect(peer, set_id);
2786
2787 if let Some(PeerState::Disabled { backoff_until, .. }) = notif.peers.get(&(peer, set_id)) {
2788 assert!(backoff_until.is_some());
2789 assert!(backoff_until.unwrap() > Instant::now());
2790 } else {
2791 panic!("invalid state");
2792 }
2793 }
2794
2795 #[test]
2796 fn peerset_disconnect_enabled() {
2797 let (mut notif, _controller, _notif_service) = development_notifs();
2798 let peer = PeerId::random();
2799 let conn = ConnectionId::new_unchecked(0);
2800 let set_id = SetId::from(0);
2801 let connected = ConnectedPoint::Listener {
2802 local_addr: Multiaddr::empty(),
2803 send_back_addr: Multiaddr::empty(),
2804 };
2805
2806 notif.on_swarm_event(FromSwarm::ConnectionEstablished(
2808 libp2p::swarm::behaviour::ConnectionEstablished {
2809 peer_id: peer,
2810 connection_id: conn,
2811 endpoint: &connected,
2812 failed_addresses: &[],
2813 other_established: 0usize,
2814 },
2815 ));
2816 notif.on_connection_handler_event(
2817 peer,
2818 conn,
2819 NotifsHandlerOut::OpenDesiredByRemote {
2820 protocol_index: 0,
2821 handshake: vec![1, 3, 3, 7],
2822 },
2823 );
2824 notif.protocol_report_accept(IncomingIndex(0));
2827 assert!(std::matches!(notif.peers.get(&(peer, set_id)), Some(&PeerState::Enabled { .. })));
2828
2829 notif.peerset_report_disconnect(peer, set_id);
2831 assert!(std::matches!(notif.peers.get(&(peer, set_id)), Some(&PeerState::Disabled { .. })));
2832 }
2833
2834 #[test]
2835 fn peerset_disconnect_requested() {
2836 let (mut notif, _controller, _notif_service) = development_notifs();
2837 let peer = PeerId::random();
2838 let set_id = SetId::from(0);
2839
2840 notif.peerset_report_connect(peer, set_id);
2842 assert!(std::matches!(notif.peers.get(&(peer, set_id)), Some(&PeerState::Requested)));
2843
2844 notif.peerset_report_disconnect(peer, set_id);
2846 assert!(notif.peers.get(&(peer, set_id)).is_none());
2847 }
2848
2849 #[test]
2850 fn peerset_disconnect_pending_request() {
2851 let (mut notif, _controller, _notif_service) = development_notifs();
2852 let set_id = SetId::from(0);
2853 let peer = PeerId::random();
2854 let conn = ConnectionId::new_unchecked(0);
2855 let connected = ConnectedPoint::Listener {
2856 local_addr: Multiaddr::empty(),
2857 send_back_addr: Multiaddr::empty(),
2858 };
2859
2860 notif.on_swarm_event(FromSwarm::ConnectionEstablished(
2861 libp2p::swarm::behaviour::ConnectionEstablished {
2862 peer_id: peer,
2863 connection_id: conn,
2864 endpoint: &connected,
2865 failed_addresses: &[],
2866 other_established: 0usize,
2867 },
2868 ));
2869 assert!(std::matches!(notif.peers.get(&(peer, set_id)), Some(&PeerState::Disabled { .. })));
2870
2871 if let Some(PeerState::Disabled { ref mut backoff_until, .. }) =
2873 notif.peers.get_mut(&(peer, set_id))
2874 {
2875 *backoff_until =
2876 Some(Instant::now().checked_add(std::time::Duration::from_secs(5)).unwrap());
2877 }
2878
2879 notif.on_swarm_event(FromSwarm::ConnectionClosed(
2880 libp2p::swarm::behaviour::ConnectionClosed {
2881 peer_id: peer,
2882 connection_id: conn,
2883 endpoint: &connected.clone(),
2884 cause: None,
2885 remaining_established: 0usize,
2886 },
2887 ));
2888 assert!(std::matches!(notif.peers.get(&(peer, set_id)), Some(&PeerState::Backoff { .. })));
2889
2890 notif.peerset_report_connect(peer, set_id);
2892 assert!(std::matches!(
2893 notif.peers.get(&(peer, set_id)),
2894 Some(&PeerState::PendingRequest { .. })
2895 ));
2896
2897 notif.peerset_report_disconnect(peer, set_id);
2899 assert!(std::matches!(notif.peers.get(&(peer, set_id)), Some(&PeerState::Backoff { .. })));
2900 }
2901
2902 #[test]
2903 fn peerset_accept_peer_not_alive() {
2904 let (mut notif, _controller, _notif_service) = development_notifs();
2905 let peer = PeerId::random();
2906 let conn = ConnectionId::new_unchecked(0);
2907 let set_id = SetId::from(0);
2908 let connected = ConnectedPoint::Listener {
2909 local_addr: Multiaddr::empty(),
2910 send_back_addr: Multiaddr::empty(),
2911 };
2912
2913 notif.on_swarm_event(FromSwarm::ConnectionEstablished(
2914 libp2p::swarm::behaviour::ConnectionEstablished {
2915 peer_id: peer,
2916 connection_id: conn,
2917 endpoint: &connected,
2918 failed_addresses: &[],
2919 other_established: 0usize,
2920 },
2921 ));
2922 assert!(std::matches!(notif.peers.get(&(peer, set_id)), Some(&PeerState::Disabled { .. })));
2923
2924 notif.on_connection_handler_event(
2926 peer,
2927 conn,
2928 NotifsHandlerOut::OpenDesiredByRemote {
2929 protocol_index: 0,
2930 handshake: vec![1, 3, 3, 7],
2931 },
2932 );
2933 assert!(std::matches!(notif.peers.get(&(peer, set_id)), Some(&PeerState::Incoming { .. })));
2934
2935 assert!(std::matches!(
2936 notif.incoming[0],
2937 IncomingPeer { alive: true, incoming_id: IncomingIndex(0), .. },
2938 ));
2939
2940 notif.disconnect_peer(&peer, set_id);
2941 assert!(std::matches!(notif.peers.get(&(peer, set_id)), Some(&PeerState::Disabled { .. })));
2942 assert!(std::matches!(
2943 notif.incoming[0],
2944 IncomingPeer { alive: false, incoming_id: IncomingIndex(0), .. },
2945 ));
2946
2947 notif.protocol_report_accept(IncomingIndex(0));
2948 assert_eq!(notif.incoming.len(), 0);
2949 assert!(std::matches!(notif.peers.get(&(peer, set_id)), Some(PeerState::Disabled { .. })));
2950 }
2951
2952 #[test]
2953 fn secondary_connection_peer_state_incoming() {
2954 let (mut notif, _controller, _notif_service) = development_notifs();
2955 let peer = PeerId::random();
2956 let conn = ConnectionId::new_unchecked(0);
2957 let conn2 = ConnectionId::new_unchecked(1);
2958 let set_id = SetId::from(0);
2959 let connected = ConnectedPoint::Listener {
2960 local_addr: Multiaddr::empty(),
2961 send_back_addr: Multiaddr::empty(),
2962 };
2963
2964 notif.on_swarm_event(FromSwarm::ConnectionEstablished(
2965 libp2p::swarm::behaviour::ConnectionEstablished {
2966 peer_id: peer,
2967 connection_id: conn,
2968 endpoint: &connected,
2969 failed_addresses: &[],
2970 other_established: 0usize,
2971 },
2972 ));
2973 assert!(std::matches!(notif.peers.get(&(peer, set_id)), Some(&PeerState::Disabled { .. })));
2974
2975 notif.on_connection_handler_event(
2976 peer,
2977 conn,
2978 NotifsHandlerOut::OpenDesiredByRemote {
2979 protocol_index: 0,
2980 handshake: vec![1, 3, 3, 7],
2981 },
2982 );
2983 if let Some(PeerState::Incoming { connections, .. }) = notif.peers.get(&(peer, set_id)) {
2984 assert_eq!(connections.len(), 1);
2985 assert_eq!(connections[0], (conn, ConnectionState::OpenDesiredByRemote));
2986 } else {
2987 panic!("invalid state");
2988 }
2989
2990 notif.on_swarm_event(FromSwarm::ConnectionEstablished(
2992 libp2p::swarm::behaviour::ConnectionEstablished {
2993 peer_id: peer,
2994 connection_id: conn2,
2995 endpoint: &connected,
2996 failed_addresses: &[],
2997 other_established: 0usize,
2998 },
2999 ));
3000
3001 if let Some(PeerState::Incoming { connections, .. }) = notif.peers.get(&(peer, set_id)) {
3002 assert_eq!(connections.len(), 2);
3003 assert_eq!(connections[0], (conn, ConnectionState::OpenDesiredByRemote));
3004 assert_eq!(connections[1], (conn2, ConnectionState::Closed));
3005 } else {
3006 panic!("invalid state");
3007 }
3008 }
3009
3010 #[test]
3011 fn close_connection_for_disabled_peer() {
3012 let (mut notif, _controller, _notif_service) = development_notifs();
3013 let peer = PeerId::random();
3014 let conn = ConnectionId::new_unchecked(0);
3015 let set_id = SetId::from(0);
3016 let connected = ConnectedPoint::Listener {
3017 local_addr: Multiaddr::empty(),
3018 send_back_addr: Multiaddr::empty(),
3019 };
3020
3021 notif.on_swarm_event(FromSwarm::ConnectionEstablished(
3022 libp2p::swarm::behaviour::ConnectionEstablished {
3023 peer_id: peer,
3024 connection_id: conn,
3025 endpoint: &connected,
3026 failed_addresses: &[],
3027 other_established: 0usize,
3028 },
3029 ));
3030 assert!(std::matches!(notif.peers.get(&(peer, set_id)), Some(&PeerState::Disabled { .. })));
3031
3032 notif.on_swarm_event(FromSwarm::ConnectionClosed(
3033 libp2p::swarm::behaviour::ConnectionClosed {
3034 peer_id: peer,
3035 connection_id: conn,
3036 endpoint: &connected.clone(),
3037 cause: None,
3038 remaining_established: 0usize,
3039 },
3040 ));
3041 assert!(notif.peers.get(&(peer, set_id)).is_none());
3042 }
3043
3044 #[test]
3045 fn close_connection_for_incoming_peer_one_connection() {
3046 let (mut notif, _controller, _notif_service) = development_notifs();
3047 let peer = PeerId::random();
3048 let conn = ConnectionId::new_unchecked(0);
3049 let set_id = SetId::from(0);
3050 let connected = ConnectedPoint::Listener {
3051 local_addr: Multiaddr::empty(),
3052 send_back_addr: Multiaddr::empty(),
3053 };
3054
3055 notif.on_swarm_event(FromSwarm::ConnectionEstablished(
3056 libp2p::swarm::behaviour::ConnectionEstablished {
3057 peer_id: peer,
3058 connection_id: conn,
3059 endpoint: &connected,
3060 failed_addresses: &[],
3061 other_established: 0usize,
3062 },
3063 ));
3064 assert!(std::matches!(notif.peers.get(&(peer, set_id)), Some(&PeerState::Disabled { .. })));
3065
3066 notif.on_connection_handler_event(
3067 peer,
3068 conn,
3069 NotifsHandlerOut::OpenDesiredByRemote {
3070 protocol_index: 0,
3071 handshake: vec![1, 3, 3, 7],
3072 },
3073 );
3074 assert!(std::matches!(notif.peers.get(&(peer, set_id)), Some(&PeerState::Incoming { .. })));
3075
3076 notif.on_swarm_event(FromSwarm::ConnectionClosed(
3077 libp2p::swarm::behaviour::ConnectionClosed {
3078 peer_id: peer,
3079 connection_id: conn,
3080 endpoint: &connected.clone(),
3081 cause: None,
3082 remaining_established: 0usize,
3083 },
3084 ));
3085 assert!(notif.peers.get(&(peer, set_id)).is_none());
3086 assert!(std::matches!(
3087 notif.incoming[0],
3088 IncomingPeer { alive: false, incoming_id: IncomingIndex(0), .. },
3089 ));
3090 }
3091
3092 #[test]
3093 fn close_connection_for_incoming_peer_two_connections() {
3094 let (mut notif, _controller, _notif_service) = development_notifs();
3095 let peer = PeerId::random();
3096 let conn = ConnectionId::new_unchecked(0);
3097 let conn1 = ConnectionId::new_unchecked(1);
3098 let set_id = SetId::from(0);
3099 let connected = ConnectedPoint::Listener {
3100 local_addr: Multiaddr::empty(),
3101 send_back_addr: Multiaddr::empty(),
3102 };
3103 let mut conns = SmallVec::<
3104 [(ConnectionId, ConnectionState); crate::MAX_CONNECTIONS_PER_PEER],
3105 >::from(vec![(conn, ConnectionState::Closed)]);
3106
3107 notif.on_swarm_event(FromSwarm::ConnectionEstablished(
3108 libp2p::swarm::behaviour::ConnectionEstablished {
3109 peer_id: peer,
3110 connection_id: conn,
3111 endpoint: &connected,
3112 failed_addresses: &[],
3113 other_established: 0usize,
3114 },
3115 ));
3116 assert!(std::matches!(notif.peers.get(&(peer, set_id)), Some(&PeerState::Disabled { .. })));
3117
3118 notif.on_connection_handler_event(
3119 peer,
3120 conn,
3121 NotifsHandlerOut::OpenDesiredByRemote {
3122 protocol_index: 0,
3123 handshake: vec![1, 3, 3, 7],
3124 },
3125 );
3126 assert!(std::matches!(notif.peers.get(&(peer, set_id)), Some(&PeerState::Incoming { .. })));
3127
3128 notif.on_swarm_event(FromSwarm::ConnectionEstablished(
3129 libp2p::swarm::behaviour::ConnectionEstablished {
3130 peer_id: peer,
3131 connection_id: conn1,
3132 endpoint: &connected,
3133 failed_addresses: &[],
3134 other_established: 0usize,
3135 },
3136 ));
3137 conns.push((conn1, ConnectionState::Closed));
3138
3139 if let Some(PeerState::Incoming { ref connections, .. }) = notif.peers.get(&(peer, set_id))
3140 {
3141 assert_eq!(connections.len(), 2);
3142 assert_eq!(connections[0], (conn, ConnectionState::OpenDesiredByRemote));
3143 assert_eq!(connections[1], (conn1, ConnectionState::Closed));
3144 }
3145
3146 notif.on_swarm_event(FromSwarm::ConnectionClosed(
3147 libp2p::swarm::behaviour::ConnectionClosed {
3148 peer_id: peer,
3149 connection_id: conn,
3150 endpoint: &connected.clone(),
3151 cause: None,
3152 remaining_established: 0usize,
3153 },
3154 ));
3155
3156 if let Some(PeerState::Disabled { connections, .. }) = notif.peers.get(&(peer, set_id)) {
3157 assert_eq!(connections.len(), 1);
3158 assert_eq!(connections[0], (conn1, ConnectionState::Closed));
3159 } else {
3160 panic!("invalid state");
3161 }
3162 }
3163
3164 #[test]
3165 fn connection_and_substream_open() {
3166 let (mut notif, _controller, _notif_service) = development_notifs();
3167 let peer = PeerId::random();
3168 let conn = ConnectionId::new_unchecked(0);
3169 let set_id = SetId::from(0);
3170 let connected = ConnectedPoint::Listener {
3171 local_addr: Multiaddr::empty(),
3172 send_back_addr: Multiaddr::empty(),
3173 };
3174 let mut conn_yielder = ConnectionYielder::new();
3175
3176 notif.on_swarm_event(FromSwarm::ConnectionEstablished(
3178 libp2p::swarm::behaviour::ConnectionEstablished {
3179 peer_id: peer,
3180 connection_id: conn,
3181 endpoint: &connected,
3182 failed_addresses: &[],
3183 other_established: 0usize,
3184 },
3185 ));
3186 assert!(std::matches!(notif.peers.get(&(peer, set_id)), Some(&PeerState::Disabled { .. })));
3187
3188 notif.on_connection_handler_event(
3189 peer,
3190 conn,
3191 NotifsHandlerOut::OpenDesiredByRemote {
3192 protocol_index: 0,
3193 handshake: vec![1, 3, 3, 7],
3194 },
3195 );
3196 assert!(std::matches!(notif.peers.get(&(peer, set_id)), Some(&PeerState::Incoming { .. })));
3197
3198 notif.protocol_report_accept(IncomingIndex(0));
3201 assert!(std::matches!(notif.peers.get(&(peer, set_id)), Some(&PeerState::Enabled { .. })));
3202
3203 let event = conn_yielder.open_substream(peer, 0, vec![1, 2, 3, 4]);
3205
3206 notif.on_connection_handler_event(peer, conn, event);
3207 assert!(std::matches!(notif.peers.get(&(peer, set_id)), Some(&PeerState::Enabled { .. })));
3208
3209 if let Some(PeerState::Enabled { ref connections, .. }) = notif.peers.get(&(peer, set_id)) {
3210 assert_eq!(connections.len(), 1);
3211 assert_eq!(connections[0].0, conn);
3212 assert!(std::matches!(connections[0].1, ConnectionState::Open(_)));
3213 }
3214
3215 assert!(std::matches!(
3216 notif.events[notif.events.len() - 1],
3217 ToSwarm::GenerateEvent(NotificationsOut::CustomProtocolOpen { .. })
3218 ));
3219 }
3220
3221 #[test]
3222 fn connection_closed_sink_replaced() {
3223 let (mut notif, _controller, _notif_service) = development_notifs();
3224 let peer = PeerId::random();
3225 let conn1 = ConnectionId::new_unchecked(0);
3226 let conn2 = ConnectionId::new_unchecked(1);
3227 let set_id = SetId::from(0);
3228 let connected = ConnectedPoint::Listener {
3229 local_addr: Multiaddr::empty(),
3230 send_back_addr: Multiaddr::empty(),
3231 };
3232 let mut conn_yielder = ConnectionYielder::new();
3233
3234 for conn_id in vec![conn1, conn2] {
3236 notif.on_swarm_event(FromSwarm::ConnectionEstablished(
3237 libp2p::swarm::behaviour::ConnectionEstablished {
3238 peer_id: peer,
3239 connection_id: conn_id,
3240 endpoint: &connected,
3241 failed_addresses: &[],
3242 other_established: 0usize,
3243 },
3244 ));
3245 }
3246
3247 if let Some(PeerState::Disabled { connections, .. }) = notif.peers.get(&(peer, set_id)) {
3248 assert_eq!(connections[0], (conn1, ConnectionState::Closed));
3249 assert_eq!(connections[1], (conn2, ConnectionState::Closed));
3250 } else {
3251 panic!("invalid state");
3252 }
3253
3254 notif.peerset_report_connect(peer, set_id);
3256 notif.on_connection_handler_event(
3257 peer,
3258 conn2,
3259 NotifsHandlerOut::OpenDesiredByRemote {
3260 protocol_index: 0,
3261 handshake: vec![1, 3, 3, 7],
3262 },
3263 );
3264
3265 if let Some(PeerState::Enabled { connections, .. }) = notif.peers.get(&(peer, set_id)) {
3266 assert_eq!(connections[0], (conn1, ConnectionState::Opening));
3267 assert_eq!(connections[1], (conn2, ConnectionState::Opening));
3268 } else {
3269 panic!("invalid state");
3270 }
3271
3272 for conn in vec![conn1, conn2].iter() {
3274 notif.on_connection_handler_event(
3275 peer,
3276 *conn,
3277 conn_yielder.open_substream(peer, 0, vec![1, 2, 3, 4]),
3278 );
3279 }
3280
3281 if let Some(PeerState::Enabled { ref connections, .. }) = notif.peers.get(&(peer, set_id)) {
3282 assert_eq!(connections[0].0, conn1);
3283 assert!(std::matches!(connections[0].1, ConnectionState::Open(_)));
3284 assert_eq!(connections[1].0, conn2);
3285 assert!(std::matches!(connections[1].1, ConnectionState::Open(_)));
3286 } else {
3287 panic!("invalid state");
3288 }
3289
3290 assert_eq!(notif.open_peers().collect::<Vec<_>>(), vec![&peer],);
3292
3293 notif.on_swarm_event(FromSwarm::ConnectionClosed(
3295 libp2p::swarm::behaviour::ConnectionClosed {
3296 peer_id: peer,
3297 connection_id: conn1,
3298 endpoint: &connected.clone(),
3299 cause: None,
3300 remaining_established: 0usize,
3301 },
3302 ));
3303
3304 if let Some(PeerState::Enabled { ref connections, .. }) = notif.peers.get(&(peer, set_id)) {
3305 assert_eq!(connections.len(), 1);
3306 assert_eq!(connections[0].0, conn2);
3307 assert!(std::matches!(connections[0].1, ConnectionState::Open(_)));
3308 } else {
3309 panic!("invalid state");
3310 }
3311
3312 assert!(std::matches!(
3313 notif.events[notif.events.len() - 1],
3314 ToSwarm::GenerateEvent(NotificationsOut::CustomProtocolReplaced { .. })
3315 ));
3316 }
3317
3318 #[test]
3319 fn dial_failure_for_requested_peer() {
3320 let (mut notif, _controller, _notif_service) = development_notifs();
3321 let peer = PeerId::random();
3322 let set_id = SetId::from(0);
3323
3324 notif.peerset_report_connect(peer, set_id);
3326 assert!(std::matches!(notif.peers.get(&(peer, set_id)), Some(&PeerState::Requested)));
3327
3328 notif.on_swarm_event(FromSwarm::DialFailure(libp2p::swarm::behaviour::DialFailure {
3329 peer_id: Some(peer),
3330 error: &libp2p::swarm::DialError::Aborted,
3331 connection_id: ConnectionId::new_unchecked(1337),
3332 }));
3333
3334 if let Some(PeerState::Backoff { timer_deadline, .. }) = notif.peers.get(&(peer, set_id)) {
3335 assert!(timer_deadline > &Instant::now());
3336 } else {
3337 panic!("invalid state");
3338 }
3339 }
3340
3341 #[tokio::test]
3342 async fn write_notification() {
3343 let (mut notif, _controller, _notif_service) = development_notifs();
3344 let peer = PeerId::random();
3345 let conn = ConnectionId::new_unchecked(0);
3346 let set_id = SetId::from(0);
3347 let connected = ConnectedPoint::Listener {
3348 local_addr: Multiaddr::empty(),
3349 send_back_addr: Multiaddr::empty(),
3350 };
3351 let mut conn_yielder = ConnectionYielder::new();
3352
3353 notif.on_swarm_event(FromSwarm::ConnectionEstablished(
3354 libp2p::swarm::behaviour::ConnectionEstablished {
3355 peer_id: peer,
3356 connection_id: conn,
3357 endpoint: &connected,
3358 failed_addresses: &[],
3359 other_established: 0usize,
3360 },
3361 ));
3362 assert!(std::matches!(notif.peers.get(&(peer, set_id)), Some(&PeerState::Disabled { .. })));
3363
3364 notif.peerset_report_connect(peer, set_id);
3365 assert!(std::matches!(notif.peers.get(&(peer, set_id)), Some(&PeerState::Enabled { .. })));
3366
3367 notif.on_connection_handler_event(
3368 peer,
3369 conn,
3370 conn_yielder.open_substream(peer, 0, vec![1, 2, 3, 4]),
3371 );
3372
3373 if let Some(PeerState::Enabled { ref connections, .. }) = notif.peers.get(&(peer, set_id)) {
3374 assert_eq!(connections[0].0, conn);
3375 assert!(std::matches!(connections[0].1, ConnectionState::Open(_)));
3376 } else {
3377 panic!("invalid state");
3378 }
3379
3380 notif
3381 .peers
3382 .get(&(peer, set_id))
3383 .unwrap()
3384 .get_open()
3385 .unwrap()
3386 .send_sync_notification(vec![1, 3, 3, 7]);
3387 assert_eq!(conn_yielder.get_next_event(peer, set_id.into()).await, Some(vec![1, 3, 3, 7]));
3388 }
3389
3390 #[test]
3391 fn peerset_report_connect_backoff_expired() {
3392 let (mut notif, _controller, _notif_service) = development_notifs();
3393 let set_id = SetId::from(0);
3394 let peer = PeerId::random();
3395 let conn = ConnectionId::new_unchecked(0);
3396 let connected = ConnectedPoint::Listener {
3397 local_addr: Multiaddr::empty(),
3398 send_back_addr: Multiaddr::empty(),
3399 };
3400 let backoff_duration = Duration::from_millis(100);
3401
3402 notif.on_swarm_event(FromSwarm::ConnectionEstablished(
3403 libp2p::swarm::behaviour::ConnectionEstablished {
3404 peer_id: peer,
3405 connection_id: conn,
3406 endpoint: &connected,
3407 failed_addresses: &[],
3408 other_established: 0usize,
3409 },
3410 ));
3411 assert!(std::matches!(notif.peers.get(&(peer, set_id)), Some(&PeerState::Disabled { .. })));
3412
3413 if let Some(PeerState::Disabled { ref mut backoff_until, .. }) =
3415 notif.peers.get_mut(&(peer, set_id))
3416 {
3417 *backoff_until = Some(Instant::now().checked_add(backoff_duration).unwrap());
3418 }
3419
3420 notif.on_swarm_event(FromSwarm::ConnectionClosed(
3421 libp2p::swarm::behaviour::ConnectionClosed {
3422 peer_id: peer,
3423 connection_id: conn,
3424 endpoint: &connected.clone(),
3425 cause: None,
3426 remaining_established: 0usize,
3427 },
3428 ));
3429
3430 std::thread::sleep(backoff_duration * 2);
3432
3433 notif.peerset_report_connect(peer, set_id);
3435 assert!(std::matches!(notif.peers.get(&(peer, set_id)), Some(&PeerState::Requested { .. })))
3436 }
3437
3438 #[test]
3439 fn peerset_report_disconnect_disabled() {
3440 let (mut notif, _controller, _notif_service) = development_notifs();
3441 let peer = PeerId::random();
3442 let set_id = SetId::from(0);
3443 let conn = ConnectionId::new_unchecked(0);
3444 let connected = ConnectedPoint::Listener {
3445 local_addr: Multiaddr::empty(),
3446 send_back_addr: Multiaddr::empty(),
3447 };
3448
3449 notif.on_swarm_event(FromSwarm::ConnectionEstablished(
3450 libp2p::swarm::behaviour::ConnectionEstablished {
3451 peer_id: peer,
3452 connection_id: conn,
3453 endpoint: &connected,
3454 failed_addresses: &[],
3455 other_established: 0usize,
3456 },
3457 ));
3458 assert!(std::matches!(notif.peers.get(&(peer, set_id)), Some(&PeerState::Disabled { .. })));
3459
3460 notif.peerset_report_disconnect(peer, set_id);
3461 assert!(std::matches!(notif.peers.get(&(peer, set_id)), Some(&PeerState::Disabled { .. })));
3462 }
3463
3464 #[test]
3465 fn peerset_report_disconnect_backoff() {
3466 let (mut notif, _controller, _notif_service) = development_notifs();
3467 let set_id = SetId::from(0);
3468 let peer = PeerId::random();
3469 let conn = ConnectionId::new_unchecked(0);
3470 let connected = ConnectedPoint::Listener {
3471 local_addr: Multiaddr::empty(),
3472 send_back_addr: Multiaddr::empty(),
3473 };
3474 let backoff_duration = Duration::from_secs(2);
3475
3476 notif.on_swarm_event(FromSwarm::ConnectionEstablished(
3477 libp2p::swarm::behaviour::ConnectionEstablished {
3478 peer_id: peer,
3479 connection_id: conn,
3480 endpoint: &connected,
3481 failed_addresses: &[],
3482 other_established: 0usize,
3483 },
3484 ));
3485 assert!(std::matches!(notif.peers.get(&(peer, set_id)), Some(&PeerState::Disabled { .. })));
3486
3487 if let Some(PeerState::Disabled { ref mut backoff_until, .. }) =
3489 notif.peers.get_mut(&(peer, set_id))
3490 {
3491 *backoff_until = Some(Instant::now().checked_add(backoff_duration).unwrap());
3492 }
3493
3494 notif.on_swarm_event(FromSwarm::ConnectionClosed(
3495 libp2p::swarm::behaviour::ConnectionClosed {
3496 peer_id: peer,
3497 connection_id: conn,
3498 endpoint: &connected.clone(),
3499 cause: None,
3500 remaining_established: 0usize,
3501 },
3502 ));
3503
3504 assert!(std::matches!(notif.peers.get(&(peer, set_id)), Some(&PeerState::Backoff { .. })));
3505
3506 notif.peerset_report_disconnect(peer, set_id);
3507 assert!(std::matches!(notif.peers.get(&(peer, set_id)), Some(&PeerState::Backoff { .. })));
3508 }
3509
3510 #[test]
3511 fn peer_is_backed_off_if_both_connections_get_closed_while_peer_is_disabled_with_back_off() {
3512 let (mut notif, _controller, _notif_service) = development_notifs();
3513 let set_id = SetId::from(0);
3514 let peer = PeerId::random();
3515 let conn1 = ConnectionId::new_unchecked(0);
3516 let conn2 = ConnectionId::new_unchecked(1);
3517 let connected = ConnectedPoint::Listener {
3518 local_addr: Multiaddr::empty(),
3519 send_back_addr: Multiaddr::empty(),
3520 };
3521
3522 notif.on_swarm_event(FromSwarm::ConnectionEstablished(
3523 libp2p::swarm::behaviour::ConnectionEstablished {
3524 peer_id: peer,
3525 connection_id: conn1,
3526 endpoint: &connected,
3527 failed_addresses: &[],
3528 other_established: 0usize,
3529 },
3530 ));
3531 notif.on_swarm_event(FromSwarm::ConnectionEstablished(
3532 libp2p::swarm::behaviour::ConnectionEstablished {
3533 peer_id: peer,
3534 connection_id: conn2,
3535 endpoint: &connected,
3536 failed_addresses: &[],
3537 other_established: 0usize,
3538 },
3539 ));
3540 assert!(std::matches!(notif.peers.get(&(peer, set_id)), Some(&PeerState::Disabled { .. })));
3541
3542 if let Some(PeerState::Disabled { ref mut backoff_until, .. }) =
3544 notif.peers.get_mut(&(peer, set_id))
3545 {
3546 *backoff_until =
3547 Some(Instant::now().checked_add(std::time::Duration::from_secs(5)).unwrap());
3548 }
3549
3550 notif.peerset_report_connect(peer, set_id);
3552 assert!(std::matches!(
3553 notif.peers.get(&(peer, set_id)),
3554 Some(&PeerState::DisabledPendingEnable { .. })
3555 ));
3556
3557 notif.on_swarm_event(FromSwarm::ConnectionClosed(
3558 libp2p::swarm::behaviour::ConnectionClosed {
3559 peer_id: peer,
3560 connection_id: conn1,
3561 endpoint: &connected.clone(),
3562 cause: None,
3563 remaining_established: 0usize,
3564 },
3565 ));
3566 assert!(std::matches!(
3567 notif.peers.get(&(peer, set_id)),
3568 Some(&PeerState::DisabledPendingEnable { .. })
3569 ));
3570
3571 notif.on_swarm_event(FromSwarm::ConnectionClosed(
3572 libp2p::swarm::behaviour::ConnectionClosed {
3573 peer_id: peer,
3574 connection_id: conn2,
3575 endpoint: &connected.clone(),
3576 cause: None,
3577 remaining_established: 0usize,
3578 },
3579 ));
3580 assert!(std::matches!(notif.peers.get(&(peer, set_id)), Some(&PeerState::Backoff { .. })));
3581 }
3582
3583 #[test]
3584 fn inject_connection_closed_incoming_with_backoff() {
3585 let (mut notif, _controller, _notif_service) = development_notifs();
3586 let peer = PeerId::random();
3587 let set_id = SetId::from(0);
3588 let conn = ConnectionId::new_unchecked(0);
3589 let connected = ConnectedPoint::Listener {
3590 local_addr: Multiaddr::empty(),
3591 send_back_addr: Multiaddr::empty(),
3592 };
3593
3594 notif.on_swarm_event(FromSwarm::ConnectionEstablished(
3595 libp2p::swarm::behaviour::ConnectionEstablished {
3596 peer_id: peer,
3597 connection_id: conn,
3598 endpoint: &connected,
3599 failed_addresses: &[],
3600 other_established: 0usize,
3601 },
3602 ));
3603 assert!(std::matches!(notif.peers.get(&(peer, set_id)), Some(&PeerState::Disabled { .. })));
3604
3605 notif.on_connection_handler_event(
3607 peer,
3608 conn,
3609 NotifsHandlerOut::OpenDesiredByRemote {
3610 protocol_index: 0,
3611 handshake: vec![1, 3, 3, 7],
3612 },
3613 );
3614
3615 if let Some(&mut PeerState::Incoming { ref mut backoff_until, .. }) =
3617 notif.peers.get_mut(&(peer, 0.into()))
3618 {
3619 *backoff_until =
3620 Some(Instant::now().checked_add(std::time::Duration::from_secs(5)).unwrap());
3621 } else {
3622 panic!("invalid state");
3623 }
3624
3625 notif.on_swarm_event(FromSwarm::ConnectionClosed(
3626 libp2p::swarm::behaviour::ConnectionClosed {
3627 peer_id: peer,
3628 connection_id: conn,
3629 endpoint: &connected.clone(),
3630 cause: None,
3631 remaining_established: 0usize,
3632 },
3633 ));
3634 assert!(std::matches!(notif.peers.get(&(peer, set_id)), Some(&PeerState::Backoff { .. })));
3635 }
3636
3637 #[test]
3638 fn two_connections_inactive_connection_gets_closed_peer_state_is_still_incoming() {
3639 let (mut notif, _controller, _notif_service) = development_notifs();
3640 let peer = PeerId::random();
3641 let conn1 = ConnectionId::new_unchecked(0);
3642 let conn2 = ConnectionId::new_unchecked(1);
3643 let set_id = SetId::from(0);
3644 let connected = ConnectedPoint::Listener {
3645 local_addr: Multiaddr::empty(),
3646 send_back_addr: Multiaddr::empty(),
3647 };
3648
3649 for conn_id in vec![conn1, conn2] {
3651 notif.on_swarm_event(FromSwarm::ConnectionEstablished(
3652 libp2p::swarm::behaviour::ConnectionEstablished {
3653 peer_id: peer,
3654 connection_id: conn_id,
3655 endpoint: &connected,
3656 failed_addresses: &[],
3657 other_established: 0usize,
3658 },
3659 ));
3660 }
3661
3662 if let Some(PeerState::Disabled { connections, .. }) = notif.peers.get(&(peer, set_id)) {
3663 assert_eq!(connections[0], (conn1, ConnectionState::Closed));
3664 assert_eq!(connections[1], (conn2, ConnectionState::Closed));
3665 } else {
3666 panic!("invalid state");
3667 }
3668
3669 notif.on_connection_handler_event(
3671 peer,
3672 conn1,
3673 NotifsHandlerOut::OpenDesiredByRemote {
3674 protocol_index: 0,
3675 handshake: vec![1, 3, 3, 7],
3676 },
3677 );
3678 assert!(std::matches!(
3679 notif.peers.get_mut(&(peer, 0.into())),
3680 Some(&mut PeerState::Incoming { .. })
3681 ));
3682
3683 notif.on_swarm_event(FromSwarm::ConnectionClosed(
3684 libp2p::swarm::behaviour::ConnectionClosed {
3685 peer_id: peer,
3686 connection_id: conn2,
3687 endpoint: &connected.clone(),
3688 cause: None,
3689 remaining_established: 0usize,
3690 },
3691 ));
3692 assert!(std::matches!(notif.peers.get(&(peer, set_id)), Some(&PeerState::Incoming { .. })));
3693 }
3694
3695 #[test]
3696 fn two_connections_active_connection_gets_closed_peer_state_is_disabled() {
3697 let (mut notif, _controller, _notif_service) = development_notifs();
3698 let peer = PeerId::random();
3699 let conn1 = ConnectionId::new_unchecked(0);
3700 let conn2 = ConnectionId::new_unchecked(1);
3701 let set_id = SetId::from(0);
3702 let connected = ConnectedPoint::Listener {
3703 local_addr: Multiaddr::empty(),
3704 send_back_addr: Multiaddr::empty(),
3705 };
3706
3707 for conn_id in vec![conn1, conn2] {
3709 notif.on_swarm_event(FromSwarm::ConnectionEstablished(
3710 libp2p::swarm::behaviour::ConnectionEstablished {
3711 peer_id: peer,
3712 connection_id: conn_id,
3713 endpoint: &ConnectedPoint::Listener {
3714 local_addr: Multiaddr::empty(),
3715 send_back_addr: Multiaddr::empty(),
3716 },
3717 failed_addresses: &[],
3718 other_established: 0usize,
3719 },
3720 ));
3721 }
3722
3723 if let Some(PeerState::Disabled { connections, .. }) = notif.peers.get(&(peer, set_id)) {
3724 assert_eq!(connections[0], (conn1, ConnectionState::Closed));
3725 assert_eq!(connections[1], (conn2, ConnectionState::Closed));
3726 } else {
3727 panic!("invalid state");
3728 }
3729
3730 notif.on_connection_handler_event(
3732 peer,
3733 conn1,
3734 NotifsHandlerOut::OpenDesiredByRemote {
3735 protocol_index: 0,
3736 handshake: vec![1, 3, 3, 7],
3737 },
3738 );
3739 assert!(std::matches!(
3740 notif.peers.get_mut(&(peer, 0.into())),
3741 Some(PeerState::Incoming { .. })
3742 ));
3743
3744 notif.on_swarm_event(FromSwarm::ConnectionClosed(
3745 libp2p::swarm::behaviour::ConnectionClosed {
3746 peer_id: peer,
3747 connection_id: conn1,
3748 endpoint: &connected.clone(),
3749 cause: None,
3750 remaining_established: 0usize,
3751 },
3752 ));
3753 assert!(std::matches!(notif.peers.get(&(peer, set_id)), Some(&PeerState::Disabled { .. })));
3754 }
3755
3756 #[test]
3757 fn inject_connection_closed_for_active_connection() {
3758 let (mut notif, _controller, _notif_service) = development_notifs();
3759 let peer = PeerId::random();
3760 let conn1 = ConnectionId::new_unchecked(0);
3761 let conn2 = ConnectionId::new_unchecked(1);
3762 let set_id = SetId::from(0);
3763 let connected = ConnectedPoint::Listener {
3764 local_addr: Multiaddr::empty(),
3765 send_back_addr: Multiaddr::empty(),
3766 };
3767 let mut conn_yielder = ConnectionYielder::new();
3768
3769 for conn_id in vec![conn1, conn2] {
3771 notif.on_swarm_event(FromSwarm::ConnectionEstablished(
3772 libp2p::swarm::behaviour::ConnectionEstablished {
3773 peer_id: peer,
3774 connection_id: conn_id,
3775 endpoint: &connected,
3776 failed_addresses: &[],
3777 other_established: 0usize,
3778 },
3779 ));
3780 }
3781
3782 if let Some(PeerState::Disabled { connections, .. }) = notif.peers.get(&(peer, set_id)) {
3783 assert_eq!(connections[0], (conn1, ConnectionState::Closed));
3784 assert_eq!(connections[1], (conn2, ConnectionState::Closed));
3785 } else {
3786 panic!("invalid state");
3787 }
3788
3789 notif.peerset_report_connect(peer, set_id);
3791
3792 if let Some(PeerState::Enabled { connections, .. }) = notif.peers.get(&(peer, set_id)) {
3793 assert_eq!(connections[0], (conn1, ConnectionState::Opening));
3794 assert_eq!(connections[1], (conn2, ConnectionState::Closed));
3795 } else {
3796 panic!("invalid state");
3797 }
3798
3799 notif.on_connection_handler_event(
3800 peer,
3801 conn1,
3802 conn_yielder.open_substream(peer, 0, vec![1, 2, 3, 4]),
3803 );
3804
3805 if let Some(PeerState::Enabled { ref connections, .. }) = notif.peers.get(&(peer, set_id)) {
3806 assert!(std::matches!(connections[0].1, ConnectionState::Open(_)));
3807 assert_eq!(connections[0].0, conn1);
3808 assert_eq!(connections[1], (conn2, ConnectionState::Closed));
3809 } else {
3810 panic!("invalid state");
3811 }
3812
3813 notif.on_swarm_event(FromSwarm::ConnectionClosed(
3814 libp2p::swarm::behaviour::ConnectionClosed {
3815 peer_id: peer,
3816 connection_id: conn1,
3817 endpoint: &connected.clone(),
3818 cause: None,
3819 remaining_established: 0usize,
3820 },
3821 ));
3822 }
3823
3824 #[test]
3825 fn inject_dial_failure_for_pending_request() {
3826 let (mut notif, _controller, _notif_service) = development_notifs();
3827 let set_id = SetId::from(0);
3828 let peer = PeerId::random();
3829 let conn = ConnectionId::new_unchecked(0);
3830 let connected = ConnectedPoint::Listener {
3831 local_addr: Multiaddr::empty(),
3832 send_back_addr: Multiaddr::empty(),
3833 };
3834
3835 notif.on_swarm_event(FromSwarm::ConnectionEstablished(
3836 libp2p::swarm::behaviour::ConnectionEstablished {
3837 peer_id: peer,
3838 connection_id: conn,
3839 endpoint: &connected,
3840 failed_addresses: &[],
3841 other_established: 0usize,
3842 },
3843 ));
3844 assert!(std::matches!(notif.peers.get(&(peer, set_id)), Some(&PeerState::Disabled { .. })));
3845
3846 if let Some(PeerState::Disabled { ref mut backoff_until, .. }) =
3848 notif.peers.get_mut(&(peer, set_id))
3849 {
3850 *backoff_until =
3851 Some(Instant::now().checked_add(std::time::Duration::from_secs(5)).unwrap());
3852 }
3853
3854 notif.on_swarm_event(FromSwarm::ConnectionClosed(
3855 libp2p::swarm::behaviour::ConnectionClosed {
3856 peer_id: peer,
3857 connection_id: conn,
3858 endpoint: &connected.clone(),
3859 cause: None,
3860 remaining_established: 0usize,
3861 },
3862 ));
3863
3864 assert!(std::matches!(notif.peers.get(&(peer, set_id)), Some(&PeerState::Backoff { .. })));
3865
3866 notif.peerset_report_connect(peer, set_id);
3868 assert!(std::matches!(
3869 notif.peers.get(&(peer, set_id)),
3870 Some(&PeerState::PendingRequest { .. })
3871 ));
3872
3873 let now = Instant::now();
3874 notif.on_swarm_event(FromSwarm::DialFailure(libp2p::swarm::behaviour::DialFailure {
3875 peer_id: Some(peer),
3876 error: &libp2p::swarm::DialError::Aborted,
3877 connection_id: ConnectionId::new_unchecked(0),
3878 }));
3879
3880 if let Some(PeerState::PendingRequest { ref timer_deadline, .. }) =
3881 notif.peers.get(&(peer, set_id))
3882 {
3883 assert!(timer_deadline > &(now + std::time::Duration::from_secs(5)));
3884 }
3885 }
3886
3887 #[test]
3888 fn peerstate_incoming_open_desired_by_remote() {
3889 let (mut notif, _controller, _notif_service) = development_notifs();
3890 let peer = PeerId::random();
3891 let set_id = SetId::from(0);
3892 let conn1 = ConnectionId::new_unchecked(0);
3893 let conn2 = ConnectionId::new_unchecked(1);
3894 let connected = ConnectedPoint::Listener {
3895 local_addr: Multiaddr::empty(),
3896 send_back_addr: Multiaddr::empty(),
3897 };
3898
3899 notif.on_swarm_event(FromSwarm::ConnectionEstablished(
3900 libp2p::swarm::behaviour::ConnectionEstablished {
3901 peer_id: peer,
3902 connection_id: conn1,
3903 endpoint: &connected,
3904 failed_addresses: &[],
3905 other_established: 0usize,
3906 },
3907 ));
3908 notif.on_swarm_event(FromSwarm::ConnectionEstablished(
3909 libp2p::swarm::behaviour::ConnectionEstablished {
3910 peer_id: peer,
3911 connection_id: conn2,
3912 endpoint: &connected,
3913 failed_addresses: &[],
3914 other_established: 0usize,
3915 },
3916 ));
3917 assert!(std::matches!(notif.peers.get(&(peer, set_id)), Some(&PeerState::Disabled { .. })));
3918
3919 notif.on_connection_handler_event(
3921 peer,
3922 conn1,
3923 NotifsHandlerOut::OpenDesiredByRemote {
3924 protocol_index: 0,
3925 handshake: vec![1, 3, 3, 7],
3926 },
3927 );
3928 assert!(std::matches!(notif.peers.get(&(peer, set_id)), Some(&PeerState::Incoming { .. })));
3929
3930 notif.on_connection_handler_event(
3932 peer,
3933 conn2,
3934 NotifsHandlerOut::OpenDesiredByRemote {
3935 protocol_index: 0,
3936 handshake: vec![1, 3, 3, 7],
3937 },
3938 );
3939
3940 if let Some(PeerState::Incoming { ref connections, .. }) = notif.peers.get(&(peer, set_id))
3941 {
3942 assert_eq!(connections[0], (conn1, ConnectionState::OpenDesiredByRemote));
3943 assert_eq!(connections[1], (conn2, ConnectionState::OpenDesiredByRemote));
3944 }
3945 }
3946
3947 #[tokio::test]
3948 async fn remove_backoff_peer_after_timeout() {
3949 let (mut notif, _controller, _notif_service) = development_notifs();
3950 let peer = PeerId::random();
3951 let set_id = SetId::from(0);
3952 let conn = ConnectionId::new_unchecked(0);
3953 let connected = ConnectedPoint::Listener {
3954 local_addr: Multiaddr::empty(),
3955 send_back_addr: Multiaddr::empty(),
3956 };
3957
3958 notif.on_swarm_event(FromSwarm::ConnectionEstablished(
3959 libp2p::swarm::behaviour::ConnectionEstablished {
3960 peer_id: peer,
3961 connection_id: conn,
3962 endpoint: &connected,
3963 failed_addresses: &[],
3964 other_established: 0usize,
3965 },
3966 ));
3967
3968 if let Some(&mut PeerState::Disabled { ref mut backoff_until, .. }) =
3969 notif.peers.get_mut(&(peer, 0.into()))
3970 {
3971 *backoff_until =
3972 Some(Instant::now().checked_add(std::time::Duration::from_millis(100)).unwrap());
3973 } else {
3974 panic!("invalid state");
3975 }
3976
3977 notif.on_swarm_event(FromSwarm::ConnectionClosed(
3978 libp2p::swarm::behaviour::ConnectionClosed {
3979 peer_id: peer,
3980 connection_id: conn,
3981 endpoint: &connected.clone(),
3982 cause: None,
3983 remaining_established: 0usize,
3984 },
3985 ));
3986
3987 let until = if let Some(&PeerState::Backoff { timer_deadline, .. }) =
3988 notif.peers.get(&(peer, set_id))
3989 {
3990 timer_deadline
3991 } else {
3992 panic!("invalid state");
3993 };
3994
3995 if until > Instant::now() {
3996 std::thread::sleep(until - Instant::now());
3997 }
3998
3999 assert!(notif.peers.get(&(peer, set_id)).is_some());
4000
4001 if tokio::time::timeout(Duration::from_secs(5), async {
4002 loop {
4003 futures::future::poll_fn(|cx| {
4004 let _ = notif.poll(cx);
4005 Poll::Ready(())
4006 })
4007 .await;
4008
4009 if notif.peers.get(&(peer, set_id)).is_none() {
4010 break
4011 }
4012 }
4013 })
4014 .await
4015 .is_err()
4016 {
4017 panic!("backoff peer was not removed in time");
4018 }
4019
4020 assert!(notif.peers.get(&(peer, set_id)).is_none());
4021 }
4022
4023 #[tokio::test]
4024 async fn reschedule_disabled_pending_enable_when_connection_not_closed() {
4025 let (mut notif, _controller, _notif_service) = development_notifs();
4026 let peer = PeerId::random();
4027 let conn = ConnectionId::new_unchecked(0);
4028 let set_id = SetId::from(0);
4029 let mut conn_yielder = ConnectionYielder::new();
4030
4031 notif.on_swarm_event(FromSwarm::ConnectionEstablished(
4033 libp2p::swarm::behaviour::ConnectionEstablished {
4034 peer_id: peer,
4035 connection_id: conn,
4036 endpoint: &ConnectedPoint::Listener {
4037 local_addr: Multiaddr::empty(),
4038 send_back_addr: Multiaddr::empty(),
4039 },
4040 failed_addresses: &[],
4041 other_established: 0usize,
4042 },
4043 ));
4044 assert!(std::matches!(notif.peers.get(&(peer, set_id)), Some(&PeerState::Disabled { .. })));
4045
4046 notif.on_connection_handler_event(
4048 peer,
4049 conn,
4050 NotifsHandlerOut::OpenDesiredByRemote {
4051 protocol_index: 0,
4052 handshake: vec![1, 3, 3, 7],
4053 },
4054 );
4055 assert!(std::matches!(notif.peers.get(&(peer, set_id)), Some(&PeerState::Incoming { .. })));
4056
4057 notif.protocol_report_accept(IncomingIndex(0));
4060 assert!(std::matches!(notif.peers.get(&(peer, set_id)), Some(&PeerState::Enabled { .. })));
4061
4062 let event = conn_yielder.open_substream(peer, 0, vec![1, 2, 3, 4]);
4063
4064 notif.on_connection_handler_event(peer, conn, event);
4065 assert!(std::matches!(notif.peers.get(&(peer, set_id)), Some(&PeerState::Enabled { .. })));
4066
4067 if let Some(PeerState::Enabled { ref connections, .. }) = notif.peers.get(&(peer, set_id)) {
4068 assert!(std::matches!(connections[0], (_, ConnectionState::Open(_))));
4069 assert_eq!(connections[0].0, conn);
4070 } else {
4071 panic!("invalid state");
4072 }
4073
4074 notif.peerset_report_disconnect(peer, set_id);
4075
4076 if let Some(PeerState::Disabled { ref connections, ref mut backoff_until }) =
4077 notif.peers.get_mut(&(peer, set_id))
4078 {
4079 assert!(std::matches!(connections[0], (_, ConnectionState::Closing)));
4080 assert_eq!(connections[0].0, conn);
4081
4082 *backoff_until =
4083 Some(Instant::now().checked_add(std::time::Duration::from_secs(2)).unwrap());
4084 } else {
4085 panic!("invalid state");
4086 }
4087
4088 notif.peerset_report_connect(peer, set_id);
4089
4090 let prev_instant =
4091 if let Some(PeerState::DisabledPendingEnable {
4092 ref connections, timer_deadline, ..
4093 }) = notif.peers.get(&(peer, set_id))
4094 {
4095 assert!(std::matches!(connections[0], (_, ConnectionState::Closing)));
4096 assert_eq!(connections[0].0, conn);
4097
4098 *timer_deadline
4099 } else {
4100 panic!("invalid state");
4101 };
4102
4103 if tokio::time::timeout(Duration::from_secs(5), async {
4108 loop {
4109 futures::future::poll_fn(|cx| {
4110 let _ = notif.poll(cx);
4111 Poll::Ready(())
4112 })
4113 .await;
4114
4115 if let Some(PeerState::DisabledPendingEnable {
4116 timer_deadline, connections, ..
4117 }) = notif.peers.get(&(peer, set_id))
4118 {
4119 assert!(std::matches!(connections[0], (_, ConnectionState::Closing)));
4120
4121 if timer_deadline != &prev_instant {
4122 break
4123 }
4124 } else {
4125 panic!("invalid state");
4126 }
4127 }
4128 })
4129 .await
4130 .is_err()
4131 {
4132 panic!("backoff peer was not removed in time");
4133 }
4134 }
4135
4136 #[test]
4137 #[should_panic]
4138 #[cfg(debug_assertions)]
4139 fn peerset_report_connect_with_enabled_peer() {
4140 let (mut notif, _controller, _notif_service) = development_notifs();
4141 let peer = PeerId::random();
4142 let conn = ConnectionId::new_unchecked(0);
4143 let set_id = SetId::from(0);
4144 let connected = ConnectedPoint::Listener {
4145 local_addr: Multiaddr::empty(),
4146 send_back_addr: Multiaddr::empty(),
4147 };
4148 let mut conn_yielder = ConnectionYielder::new();
4149
4150 notif.on_swarm_event(FromSwarm::ConnectionEstablished(
4152 libp2p::swarm::behaviour::ConnectionEstablished {
4153 peer_id: peer,
4154 connection_id: conn,
4155 endpoint: &connected,
4156 failed_addresses: &[],
4157 other_established: 0usize,
4158 },
4159 ));
4160 assert!(std::matches!(notif.peers.get(&(peer, set_id)), Some(&PeerState::Disabled { .. })));
4161
4162 notif.on_connection_handler_event(
4163 peer,
4164 conn,
4165 NotifsHandlerOut::OpenDesiredByRemote {
4166 protocol_index: 0,
4167 handshake: vec![1, 3, 3, 7],
4168 },
4169 );
4170 assert!(std::matches!(notif.peers.get(&(peer, set_id)), Some(&PeerState::Incoming { .. })));
4171
4172 notif.peerset_report_connect(peer, set_id);
4173 assert!(std::matches!(notif.peers.get(&(peer, set_id)), Some(&PeerState::Enabled { .. })));
4174
4175 let event = conn_yielder.open_substream(peer, 0, vec![1, 2, 3, 4]);
4176
4177 notif.on_connection_handler_event(peer, conn, event);
4178 assert!(std::matches!(notif.peers.get(&(peer, set_id)), Some(&PeerState::Enabled { .. })));
4179
4180 if let Some(PeerState::Enabled { ref connections, .. }) = notif.peers.get(&(peer, set_id)) {
4181 assert!(std::matches!(connections[0], (_, ConnectionState::Open(_))));
4182 assert_eq!(connections[0].0, conn);
4183 } else {
4184 panic!("invalid state");
4185 }
4186
4187 notif.peerset_report_connect(peer, set_id);
4188 }
4189
4190 #[test]
4191 #[cfg(debug_assertions)]
4192 fn peerset_report_connect_with_disabled_pending_enable_peer() {
4193 let (mut notif, _controller, _notif_service) = development_notifs();
4194 let set_id = SetId::from(0);
4195 let peer = PeerId::random();
4196 let conn = ConnectionId::new_unchecked(0);
4197 let connected = ConnectedPoint::Listener {
4198 local_addr: Multiaddr::empty(),
4199 send_back_addr: Multiaddr::empty(),
4200 };
4201
4202 notif.on_swarm_event(FromSwarm::ConnectionEstablished(
4203 libp2p::swarm::behaviour::ConnectionEstablished {
4204 peer_id: peer,
4205 connection_id: conn,
4206 endpoint: &connected,
4207 failed_addresses: &[],
4208 other_established: 0usize,
4209 },
4210 ));
4211 assert!(std::matches!(notif.peers.get(&(peer, set_id)), Some(&PeerState::Disabled { .. })));
4212
4213 if let Some(PeerState::Disabled { ref mut backoff_until, .. }) =
4215 notif.peers.get_mut(&(peer, set_id))
4216 {
4217 *backoff_until =
4218 Some(Instant::now().checked_add(std::time::Duration::from_secs(5)).unwrap());
4219 }
4220
4221 notif.peerset_report_connect(peer, set_id);
4223 assert!(std::matches!(
4224 notif.peers.get(&(peer, set_id)),
4225 Some(&PeerState::DisabledPendingEnable { .. })
4226 ));
4227
4228 notif.peerset_report_connect(peer, set_id);
4230 assert!(std::matches!(
4231 notif.peers.get(&(peer, set_id)),
4232 Some(&PeerState::DisabledPendingEnable { .. })
4233 ));
4234 }
4235
4236 #[test]
4237 #[cfg(debug_assertions)]
4238 fn peerset_report_connect_with_requested_peer() {
4239 let (mut notif, _controller, _notif_service) = development_notifs();
4240 let peer = PeerId::random();
4241 let set_id = SetId::from(0);
4242
4243 notif.peerset_report_connect(peer, set_id);
4245 assert!(std::matches!(notif.peers.get(&(peer, set_id)), Some(&PeerState::Requested)));
4246
4247 notif.peerset_report_connect(peer, set_id);
4249 assert!(std::matches!(notif.peers.get(&(peer, set_id)), Some(&PeerState::Requested)));
4250 }
4251
4252 #[test]
4253 #[cfg(debug_assertions)]
4254 fn peerset_report_connect_with_pending_requested() {
4255 let (mut notif, _controller, _notif_service) = development_notifs();
4256 let set_id = SetId::from(0);
4257 let peer = PeerId::random();
4258 let conn = ConnectionId::new_unchecked(0);
4259 let connected = ConnectedPoint::Listener {
4260 local_addr: Multiaddr::empty(),
4261 send_back_addr: Multiaddr::empty(),
4262 };
4263
4264 notif.on_swarm_event(FromSwarm::ConnectionEstablished(
4265 libp2p::swarm::behaviour::ConnectionEstablished {
4266 peer_id: peer,
4267 connection_id: conn,
4268 endpoint: &connected,
4269 failed_addresses: &[],
4270 other_established: 0usize,
4271 },
4272 ));
4273 assert!(std::matches!(notif.peers.get(&(peer, set_id)), Some(&PeerState::Disabled { .. })));
4274
4275 if let Some(PeerState::Disabled { ref mut backoff_until, .. }) =
4277 notif.peers.get_mut(&(peer, set_id))
4278 {
4279 *backoff_until =
4280 Some(Instant::now().checked_add(std::time::Duration::from_secs(5)).unwrap());
4281 }
4282
4283 notif.on_swarm_event(FromSwarm::ConnectionClosed(
4284 libp2p::swarm::behaviour::ConnectionClosed {
4285 peer_id: peer,
4286 connection_id: conn,
4287 endpoint: &connected.clone(),
4288 cause: None,
4289 remaining_established: 0usize,
4290 },
4291 ));
4292 assert!(std::matches!(notif.peers.get(&(peer, set_id)), Some(&PeerState::Backoff { .. })));
4293
4294 notif.peerset_report_connect(peer, set_id);
4296 assert!(std::matches!(
4297 notif.peers.get(&(peer, set_id)),
4298 Some(&PeerState::PendingRequest { .. })
4299 ));
4300
4301 notif.peerset_report_connect(peer, set_id);
4303 assert!(std::matches!(
4304 notif.peers.get(&(peer, set_id)),
4305 Some(&PeerState::PendingRequest { .. })
4306 ));
4307 }
4308
4309 #[test]
4310 #[cfg(debug_assertions)]
4311 fn peerset_report_connect_with_incoming_peer() {
4312 let (mut notif, _controller, _notif_service) = development_notifs();
4313 let peer = PeerId::random();
4314 let set_id = SetId::from(0);
4315 let conn = ConnectionId::new_unchecked(0);
4316 let connected = ConnectedPoint::Listener {
4317 local_addr: Multiaddr::empty(),
4318 send_back_addr: Multiaddr::empty(),
4319 };
4320
4321 notif.on_swarm_event(FromSwarm::ConnectionEstablished(
4322 libp2p::swarm::behaviour::ConnectionEstablished {
4323 peer_id: peer,
4324 connection_id: conn,
4325 endpoint: &connected,
4326 failed_addresses: &[],
4327 other_established: 0usize,
4328 },
4329 ));
4330 assert!(std::matches!(notif.peers.get(&(peer, set_id)), Some(&PeerState::Disabled { .. })));
4331
4332 notif.on_connection_handler_event(
4334 peer,
4335 conn,
4336 NotifsHandlerOut::OpenDesiredByRemote {
4337 protocol_index: 0,
4338 handshake: vec![1, 3, 3, 7],
4339 },
4340 );
4341 assert!(std::matches!(notif.peers.get(&(peer, set_id)), Some(&PeerState::Incoming { .. })));
4342
4343 notif.peerset_report_connect(peer, set_id);
4344 assert!(std::matches!(notif.peers.get(&(peer, set_id)), Some(&PeerState::Incoming { .. })));
4345 }
4346
4347 #[test]
4348 #[cfg(debug_assertions)]
4349 fn peerset_report_disconnect_with_incoming_peer() {
4350 let (mut notif, _controller, _notif_service) = development_notifs();
4351 let peer = PeerId::random();
4352 let set_id = SetId::from(0);
4353 let conn = ConnectionId::new_unchecked(0);
4354 let connected = ConnectedPoint::Listener {
4355 local_addr: Multiaddr::empty(),
4356 send_back_addr: Multiaddr::empty(),
4357 };
4358
4359 notif.on_swarm_event(FromSwarm::ConnectionEstablished(
4360 libp2p::swarm::behaviour::ConnectionEstablished {
4361 peer_id: peer,
4362 connection_id: conn,
4363 endpoint: &connected,
4364 failed_addresses: &[],
4365 other_established: 0usize,
4366 },
4367 ));
4368 assert!(std::matches!(notif.peers.get(&(peer, set_id)), Some(&PeerState::Disabled { .. })));
4369
4370 notif.on_connection_handler_event(
4372 peer,
4373 conn,
4374 NotifsHandlerOut::OpenDesiredByRemote {
4375 protocol_index: 0,
4376 handshake: vec![1, 3, 3, 7],
4377 },
4378 );
4379 assert!(std::matches!(notif.peers.get(&(peer, set_id)), Some(&PeerState::Incoming { .. })));
4380
4381 notif.peerset_report_disconnect(peer, set_id);
4382 assert!(std::matches!(notif.peers.get(&(peer, set_id)), Some(&PeerState::Incoming { .. })));
4383 }
4384
4385 #[test]
4386 #[cfg(debug_assertions)]
4387 fn peerset_report_disconnect_with_incoming_peer_protocol_accepts() {
4388 let (mut notif, _controller, _notif_service) = development_notifs();
4389 let peer = PeerId::random();
4390 let set_id = SetId::from(0);
4391 let conn = ConnectionId::new_unchecked(0);
4392 let connected = ConnectedPoint::Listener {
4393 local_addr: Multiaddr::empty(),
4394 send_back_addr: Multiaddr::empty(),
4395 };
4396
4397 notif.on_swarm_event(FromSwarm::ConnectionEstablished(
4398 libp2p::swarm::behaviour::ConnectionEstablished {
4399 peer_id: peer,
4400 connection_id: conn,
4401 endpoint: &connected,
4402 failed_addresses: &[],
4403 other_established: 0usize,
4404 },
4405 ));
4406 assert!(std::matches!(notif.peers.get(&(peer, set_id)), Some(&PeerState::Disabled { .. })));
4407
4408 notif.on_connection_handler_event(
4410 peer,
4411 conn,
4412 NotifsHandlerOut::OpenDesiredByRemote {
4413 protocol_index: 0,
4414 handshake: vec![1, 3, 3, 7],
4415 },
4416 );
4417 assert!(std::matches!(notif.peers.get(&(peer, set_id)), Some(&PeerState::Incoming { .. })));
4418
4419 notif.peerset_report_disconnect(peer, set_id);
4422
4423 let incoming_index = match notif.peers.get(&(peer, set_id)) {
4424 Some(&PeerState::Incoming { peerset_rejected, incoming_index, .. }) => {
4425 assert!(peerset_rejected);
4426 incoming_index
4427 },
4428 state => panic!("invalid state: {state:?}"),
4429 };
4430
4431 notif.protocol_report_accept(incoming_index);
4434
4435 match notif.peers.get(&(peer, set_id)) {
4436 Some(&PeerState::Disabled { .. }) => {},
4437 state => panic!("invalid state: {state:?}"),
4438 };
4439 }
4440
4441 #[test]
4442 #[cfg(debug_assertions)]
4443 fn peer_disconnected_protocol_accepts() {
4444 let (mut notif, _controller, _notif_service) = development_notifs();
4445 let peer = PeerId::random();
4446 let set_id = SetId::from(0);
4447 let conn = ConnectionId::new_unchecked(0);
4448 let connected = ConnectedPoint::Listener {
4449 local_addr: Multiaddr::empty(),
4450 send_back_addr: Multiaddr::empty(),
4451 };
4452
4453 notif.on_swarm_event(FromSwarm::ConnectionEstablished(
4454 libp2p::swarm::behaviour::ConnectionEstablished {
4455 peer_id: peer,
4456 connection_id: conn,
4457 endpoint: &connected,
4458 failed_addresses: &[],
4459 other_established: 0usize,
4460 },
4461 ));
4462 assert!(std::matches!(notif.peers.get(&(peer, set_id)), Some(&PeerState::Disabled { .. })));
4463
4464 notif.on_connection_handler_event(
4466 peer,
4467 conn,
4468 NotifsHandlerOut::OpenDesiredByRemote {
4469 protocol_index: 0,
4470 handshake: vec![1, 3, 3, 7],
4471 },
4472 );
4473 assert!(std::matches!(notif.peers.get(&(peer, set_id)), Some(&PeerState::Incoming { .. })));
4474
4475 assert!(notif.incoming.iter().any(|entry| entry.incoming_id == IncomingIndex(0)));
4476 notif.disconnect_peer(&peer, set_id);
4477
4478 notif.protocol_report_accept(IncomingIndex(0));
4481
4482 match notif.peers.get(&(peer, set_id)) {
4483 Some(&PeerState::Disabled { .. }) => {},
4484 state => panic!("invalid state: {state:?}"),
4485 };
4486
4487 assert!(!notif.incoming.iter().any(|entry| entry.incoming_id == IncomingIndex(0)));
4488 }
4489
4490 #[test]
4491 #[cfg(debug_assertions)]
4492 fn connection_closed_protocol_accepts() {
4493 let (mut notif, _controller, _notif_service) = development_notifs();
4494 let peer = PeerId::random();
4495 let set_id = SetId::from(0);
4496 let conn = ConnectionId::new_unchecked(0);
4497 let connected = ConnectedPoint::Listener {
4498 local_addr: Multiaddr::empty(),
4499 send_back_addr: Multiaddr::empty(),
4500 };
4501
4502 notif.on_swarm_event(FromSwarm::ConnectionEstablished(
4503 libp2p::swarm::behaviour::ConnectionEstablished {
4504 peer_id: peer,
4505 connection_id: conn,
4506 endpoint: &connected,
4507 failed_addresses: &[],
4508 other_established: 0usize,
4509 },
4510 ));
4511 assert!(std::matches!(notif.peers.get(&(peer, set_id)), Some(&PeerState::Disabled { .. })));
4512
4513 notif.on_connection_handler_event(
4515 peer,
4516 conn,
4517 NotifsHandlerOut::OpenDesiredByRemote {
4518 protocol_index: 0,
4519 handshake: vec![1, 3, 3, 7],
4520 },
4521 );
4522 assert!(std::matches!(notif.peers.get(&(peer, set_id)), Some(&PeerState::Incoming { .. })));
4523
4524 notif.on_swarm_event(FromSwarm::ConnectionClosed(
4525 libp2p::swarm::behaviour::ConnectionClosed {
4526 peer_id: peer,
4527 connection_id: ConnectionId::new_unchecked(0),
4528 endpoint: &connected.clone(),
4529 cause: None,
4530 remaining_established: 0usize,
4531 },
4532 ));
4533
4534 notif.protocol_report_accept(IncomingIndex(0));
4536
4537 match notif.peers.get(&(peer, set_id)) {
4538 None => {},
4539 state => panic!("invalid state: {state:?}"),
4540 };
4541 }
4542
4543 #[test]
4544 #[cfg(debug_assertions)]
4545 fn peer_disconnected_protocol_reject() {
4546 let (mut notif, _controller, _notif_service) = development_notifs();
4547 let peer = PeerId::random();
4548 let set_id = SetId::from(0);
4549 let conn = ConnectionId::new_unchecked(0);
4550 let connected = ConnectedPoint::Listener {
4551 local_addr: Multiaddr::empty(),
4552 send_back_addr: Multiaddr::empty(),
4553 };
4554
4555 notif.on_swarm_event(FromSwarm::ConnectionEstablished(
4556 libp2p::swarm::behaviour::ConnectionEstablished {
4557 peer_id: peer,
4558 connection_id: conn,
4559 endpoint: &connected,
4560 failed_addresses: &[],
4561 other_established: 0usize,
4562 },
4563 ));
4564 assert!(std::matches!(notif.peers.get(&(peer, set_id)), Some(&PeerState::Disabled { .. })));
4565
4566 notif.on_connection_handler_event(
4568 peer,
4569 conn,
4570 NotifsHandlerOut::OpenDesiredByRemote {
4571 protocol_index: 0,
4572 handshake: vec![1, 3, 3, 7],
4573 },
4574 );
4575 assert!(std::matches!(notif.peers.get(&(peer, set_id)), Some(&PeerState::Incoming { .. })));
4576
4577 assert!(notif.incoming.iter().any(|entry| entry.incoming_id == IncomingIndex(0)));
4578 notif.disconnect_peer(&peer, set_id);
4579
4580 notif.protocol_report_reject(IncomingIndex(0));
4583
4584 match notif.peers.get(&(peer, set_id)) {
4585 Some(&PeerState::Disabled { .. }) => {},
4586 state => panic!("invalid state: {state:?}"),
4587 };
4588
4589 assert!(!notif.incoming.iter().any(|entry| entry.incoming_id == IncomingIndex(0)));
4590 }
4591
4592 #[test]
4593 #[cfg(debug_assertions)]
4594 fn connection_closed_protocol_rejects() {
4595 let (mut notif, _controller, _notif_service) = development_notifs();
4596 let peer = PeerId::random();
4597 let set_id = SetId::from(0);
4598 let conn = ConnectionId::new_unchecked(0);
4599 let connected = ConnectedPoint::Listener {
4600 local_addr: Multiaddr::empty(),
4601 send_back_addr: Multiaddr::empty(),
4602 };
4603
4604 notif.on_swarm_event(FromSwarm::ConnectionEstablished(
4605 libp2p::swarm::behaviour::ConnectionEstablished {
4606 peer_id: peer,
4607 connection_id: conn,
4608 endpoint: &connected,
4609 failed_addresses: &[],
4610 other_established: 0usize,
4611 },
4612 ));
4613 assert!(std::matches!(notif.peers.get(&(peer, set_id)), Some(&PeerState::Disabled { .. })));
4614
4615 notif.on_connection_handler_event(
4617 peer,
4618 conn,
4619 NotifsHandlerOut::OpenDesiredByRemote {
4620 protocol_index: 0,
4621 handshake: vec![1, 3, 3, 7],
4622 },
4623 );
4624 assert!(std::matches!(notif.peers.get(&(peer, set_id)), Some(&PeerState::Incoming { .. })));
4625
4626 notif.on_swarm_event(FromSwarm::ConnectionClosed(
4627 libp2p::swarm::behaviour::ConnectionClosed {
4628 peer_id: peer,
4629 connection_id: ConnectionId::new_unchecked(0),
4630 endpoint: &connected.clone(),
4631 cause: None,
4632 remaining_established: 0usize,
4633 },
4634 ));
4635
4636 notif.protocol_report_reject(IncomingIndex(0));
4638
4639 match notif.peers.get(&(peer, set_id)) {
4640 None => {},
4641 state => panic!("invalid state: {state:?}"),
4642 };
4643 }
4644
4645 #[test]
4646 #[should_panic]
4647 #[cfg(debug_assertions)]
4648 fn protocol_report_accept_not_incoming_peer() {
4649 let (mut notif, _controller, _notif_service) = development_notifs();
4650 let peer = PeerId::random();
4651 let conn = ConnectionId::new_unchecked(0);
4652 let set_id = SetId::from(0);
4653 let connected = ConnectedPoint::Listener {
4654 local_addr: Multiaddr::empty(),
4655 send_back_addr: Multiaddr::empty(),
4656 };
4657 let mut conn_yielder = ConnectionYielder::new();
4658
4659 notif.on_swarm_event(FromSwarm::ConnectionEstablished(
4660 libp2p::swarm::behaviour::ConnectionEstablished {
4661 peer_id: peer,
4662 connection_id: conn,
4663 endpoint: &connected,
4664 failed_addresses: &[],
4665 other_established: 0usize,
4666 },
4667 ));
4668 assert!(std::matches!(notif.peers.get(&(peer, set_id)), Some(&PeerState::Disabled { .. })));
4669
4670 notif.on_connection_handler_event(
4672 peer,
4673 conn,
4674 NotifsHandlerOut::OpenDesiredByRemote {
4675 protocol_index: 0,
4676 handshake: vec![1, 3, 3, 7],
4677 },
4678 );
4679 assert!(std::matches!(notif.peers.get(&(peer, set_id)), Some(&PeerState::Incoming { .. })));
4680
4681 assert!(std::matches!(
4682 notif.incoming[0],
4683 IncomingPeer { alive: true, incoming_id: IncomingIndex(0), .. },
4684 ));
4685
4686 notif.peerset_report_connect(peer, set_id);
4687 assert!(std::matches!(notif.peers.get(&(peer, set_id)), Some(&PeerState::Enabled { .. })));
4688
4689 let event = conn_yielder.open_substream(peer, 0, vec![1, 2, 3, 4]);
4690 notif.on_connection_handler_event(peer, conn, event);
4691
4692 assert!(std::matches!(notif.peers.get(&(peer, set_id)), Some(&PeerState::Enabled { .. })));
4693 notif.incoming[0].alive = true;
4694 notif.protocol_report_accept(IncomingIndex(0));
4695 }
4696
4697 #[test]
4698 #[should_panic]
4699 #[cfg(debug_assertions)]
4700 fn inject_connection_closed_non_existent_peer() {
4701 let (mut notif, _controller, _notif_service) = development_notifs();
4702 let peer = PeerId::random();
4703 let endpoint = ConnectedPoint::Listener {
4704 local_addr: Multiaddr::empty(),
4705 send_back_addr: Multiaddr::empty(),
4706 };
4707
4708 notif.on_swarm_event(FromSwarm::ConnectionClosed(
4709 libp2p::swarm::behaviour::ConnectionClosed {
4710 peer_id: peer,
4711 connection_id: ConnectionId::new_unchecked(0),
4712 endpoint: &endpoint.clone(),
4713 cause: None,
4714 remaining_established: 0usize,
4715 },
4716 ));
4717 }
4718
4719 #[test]
4720 fn disconnect_non_existent_peer() {
4721 let (mut notif, _controller, _notif_service) = development_notifs();
4722 let peer = PeerId::random();
4723 let set_id = SetId::from(0);
4724
4725 notif.peerset_report_disconnect(peer, set_id);
4726
4727 assert!(notif.peers.is_empty());
4728 assert!(notif.incoming.is_empty());
4729 }
4730
4731 #[test]
4732 fn accept_non_existent_connection() {
4733 let (mut notif, _controller, _notif_service) = development_notifs();
4734
4735 notif.protocol_report_accept(0.into());
4736
4737 assert!(notif.peers.is_empty());
4738 assert!(notif.incoming.is_empty());
4739 }
4740
4741 #[test]
4742 fn reject_non_existent_connection() {
4743 let (mut notif, _controller, _notif_service) = development_notifs();
4744
4745 notif.protocol_report_reject(0.into());
4746
4747 assert!(notif.peers.is_empty());
4748 assert!(notif.incoming.is_empty());
4749 }
4750
4751 #[test]
4752 fn reject_non_active_connection() {
4753 let (mut notif, _controller, _notif_service) = development_notifs();
4754 let peer = PeerId::random();
4755 let conn = ConnectionId::new_unchecked(0);
4756 let set_id = SetId::from(0);
4757 let connected = ConnectedPoint::Listener {
4758 local_addr: Multiaddr::empty(),
4759 send_back_addr: Multiaddr::empty(),
4760 };
4761
4762 notif.on_swarm_event(FromSwarm::ConnectionEstablished(
4763 libp2p::swarm::behaviour::ConnectionEstablished {
4764 peer_id: peer,
4765 connection_id: conn,
4766 endpoint: &connected,
4767 failed_addresses: &[],
4768 other_established: 0usize,
4769 },
4770 ));
4771 assert!(std::matches!(notif.peers.get(&(peer, set_id)), Some(&PeerState::Disabled { .. })));
4772
4773 notif.on_connection_handler_event(
4775 peer,
4776 conn,
4777 NotifsHandlerOut::OpenDesiredByRemote {
4778 protocol_index: 0,
4779 handshake: vec![1, 3, 3, 7],
4780 },
4781 );
4782 assert!(std::matches!(notif.peers.get(&(peer, set_id)), Some(&PeerState::Incoming { .. })));
4783
4784 notif.incoming[0].alive = false;
4785 notif.protocol_report_reject(0.into());
4786
4787 assert!(std::matches!(notif.peers.get(&(peer, set_id)), Some(&PeerState::Incoming { .. })));
4788 }
4789
4790 #[test]
4791 #[should_panic]
4792 #[cfg(debug_assertions)]
4793 fn inject_non_existent_connection_closed_for_incoming_peer() {
4794 let (mut notif, _controller, _notif_service) = development_notifs();
4795 let peer = PeerId::random();
4796 let conn = ConnectionId::new_unchecked(0);
4797 let set_id = SetId::from(0);
4798 let connected = ConnectedPoint::Listener {
4799 local_addr: Multiaddr::empty(),
4800 send_back_addr: Multiaddr::empty(),
4801 };
4802
4803 notif.on_swarm_event(FromSwarm::ConnectionEstablished(
4804 libp2p::swarm::behaviour::ConnectionEstablished {
4805 peer_id: peer,
4806 connection_id: conn,
4807 endpoint: &connected,
4808 failed_addresses: &[],
4809 other_established: 0usize,
4810 },
4811 ));
4812 assert!(std::matches!(notif.peers.get(&(peer, set_id)), Some(&PeerState::Disabled { .. })));
4813
4814 notif.on_connection_handler_event(
4816 peer,
4817 conn,
4818 NotifsHandlerOut::OpenDesiredByRemote {
4819 protocol_index: 0,
4820 handshake: vec![1, 3, 3, 7],
4821 },
4822 );
4823 assert!(std::matches!(notif.peers.get(&(peer, set_id)), Some(&PeerState::Incoming { .. })));
4824
4825 notif.on_swarm_event(FromSwarm::ConnectionClosed(
4826 libp2p::swarm::behaviour::ConnectionClosed {
4827 peer_id: peer,
4828 connection_id: ConnectionId::new_unchecked(1337),
4829 endpoint: &connected.clone(),
4830 cause: None,
4831 remaining_established: 0usize,
4832 },
4833 ));
4834 }
4835
4836 #[test]
4837 #[should_panic]
4838 #[cfg(debug_assertions)]
4839 fn inject_non_existent_connection_closed_for_disabled_peer() {
4840 let (mut notif, _controller, _notif_service) = development_notifs();
4841 let set_id = SetId::from(0);
4842 let peer = PeerId::random();
4843 let conn = ConnectionId::new_unchecked(0);
4844 let connected = ConnectedPoint::Listener {
4845 local_addr: Multiaddr::empty(),
4846 send_back_addr: Multiaddr::empty(),
4847 };
4848
4849 notif.on_swarm_event(FromSwarm::ConnectionEstablished(
4850 libp2p::swarm::behaviour::ConnectionEstablished {
4851 peer_id: peer,
4852 connection_id: conn,
4853 endpoint: &connected,
4854 failed_addresses: &[],
4855 other_established: 0usize,
4856 },
4857 ));
4858 assert!(std::matches!(notif.peers.get(&(peer, set_id)), Some(&PeerState::Disabled { .. })));
4859
4860 notif.on_swarm_event(FromSwarm::ConnectionClosed(
4861 libp2p::swarm::behaviour::ConnectionClosed {
4862 peer_id: peer,
4863 connection_id: ConnectionId::new_unchecked(1337),
4864 endpoint: &connected.clone(),
4865 cause: None,
4866 remaining_established: 0usize,
4867 },
4868 ));
4869 }
4870
4871 #[test]
4872 #[should_panic]
4873 #[cfg(debug_assertions)]
4874 fn inject_non_existent_connection_closed_for_disabled_pending_enable() {
4875 let (mut notif, _controller, _notif_service) = development_notifs();
4876 let set_id = SetId::from(0);
4877 let peer = PeerId::random();
4878 let conn = ConnectionId::new_unchecked(0);
4879 let connected = ConnectedPoint::Listener {
4880 local_addr: Multiaddr::empty(),
4881 send_back_addr: Multiaddr::empty(),
4882 };
4883
4884 notif.on_swarm_event(FromSwarm::ConnectionEstablished(
4885 libp2p::swarm::behaviour::ConnectionEstablished {
4886 peer_id: peer,
4887 connection_id: conn,
4888 endpoint: &connected,
4889 failed_addresses: &[],
4890 other_established: 0usize,
4891 },
4892 ));
4893 assert!(std::matches!(notif.peers.get(&(peer, set_id)), Some(&PeerState::Disabled { .. })));
4894
4895 if let Some(PeerState::Disabled { ref mut backoff_until, .. }) =
4897 notif.peers.get_mut(&(peer, set_id))
4898 {
4899 *backoff_until =
4900 Some(Instant::now().checked_add(std::time::Duration::from_secs(5)).unwrap());
4901 }
4902
4903 notif.peerset_report_connect(peer, set_id);
4905
4906 assert!(std::matches!(
4907 notif.peers.get(&(peer, set_id)),
4908 Some(&PeerState::DisabledPendingEnable { .. })
4909 ));
4910
4911 notif.on_swarm_event(FromSwarm::ConnectionClosed(
4912 libp2p::swarm::behaviour::ConnectionClosed {
4913 peer_id: peer,
4914 connection_id: ConnectionId::new_unchecked(1337),
4915 endpoint: &connected.clone(),
4916 cause: None,
4917 remaining_established: 0usize,
4918 },
4919 ));
4920 }
4921
4922 #[test]
4923 #[should_panic]
4924 #[cfg(debug_assertions)]
4925 fn inject_connection_closed_for_incoming_peer_state_mismatch() {
4926 let (mut notif, _controller, _notif_service) = development_notifs();
4927 let peer = PeerId::random();
4928 let conn = ConnectionId::new_unchecked(0);
4929 let set_id = SetId::from(0);
4930 let connected = ConnectedPoint::Listener {
4931 local_addr: Multiaddr::empty(),
4932 send_back_addr: Multiaddr::empty(),
4933 };
4934
4935 notif.on_swarm_event(FromSwarm::ConnectionEstablished(
4936 libp2p::swarm::behaviour::ConnectionEstablished {
4937 peer_id: peer,
4938 connection_id: conn,
4939 endpoint: &connected,
4940 failed_addresses: &[],
4941 other_established: 0usize,
4942 },
4943 ));
4944 assert!(std::matches!(notif.peers.get(&(peer, set_id)), Some(&PeerState::Disabled { .. })));
4945
4946 notif.on_connection_handler_event(
4948 peer,
4949 conn,
4950 NotifsHandlerOut::OpenDesiredByRemote {
4951 protocol_index: 0,
4952 handshake: vec![1, 3, 3, 7],
4953 },
4954 );
4955 assert!(std::matches!(notif.peers.get(&(peer, set_id)), Some(&PeerState::Incoming { .. })));
4956 notif.incoming[0].alive = false;
4957
4958 notif.on_swarm_event(FromSwarm::ConnectionClosed(
4959 libp2p::swarm::behaviour::ConnectionClosed {
4960 peer_id: peer,
4961 connection_id: conn,
4962 endpoint: &connected.clone(),
4963 cause: None,
4964 remaining_established: 0usize,
4965 },
4966 ));
4967 }
4968
4969 #[test]
4970 #[should_panic]
4971 #[cfg(debug_assertions)]
4972 fn inject_connection_closed_for_enabled_state_mismatch() {
4973 let (mut notif, _controller, _notif_service) = development_notifs();
4974 let peer = PeerId::random();
4975 let conn = ConnectionId::new_unchecked(0);
4976 let set_id = SetId::from(0);
4977 let connected = ConnectedPoint::Listener {
4978 local_addr: Multiaddr::empty(),
4979 send_back_addr: Multiaddr::empty(),
4980 };
4981
4982 notif.on_swarm_event(FromSwarm::ConnectionEstablished(
4983 libp2p::swarm::behaviour::ConnectionEstablished {
4984 peer_id: peer,
4985 connection_id: conn,
4986 endpoint: &connected,
4987 failed_addresses: &[],
4988 other_established: 0usize,
4989 },
4990 ));
4991 assert!(std::matches!(notif.peers.get(&(peer, set_id)), Some(&PeerState::Disabled { .. })));
4992
4993 notif.on_connection_handler_event(
4995 peer,
4996 conn,
4997 NotifsHandlerOut::OpenDesiredByRemote {
4998 protocol_index: 0,
4999 handshake: vec![1, 3, 3, 7],
5000 },
5001 );
5002 assert!(std::matches!(notif.peers.get(&(peer, set_id)), Some(&PeerState::Incoming { .. })));
5003
5004 notif.peerset_report_connect(peer, set_id);
5006 assert!(std::matches!(notif.peers.get(&(peer, set_id)), Some(&PeerState::Enabled { .. })));
5007
5008 notif.on_swarm_event(FromSwarm::ConnectionClosed(
5009 libp2p::swarm::behaviour::ConnectionClosed {
5010 peer_id: peer,
5011 connection_id: ConnectionId::new_unchecked(1337),
5012 endpoint: &connected.clone(),
5013 cause: None,
5014 remaining_established: 0usize,
5015 },
5016 ));
5017 }
5018
5019 #[test]
5020 #[should_panic]
5021 #[cfg(debug_assertions)]
5022 fn inject_connection_closed_for_backoff_peer() {
5023 let (mut notif, _controller, _notif_service) = development_notifs();
5024 let set_id = SetId::from(0);
5025 let peer = PeerId::random();
5026 let conn = ConnectionId::new_unchecked(0);
5027 let connected = ConnectedPoint::Listener {
5028 local_addr: Multiaddr::empty(),
5029 send_back_addr: Multiaddr::empty(),
5030 };
5031
5032 notif.on_swarm_event(FromSwarm::ConnectionEstablished(
5033 libp2p::swarm::behaviour::ConnectionEstablished {
5034 peer_id: peer,
5035 connection_id: conn,
5036 endpoint: &connected,
5037 failed_addresses: &[],
5038 other_established: 0usize,
5039 },
5040 ));
5041 assert!(std::matches!(notif.peers.get(&(peer, set_id)), Some(&PeerState::Disabled { .. })));
5042
5043 if let Some(PeerState::Disabled { ref mut backoff_until, .. }) =
5045 notif.peers.get_mut(&(peer, set_id))
5046 {
5047 *backoff_until =
5048 Some(Instant::now().checked_add(std::time::Duration::from_secs(5)).unwrap());
5049 }
5050
5051 notif.on_swarm_event(FromSwarm::ConnectionClosed(
5052 libp2p::swarm::behaviour::ConnectionClosed {
5053 peer_id: peer,
5054 connection_id: conn,
5055 endpoint: &connected.clone(),
5056 cause: None,
5057 remaining_established: 0usize,
5058 },
5059 ));
5060 assert!(std::matches!(notif.peers.get(&(peer, set_id)), Some(&PeerState::Backoff { .. })));
5061
5062 notif.on_swarm_event(FromSwarm::ConnectionClosed(
5063 libp2p::swarm::behaviour::ConnectionClosed {
5064 peer_id: peer,
5065 connection_id: conn,
5066 endpoint: &connected.clone(),
5067 cause: None,
5068 remaining_established: 0usize,
5069 },
5070 ));
5071 }
5072
5073 #[test]
5074 #[should_panic]
5075 #[cfg(debug_assertions)]
5076 fn open_result_ok_non_existent_peer() {
5077 let (mut notif, _controller, _notif_service) = development_notifs();
5078 let conn = ConnectionId::new_unchecked(0);
5079 let mut conn_yielder = ConnectionYielder::new();
5080
5081 notif.on_connection_handler_event(
5082 PeerId::random(),
5083 conn,
5084 conn_yielder.open_substream(PeerId::random(), 0, vec![1, 2, 3, 4]),
5085 );
5086 }
5087}