1use crate::{
42 peer_store::{PeerStoreProvider, ProtocolHandle},
43 service::traits::{self, ValidationResult},
44 ProtocolName, ReputationChange as Reputation,
45};
46
47use futures::{channel::oneshot, future::BoxFuture, stream::FuturesUnordered, Stream, StreamExt};
48use futures_timer::Delay;
49use litep2p::protocol::notification::NotificationError;
50
51use sc_network_types::PeerId;
52use sc_utils::mpsc::{tracing_unbounded, TracingUnboundedReceiver, TracingUnboundedSender};
53
54use std::{
55 collections::{HashMap, HashSet},
56 future::Future,
57 pin::Pin,
58 sync::{
59 atomic::{AtomicUsize, Ordering},
60 Arc,
61 },
62 task::{Context, Poll},
63 time::Duration,
64};
65
66const LOG_TARGET: &str = "sub-libp2p::peerset";
68
69const DEFAULT_BACKOFF: Duration = Duration::from_secs(5);
71
72const OPEN_FAILURE_BACKOFF: Duration = Duration::from_secs(5);
74
75const SLOT_ALLOCATION_FREQUENCY: Duration = Duration::from_secs(1);
79
80const DISCONNECT_ADJUSTMENT: Reputation = Reputation::new(-256, "Peer disconnected");
84
85const OPEN_FAILURE_ADJUSTMENT: Reputation = Reputation::new(-1024, "Open failure");
89
90#[derive(Debug, Copy, Clone, PartialEq, Eq)]
92pub enum Reserved {
93 Yes,
94 No,
95}
96
97impl From<bool> for Reserved {
98 fn from(value: bool) -> Reserved {
99 match value {
100 true => Reserved::Yes,
101 false => Reserved::No,
102 }
103 }
104}
105
106impl From<Reserved> for bool {
107 fn from(value: Reserved) -> bool {
108 std::matches!(value, Reserved::Yes)
109 }
110}
111
112#[derive(Debug, Copy, Clone, PartialEq, Eq)]
113pub enum Direction {
114 Inbound(Reserved),
116
117 Outbound(Reserved),
119}
120
121impl From<Direction> for traits::Direction {
122 fn from(direction: Direction) -> traits::Direction {
123 match direction {
124 Direction::Inbound(_) => traits::Direction::Inbound,
125 Direction::Outbound(_) => traits::Direction::Outbound,
126 }
127 }
128}
129
130#[derive(PartialEq, Eq)]
132pub enum OpenResult {
133 Accept {
135 direction: traits::Direction,
137 },
138
139 Reject,
141}
142
143#[derive(Debug)]
145pub enum PeersetCommand {
146 SetReservedPeers {
150 peers: HashSet<PeerId>,
152 },
153
154 AddReservedPeers {
158 peers: HashSet<PeerId>,
160 },
161
162 RemoveReservedPeers {
164 peers: HashSet<PeerId>,
166 },
167
168 SetReservedOnly {
170 reserved_only: bool,
172 },
173
174 DisconnectPeer {
176 peer: PeerId,
178 },
179
180 GetReservedPeers {
182 tx: oneshot::Sender<Vec<PeerId>>,
184 },
185}
186
187#[derive(Debug)]
189pub enum PeersetNotificationCommand {
190 OpenSubstream {
192 peers: Vec<PeerId>,
194 },
195
196 CloseSubstream {
198 peers: Vec<PeerId>,
200 },
201}
202
203#[derive(Debug, PartialEq, Eq)]
260pub enum PeerState {
261 Disconnected,
263
264 Backoff,
270
271 Opening {
273 direction: Direction,
275 },
276
277 Connected {
279 direction: Direction,
281 },
282
283 Canceled {
291 direction: Direction,
293 },
294
295 Closing {
301 direction: Direction,
303 },
304}
305
306#[derive(Debug)]
314pub struct Peerset {
315 protocol: ProtocolName,
317
318 cmd_rx: TracingUnboundedReceiver<PeersetCommand>,
320
321 max_out: usize,
323
324 num_out: usize,
326
327 max_in: usize,
329
330 num_in: usize,
332
333 reserved_only: bool,
335
336 reserved_peers: HashSet<PeerId>,
338
339 peerstore_handle: Arc<dyn PeerStoreProvider>,
341
342 peers: HashMap<PeerId, PeerState>,
344
345 connected_peers: Arc<AtomicUsize>,
347
348 pending_backoffs: FuturesUnordered<BoxFuture<'static, (PeerId, Reputation)>>,
350
351 next_slot_allocation: Delay,
353}
354
355macro_rules! decrement_or_warn {
356 ($slot:expr, $protocol:expr, $peer:expr, $direction:expr) => {{
357 match $slot.checked_sub(1) {
358 Some(value) => {
359 $slot = value;
360 }
361 None => {
362 log::warn!(
363 target: LOG_TARGET,
364 "{}: state mismatch, {:?} is not counted as part of {:?} slots",
365 $protocol, $peer, $direction
366 );
367 debug_assert!(false);
368 }
369 }
370 }};
371}
372
373#[derive(Debug)]
375struct PeersetHandle {
376 tx: TracingUnboundedSender<PeersetCommand>,
378}
379
380impl ProtocolHandle for PeersetHandle {
381 fn disconnect_peer(&self, peer: PeerId) {
383 let _ = self.tx.unbounded_send(PeersetCommand::DisconnectPeer { peer });
384 }
385}
386
387impl Peerset {
388 pub fn new(
390 protocol: ProtocolName,
391 max_out: usize,
392 max_in: usize,
393 reserved_only: bool,
394 reserved_peers: HashSet<PeerId>,
395 connected_peers: Arc<AtomicUsize>,
396 peerstore_handle: Arc<dyn PeerStoreProvider>,
397 ) -> (Self, TracingUnboundedSender<PeersetCommand>) {
398 let (cmd_tx, cmd_rx) = tracing_unbounded("mpsc-peerset-protocol", 100_000);
399 let peers = reserved_peers
400 .iter()
401 .map(|peer| (*peer, PeerState::Disconnected))
402 .collect::<HashMap<_, _>>();
403
404 peerstore_handle.register_protocol(Arc::new(PeersetHandle { tx: cmd_tx.clone() }));
407
408 (
409 Self {
410 protocol,
411 max_out,
412 num_out: 0usize,
413 max_in,
414 num_in: 0usize,
415 reserved_peers,
416 cmd_rx,
417 peerstore_handle,
418 reserved_only,
419 peers,
420 connected_peers,
421 pending_backoffs: FuturesUnordered::new(),
422 next_slot_allocation: Delay::new(SLOT_ALLOCATION_FREQUENCY),
423 },
424 cmd_tx,
425 )
426 }
427
428 pub fn report_substream_opened(
436 &mut self,
437 peer: PeerId,
438 direction: traits::Direction,
439 ) -> OpenResult {
440 log::trace!(
441 target: LOG_TARGET,
442 "{}: substream opened to {peer:?}, direction {direction:?}, reserved peer {}",
443 self.protocol,
444 self.reserved_peers.contains(&peer),
445 );
446
447 let Some(state) = self.peers.get_mut(&peer) else {
448 log::warn!(target: LOG_TARGET, "{}: substream opened for unknown peer {peer:?}", self.protocol);
449 debug_assert!(false);
450 return OpenResult::Reject
451 };
452
453 match state {
454 PeerState::Opening { direction: substream_direction } => {
455 let real_direction: traits::Direction = (*substream_direction).into();
456
457 *state = PeerState::Connected { direction: *substream_direction };
458 self.connected_peers.fetch_add(1usize, Ordering::Relaxed);
459
460 return OpenResult::Accept { direction: real_direction }
461 },
462 PeerState::Canceled { direction: substream_direction } => {
466 log::trace!(
467 target: LOG_TARGET,
468 "{}: substream to {peer:?} is canceled, issue disconnection request",
469 self.protocol,
470 );
471
472 self.connected_peers.fetch_add(1usize, Ordering::Relaxed);
473 *state = PeerState::Closing { direction: *substream_direction };
474
475 return OpenResult::Reject
476 },
477 state => {
478 panic!("{}: invalid state for open substream {peer:?} {state:?}", self.protocol);
479 },
480 }
481 }
482
483 pub fn report_substream_closed(&mut self, peer: PeerId) {
490 log::trace!(target: LOG_TARGET, "{}: substream closed to {peer:?}", self.protocol);
491
492 let Some(state) = self.peers.get_mut(&peer) else {
493 log::warn!(target: LOG_TARGET, "{}: substream closed for unknown peer {peer:?}", self.protocol);
494 debug_assert!(false);
495 return
496 };
497
498 match &state {
499 PeerState::Connected { direction: Direction::Inbound(Reserved::No) } |
502 PeerState::Closing { direction: Direction::Inbound(Reserved::No) } => {
503 log::trace!(
504 target: LOG_TARGET,
505 "{}: inbound substream closed to non-reserved peer {peer:?}: {state:?}",
506 self.protocol,
507 );
508
509 decrement_or_warn!(
510 self.num_in,
511 peer,
512 self.protocol,
513 Direction::Inbound(Reserved::No)
514 );
515 },
516 PeerState::Connected { direction: Direction::Outbound(Reserved::No) } |
519 PeerState::Closing { direction: Direction::Outbound(Reserved::No) } => {
520 log::trace!(
521 target: LOG_TARGET,
522 "{}: outbound substream closed to non-reserved peer {peer:?} {state:?}",
523 self.protocol,
524 );
525
526 decrement_or_warn!(
527 self.num_out,
528 peer,
529 self.protocol,
530 Direction::Outbound(Reserved::No)
531 );
532 },
533 PeerState::Closing { .. } | PeerState::Connected { .. } => {
535 log::debug!(target: LOG_TARGET, "{}: reserved peer {peer:?} disconnected", self.protocol);
536 },
537 state => {
538 log::warn!(target: LOG_TARGET, "{}: invalid state for disconnected peer {peer:?}: {state:?}", self.protocol);
539 debug_assert!(false);
540 },
541 }
542 *state = PeerState::Backoff;
543
544 self.connected_peers.fetch_sub(1usize, Ordering::Relaxed);
545 self.pending_backoffs.push(Box::pin(async move {
546 Delay::new(DEFAULT_BACKOFF).await;
547 (peer, DISCONNECT_ADJUSTMENT)
548 }));
549 }
550
551 pub fn report_inbound_substream(&mut self, peer: PeerId) -> ValidationResult {
553 log::trace!(target: LOG_TARGET, "{}: inbound substream from {peer:?}", self.protocol);
554
555 if self.peerstore_handle.is_banned(&peer) {
556 log::debug!(
557 target: LOG_TARGET,
558 "{}: rejecting banned peer {peer:?}",
559 self.protocol,
560 );
561
562 return ValidationResult::Reject;
563 }
564
565 let state = self.peers.entry(peer).or_insert(PeerState::Disconnected);
566 let is_reserved_peer = self.reserved_peers.contains(&peer);
567
568 match state {
569 PeerState::Disconnected => {},
571 PeerState::Backoff =>
574 if !is_reserved_peer && self.num_in == self.max_in {
575 log::trace!(
576 target: LOG_TARGET,
577 "{}: ({peer:?}) is backed-off and cannot accept, reject inbound substream",
578 self.protocol,
579 );
580
581 return ValidationResult::Reject
582 },
583 PeerState::Opening { direction: Direction::Outbound(reserved) } => {
597 log::trace!(
598 target: LOG_TARGET,
599 "{}: inbound substream received for {peer:?} ({reserved:?}) that was marked outbound",
600 self.protocol,
601 );
602
603 return ValidationResult::Accept;
604 },
605 PeerState::Canceled { direction } => {
606 log::trace!(
607 target: LOG_TARGET,
608 "{}: {peer:?} is canceled, rejecting substream",
609 self.protocol,
610 );
611
612 *state = PeerState::Canceled { direction: *direction };
613 return ValidationResult::Reject
614 },
615 state => {
616 log::warn!(
617 target: LOG_TARGET,
618 "{}: invalid state ({state:?}) for inbound substream, peer {peer:?}",
619 self.protocol
620 );
621 debug_assert!(false);
622 return ValidationResult::Reject
623 },
624 }
625
626 if is_reserved_peer {
627 log::trace!(
628 target: LOG_TARGET,
629 "{}: {peer:?} accepting peer as reserved peer",
630 self.protocol,
631 );
632
633 *state = PeerState::Opening { direction: Direction::Inbound(is_reserved_peer.into()) };
634 return ValidationResult::Accept
635 }
636
637 if self.num_in < self.max_in {
638 log::trace!(
639 target: LOG_TARGET,
640 "{}: {peer:?} accepting peer as regular peer",
641 self.protocol,
642 );
643
644 self.num_in += 1;
645
646 *state = PeerState::Opening { direction: Direction::Inbound(is_reserved_peer.into()) };
647 return ValidationResult::Accept
648 }
649
650 log::trace!(
651 target: LOG_TARGET,
652 "{}: reject {peer:?}, not a reserved peer and no free inbound slots",
653 self.protocol,
654 );
655
656 *state = PeerState::Disconnected;
657 return ValidationResult::Reject
658 }
659
660 pub fn report_substream_open_failure(&mut self, peer: PeerId, error: NotificationError) {
662 log::trace!(
663 target: LOG_TARGET,
664 "{}: failed to open substream to {peer:?}: {error:?}",
665 self.protocol,
666 );
667
668 match self.peers.get(&peer) {
669 Some(PeerState::Opening { direction: Direction::Outbound(Reserved::No) }) => {
670 decrement_or_warn!(
671 self.num_out,
672 self.protocol,
673 peer,
674 Direction::Outbound(Reserved::No)
675 );
676 },
677 Some(PeerState::Opening { direction: Direction::Inbound(Reserved::No) }) => {
678 decrement_or_warn!(
679 self.num_in,
680 self.protocol,
681 peer,
682 Direction::Inbound(Reserved::No)
683 );
684 },
685 Some(PeerState::Canceled { direction }) => match direction {
686 Direction::Inbound(Reserved::No) => {
687 decrement_or_warn!(
688 self.num_in,
689 self.protocol,
690 peer,
691 Direction::Inbound(Reserved::No)
692 );
693 },
694 Direction::Outbound(Reserved::No) => {
695 decrement_or_warn!(
696 self.num_out,
697 self.protocol,
698 peer,
699 Direction::Outbound(Reserved::No)
700 );
701 },
702 _ => {},
703 },
704 Some(PeerState::Opening { direction: Direction::Inbound(Reserved::Yes) }) |
706 Some(PeerState::Opening { direction: Direction::Outbound(Reserved::Yes) }) => {
707 log::debug!(
708 target: LOG_TARGET,
709 "{}: substream open failure for reserved peer {peer:?}",
710 self.protocol,
711 );
712 },
713 state => {
714 log::debug!(
715 target: LOG_TARGET,
716 "{}: substream open failure for a unknown state: {state:?}",
717 self.protocol,
718 );
719
720 return;
721 },
722 }
723
724 self.peers.insert(peer, PeerState::Backoff);
725 self.pending_backoffs.push(Box::pin(async move {
726 Delay::new(OPEN_FAILURE_BACKOFF).await;
727 (peer, OPEN_FAILURE_ADJUSTMENT)
728 }));
729 }
730
731 pub fn report_substream_rejected(&mut self, peer: PeerId) {
733 log::trace!(target: LOG_TARGET, "{}: {peer:?} rejected by the protocol", self.protocol);
734
735 match self.peers.remove(&peer) {
736 Some(PeerState::Opening { direction }) => match direction {
737 Direction::Inbound(Reserved::Yes) | Direction::Outbound(Reserved::Yes) => {
738 log::warn!(
739 target: LOG_TARGET,
740 "{}: reserved peer {peer:?} rejected by the protocol",
741 self.protocol,
742 );
743 self.peers.insert(peer, PeerState::Disconnected);
744 },
745 Direction::Inbound(Reserved::No) => {
746 decrement_or_warn!(
747 self.num_in,
748 peer,
749 self.protocol,
750 Direction::Inbound(Reserved::No)
751 );
752 self.peers.insert(peer, PeerState::Disconnected);
753 },
754 Direction::Outbound(Reserved::No) => {
755 decrement_or_warn!(
756 self.num_out,
757 peer,
758 self.protocol,
759 Direction::Outbound(Reserved::No)
760 );
761 self.peers.insert(peer, PeerState::Disconnected);
762 },
763 },
764 Some(state @ PeerState::Canceled { .. }) => {
765 log::debug!(
766 target: LOG_TARGET,
767 "{}: substream to {peer:?} rejected by protocol but already canceled",
768 self.protocol,
769 );
770
771 self.peers.insert(peer, state);
772 },
773 Some(state) => {
774 log::debug!(
775 target: LOG_TARGET,
776 "{}: {peer:?} rejected by the protocol but not opening anymore: {state:?}",
777 self.protocol,
778 );
779
780 self.peers.insert(peer, state);
781 },
782 None => {},
783 }
784 }
785
786 fn calculate_slot_adjustment<'a>(
789 &'a mut self,
790 peers: impl Iterator<Item = &'a PeerId>,
791 ) -> (usize, usize) {
792 peers.fold((0, 0), |(mut inbound, mut outbound), peer| {
793 match self.peers.get_mut(peer) {
794 Some(PeerState::Disconnected | PeerState::Backoff) => {},
795 Some(
796 PeerState::Opening { ref mut direction } |
797 PeerState::Connected { ref mut direction } |
798 PeerState::Canceled { ref mut direction } |
799 PeerState::Closing { ref mut direction },
800 ) => {
801 *direction = match direction {
802 Direction::Inbound(Reserved::No) => {
803 inbound += 1;
804 Direction::Inbound(Reserved::Yes)
805 },
806 Direction::Outbound(Reserved::No) => {
807 outbound += 1;
808 Direction::Outbound(Reserved::Yes)
809 },
810 ref direction => **direction,
811 };
812 },
813 None => {
814 self.peers.insert(*peer, PeerState::Disconnected);
815 },
816 }
817
818 (inbound, outbound)
819 })
820 }
821
822 #[cfg(test)]
824 pub fn num_in(&self) -> usize {
825 self.num_in
826 }
827
828 #[cfg(test)]
830 pub fn num_out(&self) -> usize {
831 self.num_out
832 }
833
834 #[cfg(test)]
836 pub fn peers(&self) -> &HashMap<PeerId, PeerState> {
837 &self.peers
838 }
839
840 #[cfg(test)]
842 pub fn reserved_peers(&self) -> &HashSet<PeerId> {
843 &self.reserved_peers
844 }
845}
846
847impl Stream for Peerset {
848 type Item = PeersetNotificationCommand;
849
850 fn poll_next(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
851 while let Poll::Ready(Some((peer, reputation))) = self.pending_backoffs.poll_next_unpin(cx)
852 {
853 log::trace!(target: LOG_TARGET, "{}: backoff expired for {peer:?}", self.protocol);
854
855 if std::matches!(self.peers.get(&peer), None | Some(PeerState::Backoff)) {
856 self.peers.insert(peer, PeerState::Disconnected);
857 }
858
859 self.peerstore_handle.report_peer(peer, reputation);
860 }
861
862 if let Poll::Ready(Some(action)) = Pin::new(&mut self.cmd_rx).poll_next(cx) {
863 match action {
864 PeersetCommand::DisconnectPeer { peer } if !self.reserved_peers.contains(&peer) =>
865 match self.peers.remove(&peer) {
866 Some(PeerState::Connected { direction }) => {
867 log::trace!(
868 target: LOG_TARGET,
869 "{}: close connection to {peer:?}, direction {direction:?}",
870 self.protocol,
871 );
872
873 self.peers.insert(peer, PeerState::Closing { direction });
874 return Poll::Ready(Some(PeersetNotificationCommand::CloseSubstream {
875 peers: vec![peer],
876 }))
877 },
878 Some(PeerState::Backoff) => {
879 log::trace!(
880 target: LOG_TARGET,
881 "{}: cannot disconnect {peer:?}, already backed-off",
882 self.protocol,
883 );
884
885 self.peers.insert(peer, PeerState::Backoff);
886 },
887 Some(PeerState::Opening { direction }) => {
893 log::trace!(
894 target: LOG_TARGET,
895 "{}: canceling substream to disconnect peer {peer:?}",
896 self.protocol,
897 );
898
899 self.peers.insert(peer, PeerState::Canceled { direction });
900 },
901 Some(state @ PeerState::Closing { .. }) => {
905 log::trace!(
906 target: LOG_TARGET,
907 "{}: cannot disconnect {peer:?}, already closing ({state:?})",
908 self.protocol,
909 );
910
911 self.peers.insert(peer, state);
912 },
913 Some(state @ PeerState::Disconnected) => {
919 self.peers.insert(peer, state);
920 },
921 Some(state @ PeerState::Canceled { .. }) => {
924 log::debug!(
925 target: LOG_TARGET,
926 "{}: cannot disconnect {peer:?}, already canceled ({state:?})",
927 self.protocol,
928 );
929
930 self.peers.insert(peer, state);
931 },
932 None => {
940 log::debug!(target: LOG_TARGET, "{}: {peer:?} doesn't exist", self.protocol);
941 },
942 },
943 PeersetCommand::DisconnectPeer { peer } => {
944 log::debug!(
945 target: LOG_TARGET,
946 "{}: ignoring disconnection request for reserved peer {peer}",
947 self.protocol,
948 );
949 },
950 PeersetCommand::SetReservedPeers { peers } => {
955 log::debug!(target: LOG_TARGET, "{}: set reserved peers {peers:?}", self.protocol);
956
957 let (in_peers, out_peers) = self.calculate_slot_adjustment(peers.iter());
964 self.num_out -= out_peers;
965 self.num_in -= in_peers;
966
967 peers.iter().for_each(|peer| {
969 if !self.peers.contains_key(peer) {
970 self.peers.insert(*peer, PeerState::Disconnected);
971 }
972 });
973
974 let peers_to_remove = self
976 .peers
977 .iter()
978 .filter_map(|(peer, _)| (!peers.contains(peer)).then_some(*peer))
979 .collect::<HashSet<_>>();
980
981 self.reserved_peers = peers;
982
983 let peers = peers_to_remove
984 .into_iter()
985 .filter(|peer| {
986 match self.peers.remove(&peer) {
987 Some(PeerState::Connected { direction }) => {
988 log::trace!(
989 target: LOG_TARGET,
990 "{}: close connection to {peer:?}, direction {direction:?}",
991 self.protocol,
992 );
993
994 self.peers.insert(*peer, PeerState::Closing { direction });
995 true
996 },
997 Some(PeerState::Opening { direction }) => {
1000 log::trace!(
1001 target: LOG_TARGET,
1002 "{}: cancel substream to {peer:?}, direction {direction:?}",
1003 self.protocol,
1004 );
1005
1006 self.peers.insert(*peer, PeerState::Canceled { direction });
1007 false
1008 },
1009 Some(state) => {
1010 self.peers.insert(*peer, state);
1011 false
1012 },
1013 None => {
1014 log::debug!(target: LOG_TARGET, "{}: {peer:?} doesn't exist", self.protocol);
1015 debug_assert!(false);
1016 false
1017 },
1018 }
1019 })
1020 .collect();
1021
1022 log::trace!(
1023 target: LOG_TARGET,
1024 "{}: close substreams to {peers:?}",
1025 self.protocol,
1026 );
1027
1028 return Poll::Ready(Some(PeersetNotificationCommand::CloseSubstream { peers }))
1029 },
1030 PeersetCommand::AddReservedPeers { peers } => {
1031 log::debug!(target: LOG_TARGET, "{}: add reserved peers {peers:?}", self.protocol);
1032
1033 let (in_peers, out_peers) = self.calculate_slot_adjustment(peers.iter());
1040 self.num_out -= out_peers;
1041 self.num_in -= in_peers;
1042
1043 let peers = peers
1044 .iter()
1045 .filter_map(|peer| {
1046 if !self.reserved_peers.insert(*peer) {
1047 log::warn!(
1048 target: LOG_TARGET,
1049 "{}: {peer:?} is already a reserved peer",
1050 self.protocol,
1051 );
1052 return None
1053 }
1054
1055 std::matches!(
1056 self.peers.get_mut(peer),
1057 None | Some(PeerState::Disconnected)
1058 )
1059 .then(|| {
1060 self.peers.insert(
1061 *peer,
1062 PeerState::Opening {
1063 direction: Direction::Outbound(Reserved::Yes),
1064 },
1065 );
1066 *peer
1067 })
1068 })
1069 .collect();
1070
1071 log::debug!(target: LOG_TARGET, "{}: start connecting to {peers:?}", self.protocol);
1072
1073 return Poll::Ready(Some(PeersetNotificationCommand::OpenSubstream { peers }))
1074 },
1075 PeersetCommand::RemoveReservedPeers { peers } => {
1076 log::debug!(target: LOG_TARGET, "{}: remove reserved peers {peers:?}", self.protocol);
1077
1078 let peers_to_remove = peers
1079 .iter()
1080 .filter_map(|peer| {
1081 if !self.reserved_peers.remove(peer) {
1082 log::debug!(
1083 target: LOG_TARGET,
1084 "{}: {peer} is not a reserved peer",
1085 self.protocol,
1086 );
1087 return None
1088 }
1089
1090 match self.peers.remove(peer)? {
1091 PeerState::Backoff => {
1096 log::trace!(
1097 target: LOG_TARGET,
1098 "{}: cannot disconnect removed reserved peer {peer:?}, already backed-off",
1099 self.protocol,
1100 );
1101
1102 self.peers.insert(*peer, PeerState::Backoff);
1103 None
1104 },
1105 PeerState::Canceled { direction } => {
1116 log::trace!(
1117 target: LOG_TARGET,
1118 "{}: cannot disconnect removed reserved peer {peer:?}, already canceled",
1119 self.protocol,
1120 );
1121
1122 self.peers.insert(*peer, PeerState::Canceled { direction });
1123 None
1124 },
1125 PeerState::Disconnected => {
1132 log::trace!(
1133 target: LOG_TARGET,
1134 "{}: cannot disconnect removed reserved peer {peer:?}, already disconnected",
1135 self.protocol,
1136 );
1137
1138 self.peers.insert(*peer, PeerState::Disconnected);
1139 None
1140 },
1141 PeerState::Closing { direction } => {
1153 log::trace!(
1154 target: LOG_TARGET,
1155 "{}: cannot disconnect removed reserved peer {peer:?}, already closing",
1156 self.protocol,
1157 );
1158
1159 self.peers.insert(*peer, PeerState::Closing { direction });
1160 None
1161 },
1162 PeerState::Connected { direction } => match direction {
1171 Direction::Inbound(_) => match self.num_in < self.max_in {
1172 true => {
1173 log::trace!(
1174 target: LOG_TARGET,
1175 "{}: {peer:?} converted to regular inbound peer (inbound open)",
1176 self.protocol,
1177 );
1178
1179 self.num_in += 1;
1180 self.peers.insert(
1181 *peer,
1182 PeerState::Connected {
1183 direction: Direction::Inbound(Reserved::No),
1184 },
1185 );
1186
1187 None
1188 },
1189 false => {
1190 self.peers.insert(
1191 *peer,
1192 PeerState::Closing {
1193 direction: Direction::Inbound(Reserved::Yes),
1194 },
1195 );
1196
1197 Some(*peer)
1198 },
1199 },
1200 Direction::Outbound(_) => match self.num_out < self.max_out {
1201 true => {
1202 log::trace!(
1203 target: LOG_TARGET,
1204 "{}: {peer:?} converted to regular outbound peer (outbound open)",
1205 self.protocol,
1206 );
1207
1208 self.num_out += 1;
1209 self.peers.insert(
1210 *peer,
1211 PeerState::Connected {
1212 direction: Direction::Outbound(Reserved::No),
1213 },
1214 );
1215
1216 None
1217 },
1218 false => {
1219 self.peers.insert(
1220 *peer,
1221 PeerState::Closing {
1222 direction: Direction::Outbound(Reserved::Yes),
1223 },
1224 );
1225
1226 Some(*peer)
1227 },
1228 },
1229 },
1230 PeerState::Opening { direction } => match direction {
1231 Direction::Inbound(_) => match self.num_in < self.max_in {
1232 true => {
1233 log::trace!(
1234 target: LOG_TARGET,
1235 "{}: {peer:?} converted to regular inbound peer (inbound opening)",
1236 self.protocol,
1237 );
1238
1239 self.num_in += 1;
1240 self.peers.insert(
1241 *peer,
1242 PeerState::Opening {
1243 direction: Direction::Inbound(Reserved::No),
1244 },
1245 );
1246
1247 None
1248 },
1249 false => {
1250 self.peers.insert(
1251 *peer,
1252 PeerState::Canceled {
1253 direction: Direction::Inbound(Reserved::Yes),
1254 },
1255 );
1256
1257 None
1258 },
1259 },
1260 Direction::Outbound(_) => match self.num_out < self.max_out {
1261 true => {
1262 log::trace!(
1263 target: LOG_TARGET,
1264 "{}: {peer:?} converted to regular outbound peer (outbound opening)",
1265 self.protocol,
1266 );
1267
1268 self.num_out += 1;
1269 self.peers.insert(
1270 *peer,
1271 PeerState::Opening {
1272 direction: Direction::Outbound(Reserved::No),
1273 },
1274 );
1275
1276 None
1277 },
1278 false => {
1279 self.peers.insert(
1280 *peer,
1281 PeerState::Canceled {
1282 direction: Direction::Outbound(Reserved::Yes),
1283 },
1284 );
1285
1286 None
1287 },
1288 },
1289 },
1290 }
1291 })
1292 .collect();
1293
1294 log::debug!(
1295 target: LOG_TARGET,
1296 "{}: close substreams to {peers_to_remove:?}",
1297 self.protocol,
1298 );
1299
1300 return Poll::Ready(Some(PeersetNotificationCommand::CloseSubstream {
1301 peers: peers_to_remove,
1302 }))
1303 },
1304 PeersetCommand::SetReservedOnly { reserved_only } => {
1305 log::debug!(target: LOG_TARGET, "{}: set reserved only mode to {reserved_only}", self.protocol);
1306
1307 self.reserved_only = reserved_only;
1309
1310 if reserved_only {
1311 let peers_to_remove = self
1312 .peers
1313 .iter()
1314 .filter_map(|(peer, state)| {
1315 (!self.reserved_peers.contains(peer) &&
1316 std::matches!(state, PeerState::Connected { .. }))
1317 .then_some(*peer)
1318 })
1319 .collect::<Vec<_>>();
1320
1321 self.peers.iter_mut().for_each(|(_, state)| match state {
1327 PeerState::Connected { direction } => {
1328 *state = PeerState::Closing { direction: *direction };
1329 },
1330 PeerState::Opening { direction } => {
1333 *state = PeerState::Canceled { direction: *direction };
1334 },
1335 _ => {},
1336 });
1337
1338 return Poll::Ready(Some(PeersetNotificationCommand::CloseSubstream {
1339 peers: peers_to_remove,
1340 }))
1341 }
1342 },
1343 PeersetCommand::GetReservedPeers { tx } => {
1344 let _ = tx.send(self.reserved_peers.iter().cloned().collect());
1345 },
1346 }
1347 }
1348
1349 if let Poll::Ready(()) = Pin::new(&mut self.next_slot_allocation).poll(cx) {
1355 let mut connect_to = self
1356 .peers
1357 .iter()
1358 .filter_map(|(peer, state)| {
1359 (self.reserved_peers.contains(peer) &&
1360 std::matches!(state, PeerState::Disconnected) &&
1361 !self.peerstore_handle.is_banned(peer))
1362 .then_some(*peer)
1363 })
1364 .collect::<Vec<_>>();
1365
1366 connect_to.iter().for_each(|peer| {
1367 self.peers.insert(
1368 *peer,
1369 PeerState::Opening { direction: Direction::Outbound(Reserved::Yes) },
1370 );
1371 });
1372
1373 if self.num_out < self.max_out && !self.reserved_only {
1376 let ignore: HashSet<PeerId> = self
1377 .peers
1378 .iter()
1379 .filter_map(|(peer, state)| {
1380 (!std::matches!(state, PeerState::Disconnected)).then_some(*peer)
1381 })
1382 .collect();
1383
1384 let peers: Vec<_> =
1385 self.peerstore_handle.outgoing_candidates(self.max_out - self.num_out, ignore);
1386
1387 if peers.len() > 0 {
1388 peers.iter().for_each(|peer| {
1389 self.peers.insert(
1390 *peer,
1391 PeerState::Opening { direction: Direction::Outbound(Reserved::No) },
1392 );
1393 });
1394
1395 self.num_out += peers.len();
1396 connect_to.extend(peers);
1397 }
1398 }
1399
1400 self.next_slot_allocation = Delay::new(SLOT_ALLOCATION_FREQUENCY);
1403
1404 if !connect_to.is_empty() {
1405 log::trace!(
1406 target: LOG_TARGET,
1407 "{}: start connecting to peers {connect_to:?}",
1408 self.protocol,
1409 );
1410
1411 return Poll::Ready(Some(PeersetNotificationCommand::OpenSubstream {
1412 peers: connect_to,
1413 }))
1414 }
1415 }
1416
1417 Poll::Pending
1418 }
1419}