1use crate::{
20 protocol::notifications::{
21 handler::{self, NotificationsSink, NotifsHandler, NotifsHandlerIn, NotifsHandlerOut},
22 service::{NotificationCommand, ProtocolHandle, ValidationCallResult},
23 },
24 protocol_controller::{self, IncomingIndex, Message, SetId},
25 service::{
26 metrics::NotificationMetrics,
27 traits::{Direction, ValidationResult},
28 },
29 types::ProtocolName,
30};
31
32use bytes::BytesMut;
33use fnv::FnvHashMap;
34use futures::{future::BoxFuture, prelude::*, stream::FuturesUnordered};
35use libp2p::{
36 core::{Endpoint, Multiaddr},
37 swarm::{
38 behaviour::{ConnectionClosed, ConnectionEstablished, DialFailure, FromSwarm},
39 ConnectionDenied, ConnectionId, DialError, NetworkBehaviour, NotifyHandler, PollParameters,
40 THandler, THandlerInEvent, THandlerOutEvent, ToSwarm,
41 },
42 PeerId,
43};
44use log::{debug, error, trace, warn};
45use parking_lot::RwLock;
46use rand::distributions::{Distribution as _, Uniform};
47use sc_utils::mpsc::TracingUnboundedReceiver;
48use smallvec::SmallVec;
49use tokio::sync::oneshot::error::RecvError;
50use tokio_stream::StreamMap;
51
52use std::{
53 cmp,
54 collections::{hash_map::Entry, VecDeque},
55 mem,
56 pin::Pin,
57 sync::Arc,
58 task::{Context, Poll},
59 time::{Duration, Instant},
60};
61
62type PendingInboundValidation =
64 BoxFuture<'static, (Result<ValidationResult, RecvError>, IncomingIndex)>;
65
66const LOG_TARGET: &str = "sub-libp2p";
68
69pub struct Notifications {
121 notif_protocols: Vec<handler::ProtocolConfig>,
123
124 protocol_handles: Vec<ProtocolHandle>,
126
127 command_streams: StreamMap<usize, Box<dyn Stream<Item = NotificationCommand> + Send + Unpin>>,
129
130 protocol_controller_handles: Vec<protocol_controller::ProtocolHandle>,
132
133 from_protocol_controllers: TracingUnboundedReceiver<Message>,
135
136 peers: FnvHashMap<(PeerId, SetId), PeerState>,
138
139 delays:
147 stream::FuturesUnordered<Pin<Box<dyn Future<Output = (DelayId, PeerId, SetId)> + Send>>>,
148
149 next_delay_id: DelayId,
151
152 incoming: SmallVec<[IncomingPeer; 6]>,
155
156 next_incoming_index: IncomingIndex,
159
160 events: VecDeque<ToSwarm<NotificationsOut, NotifsHandlerIn>>,
162
163 pending_inbound_validations: FuturesUnordered<PendingInboundValidation>,
171
172 metrics: NotificationMetrics,
174}
175
176#[derive(Debug, Clone)]
178pub struct ProtocolConfig {
179 pub name: ProtocolName,
181 pub fallback_names: Vec<ProtocolName>,
183 pub handshake: Vec<u8>,
185 pub max_notification_size: u64,
187}
188
189#[derive(Debug, Copy, Clone, PartialEq, Eq)]
191struct DelayId(u64);
192
193#[derive(Debug)]
197enum PeerState {
198 Poisoned,
202
203 Backoff {
206 timer: DelayId,
208 timer_deadline: Instant,
210 },
211
212 PendingRequest {
214 timer: DelayId,
216 timer_deadline: Instant,
218 },
219
220 Requested,
222
223 Disabled {
228 backoff_until: Option<Instant>,
231
232 connections: SmallVec<[(ConnectionId, ConnectionState); crate::MAX_CONNECTIONS_PER_PEER]>,
234 },
235
236 DisabledPendingEnable {
245 timer: DelayId,
247 timer_deadline: Instant,
249
250 connections: SmallVec<[(ConnectionId, ConnectionState); crate::MAX_CONNECTIONS_PER_PEER]>,
252 },
253
254 Enabled {
256 connections: SmallVec<[(ConnectionId, ConnectionState); crate::MAX_CONNECTIONS_PER_PEER]>,
258 },
259
260 Incoming {
265 backoff_until: Option<Instant>,
267
268 incoming_index: IncomingIndex,
270
271 peerset_rejected: bool,
273
274 connections: SmallVec<[(ConnectionId, ConnectionState); crate::MAX_CONNECTIONS_PER_PEER]>,
276 },
277}
278
279impl PeerState {
280 fn is_open(&self) -> bool {
283 self.get_open().is_some()
284 }
285
286 fn get_open(&self) -> Option<&NotificationsSink> {
289 match self {
290 Self::Enabled { connections, .. } => connections.iter().find_map(|(_, s)| match s {
291 ConnectionState::Open(s) => Some(s),
292 _ => None,
293 }),
294 _ => None,
295 }
296 }
297}
298
299#[derive(Debug)]
301enum ConnectionState {
302 Closed,
304
305 Closing,
309
310 Opening,
313
314 OpeningThenClosing,
318
319 OpenDesiredByRemote,
322
323 Open(NotificationsSink),
328}
329
330#[derive(Debug)]
332struct IncomingPeer {
333 peer_id: PeerId,
335 set_id: SetId,
337 alive: bool,
340 incoming_id: IncomingIndex,
342 handshake: Vec<u8>,
344}
345
346#[derive(Debug)]
348pub enum NotificationsOut {
349 CustomProtocolOpen {
351 peer_id: PeerId,
353 set_id: SetId,
355 direction: Direction,
357 negotiated_fallback: Option<ProtocolName>,
360 received_handshake: Vec<u8>,
363 notifications_sink: NotificationsSink,
365 },
366
367 CustomProtocolReplaced {
373 peer_id: PeerId,
375 set_id: SetId,
377 notifications_sink: NotificationsSink,
379 },
380
381 CustomProtocolClosed {
384 peer_id: PeerId,
386 set_id: SetId,
388 },
389
390 Notification {
394 peer_id: PeerId,
396 set_id: SetId,
398 message: BytesMut,
400 },
401}
402
403impl Notifications {
404 pub(crate) fn new(
406 protocol_controller_handles: Vec<protocol_controller::ProtocolHandle>,
407 from_protocol_controllers: TracingUnboundedReceiver<Message>,
408 metrics: NotificationMetrics,
409 notif_protocols: impl Iterator<
410 Item = (
411 ProtocolConfig,
412 ProtocolHandle,
413 Box<dyn Stream<Item = NotificationCommand> + Send + Unpin>,
414 ),
415 >,
416 ) -> Self {
417 let (notif_protocols, protocol_handles): (Vec<_>, Vec<_>) = notif_protocols
418 .map(|(cfg, protocol_handle, command_stream)| {
419 (
420 handler::ProtocolConfig {
421 name: cfg.name,
422 fallback_names: cfg.fallback_names,
423 handshake: Arc::new(RwLock::new(cfg.handshake)),
424 max_notification_size: cfg.max_notification_size,
425 },
426 (protocol_handle, command_stream),
427 )
428 })
429 .unzip();
430 assert!(!notif_protocols.is_empty());
431
432 let (mut protocol_handles, command_streams): (Vec<_>, Vec<_>) = protocol_handles
433 .into_iter()
434 .enumerate()
435 .map(|(set_id, (mut protocol_handle, command_stream))| {
436 protocol_handle.set_metrics(metrics.clone());
437
438 (protocol_handle, (set_id, command_stream))
439 })
440 .unzip();
441
442 protocol_handles.iter_mut().skip(1).for_each(|handle| {
443 handle.delegate_to_peerset(true);
444 });
445
446 Self {
447 notif_protocols,
448 protocol_handles,
449 command_streams: StreamMap::from_iter(command_streams.into_iter()),
450 protocol_controller_handles,
451 from_protocol_controllers,
452 peers: FnvHashMap::default(),
453 delays: Default::default(),
454 next_delay_id: DelayId(0),
455 incoming: SmallVec::new(),
456 next_incoming_index: IncomingIndex(0),
457 events: VecDeque::new(),
458 pending_inbound_validations: FuturesUnordered::new(),
459 metrics,
460 }
461 }
462
463 pub fn set_notif_protocol_handshake(
465 &mut self,
466 set_id: SetId,
467 handshake_message: impl Into<Vec<u8>>,
468 ) {
469 if let Some(p) = self.notif_protocols.get_mut(usize::from(set_id)) {
470 *p.handshake.write() = handshake_message.into();
471 } else {
472 log::error!(target: "sub-libp2p", "Unknown handshake change set: {:?}", set_id);
473 debug_assert!(false);
474 }
475 }
476
477 pub fn open_peers(&self) -> impl Iterator<Item = &PeerId> {
479 self.peers.iter().filter(|(_, state)| state.is_open()).map(|((id, _), _)| id)
480 }
481
482 pub fn is_open(&self, peer_id: &PeerId, set_id: SetId) -> bool {
484 self.peers.get(&(*peer_id, set_id)).map(|p| p.is_open()).unwrap_or(false)
485 }
486
487 pub fn disconnect_peer(&mut self, peer_id: &PeerId, set_id: SetId) {
489 trace!(target: "sub-libp2p", "External API => Disconnect({}, {:?})", peer_id, set_id);
490 self.disconnect_peer_inner(peer_id, set_id);
491 }
492
493 fn disconnect_peer_inner(&mut self, peer_id: &PeerId, set_id: SetId) {
495 let mut entry = if let Entry::Occupied(entry) = self.peers.entry((*peer_id, set_id)) {
496 entry
497 } else {
498 return
499 };
500
501 match mem::replace(entry.get_mut(), PeerState::Poisoned) {
502 st @ PeerState::Disabled { .. } => *entry.into_mut() = st,
504 st @ PeerState::Requested => *entry.into_mut() = st,
505 st @ PeerState::PendingRequest { .. } => *entry.into_mut() = st,
506 st @ PeerState::Backoff { .. } => *entry.into_mut() = st,
507
508 PeerState::DisabledPendingEnable { connections, timer_deadline, timer: _ } => {
510 trace!(target: "sub-libp2p", "PSM <= Dropped({}, {:?})", peer_id, set_id);
511 self.protocol_controller_handles[usize::from(set_id)].dropped(*peer_id);
512 *entry.into_mut() =
513 PeerState::Disabled { connections, backoff_until: Some(timer_deadline) }
514 },
515
516 PeerState::Enabled { mut connections } => {
520 trace!(target: "sub-libp2p", "PSM <= Dropped({}, {:?})", peer_id, set_id);
521 self.protocol_controller_handles[usize::from(set_id)].dropped(*peer_id);
522
523 if connections.iter().any(|(_, s)| matches!(s, ConnectionState::Open(_))) {
524 trace!(target: "sub-libp2p", "External API <= Closed({}, {:?})", peer_id, set_id);
525 let event =
526 NotificationsOut::CustomProtocolClosed { peer_id: *peer_id, set_id };
527 self.events.push_back(ToSwarm::GenerateEvent(event));
528 }
529
530 for (connec_id, connec_state) in
531 connections.iter_mut().filter(|(_, s)| matches!(s, ConnectionState::Open(_)))
532 {
533 trace!(target: "sub-libp2p", "Handler({:?}, {:?}) <= Close({:?})", peer_id, *connec_id, set_id);
534 self.events.push_back(ToSwarm::NotifyHandler {
535 peer_id: *peer_id,
536 handler: NotifyHandler::One(*connec_id),
537 event: NotifsHandlerIn::Close { protocol_index: set_id.into() },
538 });
539 *connec_state = ConnectionState::Closing;
540 }
541
542 for (connec_id, connec_state) in
543 connections.iter_mut().filter(|(_, s)| matches!(s, ConnectionState::Opening))
544 {
545 trace!(target: "sub-libp2p", "Handler({:?}, {:?}) <= Close({:?})", peer_id, *connec_id, set_id);
546 self.events.push_back(ToSwarm::NotifyHandler {
547 peer_id: *peer_id,
548 handler: NotifyHandler::One(*connec_id),
549 event: NotifsHandlerIn::Close { protocol_index: set_id.into() },
550 });
551 *connec_state = ConnectionState::OpeningThenClosing;
552 }
553
554 debug_assert!(!connections
555 .iter()
556 .any(|(_, s)| matches!(s, ConnectionState::Open(_))));
557 debug_assert!(!connections
558 .iter()
559 .any(|(_, s)| matches!(s, ConnectionState::Opening)));
560
561 *entry.into_mut() = PeerState::Disabled { connections, backoff_until: None }
562 },
563
564 PeerState::Incoming { mut connections, backoff_until, .. } => {
567 let inc = if let Some(inc) = self
568 .incoming
569 .iter_mut()
570 .find(|i| i.peer_id == entry.key().0 && i.set_id == set_id && i.alive)
571 {
572 inc
573 } else {
574 error!(
575 target: "sub-libp2p",
576 "State mismatch in libp2p: no entry in incoming for incoming peer"
577 );
578 return
579 };
580
581 inc.alive = false;
582
583 for (connec_id, connec_state) in connections
584 .iter_mut()
585 .filter(|(_, s)| matches!(s, ConnectionState::OpenDesiredByRemote))
586 {
587 trace!(target: "sub-libp2p", "Handler({:?}, {:?}) <= Close({:?})", peer_id, *connec_id, set_id);
588 self.events.push_back(ToSwarm::NotifyHandler {
589 peer_id: *peer_id,
590 handler: NotifyHandler::One(*connec_id),
591 event: NotifsHandlerIn::Close { protocol_index: set_id.into() },
592 });
593 *connec_state = ConnectionState::Closing;
594 }
595
596 debug_assert!(!connections
597 .iter()
598 .any(|(_, s)| matches!(s, ConnectionState::OpenDesiredByRemote)));
599 *entry.into_mut() = PeerState::Disabled { connections, backoff_until }
600 },
601
602 PeerState::Poisoned => {
603 error!(target: "sub-libp2p", "State of {:?} is poisoned", peer_id)
604 },
605 }
606 }
607
608 fn peerset_report_connect(&mut self, peer_id: PeerId, set_id: SetId) {
610 let mut occ_entry = match self.peers.entry((peer_id, set_id)) {
612 Entry::Occupied(entry) => entry,
613 Entry::Vacant(entry) => {
614 trace!(
616 target: "sub-libp2p",
617 "PSM => Connect({}, {:?}): Starting to connect",
618 entry.key().0,
619 set_id,
620 );
621 trace!(target: "sub-libp2p", "Libp2p <= Dial {}", entry.key().0);
622 self.events.push_back(ToSwarm::Dial { opts: entry.key().0.into() });
623 entry.insert(PeerState::Requested);
624 return
625 },
626 };
627
628 let now = Instant::now();
629
630 match mem::replace(occ_entry.get_mut(), PeerState::Poisoned) {
631 PeerState::Backoff { ref timer, ref timer_deadline } if *timer_deadline > now => {
633 let peer_id = occ_entry.key().0;
634 trace!(
635 target: "sub-libp2p",
636 "PSM => Connect({}, {:?}): Will start to connect at until {:?}",
637 peer_id,
638 set_id,
639 timer_deadline,
640 );
641 *occ_entry.into_mut() =
642 PeerState::PendingRequest { timer: *timer, timer_deadline: *timer_deadline };
643 },
644
645 PeerState::Backoff { .. } => {
647 trace!(
648 target: "sub-libp2p",
649 "PSM => Connect({}, {:?}): Starting to connect",
650 occ_entry.key().0,
651 set_id,
652 );
653 trace!(target: "sub-libp2p", "Libp2p <= Dial {:?}", occ_entry.key());
654 self.events.push_back(ToSwarm::Dial { opts: occ_entry.key().0.into() });
655 *occ_entry.into_mut() = PeerState::Requested;
656 },
657
658 PeerState::Disabled { connections, backoff_until: Some(ref backoff) }
660 if *backoff > now =>
661 {
662 let peer_id = occ_entry.key().0;
663 trace!(
664 target: "sub-libp2p",
665 "PSM => Connect({}, {:?}): But peer is backed-off until {:?}",
666 peer_id,
667 set_id,
668 backoff,
669 );
670
671 let delay_id = self.next_delay_id;
672 self.next_delay_id.0 += 1;
673 let delay = futures_timer::Delay::new(*backoff - now);
674 self.delays.push(
675 async move {
676 delay.await;
677 (delay_id, peer_id, set_id)
678 }
679 .boxed(),
680 );
681
682 *occ_entry.into_mut() = PeerState::DisabledPendingEnable {
683 connections,
684 timer: delay_id,
685 timer_deadline: *backoff,
686 };
687 },
688
689 PeerState::Disabled { mut connections, backoff_until } => {
691 debug_assert!(!connections
692 .iter()
693 .any(|(_, s)| { matches!(s, ConnectionState::Open(_)) }));
694
695 if let Some((connec_id, connec_state)) =
697 connections.iter_mut().find(|(_, s)| matches!(s, ConnectionState::Closed))
698 {
699 trace!(target: "sub-libp2p", "PSM => Connect({}, {:?}): Enabling connections.",
700 occ_entry.key().0, set_id);
701 trace!(target: "sub-libp2p", "Handler({:?}, {:?}) <= Open({:?})", peer_id, *connec_id, set_id);
702 self.events.push_back(ToSwarm::NotifyHandler {
703 peer_id,
704 handler: NotifyHandler::One(*connec_id),
705 event: NotifsHandlerIn::Open { protocol_index: set_id.into() },
706 });
707 *connec_state = ConnectionState::Opening;
708 *occ_entry.into_mut() = PeerState::Enabled { connections };
709 } else {
710 debug_assert!(connections.iter().any(|(_, s)| {
713 matches!(s, ConnectionState::OpeningThenClosing | ConnectionState::Closing)
714 }));
715 trace!(
716 target: "sub-libp2p",
717 "PSM => Connect({}, {:?}): No connection in proper state. Delaying.",
718 occ_entry.key().0, set_id
719 );
720
721 let timer_deadline = {
722 let base = now + Duration::from_secs(5);
723 if let Some(backoff_until) = backoff_until {
724 cmp::max(base, backoff_until)
725 } else {
726 base
727 }
728 };
729
730 let delay_id = self.next_delay_id;
731 self.next_delay_id.0 += 1;
732 debug_assert!(timer_deadline > now);
733 let delay = futures_timer::Delay::new(timer_deadline - now);
734 self.delays.push(
735 async move {
736 delay.await;
737 (delay_id, peer_id, set_id)
738 }
739 .boxed(),
740 );
741
742 *occ_entry.into_mut() = PeerState::DisabledPendingEnable {
743 connections,
744 timer: delay_id,
745 timer_deadline,
746 };
747 }
748 },
749 st @ PeerState::Incoming { .. } => {
751 debug!(
752 target: "sub-libp2p",
753 "PSM => Connect({}, {:?}): Ignoring obsolete connect, we are awaiting accept/reject.",
754 occ_entry.key().0, set_id
755 );
756 *occ_entry.into_mut() = st;
757 },
758
759 st @ PeerState::Enabled { .. } => {
761 debug!(target: "sub-libp2p",
762 "PSM => Connect({}, {:?}): Already connected.",
763 occ_entry.key().0, set_id);
764 *occ_entry.into_mut() = st;
765 },
766 st @ PeerState::DisabledPendingEnable { .. } => {
767 debug!(target: "sub-libp2p",
768 "PSM => Connect({}, {:?}): Already pending enabling.",
769 occ_entry.key().0, set_id);
770 *occ_entry.into_mut() = st;
771 },
772 st @ PeerState::Requested { .. } | st @ PeerState::PendingRequest { .. } => {
773 debug!(target: "sub-libp2p",
774 "PSM => Connect({}, {:?}): Duplicate request.",
775 occ_entry.key().0, set_id);
776 *occ_entry.into_mut() = st;
777 },
778
779 PeerState::Poisoned => {
780 error!(target: "sub-libp2p", "State of {:?} is poisoned", occ_entry.key());
781 debug_assert!(false);
782 },
783 }
784 }
785
786 fn peerset_report_disconnect(&mut self, peer_id: PeerId, set_id: SetId) {
788 let mut entry = match self.peers.entry((peer_id, set_id)) {
789 Entry::Occupied(entry) => entry,
790 Entry::Vacant(entry) => {
791 trace!(target: "sub-libp2p", "PSM => Drop({}, {:?}): Already disabled.",
792 entry.key().0, set_id);
793 return
794 },
795 };
796
797 match mem::replace(entry.get_mut(), PeerState::Poisoned) {
798 st @ PeerState::Disabled { .. } | st @ PeerState::Backoff { .. } => {
799 trace!(target: "sub-libp2p", "PSM => Drop({}, {:?}): Already disabled.",
800 entry.key().0, set_id);
801 *entry.into_mut() = st;
802 },
803
804 PeerState::DisabledPendingEnable { connections, timer_deadline, timer: _ } => {
806 debug_assert!(!connections.is_empty());
807 trace!(target: "sub-libp2p",
808 "PSM => Drop({}, {:?}): Interrupting pending enabling.",
809 entry.key().0, set_id);
810 *entry.into_mut() =
811 PeerState::Disabled { connections, backoff_until: Some(timer_deadline) };
812 },
813
814 PeerState::Enabled { mut connections } => {
816 trace!(target: "sub-libp2p", "PSM => Drop({}, {:?}): Disabling connections.",
817 entry.key().0, set_id);
818
819 debug_assert!(connections.iter().any(|(_, s)| matches!(
820 s,
821 ConnectionState::Opening | ConnectionState::Open(_)
822 )));
823
824 if connections.iter().any(|(_, s)| matches!(s, ConnectionState::Open(_))) {
825 trace!(target: "sub-libp2p", "External API <= Closed({}, {:?})", entry.key().0, set_id);
826 let event =
827 NotificationsOut::CustomProtocolClosed { peer_id: entry.key().0, set_id };
828 self.events.push_back(ToSwarm::GenerateEvent(event));
829 }
830
831 for (connec_id, connec_state) in
832 connections.iter_mut().filter(|(_, s)| matches!(s, ConnectionState::Opening))
833 {
834 trace!(target: "sub-libp2p", "Handler({:?}, {:?}) <= Close({:?})",
835 entry.key(), *connec_id, set_id);
836 self.events.push_back(ToSwarm::NotifyHandler {
837 peer_id: entry.key().0,
838 handler: NotifyHandler::One(*connec_id),
839 event: NotifsHandlerIn::Close { protocol_index: set_id.into() },
840 });
841 *connec_state = ConnectionState::OpeningThenClosing;
842 }
843
844 for (connec_id, connec_state) in
845 connections.iter_mut().filter(|(_, s)| matches!(s, ConnectionState::Open(_)))
846 {
847 trace!(target: "sub-libp2p", "Handler({:?}, {:?}) <= Close({:?})",
848 entry.key(), *connec_id, set_id);
849 self.events.push_back(ToSwarm::NotifyHandler {
850 peer_id: entry.key().0,
851 handler: NotifyHandler::One(*connec_id),
852 event: NotifsHandlerIn::Close { protocol_index: set_id.into() },
853 });
854 *connec_state = ConnectionState::Closing;
855 }
856
857 *entry.into_mut() = PeerState::Disabled { connections, backoff_until: None }
858 },
859
860 PeerState::Requested => {
862 trace!(target: "sub-libp2p", "PSM => Drop({}, {:?}): Not yet connected.",
866 entry.key().0, set_id);
867 entry.remove();
868 },
869
870 PeerState::PendingRequest { timer, timer_deadline } => {
872 trace!(target: "sub-libp2p", "PSM => Drop({}, {:?}): Not yet connected",
873 entry.key().0, set_id);
874 *entry.into_mut() = PeerState::Backoff { timer, timer_deadline }
875 },
876
877 PeerState::Incoming { backoff_until, connections, incoming_index, .. } => {
881 debug!(
882 target: "sub-libp2p",
883 "PSM => Drop({}, {:?}): Ignoring obsolete disconnect, we are awaiting accept/reject.",
884 entry.key().0, set_id,
885 );
886 *entry.into_mut() = PeerState::Incoming {
887 backoff_until,
888 connections,
889 incoming_index,
890 peerset_rejected: true,
891 };
892 },
893 PeerState::Poisoned => {
894 error!(target: "sub-libp2p", "State of {:?} is poisoned", entry.key());
895 debug_assert!(false);
896 },
897 }
898 }
899
900 fn peerset_report_preaccept(&mut self, index: IncomingIndex) {
903 let Some(pos) = self.incoming.iter().position(|i| i.incoming_id == index) else {
904 error!(target: LOG_TARGET, "PSM => Preaccept({:?}): Invalid index", index);
905 return
906 };
907
908 trace!(
909 target: LOG_TARGET,
910 "PSM => Preaccept({:?}): Sent to protocol for validation",
911 index
912 );
913 let incoming = &self.incoming[pos];
914
915 match self.protocol_handles[usize::from(incoming.set_id)]
916 .report_incoming_substream(incoming.peer_id, incoming.handshake.clone())
917 {
918 Ok(ValidationCallResult::Delegated) => {
919 self.protocol_report_accept(index);
920 },
921 Ok(ValidationCallResult::WaitForValidation(rx)) => {
922 self.pending_inbound_validations
923 .push(Box::pin(async move { (rx.await, index) }));
924 },
925 Err(err) => {
926 debug!(target: LOG_TARGET, "protocol has exited: {err:?} {:?}", incoming.set_id);
933
934 self.protocol_report_reject(index);
935 },
936 }
937 }
938
939 fn protocol_report_accept(&mut self, index: IncomingIndex) {
942 let (pos, incoming) =
943 if let Some(pos) = self.incoming.iter().position(|i| i.incoming_id == index) {
944 (pos, self.incoming.get(pos))
945 } else {
946 error!(target: "sub-libp2p", "PSM => Accept({:?}): Invalid index", index);
947 return
948 };
949
950 let Some(incoming) = incoming else {
951 error!(target: "sub-libp2p", "Incoming connection ({:?}) doesn't exist", index);
952 debug_assert!(false);
953 return;
954 };
955
956 if !incoming.alive {
957 trace!(
958 target: "sub-libp2p",
959 "PSM => Accept({:?}, {}, {:?}): Obsolete incoming",
960 index,
961 incoming.peer_id,
962 incoming.set_id,
963 );
964
965 match self.peers.get_mut(&(incoming.peer_id, incoming.set_id)) {
966 Some(PeerState::DisabledPendingEnable { .. }) | Some(PeerState::Enabled { .. }) => {
967 },
968 _ => {
969 trace!(target: "sub-libp2p", "PSM <= Dropped({}, {:?})",
970 incoming.peer_id, incoming.set_id);
971 self.protocol_controller_handles[usize::from(incoming.set_id)]
972 .dropped(incoming.peer_id);
973 },
974 }
975
976 self.incoming.remove(pos);
977 return
978 }
979
980 let state = match self.peers.get_mut(&(incoming.peer_id, incoming.set_id)) {
981 Some(s) => s,
982 None => {
983 log::debug!(
984 target: "sub-libp2p",
985 "Connection to {:?} closed, ({:?} {:?}), ignoring accept",
986 incoming.peer_id,
987 incoming.set_id,
988 index,
989 );
990 self.incoming.remove(pos);
991 return
992 },
993 };
994
995 match mem::replace(state, PeerState::Poisoned) {
996 PeerState::Incoming {
998 mut connections,
999 incoming_index,
1000 peerset_rejected,
1001 backoff_until,
1002 } => {
1003 if index < incoming_index {
1004 warn!(
1005 target: "sub-libp2p",
1006 "PSM => Accept({:?}, {}, {:?}): Ignoring obsolete incoming index, we are already awaiting {:?}.",
1007 index, incoming.peer_id, incoming.set_id, incoming_index
1008 );
1009
1010 self.incoming.remove(pos);
1011 return
1012 } else if index > incoming_index {
1013 error!(
1014 target: "sub-libp2p",
1015 "PSM => Accept({:?}, {}, {:?}): Ignoring incoming index from the future, we are awaiting {:?}.",
1016 index, incoming.peer_id, incoming.set_id, incoming_index
1017 );
1018
1019 self.incoming.remove(pos);
1020 debug_assert!(false);
1021 return
1022 }
1023
1024 if peerset_rejected {
1027 trace!(
1028 target: "sub-libp2p",
1029 "Protocol accepted ({:?} {:?} {:?}) but Peerset had request disconnection, rejecting",
1030 index,
1031 incoming.peer_id,
1032 incoming.set_id
1033 );
1034
1035 *state = PeerState::Incoming {
1036 connections,
1037 backoff_until,
1038 peerset_rejected,
1039 incoming_index,
1040 };
1041 return self.report_reject(index).map_or((), |_| ())
1042 }
1043
1044 trace!(
1045 target: "sub-libp2p",
1046 "PSM => Accept({:?}, {}, {:?}): Enabling connections.",
1047 index,
1048 incoming.peer_id,
1049 incoming.set_id
1050 );
1051
1052 debug_assert!(connections
1053 .iter()
1054 .any(|(_, s)| matches!(s, ConnectionState::OpenDesiredByRemote)));
1055 for (connec_id, connec_state) in connections
1056 .iter_mut()
1057 .filter(|(_, s)| matches!(s, ConnectionState::OpenDesiredByRemote))
1058 {
1059 trace!(target: "sub-libp2p", "Handler({:?}, {:?}) <= Open({:?})",
1060 incoming.peer_id, *connec_id, incoming.set_id);
1061 self.events.push_back(ToSwarm::NotifyHandler {
1062 peer_id: incoming.peer_id,
1063 handler: NotifyHandler::One(*connec_id),
1064 event: NotifsHandlerIn::Open { protocol_index: incoming.set_id.into() },
1065 });
1066 *connec_state = ConnectionState::Opening;
1067 }
1068
1069 self.incoming.remove(pos);
1070 *state = PeerState::Enabled { connections };
1071 },
1072 st @ PeerState::Disabled { .. } | st @ PeerState::Backoff { .. } => {
1073 self.incoming.remove(pos);
1074 *state = st;
1075 },
1076 peer => {
1078 error!(
1079 target: "sub-libp2p",
1080 "State mismatch in libp2p: Expected alive incoming. Got {:?}.",
1081 peer
1082 );
1083
1084 self.incoming.remove(pos);
1085 debug_assert!(false);
1086 },
1087 }
1088 }
1089
1090 fn peerset_report_reject(&mut self, index: IncomingIndex) {
1092 let _ = self.report_reject(index);
1093 }
1094
1095 fn protocol_report_reject(&mut self, index: IncomingIndex) {
1097 if let Some((set_id, peer_id)) = self.report_reject(index) {
1098 self.protocol_controller_handles[usize::from(set_id)].dropped(peer_id)
1099 }
1100 }
1101
1102 fn report_reject(&mut self, index: IncomingIndex) -> Option<(SetId, PeerId)> {
1104 let incoming = if let Some(pos) = self.incoming.iter().position(|i| i.incoming_id == index)
1105 {
1106 self.incoming.remove(pos)
1107 } else {
1108 error!(target: "sub-libp2p", "PSM => Reject({:?}): Invalid index", index);
1109 return None
1110 };
1111
1112 if !incoming.alive {
1113 trace!(
1114 target: "sub-libp2p",
1115 "PSM => Reject({:?}, {}, {:?}): Obsolete incoming, ignoring",
1116 index,
1117 incoming.peer_id,
1118 incoming.set_id,
1119 );
1120
1121 return None
1122 }
1123
1124 let state = match self.peers.get_mut(&(incoming.peer_id, incoming.set_id)) {
1125 Some(s) => s,
1126 None => {
1127 log::debug!(
1128 target: "sub-libp2p",
1129 "Connection to {:?} closed, ({:?} {:?}), ignoring accept",
1130 incoming.peer_id,
1131 incoming.set_id,
1132 index,
1133 );
1134 return None
1135 },
1136 };
1137
1138 match mem::replace(state, PeerState::Poisoned) {
1139 PeerState::Incoming { mut connections, backoff_until, incoming_index, .. } => {
1141 if index < incoming_index {
1142 warn!(
1143 target: "sub-libp2p",
1144 "PSM => Reject({:?}, {}, {:?}): Ignoring obsolete incoming index, we are already awaiting {:?}.",
1145 index, incoming.peer_id, incoming.set_id, incoming_index
1146 );
1147 return None
1148 } else if index > incoming_index {
1149 error!(
1150 target: "sub-libp2p",
1151 "PSM => Reject({:?}, {}, {:?}): Ignoring incoming index from the future, we are awaiting {:?}.",
1152 index, incoming.peer_id, incoming.set_id, incoming_index
1153 );
1154 debug_assert!(false);
1155 return None
1156 }
1157
1158 trace!(target: "sub-libp2p", "PSM => Reject({:?}, {}, {:?}): Rejecting connections.",
1159 index, incoming.peer_id, incoming.set_id);
1160
1161 debug_assert!(connections
1162 .iter()
1163 .any(|(_, s)| matches!(s, ConnectionState::OpenDesiredByRemote)));
1164 for (connec_id, connec_state) in connections
1165 .iter_mut()
1166 .filter(|(_, s)| matches!(s, ConnectionState::OpenDesiredByRemote))
1167 {
1168 trace!(target: "sub-libp2p", "Handler({:?}, {:?}) <= Close({:?})",
1169 incoming.peer_id, connec_id, incoming.set_id);
1170 self.events.push_back(ToSwarm::NotifyHandler {
1171 peer_id: incoming.peer_id,
1172 handler: NotifyHandler::One(*connec_id),
1173 event: NotifsHandlerIn::Close { protocol_index: incoming.set_id.into() },
1174 });
1175 *connec_state = ConnectionState::Closing;
1176 }
1177
1178 *state = PeerState::Disabled { connections, backoff_until };
1179 Some((incoming.set_id, incoming.peer_id))
1180 },
1181 st @ PeerState::Disabled { .. } | st @ PeerState::Backoff { .. } => {
1183 *state = st;
1184 None
1185 },
1186 peer => {
1187 error!(
1188 target: LOG_TARGET,
1189 "State mismatch in libp2p: Expected alive incoming. Got {peer:?}.",
1190 );
1191 None
1192 },
1193 }
1194 }
1195}
1196
1197impl NetworkBehaviour for Notifications {
1198 type ConnectionHandler = NotifsHandler;
1199 type ToSwarm = NotificationsOut;
1200
1201 fn handle_pending_inbound_connection(
1202 &mut self,
1203 _connection_id: ConnectionId,
1204 _local_addr: &Multiaddr,
1205 _remote_addr: &Multiaddr,
1206 ) -> Result<(), ConnectionDenied> {
1207 Ok(())
1208 }
1209
1210 fn handle_pending_outbound_connection(
1211 &mut self,
1212 _connection_id: ConnectionId,
1213 _maybe_peer: Option<PeerId>,
1214 _addresses: &[Multiaddr],
1215 _effective_role: Endpoint,
1216 ) -> Result<Vec<Multiaddr>, ConnectionDenied> {
1217 Ok(Vec::new())
1218 }
1219
1220 fn handle_established_inbound_connection(
1221 &mut self,
1222 _connection_id: ConnectionId,
1223 peer: PeerId,
1224 _local_addr: &Multiaddr,
1225 _remote_addr: &Multiaddr,
1226 ) -> Result<THandler<Self>, ConnectionDenied> {
1227 Ok(NotifsHandler::new(peer, self.notif_protocols.clone(), Some(self.metrics.clone())))
1228 }
1229
1230 fn handle_established_outbound_connection(
1231 &mut self,
1232 _connection_id: ConnectionId,
1233 peer: PeerId,
1234 _addr: &Multiaddr,
1235 _role_override: Endpoint,
1236 ) -> Result<THandler<Self>, ConnectionDenied> {
1237 Ok(NotifsHandler::new(peer, self.notif_protocols.clone(), Some(self.metrics.clone())))
1238 }
1239
1240 fn on_swarm_event(&mut self, event: FromSwarm<Self::ConnectionHandler>) {
1241 match event {
1242 FromSwarm::ConnectionEstablished(ConnectionEstablished {
1243 peer_id,
1244 endpoint,
1245 connection_id,
1246 ..
1247 }) => {
1248 for set_id in (0..self.notif_protocols.len()).map(SetId::from) {
1249 match self.peers.entry((peer_id, set_id)).or_insert(PeerState::Poisoned) {
1250 st @ &mut PeerState::Requested |
1252 st @ &mut PeerState::PendingRequest { .. } => {
1253 trace!(target: "sub-libp2p",
1254 "Libp2p => Connected({}, {:?}, {:?}): Connection was requested by PSM.",
1255 peer_id, set_id, endpoint
1256 );
1257 trace!(target: "sub-libp2p", "Handler({:?}, {:?}) <= Open({:?})", peer_id, connection_id, set_id);
1258 self.events.push_back(ToSwarm::NotifyHandler {
1259 peer_id,
1260 handler: NotifyHandler::One(connection_id),
1261 event: NotifsHandlerIn::Open { protocol_index: set_id.into() },
1262 });
1263
1264 let mut connections = SmallVec::new();
1265 connections.push((connection_id, ConnectionState::Opening));
1266 *st = PeerState::Enabled { connections };
1267 },
1268
1269 st @ &mut PeerState::Poisoned | st @ &mut PeerState::Backoff { .. } => {
1272 let backoff_until =
1273 if let PeerState::Backoff { timer_deadline, .. } = st {
1274 Some(*timer_deadline)
1275 } else {
1276 None
1277 };
1278 trace!(target: "sub-libp2p",
1279 "Libp2p => Connected({}, {:?}, {:?}, {:?}): Not requested by PSM, disabling.",
1280 peer_id, set_id, endpoint, connection_id);
1281
1282 let mut connections = SmallVec::new();
1283 connections.push((connection_id, ConnectionState::Closed));
1284 *st = PeerState::Disabled { connections, backoff_until };
1285 },
1286
1287 PeerState::Incoming { connections, .. } |
1290 PeerState::Disabled { connections, .. } |
1291 PeerState::DisabledPendingEnable { connections, .. } |
1292 PeerState::Enabled { connections, .. } => {
1293 trace!(target: "sub-libp2p",
1294 "Libp2p => Connected({}, {:?}, {:?}, {:?}): Secondary connection. Leaving closed.",
1295 peer_id, set_id, endpoint, connection_id);
1296 connections.push((connection_id, ConnectionState::Closed));
1297 },
1298 }
1299 }
1300 },
1301 FromSwarm::ConnectionClosed(ConnectionClosed { peer_id, connection_id, .. }) => {
1302 for set_id in (0..self.notif_protocols.len()).map(SetId::from) {
1303 let mut entry = if let Entry::Occupied(entry) =
1304 self.peers.entry((peer_id, set_id))
1305 {
1306 entry
1307 } else {
1308 error!(target: "sub-libp2p", "inject_connection_closed: State mismatch in the custom protos handler");
1309 debug_assert!(false);
1310 return
1311 };
1312
1313 match mem::replace(entry.get_mut(), PeerState::Poisoned) {
1314 PeerState::Disabled { mut connections, backoff_until } => {
1316 trace!(target: "sub-libp2p", "Libp2p => Disconnected({}, {:?}, {:?}): Disabled.",
1317 peer_id, set_id, connection_id);
1318
1319 if let Some(pos) =
1320 connections.iter().position(|(c, _)| *c == connection_id)
1321 {
1322 connections.remove(pos);
1323 } else {
1324 debug_assert!(false);
1325 error!(target: "sub-libp2p",
1326 "inject_connection_closed: State mismatch in the custom protos handler");
1327 }
1328
1329 if connections.is_empty() {
1330 if let Some(until) = backoff_until {
1331 let now = Instant::now();
1332 if until > now {
1333 let delay_id = self.next_delay_id;
1334 self.next_delay_id.0 += 1;
1335 let delay = futures_timer::Delay::new(until - now);
1336 self.delays.push(
1337 async move {
1338 delay.await;
1339 (delay_id, peer_id, set_id)
1340 }
1341 .boxed(),
1342 );
1343
1344 *entry.get_mut() = PeerState::Backoff {
1345 timer: delay_id,
1346 timer_deadline: until,
1347 };
1348 } else {
1349 entry.remove();
1350 }
1351 } else {
1352 entry.remove();
1353 }
1354 } else {
1355 *entry.get_mut() =
1356 PeerState::Disabled { connections, backoff_until };
1357 }
1358 },
1359
1360 PeerState::DisabledPendingEnable {
1362 mut connections,
1363 timer_deadline,
1364 timer,
1365 } => {
1366 trace!(
1367 target: "sub-libp2p",
1368 "Libp2p => Disconnected({}, {:?}, {:?}): Disabled but pending enable.",
1369 peer_id, set_id, connection_id
1370 );
1371
1372 if let Some(pos) =
1373 connections.iter().position(|(c, _)| *c == connection_id)
1374 {
1375 connections.remove(pos);
1376 } else {
1377 error!(target: "sub-libp2p",
1378 "inject_connection_closed: State mismatch in the custom protos handler");
1379 debug_assert!(false);
1380 }
1381
1382 if connections.is_empty() {
1383 trace!(target: "sub-libp2p", "PSM <= Dropped({}, {:?})", peer_id, set_id);
1384 self.protocol_controller_handles[usize::from(set_id)]
1385 .dropped(peer_id);
1386 *entry.get_mut() = PeerState::Backoff { timer, timer_deadline };
1387 } else {
1388 *entry.get_mut() = PeerState::DisabledPendingEnable {
1389 connections,
1390 timer_deadline,
1391 timer,
1392 };
1393 }
1394 },
1395
1396 PeerState::Incoming {
1398 mut connections,
1399 backoff_until,
1400 incoming_index,
1401 peerset_rejected,
1402 } => {
1403 trace!(
1404 target: "sub-libp2p",
1405 "Libp2p => Disconnected({}, {:?}, {:?}): OpenDesiredByRemote.",
1406 peer_id, set_id, connection_id
1407 );
1408
1409 debug_assert!(connections
1410 .iter()
1411 .any(|(_, s)| matches!(s, ConnectionState::OpenDesiredByRemote)));
1412
1413 if let Some(pos) =
1414 connections.iter().position(|(c, _)| *c == connection_id)
1415 {
1416 connections.remove(pos);
1417 } else {
1418 error!(target: "sub-libp2p",
1419 "inject_connection_closed: State mismatch in the custom protos handler");
1420 debug_assert!(false);
1421 }
1422
1423 let no_desired_left = !connections
1424 .iter()
1425 .any(|(_, s)| matches!(s, ConnectionState::OpenDesiredByRemote));
1426
1427 if no_desired_left {
1430 if let Some(state) = self
1434 .incoming
1435 .iter_mut()
1436 .find(|i| i.alive && i.set_id == set_id && i.peer_id == peer_id)
1437 {
1438 state.alive = false;
1439 } else {
1440 error!(target: "sub-libp2p", "State mismatch in libp2p: no entry in \
1441 incoming corresponding to an incoming state in peers");
1442 debug_assert!(false);
1443 }
1444 }
1445
1446 if connections.is_empty() {
1447 if let Some(until) = backoff_until {
1448 let now = Instant::now();
1449 if until > now {
1450 let delay_id = self.next_delay_id;
1451 self.next_delay_id.0 += 1;
1452 let delay = futures_timer::Delay::new(until - now);
1453 self.delays.push(
1454 async move {
1455 delay.await;
1456 (delay_id, peer_id, set_id)
1457 }
1458 .boxed(),
1459 );
1460
1461 *entry.get_mut() = PeerState::Backoff {
1462 timer: delay_id,
1463 timer_deadline: until,
1464 };
1465 } else {
1466 entry.remove();
1467 }
1468 } else {
1469 entry.remove();
1470 }
1471 } else if no_desired_left {
1472 *entry.get_mut() =
1475 PeerState::Disabled { connections, backoff_until };
1476 } else {
1477 *entry.get_mut() = PeerState::Incoming {
1478 connections,
1479 backoff_until,
1480 incoming_index,
1481 peerset_rejected,
1482 };
1483 }
1484 },
1485
1486 PeerState::Enabled { mut connections } => {
1489 trace!(
1490 target: "sub-libp2p",
1491 "Libp2p => Disconnected({}, {:?}, {:?}): Enabled.",
1492 peer_id, set_id, connection_id
1493 );
1494
1495 debug_assert!(connections.iter().any(|(_, s)| matches!(
1496 s,
1497 ConnectionState::Opening | ConnectionState::Open(_)
1498 )));
1499
1500 if let Some(pos) =
1501 connections.iter().position(|(c, _)| *c == connection_id)
1502 {
1503 let (_, state) = connections.remove(pos);
1504 if let ConnectionState::Open(_) = state {
1505 if let Some((replacement_pos, replacement_sink)) = connections
1506 .iter()
1507 .enumerate()
1508 .find_map(|(num, (_, s))| match s {
1509 ConnectionState::Open(s) => Some((num, s.clone())),
1510 _ => None,
1511 }) {
1512 if pos <= replacement_pos {
1513 trace!(
1514 target: "sub-libp2p",
1515 "External API <= Sink replaced({}, {:?})",
1516 peer_id, set_id
1517 );
1518 let event = NotificationsOut::CustomProtocolReplaced {
1519 peer_id,
1520 set_id,
1521 notifications_sink: replacement_sink.clone(),
1522 };
1523 self.events.push_back(ToSwarm::GenerateEvent(event));
1524 }
1525 } else {
1526 trace!(
1527 target: "sub-libp2p", "External API <= Closed({}, {:?})",
1528 peer_id, set_id
1529 );
1530 let event = NotificationsOut::CustomProtocolClosed {
1531 peer_id,
1532 set_id,
1533 };
1534 self.events.push_back(ToSwarm::GenerateEvent(event));
1535 }
1536 }
1537 } else {
1538 error!(target: "sub-libp2p",
1539 "inject_connection_closed: State mismatch in the custom protos handler");
1540 debug_assert!(false);
1541 }
1542
1543 if connections.is_empty() {
1544 trace!(target: "sub-libp2p", "PSM <= Dropped({}, {:?})", peer_id, set_id);
1545 self.protocol_controller_handles[usize::from(set_id)]
1546 .dropped(peer_id);
1547 let ban_dur = Uniform::new(5, 10).sample(&mut rand::thread_rng());
1548
1549 let delay_id = self.next_delay_id;
1550 self.next_delay_id.0 += 1;
1551 let delay = futures_timer::Delay::new(Duration::from_secs(ban_dur));
1552 self.delays.push(
1553 async move {
1554 delay.await;
1555 (delay_id, peer_id, set_id)
1556 }
1557 .boxed(),
1558 );
1559
1560 *entry.get_mut() = PeerState::Backoff {
1561 timer: delay_id,
1562 timer_deadline: Instant::now() + Duration::from_secs(ban_dur),
1563 };
1564 } else if !connections.iter().any(|(_, s)| {
1565 matches!(s, ConnectionState::Opening | ConnectionState::Open(_))
1566 }) {
1567 trace!(target: "sub-libp2p", "PSM <= Dropped({}, {:?})", peer_id, set_id);
1568 self.protocol_controller_handles[usize::from(set_id)]
1569 .dropped(peer_id);
1570
1571 *entry.get_mut() =
1572 PeerState::Disabled { connections, backoff_until: None };
1573 } else {
1574 *entry.get_mut() = PeerState::Enabled { connections };
1575 }
1576 },
1577
1578 PeerState::Requested |
1579 PeerState::PendingRequest { .. } |
1580 PeerState::Backoff { .. } => {
1581 error!(target: "sub-libp2p",
1583 "`inject_connection_closed` called for unknown peer {}",
1584 peer_id);
1585 debug_assert!(false);
1586 },
1587 PeerState::Poisoned => {
1588 error!(target: "sub-libp2p", "State of peer {} is poisoned", peer_id);
1589 debug_assert!(false);
1590 },
1591 }
1592 }
1593 },
1594 FromSwarm::DialFailure(DialFailure { peer_id, error, .. }) => {
1595 if let DialError::Transport(errors) = error {
1596 for (addr, error) in errors.iter() {
1597 trace!(target: "sub-libp2p", "Libp2p => Reach failure for {:?} through {:?}: {:?}", peer_id, addr, error);
1598 }
1599 }
1600
1601 if let Some(peer_id) = peer_id {
1602 trace!(target: "sub-libp2p", "Libp2p => Dial failure for {:?}", peer_id);
1603
1604 for set_id in (0..self.notif_protocols.len()).map(SetId::from) {
1605 if let Entry::Occupied(mut entry) = self.peers.entry((peer_id, set_id)) {
1606 match mem::replace(entry.get_mut(), PeerState::Poisoned) {
1607 st @ PeerState::Backoff { .. } => {
1609 *entry.into_mut() = st;
1610 },
1611
1612 st @ PeerState::Requested |
1615 st @ PeerState::PendingRequest { .. } => {
1616 trace!(target: "sub-libp2p", "PSM <= Dropped({}, {:?})", peer_id, set_id);
1617 self.protocol_controller_handles[usize::from(set_id)]
1618 .dropped(peer_id);
1619
1620 let now = Instant::now();
1621 let ban_duration = match st {
1622 PeerState::PendingRequest { timer_deadline, .. }
1623 if timer_deadline > now =>
1624 cmp::max(timer_deadline - now, Duration::from_secs(5)),
1625 _ => Duration::from_secs(5),
1626 };
1627
1628 let delay_id = self.next_delay_id;
1629 self.next_delay_id.0 += 1;
1630 let delay = futures_timer::Delay::new(ban_duration);
1631 self.delays.push(
1632 async move {
1633 delay.await;
1634 (delay_id, peer_id, set_id)
1635 }
1636 .boxed(),
1637 );
1638
1639 *entry.into_mut() = PeerState::Backoff {
1640 timer: delay_id,
1641 timer_deadline: now + ban_duration,
1642 };
1643 },
1644
1645 st @ PeerState::Disabled { .. } |
1648 st @ PeerState::Enabled { .. } |
1649 st @ PeerState::DisabledPendingEnable { .. } |
1650 st @ PeerState::Incoming { .. } => {
1651 *entry.into_mut() = st;
1652 },
1653
1654 PeerState::Poisoned => {
1655 error!(target: "sub-libp2p", "State of {:?} is poisoned", peer_id);
1656 debug_assert!(false);
1657 },
1658 }
1659 }
1660 }
1661 }
1662 },
1663 FromSwarm::ListenerClosed(_) => {},
1664 FromSwarm::ListenFailure(_) => {},
1665 FromSwarm::ListenerError(_) => {},
1666 FromSwarm::ExternalAddrExpired(_) => {},
1667 FromSwarm::NewListener(_) => {},
1668 FromSwarm::ExpiredListenAddr(_) => {},
1669 FromSwarm::NewExternalAddrCandidate(_) => {},
1670 FromSwarm::ExternalAddrConfirmed(_) => {},
1671 FromSwarm::AddressChange(_) => {},
1672 FromSwarm::NewListenAddr(_) => {},
1673 }
1674 }
1675
1676 fn on_connection_handler_event(
1677 &mut self,
1678 peer_id: PeerId,
1679 connection_id: ConnectionId,
1680 event: THandlerOutEvent<Self>,
1681 ) {
1682 match event {
1683 NotifsHandlerOut::OpenDesiredByRemote { protocol_index, handshake } => {
1684 let set_id = SetId::from(protocol_index);
1685
1686 trace!(target: "sub-libp2p",
1687 "Handler({:?}, {:?}]) => OpenDesiredByRemote({:?})",
1688 peer_id, connection_id, set_id);
1689
1690 let mut entry = if let Entry::Occupied(entry) = self.peers.entry((peer_id, set_id))
1691 {
1692 entry
1693 } else {
1694 error!(
1695 target: "sub-libp2p",
1696 "OpenDesiredByRemote: State mismatch in the custom protos handler"
1697 );
1698 debug_assert!(false);
1699 return
1700 };
1701
1702 match mem::replace(entry.get_mut(), PeerState::Poisoned) {
1703 PeerState::Incoming {
1705 mut connections,
1706 backoff_until,
1707 incoming_index,
1708 peerset_rejected,
1709 } => {
1710 debug_assert!(connections
1711 .iter()
1712 .any(|(_, s)| matches!(s, ConnectionState::OpenDesiredByRemote)));
1713 if let Some((_, connec_state)) =
1714 connections.iter_mut().find(|(c, _)| *c == connection_id)
1715 {
1716 if let ConnectionState::Closed = *connec_state {
1717 *connec_state = ConnectionState::OpenDesiredByRemote;
1718 } else {
1719 debug_assert!(matches!(
1725 connec_state,
1726 ConnectionState::OpeningThenClosing | ConnectionState::Closing
1727 ));
1728 }
1729 } else {
1730 error!(
1731 target: "sub-libp2p",
1732 "OpenDesiredByRemote: State mismatch in the custom protos handler"
1733 );
1734 debug_assert!(false);
1735 }
1736
1737 *entry.into_mut() = PeerState::Incoming {
1738 connections,
1739 backoff_until,
1740 incoming_index,
1741 peerset_rejected,
1742 };
1743 },
1744
1745 PeerState::Enabled { mut connections } => {
1746 debug_assert!(connections.iter().any(|(_, s)| matches!(
1747 s,
1748 ConnectionState::Opening | ConnectionState::Open(_)
1749 )));
1750
1751 if let Some((_, connec_state)) =
1752 connections.iter_mut().find(|(c, _)| *c == connection_id)
1753 {
1754 if let ConnectionState::Closed = *connec_state {
1755 trace!(target: "sub-libp2p", "Handler({:?}, {:?}) <= Open({:?})",
1756 peer_id, connection_id, set_id);
1757 self.events.push_back(ToSwarm::NotifyHandler {
1758 peer_id,
1759 handler: NotifyHandler::One(connection_id),
1760 event: NotifsHandlerIn::Open { protocol_index: set_id.into() },
1761 });
1762 *connec_state = ConnectionState::Opening;
1763 } else {
1764 debug_assert!(matches!(
1770 connec_state,
1771 ConnectionState::OpenDesiredByRemote |
1772 ConnectionState::Closing | ConnectionState::Opening
1773 ));
1774 }
1775 } else {
1776 error!(
1777 target: "sub-libp2p",
1778 "OpenDesiredByRemote: State mismatch in the custom protos handler"
1779 );
1780 debug_assert!(false);
1781 }
1782
1783 *entry.into_mut() = PeerState::Enabled { connections };
1784 },
1785
1786 PeerState::Disabled { mut connections, backoff_until } => {
1788 if let Some((_, connec_state)) =
1789 connections.iter_mut().find(|(c, _)| *c == connection_id)
1790 {
1791 if let ConnectionState::Closed = *connec_state {
1792 *connec_state = ConnectionState::OpenDesiredByRemote;
1793
1794 let incoming_id = self.next_incoming_index;
1795 self.next_incoming_index.0 += 1;
1796
1797 trace!(target: "sub-libp2p", "PSM <= Incoming({}, {:?}, {:?}).",
1798 peer_id, set_id, incoming_id);
1799 self.protocol_controller_handles[usize::from(set_id)]
1800 .incoming_connection(peer_id, incoming_id);
1801 self.incoming.push(IncomingPeer {
1802 peer_id,
1803 set_id,
1804 alive: true,
1805 incoming_id,
1806 handshake,
1807 });
1808
1809 *entry.into_mut() = PeerState::Incoming {
1810 connections,
1811 backoff_until,
1812 peerset_rejected: false,
1813 incoming_index: incoming_id,
1814 };
1815 } else {
1816 debug_assert!(matches!(
1821 connec_state,
1822 ConnectionState::OpeningThenClosing | ConnectionState::Closing
1823 ));
1824 *entry.into_mut() =
1825 PeerState::Disabled { connections, backoff_until };
1826 }
1827 } else {
1828 error!(
1829 target: "sub-libp2p",
1830 "OpenDesiredByRemote: State mismatch in the custom protos handler"
1831 );
1832 debug_assert!(false);
1833 }
1834 },
1835
1836 PeerState::DisabledPendingEnable { mut connections, timer, timer_deadline } => {
1838 if let Some((_, connec_state)) =
1839 connections.iter_mut().find(|(c, _)| *c == connection_id)
1840 {
1841 if let ConnectionState::Closed = *connec_state {
1842 trace!(target: "sub-libp2p", "Handler({:?}, {:?}) <= Open({:?})",
1843 peer_id, connection_id, set_id);
1844 self.events.push_back(ToSwarm::NotifyHandler {
1845 peer_id,
1846 handler: NotifyHandler::One(connection_id),
1847 event: NotifsHandlerIn::Open { protocol_index: set_id.into() },
1848 });
1849 *connec_state = ConnectionState::Opening;
1850
1851 *entry.into_mut() = PeerState::Enabled { connections };
1852 } else {
1853 debug_assert!(matches!(
1858 connec_state,
1859 ConnectionState::OpeningThenClosing | ConnectionState::Closing
1860 ));
1861 *entry.into_mut() = PeerState::DisabledPendingEnable {
1862 connections,
1863 timer,
1864 timer_deadline,
1865 };
1866 }
1867 } else {
1868 error!(
1869 target: "sub-libp2p",
1870 "OpenDesiredByRemote: State mismatch in the custom protos handler"
1871 );
1872 debug_assert!(false);
1873 }
1874 },
1875
1876 state => {
1877 error!(target: "sub-libp2p",
1878 "OpenDesiredByRemote: Unexpected state in the custom protos handler: {:?}",
1879 state);
1880 debug_assert!(false);
1881 },
1882 };
1883 },
1884
1885 NotifsHandlerOut::CloseDesired { protocol_index } => {
1886 let set_id = SetId::from(protocol_index);
1887
1888 trace!(target: "sub-libp2p",
1889 "Handler({}, {:?}) => CloseDesired({:?})",
1890 peer_id, connection_id, set_id);
1891
1892 let mut entry = if let Entry::Occupied(entry) = self.peers.entry((peer_id, set_id))
1893 {
1894 entry
1895 } else {
1896 error!(target: "sub-libp2p", "CloseDesired: State mismatch in the custom protos handler");
1897 debug_assert!(false);
1898 return
1899 };
1900
1901 match mem::replace(entry.get_mut(), PeerState::Poisoned) {
1902 PeerState::Enabled { mut connections } => {
1904 debug_assert!(connections.iter().any(|(_, s)| matches!(
1905 s,
1906 ConnectionState::Opening | ConnectionState::Open(_)
1907 )));
1908
1909 let pos = if let Some(pos) =
1910 connections.iter().position(|(c, _)| *c == connection_id)
1911 {
1912 pos
1913 } else {
1914 error!(target: "sub-libp2p",
1915 "CloseDesired: State mismatch in the custom protos handler");
1916 debug_assert!(false);
1917 return
1918 };
1919
1920 if matches!(connections[pos].1, ConnectionState::Closing) {
1921 *entry.into_mut() = PeerState::Enabled { connections };
1922 return
1923 }
1924
1925 debug_assert!(matches!(connections[pos].1, ConnectionState::Open(_)));
1926 connections[pos].1 = ConnectionState::Closing;
1927
1928 trace!(target: "sub-libp2p", "Handler({}, {:?}) <= Close({:?})", peer_id, connection_id, set_id);
1929 self.events.push_back(ToSwarm::NotifyHandler {
1930 peer_id,
1931 handler: NotifyHandler::One(connection_id),
1932 event: NotifsHandlerIn::Close { protocol_index: set_id.into() },
1933 });
1934
1935 if let Some((replacement_pos, replacement_sink)) =
1936 connections.iter().enumerate().find_map(|(num, (_, s))| match s {
1937 ConnectionState::Open(s) => Some((num, s.clone())),
1938 _ => None,
1939 }) {
1940 if pos <= replacement_pos {
1941 trace!(target: "sub-libp2p", "External API <= Sink replaced({:?}, {:?})", peer_id, set_id);
1942 let event = NotificationsOut::CustomProtocolReplaced {
1943 peer_id,
1944 set_id,
1945 notifications_sink: replacement_sink.clone(),
1946 };
1947 self.events.push_back(ToSwarm::GenerateEvent(event));
1948 }
1949
1950 *entry.into_mut() = PeerState::Enabled { connections };
1951 } else {
1952 if !connections
1954 .iter()
1955 .any(|(_, s)| matches!(s, ConnectionState::Opening))
1956 {
1957 trace!(target: "sub-libp2p", "PSM <= Dropped({}, {:?})", peer_id, set_id);
1958 self.protocol_controller_handles[usize::from(set_id)]
1959 .dropped(peer_id);
1960 *entry.into_mut() =
1961 PeerState::Disabled { connections, backoff_until: None };
1962 } else {
1963 *entry.into_mut() = PeerState::Enabled { connections };
1964 }
1965
1966 trace!(target: "sub-libp2p", "External API <= Closed({}, {:?})", peer_id, set_id);
1967 let event = NotificationsOut::CustomProtocolClosed { peer_id, set_id };
1968 self.events.push_back(ToSwarm::GenerateEvent(event));
1969 }
1970 },
1971
1972 state @ PeerState::Disabled { .. } |
1975 state @ PeerState::DisabledPendingEnable { .. } => {
1976 *entry.into_mut() = state;
1977 },
1978 state => {
1979 error!(target: "sub-libp2p",
1980 "Unexpected state in the custom protos handler: {:?}",
1981 state);
1982 },
1983 }
1984 },
1985
1986 NotifsHandlerOut::CloseResult { protocol_index } => {
1987 let set_id = SetId::from(protocol_index);
1988
1989 trace!(target: "sub-libp2p",
1990 "Handler({}, {:?}) => CloseResult({:?})",
1991 peer_id, connection_id, set_id);
1992
1993 match self.peers.get_mut(&(peer_id, set_id)) {
1994 Some(PeerState::Incoming { connections, .. }) |
1996 Some(PeerState::DisabledPendingEnable { connections, .. }) |
1997 Some(PeerState::Disabled { connections, .. }) |
1998 Some(PeerState::Enabled { connections, .. }) => {
1999 if let Some((_, connec_state)) = connections.iter_mut().find(|(c, s)| {
2000 *c == connection_id && matches!(s, ConnectionState::Closing)
2001 }) {
2002 *connec_state = ConnectionState::Closed;
2003 } else {
2004 error!(target: "sub-libp2p",
2005 "CloseResult: State mismatch in the custom protos handler");
2006 debug_assert!(false);
2007 }
2008 },
2009
2010 state => {
2011 error!(target: "sub-libp2p",
2012 "CloseResult: Unexpected state in the custom protos handler: {:?}",
2013 state);
2014 debug_assert!(false);
2015 },
2016 }
2017 },
2018
2019 NotifsHandlerOut::OpenResultOk {
2020 protocol_index,
2021 negotiated_fallback,
2022 received_handshake,
2023 notifications_sink,
2024 inbound,
2025 ..
2026 } => {
2027 let set_id = SetId::from(protocol_index);
2028 trace!(target: "sub-libp2p",
2029 "Handler({}, {:?}) => OpenResultOk({:?})",
2030 peer_id, connection_id, set_id);
2031
2032 match self.peers.get_mut(&(peer_id, set_id)) {
2033 Some(PeerState::Enabled { connections, .. }) => {
2034 debug_assert!(connections.iter().any(|(_, s)| matches!(
2035 s,
2036 ConnectionState::Opening | ConnectionState::Open(_)
2037 )));
2038 let any_open =
2039 connections.iter().any(|(_, s)| matches!(s, ConnectionState::Open(_)));
2040
2041 if let Some((_, connec_state)) = connections.iter_mut().find(|(c, s)| {
2042 *c == connection_id && matches!(s, ConnectionState::Opening)
2043 }) {
2044 if !any_open {
2045 trace!(target: "sub-libp2p", "External API <= Open({}, {:?})", peer_id, set_id);
2046 let event = NotificationsOut::CustomProtocolOpen {
2047 peer_id,
2048 set_id,
2049 direction: if inbound {
2050 Direction::Inbound
2051 } else {
2052 Direction::Outbound
2053 },
2054 received_handshake: received_handshake.clone(),
2055 negotiated_fallback: negotiated_fallback.clone(),
2056 notifications_sink: notifications_sink.clone(),
2057 };
2058 self.events.push_back(ToSwarm::GenerateEvent(event));
2059 }
2060 *connec_state = ConnectionState::Open(notifications_sink);
2061 } else if let Some((_, connec_state)) =
2062 connections.iter_mut().find(|(c, s)| {
2063 *c == connection_id &&
2064 matches!(s, ConnectionState::OpeningThenClosing)
2065 }) {
2066 *connec_state = ConnectionState::Closing;
2067 } else {
2068 error!(target: "sub-libp2p",
2069 "OpenResultOk State mismatch in the custom protos handler");
2070 debug_assert!(false);
2071 }
2072 },
2073
2074 Some(PeerState::Incoming { connections, .. }) |
2075 Some(PeerState::DisabledPendingEnable { connections, .. }) |
2076 Some(PeerState::Disabled { connections, .. }) => {
2077 if let Some((_, connec_state)) = connections.iter_mut().find(|(c, s)| {
2078 *c == connection_id && matches!(s, ConnectionState::OpeningThenClosing)
2079 }) {
2080 *connec_state = ConnectionState::Closing;
2081 } else {
2082 error!(target: "sub-libp2p",
2083 "OpenResultOk State mismatch in the custom protos handler");
2084 debug_assert!(false);
2085 }
2086 },
2087
2088 state => {
2089 error!(target: "sub-libp2p",
2090 "OpenResultOk: Unexpected state in the custom protos handler: {:?}",
2091 state);
2092 debug_assert!(false);
2093 },
2094 }
2095 },
2096
2097 NotifsHandlerOut::OpenResultErr { protocol_index } => {
2098 let set_id = SetId::from(protocol_index);
2099 trace!(target: "sub-libp2p",
2100 "Handler({:?}, {:?}) => OpenResultErr({:?})",
2101 peer_id, connection_id, set_id);
2102
2103 let mut entry = if let Entry::Occupied(entry) = self.peers.entry((peer_id, set_id))
2104 {
2105 entry
2106 } else {
2107 error!(target: "sub-libp2p", "OpenResultErr: State mismatch in the custom protos handler");
2108 debug_assert!(false);
2109 return
2110 };
2111
2112 match mem::replace(entry.get_mut(), PeerState::Poisoned) {
2113 PeerState::Enabled { mut connections } => {
2114 debug_assert!(connections.iter().any(|(_, s)| matches!(
2115 s,
2116 ConnectionState::Opening | ConnectionState::Open(_)
2117 )));
2118
2119 if let Some((_, connec_state)) = connections.iter_mut().find(|(c, s)| {
2120 *c == connection_id && matches!(s, ConnectionState::Opening)
2121 }) {
2122 *connec_state = ConnectionState::Closed;
2123 } else if let Some((_, connec_state)) =
2124 connections.iter_mut().find(|(c, s)| {
2125 *c == connection_id &&
2126 matches!(s, ConnectionState::OpeningThenClosing)
2127 }) {
2128 *connec_state = ConnectionState::Closing;
2129 } else {
2130 error!(target: "sub-libp2p",
2131 "OpenResultErr: State mismatch in the custom protos handler");
2132 debug_assert!(false);
2133 }
2134
2135 if !connections.iter().any(|(_, s)| {
2136 matches!(s, ConnectionState::Opening | ConnectionState::Open(_))
2137 }) {
2138 trace!(target: "sub-libp2p", "PSM <= Dropped({:?}, {:?})", peer_id, set_id);
2139 self.protocol_controller_handles[usize::from(set_id)].dropped(peer_id);
2140
2141 let ban_dur = Uniform::new(5, 10).sample(&mut rand::thread_rng());
2142 *entry.into_mut() = PeerState::Disabled {
2143 connections,
2144 backoff_until: Some(Instant::now() + Duration::from_secs(ban_dur)),
2145 };
2146 } else {
2147 *entry.into_mut() = PeerState::Enabled { connections };
2148 }
2149 },
2150 mut state @ PeerState::Incoming { .. } |
2151 mut state @ PeerState::DisabledPendingEnable { .. } |
2152 mut state @ PeerState::Disabled { .. } => {
2153 match &mut state {
2154 PeerState::Incoming { connections, .. } |
2155 PeerState::Disabled { connections, .. } |
2156 PeerState::DisabledPendingEnable { connections, .. } => {
2157 if let Some((_, connec_state)) =
2158 connections.iter_mut().find(|(c, s)| {
2159 *c == connection_id &&
2160 matches!(s, ConnectionState::OpeningThenClosing)
2161 }) {
2162 *connec_state = ConnectionState::Closing;
2163 } else {
2164 error!(target: "sub-libp2p",
2165 "OpenResultErr: State mismatch in the custom protos handler");
2166 debug_assert!(false);
2167 }
2168 },
2169 _ => unreachable!(
2170 "Match branches are the same as the one on which we
2171 enter this block; qed"
2172 ),
2173 };
2174
2175 *entry.into_mut() = state;
2176 },
2177 state => {
2178 error!(target: "sub-libp2p",
2179 "Unexpected state in the custom protos handler: {:?}",
2180 state);
2181 debug_assert!(false);
2182 },
2183 };
2184 },
2185
2186 NotifsHandlerOut::Notification { protocol_index, message } => {
2187 let set_id = SetId::from(protocol_index);
2188 if self.is_open(&peer_id, set_id) {
2189 trace!(
2190 target: "sub-libp2p",
2191 "Handler({:?}) => Notification({}, {:?}, {} bytes)",
2192 connection_id,
2193 peer_id,
2194 set_id,
2195 message.len()
2196 );
2197 trace!(
2198 target: "sub-libp2p",
2199 "External API <= Message({}, {:?})",
2200 peer_id,
2201 set_id,
2202 );
2203 let event = NotificationsOut::Notification {
2204 peer_id,
2205 set_id,
2206 message: message.clone(),
2207 };
2208 self.events.push_back(ToSwarm::GenerateEvent(event));
2209 } else {
2210 trace!(
2211 target: "sub-libp2p",
2212 "Handler({:?}) => Post-close notification({}, {:?}, {} bytes)",
2213 connection_id,
2214 peer_id,
2215 set_id,
2216 message.len()
2217 );
2218 }
2219 },
2220 }
2221 }
2222
2223 fn poll(
2224 &mut self,
2225 cx: &mut Context,
2226 _params: &mut impl PollParameters,
2227 ) -> Poll<ToSwarm<Self::ToSwarm, THandlerInEvent<Self>>> {
2228 if let Some(event) = self.events.pop_front() {
2229 return Poll::Ready(event)
2230 }
2231
2232 loop {
2234 match futures::Stream::poll_next(Pin::new(&mut self.from_protocol_controllers), cx) {
2235 Poll::Ready(Some(Message::Accept(index))) => {
2236 self.peerset_report_preaccept(index);
2237 },
2238 Poll::Ready(Some(Message::Reject(index))) => {
2239 let _ = self.peerset_report_reject(index);
2240 },
2241 Poll::Ready(Some(Message::Connect { peer_id, set_id, .. })) => {
2242 self.peerset_report_connect(peer_id, set_id);
2243 },
2244 Poll::Ready(Some(Message::Drop { peer_id, set_id, .. })) => {
2245 self.peerset_report_disconnect(peer_id, set_id);
2246 },
2247 Poll::Ready(None) => {
2248 error!(
2249 target: "sub-libp2p",
2250 "Protocol controllers receiver stream has returned `None`. Ignore this error if the node is shutting down.",
2251 );
2252 break
2253 },
2254 Poll::Pending => break,
2255 }
2256 }
2257
2258 loop {
2260 match futures::Stream::poll_next(Pin::new(&mut self.command_streams), cx) {
2261 Poll::Ready(Some((set_id, command))) => match command {
2262 NotificationCommand::SetHandshake(handshake) => {
2263 self.set_notif_protocol_handshake(set_id.into(), handshake);
2264 },
2265 NotificationCommand::OpenSubstream(_peer) |
2266 NotificationCommand::CloseSubstream(_peer) => {
2267 todo!("substream control not implemented");
2268 },
2269 },
2270 Poll::Ready(None) => {
2271 error!(target: LOG_TARGET, "Protocol command streams have been shut down");
2272 break
2273 },
2274 Poll::Pending => break,
2275 }
2276 }
2277
2278 while let Poll::Ready(Some((result, index))) =
2279 self.pending_inbound_validations.poll_next_unpin(cx)
2280 {
2281 match result {
2282 Ok(ValidationResult::Accept) => {
2283 self.protocol_report_accept(index);
2284 },
2285 Ok(ValidationResult::Reject) => {
2286 self.protocol_report_reject(index);
2287 },
2288 Err(_) => {
2289 error!(target: LOG_TARGET, "Protocol has shut down");
2290 break
2291 },
2292 }
2293 }
2294
2295 while let Poll::Ready(Some((delay_id, peer_id, set_id))) =
2296 Pin::new(&mut self.delays).poll_next(cx)
2297 {
2298 let peer_state = match self.peers.get_mut(&(peer_id, set_id)) {
2299 Some(s) => s,
2300 None => continue,
2303 };
2304
2305 match peer_state {
2306 PeerState::Backoff { timer, .. } if *timer == delay_id => {
2307 trace!(target: "sub-libp2p", "Libp2p <= Clean up ban of {:?} from the state ({:?})", peer_id, set_id);
2308 self.peers.remove(&(peer_id, set_id));
2309 },
2310
2311 PeerState::PendingRequest { timer, .. } if *timer == delay_id => {
2312 trace!(target: "sub-libp2p", "Libp2p <= Dial {:?} now that ban has expired ({:?})", peer_id, set_id);
2313 self.events.push_back(ToSwarm::Dial { opts: peer_id.into() });
2314 *peer_state = PeerState::Requested;
2315 },
2316
2317 PeerState::DisabledPendingEnable { connections, timer, timer_deadline }
2318 if *timer == delay_id =>
2319 {
2320 if let Some((connec_id, connec_state)) =
2322 connections.iter_mut().find(|(_, s)| matches!(s, ConnectionState::Closed))
2323 {
2324 trace!(target: "sub-libp2p", "Handler({}, {:?}) <= Open({:?}) (ban expired)",
2325 peer_id, *connec_id, set_id);
2326 self.events.push_back(ToSwarm::NotifyHandler {
2327 peer_id,
2328 handler: NotifyHandler::One(*connec_id),
2329 event: NotifsHandlerIn::Open { protocol_index: set_id.into() },
2330 });
2331 *connec_state = ConnectionState::Opening;
2332 *peer_state = PeerState::Enabled { connections: mem::take(connections) };
2333 } else {
2334 *timer_deadline = Instant::now() + Duration::from_secs(5);
2335 let delay = futures_timer::Delay::new(Duration::from_secs(5));
2336 let timer = *timer;
2337 self.delays.push(
2338 async move {
2339 delay.await;
2340 (timer, peer_id, set_id)
2341 }
2342 .boxed(),
2343 );
2344 }
2345 },
2346
2347 _ => {},
2350 }
2351 }
2352
2353 if let Some(event) = self.events.pop_front() {
2354 return Poll::Ready(event)
2355 }
2356
2357 Poll::Pending
2358 }
2359}
2360
2361#[cfg(test)]
2362#[allow(deprecated)]
2363mod tests {
2364 use super::*;
2365 use crate::{
2366 mock::MockPeerStore,
2367 protocol::notifications::handler::tests::*,
2368 protocol_controller::{IncomingIndex, ProtoSetConfig, ProtocolController},
2369 };
2370 use libp2p::core::ConnectedPoint;
2371 use sc_utils::mpsc::tracing_unbounded;
2372 use std::{collections::HashSet, iter};
2373
2374 impl PartialEq for ConnectionState {
2375 fn eq(&self, other: &ConnectionState) -> bool {
2376 match (self, other) {
2377 (ConnectionState::Closed, ConnectionState::Closed) => true,
2378 (ConnectionState::Closing, ConnectionState::Closing) => true,
2379 (ConnectionState::Opening, ConnectionState::Opening) => true,
2380 (ConnectionState::OpeningThenClosing, ConnectionState::OpeningThenClosing) => true,
2381 (ConnectionState::OpenDesiredByRemote, ConnectionState::OpenDesiredByRemote) =>
2382 true,
2383 (ConnectionState::Open(_), ConnectionState::Open(_)) => true,
2384 _ => false,
2385 }
2386 }
2387 }
2388
2389 #[derive(Clone)]
2390 struct MockPollParams {}
2391
2392 impl PollParameters for MockPollParams {
2393 type SupportedProtocolsIter = std::vec::IntoIter<Vec<u8>>;
2394
2395 fn supported_protocols(&self) -> Self::SupportedProtocolsIter {
2396 vec![].into_iter()
2397 }
2398 }
2399
2400 fn development_notifs(
2401 ) -> (Notifications, ProtocolController, Box<dyn crate::service::traits::NotificationService>)
2402 {
2403 let (protocol_handle_pair, notif_service) =
2404 crate::protocol::notifications::service::notification_service("/proto/1".into());
2405 let (to_notifications, from_controller) =
2406 tracing_unbounded("test_controller_to_notifications", 10_000);
2407
2408 let (handle, controller) = ProtocolController::new(
2409 SetId::from(0),
2410 ProtoSetConfig {
2411 in_peers: 25,
2412 out_peers: 25,
2413 reserved_nodes: HashSet::new(),
2414 reserved_only: false,
2415 },
2416 to_notifications,
2417 Arc::new(MockPeerStore {}),
2418 );
2419
2420 let (notif_handle, command_stream) = protocol_handle_pair.split();
2421 (
2422 Notifications::new(
2423 vec![handle],
2424 from_controller,
2425 NotificationMetrics::new(None),
2426 iter::once((
2427 ProtocolConfig {
2428 name: "/foo".into(),
2429 fallback_names: Vec::new(),
2430 handshake: vec![1, 2, 3, 4],
2431 max_notification_size: u64::MAX,
2432 },
2433 notif_handle,
2434 command_stream,
2435 )),
2436 ),
2437 controller,
2438 notif_service,
2439 )
2440 }
2441
2442 #[test]
2443 fn update_handshake() {
2444 let (mut notif, _controller, _notif_service) = development_notifs();
2445
2446 let inner = notif.notif_protocols.get_mut(0).unwrap().handshake.read().clone();
2447 assert_eq!(inner, vec![1, 2, 3, 4]);
2448
2449 notif.set_notif_protocol_handshake(0.into(), vec![5, 6, 7, 8]);
2450
2451 let inner = notif.notif_protocols.get_mut(0).unwrap().handshake.read().clone();
2452 assert_eq!(inner, vec![5, 6, 7, 8]);
2453 }
2454
2455 #[test]
2456 #[should_panic]
2457 #[cfg(debug_assertions)]
2458 fn update_unknown_handshake() {
2459 let (mut notif, _controller, _notif_service) = development_notifs();
2460
2461 notif.set_notif_protocol_handshake(1337.into(), vec![5, 6, 7, 8]);
2462 }
2463
2464 #[test]
2465 fn disconnect_backoff_peer() {
2466 let (mut notif, _controller, _notif_service) = development_notifs();
2467
2468 let peer = PeerId::random();
2469 notif.peers.insert(
2470 (peer, 0.into()),
2471 PeerState::Backoff { timer: DelayId(0), timer_deadline: Instant::now() },
2472 );
2473 notif.disconnect_peer(&peer, 0.into());
2474
2475 assert!(std::matches!(
2476 notif.peers.get(&(peer, 0.into())),
2477 Some(PeerState::Backoff { timer: DelayId(0), .. })
2478 ));
2479 }
2480
2481 #[test]
2482 fn disconnect_pending_request() {
2483 let (mut notif, _controller, _notif_service) = development_notifs();
2484 let peer = PeerId::random();
2485
2486 notif.peers.insert(
2487 (peer, 0.into()),
2488 PeerState::PendingRequest { timer: DelayId(0), timer_deadline: Instant::now() },
2489 );
2490 notif.disconnect_peer(&peer, 0.into());
2491
2492 assert!(std::matches!(
2493 notif.peers.get(&(peer, 0.into())),
2494 Some(PeerState::PendingRequest { timer: DelayId(0), .. })
2495 ));
2496 }
2497
2498 #[test]
2499 fn disconnect_requested_peer() {
2500 let (mut notif, _controller, _notif_service) = development_notifs();
2501
2502 let peer = PeerId::random();
2503 notif.peers.insert((peer, 0.into()), PeerState::Requested);
2504 notif.disconnect_peer(&peer, 0.into());
2505
2506 assert!(std::matches!(notif.peers.get(&(peer, 0.into())), Some(PeerState::Requested)));
2507 }
2508
2509 #[test]
2510 fn disconnect_disabled_peer() {
2511 let (mut notif, _controller, _notif_service) = development_notifs();
2512 let peer = PeerId::random();
2513 notif.peers.insert(
2514 (peer, 0.into()),
2515 PeerState::Disabled { backoff_until: None, connections: SmallVec::new() },
2516 );
2517 notif.disconnect_peer(&peer, 0.into());
2518
2519 assert!(std::matches!(
2520 notif.peers.get(&(peer, 0.into())),
2521 Some(PeerState::Disabled { backoff_until: None, .. })
2522 ));
2523 }
2524
2525 #[test]
2526 fn remote_opens_connection_and_substream() {
2527 let (mut notif, _controller, _notif_service) = development_notifs();
2528 let peer = PeerId::random();
2529 let conn = ConnectionId::new_unchecked(0);
2530 let connected = ConnectedPoint::Listener {
2531 local_addr: Multiaddr::empty(),
2532 send_back_addr: Multiaddr::empty(),
2533 };
2534
2535 notif.on_swarm_event(FromSwarm::ConnectionEstablished(
2536 libp2p::swarm::behaviour::ConnectionEstablished {
2537 peer_id: peer,
2538 connection_id: conn,
2539 endpoint: &connected,
2540 failed_addresses: &[],
2541 other_established: 0usize,
2542 },
2543 ));
2544
2545 if let Some(&PeerState::Disabled { ref connections, backoff_until: None }) =
2546 notif.peers.get(&(peer, 0.into()))
2547 {
2548 assert_eq!(connections[0], (conn, ConnectionState::Closed));
2549 } else {
2550 panic!("invalid state");
2551 }
2552
2553 notif.on_connection_handler_event(
2555 peer,
2556 conn,
2557 NotifsHandlerOut::OpenDesiredByRemote {
2558 protocol_index: 0,
2559 handshake: vec![1, 3, 3, 7],
2560 },
2561 );
2562
2563 if let Some(&PeerState::Incoming { ref connections, backoff_until: None, .. }) =
2564 notif.peers.get(&(peer, 0.into()))
2565 {
2566 assert_eq!(connections.len(), 1);
2567 assert_eq!(connections[0], (conn, ConnectionState::OpenDesiredByRemote));
2568 } else {
2569 panic!("invalid state");
2570 }
2571
2572 assert!(std::matches!(
2573 notif.incoming.pop(),
2574 Some(IncomingPeer { alive: true, incoming_id: IncomingIndex(0), .. }),
2575 ));
2576 }
2577
2578 #[tokio::test]
2579 async fn disconnect_remote_substream_before_handled_by_controller() {
2580 let (mut notif, _controller, _notif_service) = development_notifs();
2581 let peer = PeerId::random();
2582 let conn = ConnectionId::new_unchecked(0);
2583 let connected = ConnectedPoint::Listener {
2584 local_addr: Multiaddr::empty(),
2585 send_back_addr: Multiaddr::empty(),
2586 };
2587
2588 notif.on_swarm_event(FromSwarm::ConnectionEstablished(
2589 libp2p::swarm::behaviour::ConnectionEstablished {
2590 peer_id: peer,
2591 connection_id: conn,
2592 endpoint: &connected,
2593 failed_addresses: &[],
2594 other_established: 0usize,
2595 },
2596 ));
2597 notif.on_connection_handler_event(
2598 peer,
2599 conn,
2600 NotifsHandlerOut::OpenDesiredByRemote {
2601 protocol_index: 0,
2602 handshake: vec![1, 3, 3, 7],
2603 },
2604 );
2605 notif.disconnect_peer(&peer, 0.into());
2606
2607 if let Some(&PeerState::Disabled { ref connections, backoff_until: None }) =
2608 notif.peers.get(&(peer, 0.into()))
2609 {
2610 assert_eq!(connections.len(), 1);
2611 assert_eq!(connections[0], (conn, ConnectionState::Closing));
2612 } else {
2613 panic!("invalid state");
2614 }
2615 }
2616
2617 #[test]
2618 fn peerset_report_connect_backoff() {
2619 let (mut notif, _controller, _notif_service) = development_notifs();
2620 let set_id = SetId::from(0);
2621 let peer = PeerId::random();
2622 let conn = ConnectionId::new_unchecked(0);
2623 let connected = ConnectedPoint::Listener {
2624 local_addr: Multiaddr::empty(),
2625 send_back_addr: Multiaddr::empty(),
2626 };
2627
2628 notif.on_swarm_event(FromSwarm::ConnectionEstablished(
2629 libp2p::swarm::behaviour::ConnectionEstablished {
2630 peer_id: peer,
2631 connection_id: conn,
2632 endpoint: &connected,
2633 failed_addresses: &[],
2634 other_established: 0usize,
2635 },
2636 ));
2637 assert!(std::matches!(notif.peers.get(&(peer, set_id)), Some(&PeerState::Disabled { .. })));
2638
2639 if let Some(PeerState::Disabled { ref mut backoff_until, .. }) =
2646 notif.peers.get_mut(&(peer, set_id))
2647 {
2648 *backoff_until =
2649 Some(Instant::now().checked_add(std::time::Duration::from_secs(5)).unwrap());
2650 }
2651
2652 notif.on_swarm_event(FromSwarm::ConnectionClosed(
2653 libp2p::swarm::behaviour::ConnectionClosed {
2654 peer_id: peer,
2655 connection_id: conn,
2656 endpoint: &connected.clone(),
2657 handler: NotifsHandler::new(peer, vec![], None),
2658 remaining_established: 0usize,
2659 },
2660 ));
2661
2662 let timer = if let Some(&PeerState::Backoff { timer_deadline, .. }) =
2663 notif.peers.get(&(peer, set_id))
2664 {
2665 timer_deadline
2666 } else {
2667 panic!("invalid state");
2668 };
2669
2670 notif.peerset_report_connect(peer, set_id);
2672
2673 if let Some(&PeerState::PendingRequest { timer_deadline, .. }) =
2674 notif.peers.get(&(peer, set_id))
2675 {
2676 assert_eq!(timer, timer_deadline);
2677 } else {
2678 panic!("invalid state");
2679 }
2680 }
2681
2682 #[test]
2683 fn peerset_connect_incoming() {
2684 let (mut notif, _controller, _notif_service) = development_notifs();
2685 let peer = PeerId::random();
2686 let conn = ConnectionId::new_unchecked(0);
2687 let set_id = SetId::from(0);
2688 let connected = ConnectedPoint::Listener {
2689 local_addr: Multiaddr::empty(),
2690 send_back_addr: Multiaddr::empty(),
2691 };
2692
2693 notif.on_swarm_event(FromSwarm::ConnectionEstablished(
2694 libp2p::swarm::behaviour::ConnectionEstablished {
2695 peer_id: peer,
2696 connection_id: conn,
2697 endpoint: &connected,
2698 failed_addresses: &[],
2699 other_established: 0usize,
2700 },
2701 ));
2702 assert!(std::matches!(notif.peers.get(&(peer, set_id)), Some(&PeerState::Disabled { .. })));
2703
2704 notif.on_connection_handler_event(
2706 peer,
2707 conn,
2708 NotifsHandlerOut::OpenDesiredByRemote {
2709 protocol_index: 0,
2710 handshake: vec![1, 3, 3, 7],
2711 },
2712 );
2713
2714 notif.protocol_report_accept(IncomingIndex(0));
2718 assert!(std::matches!(notif.peers.get(&(peer, set_id)), Some(&PeerState::Enabled { .. })));
2719 }
2720
2721 #[test]
2722 fn peerset_disconnect_disable_pending_enable() {
2723 let (mut notif, _controller, _notif_service) = development_notifs();
2724 let set_id = SetId::from(0);
2725 let peer = PeerId::random();
2726 let conn = ConnectionId::new_unchecked(0);
2727 let connected = ConnectedPoint::Listener {
2728 local_addr: Multiaddr::empty(),
2729 send_back_addr: Multiaddr::empty(),
2730 };
2731
2732 notif.on_swarm_event(FromSwarm::ConnectionEstablished(
2733 libp2p::swarm::behaviour::ConnectionEstablished {
2734 peer_id: peer,
2735 connection_id: conn,
2736 endpoint: &connected,
2737 failed_addresses: &[],
2738 other_established: 0usize,
2739 },
2740 ));
2741 assert!(std::matches!(notif.peers.get(&(peer, set_id)), Some(&PeerState::Disabled { .. })));
2742
2743 if let Some(PeerState::Disabled { ref mut backoff_until, .. }) =
2745 notif.peers.get_mut(&(peer, set_id))
2746 {
2747 *backoff_until =
2748 Some(Instant::now().checked_add(std::time::Duration::from_secs(5)).unwrap());
2749 }
2750
2751 notif.peerset_report_connect(peer, set_id);
2753 assert!(std::matches!(
2754 notif.peers.get(&(peer, set_id)),
2755 Some(&PeerState::DisabledPendingEnable { .. })
2756 ));
2757
2758 notif.peerset_report_disconnect(peer, set_id);
2759
2760 if let Some(PeerState::Disabled { backoff_until, .. }) = notif.peers.get(&(peer, set_id)) {
2761 assert!(backoff_until.is_some());
2762 assert!(backoff_until.unwrap() > Instant::now());
2763 } else {
2764 panic!("invalid state");
2765 }
2766 }
2767
2768 #[test]
2769 fn peerset_disconnect_enabled() {
2770 let (mut notif, _controller, _notif_service) = development_notifs();
2771 let peer = PeerId::random();
2772 let conn = ConnectionId::new_unchecked(0);
2773 let set_id = SetId::from(0);
2774 let connected = ConnectedPoint::Listener {
2775 local_addr: Multiaddr::empty(),
2776 send_back_addr: Multiaddr::empty(),
2777 };
2778
2779 notif.on_swarm_event(FromSwarm::ConnectionEstablished(
2781 libp2p::swarm::behaviour::ConnectionEstablished {
2782 peer_id: peer,
2783 connection_id: conn,
2784 endpoint: &connected,
2785 failed_addresses: &[],
2786 other_established: 0usize,
2787 },
2788 ));
2789 notif.on_connection_handler_event(
2790 peer,
2791 conn,
2792 NotifsHandlerOut::OpenDesiredByRemote {
2793 protocol_index: 0,
2794 handshake: vec![1, 3, 3, 7],
2795 },
2796 );
2797 notif.protocol_report_accept(IncomingIndex(0));
2800 assert!(std::matches!(notif.peers.get(&(peer, set_id)), Some(&PeerState::Enabled { .. })));
2801
2802 notif.peerset_report_disconnect(peer, set_id);
2804 assert!(std::matches!(notif.peers.get(&(peer, set_id)), Some(&PeerState::Disabled { .. })));
2805 }
2806
2807 #[test]
2808 fn peerset_disconnect_requested() {
2809 let (mut notif, _controller, _notif_service) = development_notifs();
2810 let peer = PeerId::random();
2811 let set_id = SetId::from(0);
2812
2813 notif.peerset_report_connect(peer, set_id);
2815 assert!(std::matches!(notif.peers.get(&(peer, set_id)), Some(&PeerState::Requested)));
2816
2817 notif.peerset_report_disconnect(peer, set_id);
2819 assert!(notif.peers.get(&(peer, set_id)).is_none());
2820 }
2821
2822 #[test]
2823 fn peerset_disconnect_pending_request() {
2824 let (mut notif, _controller, _notif_service) = development_notifs();
2825 let set_id = SetId::from(0);
2826 let peer = PeerId::random();
2827 let conn = ConnectionId::new_unchecked(0);
2828 let connected = ConnectedPoint::Listener {
2829 local_addr: Multiaddr::empty(),
2830 send_back_addr: Multiaddr::empty(),
2831 };
2832
2833 notif.on_swarm_event(FromSwarm::ConnectionEstablished(
2834 libp2p::swarm::behaviour::ConnectionEstablished {
2835 peer_id: peer,
2836 connection_id: conn,
2837 endpoint: &connected,
2838 failed_addresses: &[],
2839 other_established: 0usize,
2840 },
2841 ));
2842 assert!(std::matches!(notif.peers.get(&(peer, set_id)), Some(&PeerState::Disabled { .. })));
2843
2844 if let Some(PeerState::Disabled { ref mut backoff_until, .. }) =
2846 notif.peers.get_mut(&(peer, set_id))
2847 {
2848 *backoff_until =
2849 Some(Instant::now().checked_add(std::time::Duration::from_secs(5)).unwrap());
2850 }
2851
2852 notif.on_swarm_event(FromSwarm::ConnectionClosed(
2853 libp2p::swarm::behaviour::ConnectionClosed {
2854 peer_id: peer,
2855 connection_id: conn,
2856 endpoint: &connected.clone(),
2857 handler: NotifsHandler::new(peer, vec![], None),
2858 remaining_established: 0usize,
2859 },
2860 ));
2861 assert!(std::matches!(notif.peers.get(&(peer, set_id)), Some(&PeerState::Backoff { .. })));
2862
2863 notif.peerset_report_connect(peer, set_id);
2865 assert!(std::matches!(
2866 notif.peers.get(&(peer, set_id)),
2867 Some(&PeerState::PendingRequest { .. })
2868 ));
2869
2870 notif.peerset_report_disconnect(peer, set_id);
2872 assert!(std::matches!(notif.peers.get(&(peer, set_id)), Some(&PeerState::Backoff { .. })));
2873 }
2874
2875 #[test]
2876 fn peerset_accept_peer_not_alive() {
2877 let (mut notif, _controller, _notif_service) = development_notifs();
2878 let peer = PeerId::random();
2879 let conn = ConnectionId::new_unchecked(0);
2880 let set_id = SetId::from(0);
2881 let connected = ConnectedPoint::Listener {
2882 local_addr: Multiaddr::empty(),
2883 send_back_addr: Multiaddr::empty(),
2884 };
2885
2886 notif.on_swarm_event(FromSwarm::ConnectionEstablished(
2887 libp2p::swarm::behaviour::ConnectionEstablished {
2888 peer_id: peer,
2889 connection_id: conn,
2890 endpoint: &connected,
2891 failed_addresses: &[],
2892 other_established: 0usize,
2893 },
2894 ));
2895 assert!(std::matches!(notif.peers.get(&(peer, set_id)), Some(&PeerState::Disabled { .. })));
2896
2897 notif.on_connection_handler_event(
2899 peer,
2900 conn,
2901 NotifsHandlerOut::OpenDesiredByRemote {
2902 protocol_index: 0,
2903 handshake: vec![1, 3, 3, 7],
2904 },
2905 );
2906 assert!(std::matches!(notif.peers.get(&(peer, set_id)), Some(&PeerState::Incoming { .. })));
2907
2908 assert!(std::matches!(
2909 notif.incoming[0],
2910 IncomingPeer { alive: true, incoming_id: IncomingIndex(0), .. },
2911 ));
2912
2913 notif.disconnect_peer(&peer, set_id);
2914 assert!(std::matches!(notif.peers.get(&(peer, set_id)), Some(&PeerState::Disabled { .. })));
2915 assert!(std::matches!(
2916 notif.incoming[0],
2917 IncomingPeer { alive: false, incoming_id: IncomingIndex(0), .. },
2918 ));
2919
2920 notif.protocol_report_accept(IncomingIndex(0));
2921 assert_eq!(notif.incoming.len(), 0);
2922 assert!(std::matches!(notif.peers.get(&(peer, set_id)), Some(PeerState::Disabled { .. })));
2923 }
2924
2925 #[test]
2926 fn secondary_connection_peer_state_incoming() {
2927 let (mut notif, _controller, _notif_service) = development_notifs();
2928 let peer = PeerId::random();
2929 let conn = ConnectionId::new_unchecked(0);
2930 let conn2 = ConnectionId::new_unchecked(1);
2931 let set_id = SetId::from(0);
2932 let connected = ConnectedPoint::Listener {
2933 local_addr: Multiaddr::empty(),
2934 send_back_addr: Multiaddr::empty(),
2935 };
2936
2937 notif.on_swarm_event(FromSwarm::ConnectionEstablished(
2938 libp2p::swarm::behaviour::ConnectionEstablished {
2939 peer_id: peer,
2940 connection_id: conn,
2941 endpoint: &connected,
2942 failed_addresses: &[],
2943 other_established: 0usize,
2944 },
2945 ));
2946 assert!(std::matches!(notif.peers.get(&(peer, set_id)), Some(&PeerState::Disabled { .. })));
2947
2948 notif.on_connection_handler_event(
2949 peer,
2950 conn,
2951 NotifsHandlerOut::OpenDesiredByRemote {
2952 protocol_index: 0,
2953 handshake: vec![1, 3, 3, 7],
2954 },
2955 );
2956 if let Some(PeerState::Incoming { connections, .. }) = notif.peers.get(&(peer, set_id)) {
2957 assert_eq!(connections.len(), 1);
2958 assert_eq!(connections[0], (conn, ConnectionState::OpenDesiredByRemote));
2959 } else {
2960 panic!("invalid state");
2961 }
2962
2963 notif.on_swarm_event(FromSwarm::ConnectionEstablished(
2965 libp2p::swarm::behaviour::ConnectionEstablished {
2966 peer_id: peer,
2967 connection_id: conn2,
2968 endpoint: &connected,
2969 failed_addresses: &[],
2970 other_established: 0usize,
2971 },
2972 ));
2973
2974 if let Some(PeerState::Incoming { connections, .. }) = notif.peers.get(&(peer, set_id)) {
2975 assert_eq!(connections.len(), 2);
2976 assert_eq!(connections[0], (conn, ConnectionState::OpenDesiredByRemote));
2977 assert_eq!(connections[1], (conn2, ConnectionState::Closed));
2978 } else {
2979 panic!("invalid state");
2980 }
2981 }
2982
2983 #[test]
2984 fn close_connection_for_disabled_peer() {
2985 let (mut notif, _controller, _notif_service) = development_notifs();
2986 let peer = PeerId::random();
2987 let conn = ConnectionId::new_unchecked(0);
2988 let set_id = SetId::from(0);
2989 let connected = ConnectedPoint::Listener {
2990 local_addr: Multiaddr::empty(),
2991 send_back_addr: Multiaddr::empty(),
2992 };
2993
2994 notif.on_swarm_event(FromSwarm::ConnectionEstablished(
2995 libp2p::swarm::behaviour::ConnectionEstablished {
2996 peer_id: peer,
2997 connection_id: conn,
2998 endpoint: &connected,
2999 failed_addresses: &[],
3000 other_established: 0usize,
3001 },
3002 ));
3003 assert!(std::matches!(notif.peers.get(&(peer, set_id)), Some(&PeerState::Disabled { .. })));
3004
3005 notif.on_swarm_event(FromSwarm::ConnectionClosed(
3006 libp2p::swarm::behaviour::ConnectionClosed {
3007 peer_id: peer,
3008 connection_id: conn,
3009 endpoint: &connected.clone(),
3010 handler: NotifsHandler::new(peer, vec![], None),
3011 remaining_established: 0usize,
3012 },
3013 ));
3014 assert!(notif.peers.get(&(peer, set_id)).is_none());
3015 }
3016
3017 #[test]
3018 fn close_connection_for_incoming_peer_one_connection() {
3019 let (mut notif, _controller, _notif_service) = development_notifs();
3020 let peer = PeerId::random();
3021 let conn = ConnectionId::new_unchecked(0);
3022 let set_id = SetId::from(0);
3023 let connected = ConnectedPoint::Listener {
3024 local_addr: Multiaddr::empty(),
3025 send_back_addr: Multiaddr::empty(),
3026 };
3027
3028 notif.on_swarm_event(FromSwarm::ConnectionEstablished(
3029 libp2p::swarm::behaviour::ConnectionEstablished {
3030 peer_id: peer,
3031 connection_id: conn,
3032 endpoint: &connected,
3033 failed_addresses: &[],
3034 other_established: 0usize,
3035 },
3036 ));
3037 assert!(std::matches!(notif.peers.get(&(peer, set_id)), Some(&PeerState::Disabled { .. })));
3038
3039 notif.on_connection_handler_event(
3040 peer,
3041 conn,
3042 NotifsHandlerOut::OpenDesiredByRemote {
3043 protocol_index: 0,
3044 handshake: vec![1, 3, 3, 7],
3045 },
3046 );
3047 assert!(std::matches!(notif.peers.get(&(peer, set_id)), Some(&PeerState::Incoming { .. })));
3048
3049 notif.on_swarm_event(FromSwarm::ConnectionClosed(
3050 libp2p::swarm::behaviour::ConnectionClosed {
3051 peer_id: peer,
3052 connection_id: conn,
3053 endpoint: &connected.clone(),
3054 handler: NotifsHandler::new(peer, vec![], None),
3055 remaining_established: 0usize,
3056 },
3057 ));
3058 assert!(notif.peers.get(&(peer, set_id)).is_none());
3059 assert!(std::matches!(
3060 notif.incoming[0],
3061 IncomingPeer { alive: false, incoming_id: IncomingIndex(0), .. },
3062 ));
3063 }
3064
3065 #[test]
3066 fn close_connection_for_incoming_peer_two_connections() {
3067 let (mut notif, _controller, _notif_service) = development_notifs();
3068 let peer = PeerId::random();
3069 let conn = ConnectionId::new_unchecked(0);
3070 let conn1 = ConnectionId::new_unchecked(1);
3071 let set_id = SetId::from(0);
3072 let connected = ConnectedPoint::Listener {
3073 local_addr: Multiaddr::empty(),
3074 send_back_addr: Multiaddr::empty(),
3075 };
3076 let mut conns = SmallVec::<
3077 [(ConnectionId, ConnectionState); crate::MAX_CONNECTIONS_PER_PEER],
3078 >::from(vec![(conn, ConnectionState::Closed)]);
3079
3080 notif.on_swarm_event(FromSwarm::ConnectionEstablished(
3081 libp2p::swarm::behaviour::ConnectionEstablished {
3082 peer_id: peer,
3083 connection_id: conn,
3084 endpoint: &connected,
3085 failed_addresses: &[],
3086 other_established: 0usize,
3087 },
3088 ));
3089 assert!(std::matches!(notif.peers.get(&(peer, set_id)), Some(&PeerState::Disabled { .. })));
3090
3091 notif.on_connection_handler_event(
3092 peer,
3093 conn,
3094 NotifsHandlerOut::OpenDesiredByRemote {
3095 protocol_index: 0,
3096 handshake: vec![1, 3, 3, 7],
3097 },
3098 );
3099 assert!(std::matches!(notif.peers.get(&(peer, set_id)), Some(&PeerState::Incoming { .. })));
3100
3101 notif.on_swarm_event(FromSwarm::ConnectionEstablished(
3102 libp2p::swarm::behaviour::ConnectionEstablished {
3103 peer_id: peer,
3104 connection_id: conn1,
3105 endpoint: &connected,
3106 failed_addresses: &[],
3107 other_established: 0usize,
3108 },
3109 ));
3110 conns.push((conn1, ConnectionState::Closed));
3111
3112 if let Some(PeerState::Incoming { ref connections, .. }) = notif.peers.get(&(peer, set_id))
3113 {
3114 assert_eq!(connections.len(), 2);
3115 assert_eq!(connections[0], (conn, ConnectionState::OpenDesiredByRemote));
3116 assert_eq!(connections[1], (conn1, ConnectionState::Closed));
3117 }
3118
3119 notif.on_swarm_event(FromSwarm::ConnectionClosed(
3120 libp2p::swarm::behaviour::ConnectionClosed {
3121 peer_id: peer,
3122 connection_id: conn,
3123 endpoint: &connected.clone(),
3124 handler: NotifsHandler::new(peer, vec![], None),
3125 remaining_established: 0usize,
3126 },
3127 ));
3128
3129 if let Some(PeerState::Disabled { connections, .. }) = notif.peers.get(&(peer, set_id)) {
3130 assert_eq!(connections.len(), 1);
3131 assert_eq!(connections[0], (conn1, ConnectionState::Closed));
3132 } else {
3133 panic!("invalid state");
3134 }
3135 }
3136
3137 #[test]
3138 fn connection_and_substream_open() {
3139 let (mut notif, _controller, _notif_service) = development_notifs();
3140 let peer = PeerId::random();
3141 let conn = ConnectionId::new_unchecked(0);
3142 let set_id = SetId::from(0);
3143 let connected = ConnectedPoint::Listener {
3144 local_addr: Multiaddr::empty(),
3145 send_back_addr: Multiaddr::empty(),
3146 };
3147 let mut conn_yielder = ConnectionYielder::new();
3148
3149 notif.on_swarm_event(FromSwarm::ConnectionEstablished(
3151 libp2p::swarm::behaviour::ConnectionEstablished {
3152 peer_id: peer,
3153 connection_id: conn,
3154 endpoint: &connected,
3155 failed_addresses: &[],
3156 other_established: 0usize,
3157 },
3158 ));
3159 assert!(std::matches!(notif.peers.get(&(peer, set_id)), Some(&PeerState::Disabled { .. })));
3160
3161 notif.on_connection_handler_event(
3162 peer,
3163 conn,
3164 NotifsHandlerOut::OpenDesiredByRemote {
3165 protocol_index: 0,
3166 handshake: vec![1, 3, 3, 7],
3167 },
3168 );
3169 assert!(std::matches!(notif.peers.get(&(peer, set_id)), Some(&PeerState::Incoming { .. })));
3170
3171 notif.protocol_report_accept(IncomingIndex(0));
3174 assert!(std::matches!(notif.peers.get(&(peer, set_id)), Some(&PeerState::Enabled { .. })));
3175
3176 let event = conn_yielder.open_substream(peer, 0, vec![1, 2, 3, 4]);
3178
3179 notif.on_connection_handler_event(peer, conn, event);
3180 assert!(std::matches!(notif.peers.get(&(peer, set_id)), Some(&PeerState::Enabled { .. })));
3181
3182 if let Some(PeerState::Enabled { ref connections, .. }) = notif.peers.get(&(peer, set_id)) {
3183 assert_eq!(connections.len(), 1);
3184 assert_eq!(connections[0].0, conn);
3185 assert!(std::matches!(connections[0].1, ConnectionState::Open(_)));
3186 }
3187
3188 assert!(std::matches!(
3189 notif.events[notif.events.len() - 1],
3190 ToSwarm::GenerateEvent(NotificationsOut::CustomProtocolOpen { .. })
3191 ));
3192 }
3193
3194 #[test]
3195 fn connection_closed_sink_replaced() {
3196 let (mut notif, _controller, _notif_service) = development_notifs();
3197 let peer = PeerId::random();
3198 let conn1 = ConnectionId::new_unchecked(0);
3199 let conn2 = ConnectionId::new_unchecked(1);
3200 let set_id = SetId::from(0);
3201 let connected = ConnectedPoint::Listener {
3202 local_addr: Multiaddr::empty(),
3203 send_back_addr: Multiaddr::empty(),
3204 };
3205 let mut conn_yielder = ConnectionYielder::new();
3206
3207 for conn_id in vec![conn1, conn2] {
3209 notif.on_swarm_event(FromSwarm::ConnectionEstablished(
3210 libp2p::swarm::behaviour::ConnectionEstablished {
3211 peer_id: peer,
3212 connection_id: conn_id,
3213 endpoint: &connected,
3214 failed_addresses: &[],
3215 other_established: 0usize,
3216 },
3217 ));
3218 }
3219
3220 if let Some(PeerState::Disabled { connections, .. }) = notif.peers.get(&(peer, set_id)) {
3221 assert_eq!(connections[0], (conn1, ConnectionState::Closed));
3222 assert_eq!(connections[1], (conn2, ConnectionState::Closed));
3223 } else {
3224 panic!("invalid state");
3225 }
3226
3227 notif.peerset_report_connect(peer, set_id);
3229 notif.on_connection_handler_event(
3230 peer,
3231 conn2,
3232 NotifsHandlerOut::OpenDesiredByRemote {
3233 protocol_index: 0,
3234 handshake: vec![1, 3, 3, 7],
3235 },
3236 );
3237
3238 if let Some(PeerState::Enabled { connections, .. }) = notif.peers.get(&(peer, set_id)) {
3239 assert_eq!(connections[0], (conn1, ConnectionState::Opening));
3240 assert_eq!(connections[1], (conn2, ConnectionState::Opening));
3241 } else {
3242 panic!("invalid state");
3243 }
3244
3245 for conn in vec![conn1, conn2].iter() {
3247 notif.on_connection_handler_event(
3248 peer,
3249 *conn,
3250 conn_yielder.open_substream(peer, 0, vec![1, 2, 3, 4]),
3251 );
3252 }
3253
3254 if let Some(PeerState::Enabled { ref connections, .. }) = notif.peers.get(&(peer, set_id)) {
3255 assert_eq!(connections[0].0, conn1);
3256 assert!(std::matches!(connections[0].1, ConnectionState::Open(_)));
3257 assert_eq!(connections[1].0, conn2);
3258 assert!(std::matches!(connections[1].1, ConnectionState::Open(_)));
3259 } else {
3260 panic!("invalid state");
3261 }
3262
3263 assert_eq!(notif.open_peers().collect::<Vec<_>>(), vec![&peer],);
3265
3266 notif.on_swarm_event(FromSwarm::ConnectionClosed(
3268 libp2p::swarm::behaviour::ConnectionClosed {
3269 peer_id: peer,
3270 connection_id: conn1,
3271 endpoint: &connected.clone(),
3272 handler: NotifsHandler::new(peer, vec![], None),
3273 remaining_established: 0usize,
3274 },
3275 ));
3276
3277 if let Some(PeerState::Enabled { ref connections, .. }) = notif.peers.get(&(peer, set_id)) {
3278 assert_eq!(connections.len(), 1);
3279 assert_eq!(connections[0].0, conn2);
3280 assert!(std::matches!(connections[0].1, ConnectionState::Open(_)));
3281 } else {
3282 panic!("invalid state");
3283 }
3284
3285 assert!(std::matches!(
3286 notif.events[notif.events.len() - 1],
3287 ToSwarm::GenerateEvent(NotificationsOut::CustomProtocolReplaced { .. })
3288 ));
3289 }
3290
3291 #[test]
3292 fn dial_failure_for_requested_peer() {
3293 let (mut notif, _controller, _notif_service) = development_notifs();
3294 let peer = PeerId::random();
3295 let set_id = SetId::from(0);
3296
3297 notif.peerset_report_connect(peer, set_id);
3299 assert!(std::matches!(notif.peers.get(&(peer, set_id)), Some(&PeerState::Requested)));
3300
3301 notif.on_swarm_event(FromSwarm::DialFailure(libp2p::swarm::behaviour::DialFailure {
3302 peer_id: Some(peer),
3303 error: &libp2p::swarm::DialError::Aborted,
3304 connection_id: ConnectionId::new_unchecked(1337),
3305 }));
3306
3307 if let Some(PeerState::Backoff { timer_deadline, .. }) = notif.peers.get(&(peer, set_id)) {
3308 assert!(timer_deadline > &Instant::now());
3309 } else {
3310 panic!("invalid state");
3311 }
3312 }
3313
3314 #[tokio::test]
3315 async fn write_notification() {
3316 let (mut notif, _controller, _notif_service) = development_notifs();
3317 let peer = PeerId::random();
3318 let conn = ConnectionId::new_unchecked(0);
3319 let set_id = SetId::from(0);
3320 let connected = ConnectedPoint::Listener {
3321 local_addr: Multiaddr::empty(),
3322 send_back_addr: Multiaddr::empty(),
3323 };
3324 let mut conn_yielder = ConnectionYielder::new();
3325
3326 notif.on_swarm_event(FromSwarm::ConnectionEstablished(
3327 libp2p::swarm::behaviour::ConnectionEstablished {
3328 peer_id: peer,
3329 connection_id: conn,
3330 endpoint: &connected,
3331 failed_addresses: &[],
3332 other_established: 0usize,
3333 },
3334 ));
3335 assert!(std::matches!(notif.peers.get(&(peer, set_id)), Some(&PeerState::Disabled { .. })));
3336
3337 notif.peerset_report_connect(peer, set_id);
3338 assert!(std::matches!(notif.peers.get(&(peer, set_id)), Some(&PeerState::Enabled { .. })));
3339
3340 notif.on_connection_handler_event(
3341 peer,
3342 conn,
3343 conn_yielder.open_substream(peer, 0, vec![1, 2, 3, 4]),
3344 );
3345
3346 if let Some(PeerState::Enabled { ref connections, .. }) = notif.peers.get(&(peer, set_id)) {
3347 assert_eq!(connections[0].0, conn);
3348 assert!(std::matches!(connections[0].1, ConnectionState::Open(_)));
3349 } else {
3350 panic!("invalid state");
3351 }
3352
3353 notif
3354 .peers
3355 .get(&(peer, set_id))
3356 .unwrap()
3357 .get_open()
3358 .unwrap()
3359 .send_sync_notification(vec![1, 3, 3, 7]);
3360 assert_eq!(conn_yielder.get_next_event(peer, set_id.into()).await, Some(vec![1, 3, 3, 7]));
3361 }
3362
3363 #[test]
3364 fn peerset_report_connect_backoff_expired() {
3365 let (mut notif, _controller, _notif_service) = development_notifs();
3366 let set_id = SetId::from(0);
3367 let peer = PeerId::random();
3368 let conn = ConnectionId::new_unchecked(0);
3369 let connected = ConnectedPoint::Listener {
3370 local_addr: Multiaddr::empty(),
3371 send_back_addr: Multiaddr::empty(),
3372 };
3373 let backoff_duration = Duration::from_millis(100);
3374
3375 notif.on_swarm_event(FromSwarm::ConnectionEstablished(
3376 libp2p::swarm::behaviour::ConnectionEstablished {
3377 peer_id: peer,
3378 connection_id: conn,
3379 endpoint: &connected,
3380 failed_addresses: &[],
3381 other_established: 0usize,
3382 },
3383 ));
3384 assert!(std::matches!(notif.peers.get(&(peer, set_id)), Some(&PeerState::Disabled { .. })));
3385
3386 if let Some(PeerState::Disabled { ref mut backoff_until, .. }) =
3388 notif.peers.get_mut(&(peer, set_id))
3389 {
3390 *backoff_until = Some(Instant::now().checked_add(backoff_duration).unwrap());
3391 }
3392
3393 notif.on_swarm_event(FromSwarm::ConnectionClosed(
3394 libp2p::swarm::behaviour::ConnectionClosed {
3395 peer_id: peer,
3396 connection_id: conn,
3397 endpoint: &connected.clone(),
3398 handler: NotifsHandler::new(peer, vec![], None),
3399 remaining_established: 0usize,
3400 },
3401 ));
3402
3403 std::thread::sleep(backoff_duration * 2);
3405
3406 notif.peerset_report_connect(peer, set_id);
3408 assert!(std::matches!(notif.peers.get(&(peer, set_id)), Some(&PeerState::Requested { .. })))
3409 }
3410
3411 #[test]
3412 fn peerset_report_disconnect_disabled() {
3413 let (mut notif, _controller, _notif_service) = development_notifs();
3414 let peer = PeerId::random();
3415 let set_id = SetId::from(0);
3416 let conn = ConnectionId::new_unchecked(0);
3417 let connected = ConnectedPoint::Listener {
3418 local_addr: Multiaddr::empty(),
3419 send_back_addr: Multiaddr::empty(),
3420 };
3421
3422 notif.on_swarm_event(FromSwarm::ConnectionEstablished(
3423 libp2p::swarm::behaviour::ConnectionEstablished {
3424 peer_id: peer,
3425 connection_id: conn,
3426 endpoint: &connected,
3427 failed_addresses: &[],
3428 other_established: 0usize,
3429 },
3430 ));
3431 assert!(std::matches!(notif.peers.get(&(peer, set_id)), Some(&PeerState::Disabled { .. })));
3432
3433 notif.peerset_report_disconnect(peer, set_id);
3434 assert!(std::matches!(notif.peers.get(&(peer, set_id)), Some(&PeerState::Disabled { .. })));
3435 }
3436
3437 #[test]
3438 fn peerset_report_disconnect_backoff() {
3439 let (mut notif, _controller, _notif_service) = development_notifs();
3440 let set_id = SetId::from(0);
3441 let peer = PeerId::random();
3442 let conn = ConnectionId::new_unchecked(0);
3443 let connected = ConnectedPoint::Listener {
3444 local_addr: Multiaddr::empty(),
3445 send_back_addr: Multiaddr::empty(),
3446 };
3447 let backoff_duration = Duration::from_secs(2);
3448
3449 notif.on_swarm_event(FromSwarm::ConnectionEstablished(
3450 libp2p::swarm::behaviour::ConnectionEstablished {
3451 peer_id: peer,
3452 connection_id: conn,
3453 endpoint: &connected,
3454 failed_addresses: &[],
3455 other_established: 0usize,
3456 },
3457 ));
3458 assert!(std::matches!(notif.peers.get(&(peer, set_id)), Some(&PeerState::Disabled { .. })));
3459
3460 if let Some(PeerState::Disabled { ref mut backoff_until, .. }) =
3462 notif.peers.get_mut(&(peer, set_id))
3463 {
3464 *backoff_until = Some(Instant::now().checked_add(backoff_duration).unwrap());
3465 }
3466
3467 notif.on_swarm_event(FromSwarm::ConnectionClosed(
3468 libp2p::swarm::behaviour::ConnectionClosed {
3469 peer_id: peer,
3470 connection_id: conn,
3471 endpoint: &connected.clone(),
3472 handler: NotifsHandler::new(peer, vec![], None),
3473 remaining_established: 0usize,
3474 },
3475 ));
3476
3477 assert!(std::matches!(notif.peers.get(&(peer, set_id)), Some(&PeerState::Backoff { .. })));
3478
3479 notif.peerset_report_disconnect(peer, set_id);
3480 assert!(std::matches!(notif.peers.get(&(peer, set_id)), Some(&PeerState::Backoff { .. })));
3481 }
3482
3483 #[test]
3484 fn peer_is_backed_off_if_both_connections_get_closed_while_peer_is_disabled_with_back_off() {
3485 let (mut notif, _controller, _notif_service) = development_notifs();
3486 let set_id = SetId::from(0);
3487 let peer = PeerId::random();
3488 let conn1 = ConnectionId::new_unchecked(0);
3489 let conn2 = ConnectionId::new_unchecked(1);
3490 let connected = ConnectedPoint::Listener {
3491 local_addr: Multiaddr::empty(),
3492 send_back_addr: Multiaddr::empty(),
3493 };
3494
3495 notif.on_swarm_event(FromSwarm::ConnectionEstablished(
3496 libp2p::swarm::behaviour::ConnectionEstablished {
3497 peer_id: peer,
3498 connection_id: conn1,
3499 endpoint: &connected,
3500 failed_addresses: &[],
3501 other_established: 0usize,
3502 },
3503 ));
3504 notif.on_swarm_event(FromSwarm::ConnectionEstablished(
3505 libp2p::swarm::behaviour::ConnectionEstablished {
3506 peer_id: peer,
3507 connection_id: conn2,
3508 endpoint: &connected,
3509 failed_addresses: &[],
3510 other_established: 0usize,
3511 },
3512 ));
3513 assert!(std::matches!(notif.peers.get(&(peer, set_id)), Some(&PeerState::Disabled { .. })));
3514
3515 if let Some(PeerState::Disabled { ref mut backoff_until, .. }) =
3517 notif.peers.get_mut(&(peer, set_id))
3518 {
3519 *backoff_until =
3520 Some(Instant::now().checked_add(std::time::Duration::from_secs(5)).unwrap());
3521 }
3522
3523 notif.peerset_report_connect(peer, set_id);
3525 assert!(std::matches!(
3526 notif.peers.get(&(peer, set_id)),
3527 Some(&PeerState::DisabledPendingEnable { .. })
3528 ));
3529
3530 notif.on_swarm_event(FromSwarm::ConnectionClosed(
3531 libp2p::swarm::behaviour::ConnectionClosed {
3532 peer_id: peer,
3533 connection_id: conn1,
3534 endpoint: &connected.clone(),
3535 handler: NotifsHandler::new(peer, vec![], None),
3536 remaining_established: 0usize,
3537 },
3538 ));
3539 assert!(std::matches!(
3540 notif.peers.get(&(peer, set_id)),
3541 Some(&PeerState::DisabledPendingEnable { .. })
3542 ));
3543
3544 notif.on_swarm_event(FromSwarm::ConnectionClosed(
3545 libp2p::swarm::behaviour::ConnectionClosed {
3546 peer_id: peer,
3547 connection_id: conn2,
3548 endpoint: &connected.clone(),
3549 handler: NotifsHandler::new(peer, vec![], None),
3550 remaining_established: 0usize,
3551 },
3552 ));
3553 assert!(std::matches!(notif.peers.get(&(peer, set_id)), Some(&PeerState::Backoff { .. })));
3554 }
3555
3556 #[test]
3557 fn inject_connection_closed_incoming_with_backoff() {
3558 let (mut notif, _controller, _notif_service) = development_notifs();
3559 let peer = PeerId::random();
3560 let set_id = SetId::from(0);
3561 let conn = ConnectionId::new_unchecked(0);
3562 let connected = ConnectedPoint::Listener {
3563 local_addr: Multiaddr::empty(),
3564 send_back_addr: Multiaddr::empty(),
3565 };
3566
3567 notif.on_swarm_event(FromSwarm::ConnectionEstablished(
3568 libp2p::swarm::behaviour::ConnectionEstablished {
3569 peer_id: peer,
3570 connection_id: conn,
3571 endpoint: &connected,
3572 failed_addresses: &[],
3573 other_established: 0usize,
3574 },
3575 ));
3576 assert!(std::matches!(notif.peers.get(&(peer, set_id)), Some(&PeerState::Disabled { .. })));
3577
3578 notif.on_connection_handler_event(
3580 peer,
3581 conn,
3582 NotifsHandlerOut::OpenDesiredByRemote {
3583 protocol_index: 0,
3584 handshake: vec![1, 3, 3, 7],
3585 },
3586 );
3587
3588 if let Some(&mut PeerState::Incoming { ref mut backoff_until, .. }) =
3590 notif.peers.get_mut(&(peer, 0.into()))
3591 {
3592 *backoff_until =
3593 Some(Instant::now().checked_add(std::time::Duration::from_secs(5)).unwrap());
3594 } else {
3595 panic!("invalid state");
3596 }
3597
3598 notif.on_swarm_event(FromSwarm::ConnectionClosed(
3599 libp2p::swarm::behaviour::ConnectionClosed {
3600 peer_id: peer,
3601 connection_id: conn,
3602 endpoint: &connected.clone(),
3603 handler: NotifsHandler::new(peer, vec![], None),
3604 remaining_established: 0usize,
3605 },
3606 ));
3607 assert!(std::matches!(notif.peers.get(&(peer, set_id)), Some(&PeerState::Backoff { .. })));
3608 }
3609
3610 #[test]
3611 fn two_connections_inactive_connection_gets_closed_peer_state_is_still_incoming() {
3612 let (mut notif, _controller, _notif_service) = development_notifs();
3613 let peer = PeerId::random();
3614 let conn1 = ConnectionId::new_unchecked(0);
3615 let conn2 = ConnectionId::new_unchecked(1);
3616 let set_id = SetId::from(0);
3617 let connected = ConnectedPoint::Listener {
3618 local_addr: Multiaddr::empty(),
3619 send_back_addr: Multiaddr::empty(),
3620 };
3621
3622 for conn_id in vec![conn1, conn2] {
3624 notif.on_swarm_event(FromSwarm::ConnectionEstablished(
3625 libp2p::swarm::behaviour::ConnectionEstablished {
3626 peer_id: peer,
3627 connection_id: conn_id,
3628 endpoint: &connected,
3629 failed_addresses: &[],
3630 other_established: 0usize,
3631 },
3632 ));
3633 }
3634
3635 if let Some(PeerState::Disabled { connections, .. }) = notif.peers.get(&(peer, set_id)) {
3636 assert_eq!(connections[0], (conn1, ConnectionState::Closed));
3637 assert_eq!(connections[1], (conn2, ConnectionState::Closed));
3638 } else {
3639 panic!("invalid state");
3640 }
3641
3642 notif.on_connection_handler_event(
3644 peer,
3645 conn1,
3646 NotifsHandlerOut::OpenDesiredByRemote {
3647 protocol_index: 0,
3648 handshake: vec![1, 3, 3, 7],
3649 },
3650 );
3651 assert!(std::matches!(
3652 notif.peers.get_mut(&(peer, 0.into())),
3653 Some(&mut PeerState::Incoming { .. })
3654 ));
3655
3656 notif.on_swarm_event(FromSwarm::ConnectionClosed(
3657 libp2p::swarm::behaviour::ConnectionClosed {
3658 peer_id: peer,
3659 connection_id: conn2,
3660 endpoint: &connected.clone(),
3661 handler: NotifsHandler::new(peer, vec![], None),
3662 remaining_established: 0usize,
3663 },
3664 ));
3665 assert!(std::matches!(notif.peers.get(&(peer, set_id)), Some(&PeerState::Incoming { .. })));
3666 }
3667
3668 #[test]
3669 fn two_connections_active_connection_gets_closed_peer_state_is_disabled() {
3670 let (mut notif, _controller, _notif_service) = development_notifs();
3671 let peer = PeerId::random();
3672 let conn1 = ConnectionId::new_unchecked(0);
3673 let conn2 = ConnectionId::new_unchecked(1);
3674 let set_id = SetId::from(0);
3675 let connected = ConnectedPoint::Listener {
3676 local_addr: Multiaddr::empty(),
3677 send_back_addr: Multiaddr::empty(),
3678 };
3679
3680 for conn_id in vec![conn1, conn2] {
3682 notif.on_swarm_event(FromSwarm::ConnectionEstablished(
3683 libp2p::swarm::behaviour::ConnectionEstablished {
3684 peer_id: peer,
3685 connection_id: conn_id,
3686 endpoint: &ConnectedPoint::Listener {
3687 local_addr: Multiaddr::empty(),
3688 send_back_addr: Multiaddr::empty(),
3689 },
3690 failed_addresses: &[],
3691 other_established: 0usize,
3692 },
3693 ));
3694 }
3695
3696 if let Some(PeerState::Disabled { connections, .. }) = notif.peers.get(&(peer, set_id)) {
3697 assert_eq!(connections[0], (conn1, ConnectionState::Closed));
3698 assert_eq!(connections[1], (conn2, ConnectionState::Closed));
3699 } else {
3700 panic!("invalid state");
3701 }
3702
3703 notif.on_connection_handler_event(
3705 peer,
3706 conn1,
3707 NotifsHandlerOut::OpenDesiredByRemote {
3708 protocol_index: 0,
3709 handshake: vec![1, 3, 3, 7],
3710 },
3711 );
3712 assert!(std::matches!(
3713 notif.peers.get_mut(&(peer, 0.into())),
3714 Some(PeerState::Incoming { .. })
3715 ));
3716
3717 notif.on_swarm_event(FromSwarm::ConnectionClosed(
3718 libp2p::swarm::behaviour::ConnectionClosed {
3719 peer_id: peer,
3720 connection_id: conn1,
3721 endpoint: &connected.clone(),
3722 handler: NotifsHandler::new(peer, vec![], None),
3723 remaining_established: 0usize,
3724 },
3725 ));
3726 assert!(std::matches!(notif.peers.get(&(peer, set_id)), Some(&PeerState::Disabled { .. })));
3727 }
3728
3729 #[test]
3730 fn inject_connection_closed_for_active_connection() {
3731 let (mut notif, _controller, _notif_service) = development_notifs();
3732 let peer = PeerId::random();
3733 let conn1 = ConnectionId::new_unchecked(0);
3734 let conn2 = ConnectionId::new_unchecked(1);
3735 let set_id = SetId::from(0);
3736 let connected = ConnectedPoint::Listener {
3737 local_addr: Multiaddr::empty(),
3738 send_back_addr: Multiaddr::empty(),
3739 };
3740 let mut conn_yielder = ConnectionYielder::new();
3741
3742 for conn_id in vec![conn1, conn2] {
3744 notif.on_swarm_event(FromSwarm::ConnectionEstablished(
3745 libp2p::swarm::behaviour::ConnectionEstablished {
3746 peer_id: peer,
3747 connection_id: conn_id,
3748 endpoint: &connected,
3749 failed_addresses: &[],
3750 other_established: 0usize,
3751 },
3752 ));
3753 }
3754
3755 if let Some(PeerState::Disabled { connections, .. }) = notif.peers.get(&(peer, set_id)) {
3756 assert_eq!(connections[0], (conn1, ConnectionState::Closed));
3757 assert_eq!(connections[1], (conn2, ConnectionState::Closed));
3758 } else {
3759 panic!("invalid state");
3760 }
3761
3762 notif.peerset_report_connect(peer, set_id);
3764
3765 if let Some(PeerState::Enabled { connections, .. }) = notif.peers.get(&(peer, set_id)) {
3766 assert_eq!(connections[0], (conn1, ConnectionState::Opening));
3767 assert_eq!(connections[1], (conn2, ConnectionState::Closed));
3768 } else {
3769 panic!("invalid state");
3770 }
3771
3772 notif.on_connection_handler_event(
3773 peer,
3774 conn1,
3775 conn_yielder.open_substream(peer, 0, vec![1, 2, 3, 4]),
3776 );
3777
3778 if let Some(PeerState::Enabled { ref connections, .. }) = notif.peers.get(&(peer, set_id)) {
3779 assert!(std::matches!(connections[0].1, ConnectionState::Open(_)));
3780 assert_eq!(connections[0].0, conn1);
3781 assert_eq!(connections[1], (conn2, ConnectionState::Closed));
3782 } else {
3783 panic!("invalid state");
3784 }
3785
3786 notif.on_swarm_event(FromSwarm::ConnectionClosed(
3787 libp2p::swarm::behaviour::ConnectionClosed {
3788 peer_id: peer,
3789 connection_id: conn1,
3790 endpoint: &connected.clone(),
3791 handler: NotifsHandler::new(peer, vec![], None),
3792 remaining_established: 0usize,
3793 },
3794 ));
3795 }
3796
3797 #[test]
3798 fn inject_dial_failure_for_pending_request() {
3799 let (mut notif, _controller, _notif_service) = development_notifs();
3800 let set_id = SetId::from(0);
3801 let peer = PeerId::random();
3802 let conn = ConnectionId::new_unchecked(0);
3803 let connected = ConnectedPoint::Listener {
3804 local_addr: Multiaddr::empty(),
3805 send_back_addr: Multiaddr::empty(),
3806 };
3807
3808 notif.on_swarm_event(FromSwarm::ConnectionEstablished(
3809 libp2p::swarm::behaviour::ConnectionEstablished {
3810 peer_id: peer,
3811 connection_id: conn,
3812 endpoint: &connected,
3813 failed_addresses: &[],
3814 other_established: 0usize,
3815 },
3816 ));
3817 assert!(std::matches!(notif.peers.get(&(peer, set_id)), Some(&PeerState::Disabled { .. })));
3818
3819 if let Some(PeerState::Disabled { ref mut backoff_until, .. }) =
3821 notif.peers.get_mut(&(peer, set_id))
3822 {
3823 *backoff_until =
3824 Some(Instant::now().checked_add(std::time::Duration::from_secs(5)).unwrap());
3825 }
3826
3827 notif.on_swarm_event(FromSwarm::ConnectionClosed(
3828 libp2p::swarm::behaviour::ConnectionClosed {
3829 peer_id: peer,
3830 connection_id: conn,
3831 endpoint: &connected.clone(),
3832 handler: NotifsHandler::new(peer, vec![], None),
3833 remaining_established: 0usize,
3834 },
3835 ));
3836
3837 assert!(std::matches!(notif.peers.get(&(peer, set_id)), Some(&PeerState::Backoff { .. })));
3838
3839 notif.peerset_report_connect(peer, set_id);
3841 assert!(std::matches!(
3842 notif.peers.get(&(peer, set_id)),
3843 Some(&PeerState::PendingRequest { .. })
3844 ));
3845
3846 let now = Instant::now();
3847 notif.on_swarm_event(FromSwarm::DialFailure(libp2p::swarm::behaviour::DialFailure {
3848 peer_id: Some(peer),
3849 error: &libp2p::swarm::DialError::Aborted,
3850 connection_id: ConnectionId::new_unchecked(0),
3851 }));
3852
3853 if let Some(PeerState::PendingRequest { ref timer_deadline, .. }) =
3854 notif.peers.get(&(peer, set_id))
3855 {
3856 assert!(timer_deadline > &(now + std::time::Duration::from_secs(5)));
3857 }
3858 }
3859
3860 #[test]
3861 fn peerstate_incoming_open_desired_by_remote() {
3862 let (mut notif, _controller, _notif_service) = development_notifs();
3863 let peer = PeerId::random();
3864 let set_id = SetId::from(0);
3865 let conn1 = ConnectionId::new_unchecked(0);
3866 let conn2 = ConnectionId::new_unchecked(1);
3867 let connected = ConnectedPoint::Listener {
3868 local_addr: Multiaddr::empty(),
3869 send_back_addr: Multiaddr::empty(),
3870 };
3871
3872 notif.on_swarm_event(FromSwarm::ConnectionEstablished(
3873 libp2p::swarm::behaviour::ConnectionEstablished {
3874 peer_id: peer,
3875 connection_id: conn1,
3876 endpoint: &connected,
3877 failed_addresses: &[],
3878 other_established: 0usize,
3879 },
3880 ));
3881 notif.on_swarm_event(FromSwarm::ConnectionEstablished(
3882 libp2p::swarm::behaviour::ConnectionEstablished {
3883 peer_id: peer,
3884 connection_id: conn2,
3885 endpoint: &connected,
3886 failed_addresses: &[],
3887 other_established: 0usize,
3888 },
3889 ));
3890 assert!(std::matches!(notif.peers.get(&(peer, set_id)), Some(&PeerState::Disabled { .. })));
3891
3892 notif.on_connection_handler_event(
3894 peer,
3895 conn1,
3896 NotifsHandlerOut::OpenDesiredByRemote {
3897 protocol_index: 0,
3898 handshake: vec![1, 3, 3, 7],
3899 },
3900 );
3901 assert!(std::matches!(notif.peers.get(&(peer, set_id)), Some(&PeerState::Incoming { .. })));
3902
3903 notif.on_connection_handler_event(
3905 peer,
3906 conn2,
3907 NotifsHandlerOut::OpenDesiredByRemote {
3908 protocol_index: 0,
3909 handshake: vec![1, 3, 3, 7],
3910 },
3911 );
3912
3913 if let Some(PeerState::Incoming { ref connections, .. }) = notif.peers.get(&(peer, set_id))
3914 {
3915 assert_eq!(connections[0], (conn1, ConnectionState::OpenDesiredByRemote));
3916 assert_eq!(connections[1], (conn2, ConnectionState::OpenDesiredByRemote));
3917 }
3918 }
3919
3920 #[tokio::test]
3921 async fn remove_backoff_peer_after_timeout() {
3922 let (mut notif, _controller, _notif_service) = development_notifs();
3923 let peer = PeerId::random();
3924 let set_id = SetId::from(0);
3925 let conn = ConnectionId::new_unchecked(0);
3926 let connected = ConnectedPoint::Listener {
3927 local_addr: Multiaddr::empty(),
3928 send_back_addr: Multiaddr::empty(),
3929 };
3930
3931 notif.on_swarm_event(FromSwarm::ConnectionEstablished(
3932 libp2p::swarm::behaviour::ConnectionEstablished {
3933 peer_id: peer,
3934 connection_id: conn,
3935 endpoint: &connected,
3936 failed_addresses: &[],
3937 other_established: 0usize,
3938 },
3939 ));
3940
3941 if let Some(&mut PeerState::Disabled { ref mut backoff_until, .. }) =
3942 notif.peers.get_mut(&(peer, 0.into()))
3943 {
3944 *backoff_until =
3945 Some(Instant::now().checked_add(std::time::Duration::from_millis(100)).unwrap());
3946 } else {
3947 panic!("invalid state");
3948 }
3949
3950 notif.on_swarm_event(FromSwarm::ConnectionClosed(
3951 libp2p::swarm::behaviour::ConnectionClosed {
3952 peer_id: peer,
3953 connection_id: conn,
3954 endpoint: &connected.clone(),
3955 handler: NotifsHandler::new(peer, vec![], None),
3956 remaining_established: 0usize,
3957 },
3958 ));
3959
3960 let until = if let Some(&PeerState::Backoff { timer_deadline, .. }) =
3961 notif.peers.get(&(peer, set_id))
3962 {
3963 timer_deadline
3964 } else {
3965 panic!("invalid state");
3966 };
3967
3968 if until > Instant::now() {
3969 std::thread::sleep(until - Instant::now());
3970 }
3971
3972 assert!(notif.peers.get(&(peer, set_id)).is_some());
3973
3974 if tokio::time::timeout(Duration::from_secs(5), async {
3975 let mut params = MockPollParams {};
3976
3977 loop {
3978 futures::future::poll_fn(|cx| {
3979 let _ = notif.poll(cx, &mut params);
3980 Poll::Ready(())
3981 })
3982 .await;
3983
3984 if notif.peers.get(&(peer, set_id)).is_none() {
3985 break
3986 }
3987 }
3988 })
3989 .await
3990 .is_err()
3991 {
3992 panic!("backoff peer was not removed in time");
3993 }
3994
3995 assert!(notif.peers.get(&(peer, set_id)).is_none());
3996 }
3997
3998 #[tokio::test]
3999 async fn reschedule_disabled_pending_enable_when_connection_not_closed() {
4000 let (mut notif, _controller, _notif_service) = development_notifs();
4001 let peer = PeerId::random();
4002 let conn = ConnectionId::new_unchecked(0);
4003 let set_id = SetId::from(0);
4004 let mut conn_yielder = ConnectionYielder::new();
4005
4006 notif.on_swarm_event(FromSwarm::ConnectionEstablished(
4008 libp2p::swarm::behaviour::ConnectionEstablished {
4009 peer_id: peer,
4010 connection_id: conn,
4011 endpoint: &ConnectedPoint::Listener {
4012 local_addr: Multiaddr::empty(),
4013 send_back_addr: Multiaddr::empty(),
4014 },
4015 failed_addresses: &[],
4016 other_established: 0usize,
4017 },
4018 ));
4019 assert!(std::matches!(notif.peers.get(&(peer, set_id)), Some(&PeerState::Disabled { .. })));
4020
4021 notif.on_connection_handler_event(
4023 peer,
4024 conn,
4025 NotifsHandlerOut::OpenDesiredByRemote {
4026 protocol_index: 0,
4027 handshake: vec![1, 3, 3, 7],
4028 },
4029 );
4030 assert!(std::matches!(notif.peers.get(&(peer, set_id)), Some(&PeerState::Incoming { .. })));
4031
4032 notif.protocol_report_accept(IncomingIndex(0));
4035 assert!(std::matches!(notif.peers.get(&(peer, set_id)), Some(&PeerState::Enabled { .. })));
4036
4037 let event = conn_yielder.open_substream(peer, 0, vec![1, 2, 3, 4]);
4038
4039 notif.on_connection_handler_event(peer, conn, event);
4040 assert!(std::matches!(notif.peers.get(&(peer, set_id)), Some(&PeerState::Enabled { .. })));
4041
4042 if let Some(PeerState::Enabled { ref connections, .. }) = notif.peers.get(&(peer, set_id)) {
4043 assert!(std::matches!(connections[0], (_, ConnectionState::Open(_))));
4044 assert_eq!(connections[0].0, conn);
4045 } else {
4046 panic!("invalid state");
4047 }
4048
4049 notif.peerset_report_disconnect(peer, set_id);
4050
4051 if let Some(PeerState::Disabled { ref connections, ref mut backoff_until }) =
4052 notif.peers.get_mut(&(peer, set_id))
4053 {
4054 assert!(std::matches!(connections[0], (_, ConnectionState::Closing)));
4055 assert_eq!(connections[0].0, conn);
4056
4057 *backoff_until =
4058 Some(Instant::now().checked_add(std::time::Duration::from_secs(2)).unwrap());
4059 } else {
4060 panic!("invalid state");
4061 }
4062
4063 notif.peerset_report_connect(peer, set_id);
4064
4065 let prev_instant =
4066 if let Some(PeerState::DisabledPendingEnable {
4067 ref connections, timer_deadline, ..
4068 }) = notif.peers.get(&(peer, set_id))
4069 {
4070 assert!(std::matches!(connections[0], (_, ConnectionState::Closing)));
4071 assert_eq!(connections[0].0, conn);
4072
4073 *timer_deadline
4074 } else {
4075 panic!("invalid state");
4076 };
4077
4078 if tokio::time::timeout(Duration::from_secs(5), async {
4083 let mut params = MockPollParams {};
4084
4085 loop {
4086 futures::future::poll_fn(|cx| {
4087 let _ = notif.poll(cx, &mut params);
4088 Poll::Ready(())
4089 })
4090 .await;
4091
4092 if let Some(PeerState::DisabledPendingEnable {
4093 timer_deadline, connections, ..
4094 }) = notif.peers.get(&(peer, set_id))
4095 {
4096 assert!(std::matches!(connections[0], (_, ConnectionState::Closing)));
4097
4098 if timer_deadline != &prev_instant {
4099 break
4100 }
4101 } else {
4102 panic!("invalid state");
4103 }
4104 }
4105 })
4106 .await
4107 .is_err()
4108 {
4109 panic!("backoff peer was not removed in time");
4110 }
4111 }
4112
4113 #[test]
4114 #[should_panic]
4115 #[cfg(debug_assertions)]
4116 fn peerset_report_connect_with_enabled_peer() {
4117 let (mut notif, _controller, _notif_service) = development_notifs();
4118 let peer = PeerId::random();
4119 let conn = ConnectionId::new_unchecked(0);
4120 let set_id = SetId::from(0);
4121 let connected = ConnectedPoint::Listener {
4122 local_addr: Multiaddr::empty(),
4123 send_back_addr: Multiaddr::empty(),
4124 };
4125 let mut conn_yielder = ConnectionYielder::new();
4126
4127 notif.on_swarm_event(FromSwarm::ConnectionEstablished(
4129 libp2p::swarm::behaviour::ConnectionEstablished {
4130 peer_id: peer,
4131 connection_id: conn,
4132 endpoint: &connected,
4133 failed_addresses: &[],
4134 other_established: 0usize,
4135 },
4136 ));
4137 assert!(std::matches!(notif.peers.get(&(peer, set_id)), Some(&PeerState::Disabled { .. })));
4138
4139 notif.on_connection_handler_event(
4140 peer,
4141 conn,
4142 NotifsHandlerOut::OpenDesiredByRemote {
4143 protocol_index: 0,
4144 handshake: vec![1, 3, 3, 7],
4145 },
4146 );
4147 assert!(std::matches!(notif.peers.get(&(peer, set_id)), Some(&PeerState::Incoming { .. })));
4148
4149 notif.peerset_report_connect(peer, set_id);
4150 assert!(std::matches!(notif.peers.get(&(peer, set_id)), Some(&PeerState::Enabled { .. })));
4151
4152 let event = conn_yielder.open_substream(peer, 0, vec![1, 2, 3, 4]);
4153
4154 notif.on_connection_handler_event(peer, conn, event);
4155 assert!(std::matches!(notif.peers.get(&(peer, set_id)), Some(&PeerState::Enabled { .. })));
4156
4157 if let Some(PeerState::Enabled { ref connections, .. }) = notif.peers.get(&(peer, set_id)) {
4158 assert!(std::matches!(connections[0], (_, ConnectionState::Open(_))));
4159 assert_eq!(connections[0].0, conn);
4160 } else {
4161 panic!("invalid state");
4162 }
4163
4164 notif.peerset_report_connect(peer, set_id);
4165 }
4166
4167 #[test]
4168 #[cfg(debug_assertions)]
4169 fn peerset_report_connect_with_disabled_pending_enable_peer() {
4170 let (mut notif, _controller, _notif_service) = development_notifs();
4171 let set_id = SetId::from(0);
4172 let peer = PeerId::random();
4173 let conn = ConnectionId::new_unchecked(0);
4174 let connected = ConnectedPoint::Listener {
4175 local_addr: Multiaddr::empty(),
4176 send_back_addr: Multiaddr::empty(),
4177 };
4178
4179 notif.on_swarm_event(FromSwarm::ConnectionEstablished(
4180 libp2p::swarm::behaviour::ConnectionEstablished {
4181 peer_id: peer,
4182 connection_id: conn,
4183 endpoint: &connected,
4184 failed_addresses: &[],
4185 other_established: 0usize,
4186 },
4187 ));
4188 assert!(std::matches!(notif.peers.get(&(peer, set_id)), Some(&PeerState::Disabled { .. })));
4189
4190 if let Some(PeerState::Disabled { ref mut backoff_until, .. }) =
4192 notif.peers.get_mut(&(peer, set_id))
4193 {
4194 *backoff_until =
4195 Some(Instant::now().checked_add(std::time::Duration::from_secs(5)).unwrap());
4196 }
4197
4198 notif.peerset_report_connect(peer, set_id);
4200 assert!(std::matches!(
4201 notif.peers.get(&(peer, set_id)),
4202 Some(&PeerState::DisabledPendingEnable { .. })
4203 ));
4204
4205 notif.peerset_report_connect(peer, set_id);
4207 assert!(std::matches!(
4208 notif.peers.get(&(peer, set_id)),
4209 Some(&PeerState::DisabledPendingEnable { .. })
4210 ));
4211 }
4212
4213 #[test]
4214 #[cfg(debug_assertions)]
4215 fn peerset_report_connect_with_requested_peer() {
4216 let (mut notif, _controller, _notif_service) = development_notifs();
4217 let peer = PeerId::random();
4218 let set_id = SetId::from(0);
4219
4220 notif.peerset_report_connect(peer, set_id);
4222 assert!(std::matches!(notif.peers.get(&(peer, set_id)), Some(&PeerState::Requested)));
4223
4224 notif.peerset_report_connect(peer, set_id);
4226 assert!(std::matches!(notif.peers.get(&(peer, set_id)), Some(&PeerState::Requested)));
4227 }
4228
4229 #[test]
4230 #[cfg(debug_assertions)]
4231 fn peerset_report_connect_with_pending_requested() {
4232 let (mut notif, _controller, _notif_service) = development_notifs();
4233 let set_id = SetId::from(0);
4234 let peer = PeerId::random();
4235 let conn = ConnectionId::new_unchecked(0);
4236 let connected = ConnectedPoint::Listener {
4237 local_addr: Multiaddr::empty(),
4238 send_back_addr: Multiaddr::empty(),
4239 };
4240
4241 notif.on_swarm_event(FromSwarm::ConnectionEstablished(
4242 libp2p::swarm::behaviour::ConnectionEstablished {
4243 peer_id: peer,
4244 connection_id: conn,
4245 endpoint: &connected,
4246 failed_addresses: &[],
4247 other_established: 0usize,
4248 },
4249 ));
4250 assert!(std::matches!(notif.peers.get(&(peer, set_id)), Some(&PeerState::Disabled { .. })));
4251
4252 if let Some(PeerState::Disabled { ref mut backoff_until, .. }) =
4254 notif.peers.get_mut(&(peer, set_id))
4255 {
4256 *backoff_until =
4257 Some(Instant::now().checked_add(std::time::Duration::from_secs(5)).unwrap());
4258 }
4259
4260 notif.on_swarm_event(FromSwarm::ConnectionClosed(
4261 libp2p::swarm::behaviour::ConnectionClosed {
4262 peer_id: peer,
4263 connection_id: conn,
4264 endpoint: &connected.clone(),
4265 handler: NotifsHandler::new(peer, vec![], None),
4266 remaining_established: 0usize,
4267 },
4268 ));
4269 assert!(std::matches!(notif.peers.get(&(peer, set_id)), Some(&PeerState::Backoff { .. })));
4270
4271 notif.peerset_report_connect(peer, set_id);
4273 assert!(std::matches!(
4274 notif.peers.get(&(peer, set_id)),
4275 Some(&PeerState::PendingRequest { .. })
4276 ));
4277
4278 notif.peerset_report_connect(peer, set_id);
4280 assert!(std::matches!(
4281 notif.peers.get(&(peer, set_id)),
4282 Some(&PeerState::PendingRequest { .. })
4283 ));
4284 }
4285
4286 #[test]
4287 #[cfg(debug_assertions)]
4288 fn peerset_report_connect_with_incoming_peer() {
4289 let (mut notif, _controller, _notif_service) = development_notifs();
4290 let peer = PeerId::random();
4291 let set_id = SetId::from(0);
4292 let conn = ConnectionId::new_unchecked(0);
4293 let connected = ConnectedPoint::Listener {
4294 local_addr: Multiaddr::empty(),
4295 send_back_addr: Multiaddr::empty(),
4296 };
4297
4298 notif.on_swarm_event(FromSwarm::ConnectionEstablished(
4299 libp2p::swarm::behaviour::ConnectionEstablished {
4300 peer_id: peer,
4301 connection_id: conn,
4302 endpoint: &connected,
4303 failed_addresses: &[],
4304 other_established: 0usize,
4305 },
4306 ));
4307 assert!(std::matches!(notif.peers.get(&(peer, set_id)), Some(&PeerState::Disabled { .. })));
4308
4309 notif.on_connection_handler_event(
4311 peer,
4312 conn,
4313 NotifsHandlerOut::OpenDesiredByRemote {
4314 protocol_index: 0,
4315 handshake: vec![1, 3, 3, 7],
4316 },
4317 );
4318 assert!(std::matches!(notif.peers.get(&(peer, set_id)), Some(&PeerState::Incoming { .. })));
4319
4320 notif.peerset_report_connect(peer, set_id);
4321 assert!(std::matches!(notif.peers.get(&(peer, set_id)), Some(&PeerState::Incoming { .. })));
4322 }
4323
4324 #[test]
4325 #[cfg(debug_assertions)]
4326 fn peerset_report_disconnect_with_incoming_peer() {
4327 let (mut notif, _controller, _notif_service) = development_notifs();
4328 let peer = PeerId::random();
4329 let set_id = SetId::from(0);
4330 let conn = ConnectionId::new_unchecked(0);
4331 let connected = ConnectedPoint::Listener {
4332 local_addr: Multiaddr::empty(),
4333 send_back_addr: Multiaddr::empty(),
4334 };
4335
4336 notif.on_swarm_event(FromSwarm::ConnectionEstablished(
4337 libp2p::swarm::behaviour::ConnectionEstablished {
4338 peer_id: peer,
4339 connection_id: conn,
4340 endpoint: &connected,
4341 failed_addresses: &[],
4342 other_established: 0usize,
4343 },
4344 ));
4345 assert!(std::matches!(notif.peers.get(&(peer, set_id)), Some(&PeerState::Disabled { .. })));
4346
4347 notif.on_connection_handler_event(
4349 peer,
4350 conn,
4351 NotifsHandlerOut::OpenDesiredByRemote {
4352 protocol_index: 0,
4353 handshake: vec![1, 3, 3, 7],
4354 },
4355 );
4356 assert!(std::matches!(notif.peers.get(&(peer, set_id)), Some(&PeerState::Incoming { .. })));
4357
4358 notif.peerset_report_disconnect(peer, set_id);
4359 assert!(std::matches!(notif.peers.get(&(peer, set_id)), Some(&PeerState::Incoming { .. })));
4360 }
4361
4362 #[test]
4363 #[cfg(debug_assertions)]
4364 fn peerset_report_disconnect_with_incoming_peer_protocol_accepts() {
4365 let (mut notif, _controller, _notif_service) = development_notifs();
4366 let peer = PeerId::random();
4367 let set_id = SetId::from(0);
4368 let conn = ConnectionId::new_unchecked(0);
4369 let connected = ConnectedPoint::Listener {
4370 local_addr: Multiaddr::empty(),
4371 send_back_addr: Multiaddr::empty(),
4372 };
4373
4374 notif.on_swarm_event(FromSwarm::ConnectionEstablished(
4375 libp2p::swarm::behaviour::ConnectionEstablished {
4376 peer_id: peer,
4377 connection_id: conn,
4378 endpoint: &connected,
4379 failed_addresses: &[],
4380 other_established: 0usize,
4381 },
4382 ));
4383 assert!(std::matches!(notif.peers.get(&(peer, set_id)), Some(&PeerState::Disabled { .. })));
4384
4385 notif.on_connection_handler_event(
4387 peer,
4388 conn,
4389 NotifsHandlerOut::OpenDesiredByRemote {
4390 protocol_index: 0,
4391 handshake: vec![1, 3, 3, 7],
4392 },
4393 );
4394 assert!(std::matches!(notif.peers.get(&(peer, set_id)), Some(&PeerState::Incoming { .. })));
4395
4396 notif.peerset_report_disconnect(peer, set_id);
4399
4400 let incoming_index = match notif.peers.get(&(peer, set_id)) {
4401 Some(&PeerState::Incoming { peerset_rejected, incoming_index, .. }) => {
4402 assert!(peerset_rejected);
4403 incoming_index
4404 },
4405 state => panic!("invalid state: {state:?}"),
4406 };
4407
4408 notif.protocol_report_accept(incoming_index);
4411
4412 match notif.peers.get(&(peer, set_id)) {
4413 Some(&PeerState::Disabled { .. }) => {},
4414 state => panic!("invalid state: {state:?}"),
4415 };
4416 }
4417
4418 #[test]
4419 #[cfg(debug_assertions)]
4420 fn peer_disconnected_protocol_accepts() {
4421 let (mut notif, _controller, _notif_service) = development_notifs();
4422 let peer = PeerId::random();
4423 let set_id = SetId::from(0);
4424 let conn = ConnectionId::new_unchecked(0);
4425 let connected = ConnectedPoint::Listener {
4426 local_addr: Multiaddr::empty(),
4427 send_back_addr: Multiaddr::empty(),
4428 };
4429
4430 notif.on_swarm_event(FromSwarm::ConnectionEstablished(
4431 libp2p::swarm::behaviour::ConnectionEstablished {
4432 peer_id: peer,
4433 connection_id: conn,
4434 endpoint: &connected,
4435 failed_addresses: &[],
4436 other_established: 0usize,
4437 },
4438 ));
4439 assert!(std::matches!(notif.peers.get(&(peer, set_id)), Some(&PeerState::Disabled { .. })));
4440
4441 notif.on_connection_handler_event(
4443 peer,
4444 conn,
4445 NotifsHandlerOut::OpenDesiredByRemote {
4446 protocol_index: 0,
4447 handshake: vec![1, 3, 3, 7],
4448 },
4449 );
4450 assert!(std::matches!(notif.peers.get(&(peer, set_id)), Some(&PeerState::Incoming { .. })));
4451
4452 assert!(notif.incoming.iter().any(|entry| entry.incoming_id == IncomingIndex(0)));
4453 notif.disconnect_peer(&peer, set_id);
4454
4455 notif.protocol_report_accept(IncomingIndex(0));
4458
4459 match notif.peers.get(&(peer, set_id)) {
4460 Some(&PeerState::Disabled { .. }) => {},
4461 state => panic!("invalid state: {state:?}"),
4462 };
4463
4464 assert!(!notif.incoming.iter().any(|entry| entry.incoming_id == IncomingIndex(0)));
4465 }
4466
4467 #[test]
4468 #[cfg(debug_assertions)]
4469 fn connection_closed_protocol_accepts() {
4470 let (mut notif, _controller, _notif_service) = development_notifs();
4471 let peer = PeerId::random();
4472 let set_id = SetId::from(0);
4473 let conn = ConnectionId::new_unchecked(0);
4474 let connected = ConnectedPoint::Listener {
4475 local_addr: Multiaddr::empty(),
4476 send_back_addr: Multiaddr::empty(),
4477 };
4478
4479 notif.on_swarm_event(FromSwarm::ConnectionEstablished(
4480 libp2p::swarm::behaviour::ConnectionEstablished {
4481 peer_id: peer,
4482 connection_id: conn,
4483 endpoint: &connected,
4484 failed_addresses: &[],
4485 other_established: 0usize,
4486 },
4487 ));
4488 assert!(std::matches!(notif.peers.get(&(peer, set_id)), Some(&PeerState::Disabled { .. })));
4489
4490 notif.on_connection_handler_event(
4492 peer,
4493 conn,
4494 NotifsHandlerOut::OpenDesiredByRemote {
4495 protocol_index: 0,
4496 handshake: vec![1, 3, 3, 7],
4497 },
4498 );
4499 assert!(std::matches!(notif.peers.get(&(peer, set_id)), Some(&PeerState::Incoming { .. })));
4500
4501 notif.on_swarm_event(FromSwarm::ConnectionClosed(
4502 libp2p::swarm::behaviour::ConnectionClosed {
4503 peer_id: peer,
4504 connection_id: ConnectionId::new_unchecked(0),
4505 endpoint: &connected.clone(),
4506 handler: NotifsHandler::new(peer, vec![], None),
4507 remaining_established: 0usize,
4508 },
4509 ));
4510
4511 notif.protocol_report_accept(IncomingIndex(0));
4513
4514 match notif.peers.get(&(peer, set_id)) {
4515 None => {},
4516 state => panic!("invalid state: {state:?}"),
4517 };
4518 }
4519
4520 #[test]
4521 #[cfg(debug_assertions)]
4522 fn peer_disconnected_protocol_reject() {
4523 let (mut notif, _controller, _notif_service) = development_notifs();
4524 let peer = PeerId::random();
4525 let set_id = SetId::from(0);
4526 let conn = ConnectionId::new_unchecked(0);
4527 let connected = ConnectedPoint::Listener {
4528 local_addr: Multiaddr::empty(),
4529 send_back_addr: Multiaddr::empty(),
4530 };
4531
4532 notif.on_swarm_event(FromSwarm::ConnectionEstablished(
4533 libp2p::swarm::behaviour::ConnectionEstablished {
4534 peer_id: peer,
4535 connection_id: conn,
4536 endpoint: &connected,
4537 failed_addresses: &[],
4538 other_established: 0usize,
4539 },
4540 ));
4541 assert!(std::matches!(notif.peers.get(&(peer, set_id)), Some(&PeerState::Disabled { .. })));
4542
4543 notif.on_connection_handler_event(
4545 peer,
4546 conn,
4547 NotifsHandlerOut::OpenDesiredByRemote {
4548 protocol_index: 0,
4549 handshake: vec![1, 3, 3, 7],
4550 },
4551 );
4552 assert!(std::matches!(notif.peers.get(&(peer, set_id)), Some(&PeerState::Incoming { .. })));
4553
4554 assert!(notif.incoming.iter().any(|entry| entry.incoming_id == IncomingIndex(0)));
4555 notif.disconnect_peer(&peer, set_id);
4556
4557 notif.protocol_report_reject(IncomingIndex(0));
4560
4561 match notif.peers.get(&(peer, set_id)) {
4562 Some(&PeerState::Disabled { .. }) => {},
4563 state => panic!("invalid state: {state:?}"),
4564 };
4565
4566 assert!(!notif.incoming.iter().any(|entry| entry.incoming_id == IncomingIndex(0)));
4567 }
4568
4569 #[test]
4570 #[cfg(debug_assertions)]
4571 fn connection_closed_protocol_rejects() {
4572 let (mut notif, _controller, _notif_service) = development_notifs();
4573 let peer = PeerId::random();
4574 let set_id = SetId::from(0);
4575 let conn = ConnectionId::new_unchecked(0);
4576 let connected = ConnectedPoint::Listener {
4577 local_addr: Multiaddr::empty(),
4578 send_back_addr: Multiaddr::empty(),
4579 };
4580
4581 notif.on_swarm_event(FromSwarm::ConnectionEstablished(
4582 libp2p::swarm::behaviour::ConnectionEstablished {
4583 peer_id: peer,
4584 connection_id: conn,
4585 endpoint: &connected,
4586 failed_addresses: &[],
4587 other_established: 0usize,
4588 },
4589 ));
4590 assert!(std::matches!(notif.peers.get(&(peer, set_id)), Some(&PeerState::Disabled { .. })));
4591
4592 notif.on_connection_handler_event(
4594 peer,
4595 conn,
4596 NotifsHandlerOut::OpenDesiredByRemote {
4597 protocol_index: 0,
4598 handshake: vec![1, 3, 3, 7],
4599 },
4600 );
4601 assert!(std::matches!(notif.peers.get(&(peer, set_id)), Some(&PeerState::Incoming { .. })));
4602
4603 notif.on_swarm_event(FromSwarm::ConnectionClosed(
4604 libp2p::swarm::behaviour::ConnectionClosed {
4605 peer_id: peer,
4606 connection_id: ConnectionId::new_unchecked(0),
4607 endpoint: &connected.clone(),
4608 handler: NotifsHandler::new(peer, vec![], None),
4609 remaining_established: 0usize,
4610 },
4611 ));
4612
4613 notif.protocol_report_reject(IncomingIndex(0));
4615
4616 match notif.peers.get(&(peer, set_id)) {
4617 None => {},
4618 state => panic!("invalid state: {state:?}"),
4619 };
4620 }
4621
4622 #[test]
4623 #[should_panic]
4624 #[cfg(debug_assertions)]
4625 fn protocol_report_accept_not_incoming_peer() {
4626 let (mut notif, _controller, _notif_service) = development_notifs();
4627 let peer = PeerId::random();
4628 let conn = ConnectionId::new_unchecked(0);
4629 let set_id = SetId::from(0);
4630 let connected = ConnectedPoint::Listener {
4631 local_addr: Multiaddr::empty(),
4632 send_back_addr: Multiaddr::empty(),
4633 };
4634 let mut conn_yielder = ConnectionYielder::new();
4635
4636 notif.on_swarm_event(FromSwarm::ConnectionEstablished(
4637 libp2p::swarm::behaviour::ConnectionEstablished {
4638 peer_id: peer,
4639 connection_id: conn,
4640 endpoint: &connected,
4641 failed_addresses: &[],
4642 other_established: 0usize,
4643 },
4644 ));
4645 assert!(std::matches!(notif.peers.get(&(peer, set_id)), Some(&PeerState::Disabled { .. })));
4646
4647 notif.on_connection_handler_event(
4649 peer,
4650 conn,
4651 NotifsHandlerOut::OpenDesiredByRemote {
4652 protocol_index: 0,
4653 handshake: vec![1, 3, 3, 7],
4654 },
4655 );
4656 assert!(std::matches!(notif.peers.get(&(peer, set_id)), Some(&PeerState::Incoming { .. })));
4657
4658 assert!(std::matches!(
4659 notif.incoming[0],
4660 IncomingPeer { alive: true, incoming_id: IncomingIndex(0), .. },
4661 ));
4662
4663 notif.peerset_report_connect(peer, set_id);
4664 assert!(std::matches!(notif.peers.get(&(peer, set_id)), Some(&PeerState::Enabled { .. })));
4665
4666 let event = conn_yielder.open_substream(peer, 0, vec![1, 2, 3, 4]);
4667 notif.on_connection_handler_event(peer, conn, event);
4668
4669 assert!(std::matches!(notif.peers.get(&(peer, set_id)), Some(&PeerState::Enabled { .. })));
4670 notif.incoming[0].alive = true;
4671 notif.protocol_report_accept(IncomingIndex(0));
4672 }
4673
4674 #[test]
4675 #[should_panic]
4676 #[cfg(debug_assertions)]
4677 fn inject_connection_closed_non_existent_peer() {
4678 let (mut notif, _controller, _notif_service) = development_notifs();
4679 let peer = PeerId::random();
4680 let endpoint = ConnectedPoint::Listener {
4681 local_addr: Multiaddr::empty(),
4682 send_back_addr: Multiaddr::empty(),
4683 };
4684
4685 notif.on_swarm_event(FromSwarm::ConnectionClosed(
4686 libp2p::swarm::behaviour::ConnectionClosed {
4687 peer_id: peer,
4688 connection_id: ConnectionId::new_unchecked(0),
4689 endpoint: &endpoint.clone(),
4690 handler: NotifsHandler::new(peer, vec![], None),
4691 remaining_established: 0usize,
4692 },
4693 ));
4694 }
4695
4696 #[test]
4697 fn disconnect_non_existent_peer() {
4698 let (mut notif, _controller, _notif_service) = development_notifs();
4699 let peer = PeerId::random();
4700 let set_id = SetId::from(0);
4701
4702 notif.peerset_report_disconnect(peer, set_id);
4703
4704 assert!(notif.peers.is_empty());
4705 assert!(notif.incoming.is_empty());
4706 }
4707
4708 #[test]
4709 fn accept_non_existent_connection() {
4710 let (mut notif, _controller, _notif_service) = development_notifs();
4711
4712 notif.protocol_report_accept(0.into());
4713
4714 assert!(notif.peers.is_empty());
4715 assert!(notif.incoming.is_empty());
4716 }
4717
4718 #[test]
4719 fn reject_non_existent_connection() {
4720 let (mut notif, _controller, _notif_service) = development_notifs();
4721
4722 notif.protocol_report_reject(0.into());
4723
4724 assert!(notif.peers.is_empty());
4725 assert!(notif.incoming.is_empty());
4726 }
4727
4728 #[test]
4729 fn reject_non_active_connection() {
4730 let (mut notif, _controller, _notif_service) = development_notifs();
4731 let peer = PeerId::random();
4732 let conn = ConnectionId::new_unchecked(0);
4733 let set_id = SetId::from(0);
4734 let connected = ConnectedPoint::Listener {
4735 local_addr: Multiaddr::empty(),
4736 send_back_addr: Multiaddr::empty(),
4737 };
4738
4739 notif.on_swarm_event(FromSwarm::ConnectionEstablished(
4740 libp2p::swarm::behaviour::ConnectionEstablished {
4741 peer_id: peer,
4742 connection_id: conn,
4743 endpoint: &connected,
4744 failed_addresses: &[],
4745 other_established: 0usize,
4746 },
4747 ));
4748 assert!(std::matches!(notif.peers.get(&(peer, set_id)), Some(&PeerState::Disabled { .. })));
4749
4750 notif.on_connection_handler_event(
4752 peer,
4753 conn,
4754 NotifsHandlerOut::OpenDesiredByRemote {
4755 protocol_index: 0,
4756 handshake: vec![1, 3, 3, 7],
4757 },
4758 );
4759 assert!(std::matches!(notif.peers.get(&(peer, set_id)), Some(&PeerState::Incoming { .. })));
4760
4761 notif.incoming[0].alive = false;
4762 notif.protocol_report_reject(0.into());
4763
4764 assert!(std::matches!(notif.peers.get(&(peer, set_id)), Some(&PeerState::Incoming { .. })));
4765 }
4766
4767 #[test]
4768 #[should_panic]
4769 #[cfg(debug_assertions)]
4770 fn inject_non_existent_connection_closed_for_incoming_peer() {
4771 let (mut notif, _controller, _notif_service) = development_notifs();
4772 let peer = PeerId::random();
4773 let conn = ConnectionId::new_unchecked(0);
4774 let set_id = SetId::from(0);
4775 let connected = ConnectedPoint::Listener {
4776 local_addr: Multiaddr::empty(),
4777 send_back_addr: Multiaddr::empty(),
4778 };
4779
4780 notif.on_swarm_event(FromSwarm::ConnectionEstablished(
4781 libp2p::swarm::behaviour::ConnectionEstablished {
4782 peer_id: peer,
4783 connection_id: conn,
4784 endpoint: &connected,
4785 failed_addresses: &[],
4786 other_established: 0usize,
4787 },
4788 ));
4789 assert!(std::matches!(notif.peers.get(&(peer, set_id)), Some(&PeerState::Disabled { .. })));
4790
4791 notif.on_connection_handler_event(
4793 peer,
4794 conn,
4795 NotifsHandlerOut::OpenDesiredByRemote {
4796 protocol_index: 0,
4797 handshake: vec![1, 3, 3, 7],
4798 },
4799 );
4800 assert!(std::matches!(notif.peers.get(&(peer, set_id)), Some(&PeerState::Incoming { .. })));
4801
4802 notif.on_swarm_event(FromSwarm::ConnectionClosed(
4803 libp2p::swarm::behaviour::ConnectionClosed {
4804 peer_id: peer,
4805 connection_id: ConnectionId::new_unchecked(1337),
4806 endpoint: &connected.clone(),
4807 handler: NotifsHandler::new(peer, vec![], None),
4808 remaining_established: 0usize,
4809 },
4810 ));
4811 }
4812
4813 #[test]
4814 #[should_panic]
4815 #[cfg(debug_assertions)]
4816 fn inject_non_existent_connection_closed_for_disabled_peer() {
4817 let (mut notif, _controller, _notif_service) = development_notifs();
4818 let set_id = SetId::from(0);
4819 let peer = PeerId::random();
4820 let conn = ConnectionId::new_unchecked(0);
4821 let connected = ConnectedPoint::Listener {
4822 local_addr: Multiaddr::empty(),
4823 send_back_addr: Multiaddr::empty(),
4824 };
4825
4826 notif.on_swarm_event(FromSwarm::ConnectionEstablished(
4827 libp2p::swarm::behaviour::ConnectionEstablished {
4828 peer_id: peer,
4829 connection_id: conn,
4830 endpoint: &connected,
4831 failed_addresses: &[],
4832 other_established: 0usize,
4833 },
4834 ));
4835 assert!(std::matches!(notif.peers.get(&(peer, set_id)), Some(&PeerState::Disabled { .. })));
4836
4837 notif.on_swarm_event(FromSwarm::ConnectionClosed(
4838 libp2p::swarm::behaviour::ConnectionClosed {
4839 peer_id: peer,
4840 connection_id: ConnectionId::new_unchecked(1337),
4841 endpoint: &connected.clone(),
4842 handler: NotifsHandler::new(peer, vec![], None),
4843 remaining_established: 0usize,
4844 },
4845 ));
4846 }
4847
4848 #[test]
4849 #[should_panic]
4850 #[cfg(debug_assertions)]
4851 fn inject_non_existent_connection_closed_for_disabled_pending_enable() {
4852 let (mut notif, _controller, _notif_service) = development_notifs();
4853 let set_id = SetId::from(0);
4854 let peer = PeerId::random();
4855 let conn = ConnectionId::new_unchecked(0);
4856 let connected = ConnectedPoint::Listener {
4857 local_addr: Multiaddr::empty(),
4858 send_back_addr: Multiaddr::empty(),
4859 };
4860
4861 notif.on_swarm_event(FromSwarm::ConnectionEstablished(
4862 libp2p::swarm::behaviour::ConnectionEstablished {
4863 peer_id: peer,
4864 connection_id: conn,
4865 endpoint: &connected,
4866 failed_addresses: &[],
4867 other_established: 0usize,
4868 },
4869 ));
4870 assert!(std::matches!(notif.peers.get(&(peer, set_id)), Some(&PeerState::Disabled { .. })));
4871
4872 if let Some(PeerState::Disabled { ref mut backoff_until, .. }) =
4874 notif.peers.get_mut(&(peer, set_id))
4875 {
4876 *backoff_until =
4877 Some(Instant::now().checked_add(std::time::Duration::from_secs(5)).unwrap());
4878 }
4879
4880 notif.peerset_report_connect(peer, set_id);
4882
4883 assert!(std::matches!(
4884 notif.peers.get(&(peer, set_id)),
4885 Some(&PeerState::DisabledPendingEnable { .. })
4886 ));
4887
4888 notif.on_swarm_event(FromSwarm::ConnectionClosed(
4889 libp2p::swarm::behaviour::ConnectionClosed {
4890 peer_id: peer,
4891 connection_id: ConnectionId::new_unchecked(1337),
4892 endpoint: &connected.clone(),
4893 handler: NotifsHandler::new(peer, vec![], None),
4894 remaining_established: 0usize,
4895 },
4896 ));
4897 }
4898
4899 #[test]
4900 #[should_panic]
4901 #[cfg(debug_assertions)]
4902 fn inject_connection_closed_for_incoming_peer_state_mismatch() {
4903 let (mut notif, _controller, _notif_service) = development_notifs();
4904 let peer = PeerId::random();
4905 let conn = ConnectionId::new_unchecked(0);
4906 let set_id = SetId::from(0);
4907 let connected = ConnectedPoint::Listener {
4908 local_addr: Multiaddr::empty(),
4909 send_back_addr: Multiaddr::empty(),
4910 };
4911
4912 notif.on_swarm_event(FromSwarm::ConnectionEstablished(
4913 libp2p::swarm::behaviour::ConnectionEstablished {
4914 peer_id: peer,
4915 connection_id: conn,
4916 endpoint: &connected,
4917 failed_addresses: &[],
4918 other_established: 0usize,
4919 },
4920 ));
4921 assert!(std::matches!(notif.peers.get(&(peer, set_id)), Some(&PeerState::Disabled { .. })));
4922
4923 notif.on_connection_handler_event(
4925 peer,
4926 conn,
4927 NotifsHandlerOut::OpenDesiredByRemote {
4928 protocol_index: 0,
4929 handshake: vec![1, 3, 3, 7],
4930 },
4931 );
4932 assert!(std::matches!(notif.peers.get(&(peer, set_id)), Some(&PeerState::Incoming { .. })));
4933 notif.incoming[0].alive = false;
4934
4935 notif.on_swarm_event(FromSwarm::ConnectionClosed(
4936 libp2p::swarm::behaviour::ConnectionClosed {
4937 peer_id: peer,
4938 connection_id: conn,
4939 endpoint: &connected.clone(),
4940 handler: NotifsHandler::new(peer, vec![], None),
4941 remaining_established: 0usize,
4942 },
4943 ));
4944 }
4945
4946 #[test]
4947 #[should_panic]
4948 #[cfg(debug_assertions)]
4949 fn inject_connection_closed_for_enabled_state_mismatch() {
4950 let (mut notif, _controller, _notif_service) = development_notifs();
4951 let peer = PeerId::random();
4952 let conn = ConnectionId::new_unchecked(0);
4953 let set_id = SetId::from(0);
4954 let connected = ConnectedPoint::Listener {
4955 local_addr: Multiaddr::empty(),
4956 send_back_addr: Multiaddr::empty(),
4957 };
4958
4959 notif.on_swarm_event(FromSwarm::ConnectionEstablished(
4960 libp2p::swarm::behaviour::ConnectionEstablished {
4961 peer_id: peer,
4962 connection_id: conn,
4963 endpoint: &connected,
4964 failed_addresses: &[],
4965 other_established: 0usize,
4966 },
4967 ));
4968 assert!(std::matches!(notif.peers.get(&(peer, set_id)), Some(&PeerState::Disabled { .. })));
4969
4970 notif.on_connection_handler_event(
4972 peer,
4973 conn,
4974 NotifsHandlerOut::OpenDesiredByRemote {
4975 protocol_index: 0,
4976 handshake: vec![1, 3, 3, 7],
4977 },
4978 );
4979 assert!(std::matches!(notif.peers.get(&(peer, set_id)), Some(&PeerState::Incoming { .. })));
4980
4981 notif.peerset_report_connect(peer, set_id);
4983 assert!(std::matches!(notif.peers.get(&(peer, set_id)), Some(&PeerState::Enabled { .. })));
4984
4985 notif.on_swarm_event(FromSwarm::ConnectionClosed(
4986 libp2p::swarm::behaviour::ConnectionClosed {
4987 peer_id: peer,
4988 connection_id: ConnectionId::new_unchecked(1337),
4989 endpoint: &connected.clone(),
4990 handler: NotifsHandler::new(peer, vec![], None),
4991 remaining_established: 0usize,
4992 },
4993 ));
4994 }
4995
4996 #[test]
4997 #[should_panic]
4998 #[cfg(debug_assertions)]
4999 fn inject_connection_closed_for_backoff_peer() {
5000 let (mut notif, _controller, _notif_service) = development_notifs();
5001 let set_id = SetId::from(0);
5002 let peer = PeerId::random();
5003 let conn = ConnectionId::new_unchecked(0);
5004 let connected = ConnectedPoint::Listener {
5005 local_addr: Multiaddr::empty(),
5006 send_back_addr: Multiaddr::empty(),
5007 };
5008
5009 notif.on_swarm_event(FromSwarm::ConnectionEstablished(
5010 libp2p::swarm::behaviour::ConnectionEstablished {
5011 peer_id: peer,
5012 connection_id: conn,
5013 endpoint: &connected,
5014 failed_addresses: &[],
5015 other_established: 0usize,
5016 },
5017 ));
5018 assert!(std::matches!(notif.peers.get(&(peer, set_id)), Some(&PeerState::Disabled { .. })));
5019
5020 if let Some(PeerState::Disabled { ref mut backoff_until, .. }) =
5022 notif.peers.get_mut(&(peer, set_id))
5023 {
5024 *backoff_until =
5025 Some(Instant::now().checked_add(std::time::Duration::from_secs(5)).unwrap());
5026 }
5027
5028 notif.on_swarm_event(FromSwarm::ConnectionClosed(
5029 libp2p::swarm::behaviour::ConnectionClosed {
5030 peer_id: peer,
5031 connection_id: conn,
5032 endpoint: &connected.clone(),
5033 handler: NotifsHandler::new(peer, vec![], None),
5034 remaining_established: 0usize,
5035 },
5036 ));
5037 assert!(std::matches!(notif.peers.get(&(peer, set_id)), Some(&PeerState::Backoff { .. })));
5038
5039 notif.on_swarm_event(FromSwarm::ConnectionClosed(
5040 libp2p::swarm::behaviour::ConnectionClosed {
5041 peer_id: peer,
5042 connection_id: conn,
5043 endpoint: &connected.clone(),
5044 handler: NotifsHandler::new(peer, vec![], None),
5045 remaining_established: 0usize,
5046 },
5047 ));
5048 }
5049
5050 #[test]
5051 #[should_panic]
5052 #[cfg(debug_assertions)]
5053 fn open_result_ok_non_existent_peer() {
5054 let (mut notif, _controller, _notif_service) = development_notifs();
5055 let conn = ConnectionId::new_unchecked(0);
5056 let mut conn_yielder = ConnectionYielder::new();
5057
5058 notif.on_connection_handler_event(
5059 PeerId::random(),
5060 conn,
5061 conn_yielder.open_substream(PeerId::random(), 0, vec![1, 2, 3, 4]),
5062 );
5063 }
5064}