1use crate::{
42 peer_store::{PeerStoreProvider, ProtocolHandle},
43 service::traits::{self, ValidationResult},
44 ProtocolName, ReputationChange as Reputation,
45};
46
47use futures::{channel::oneshot, future::BoxFuture, stream::FuturesUnordered, Stream, StreamExt};
48use futures_timer::Delay;
49use litep2p::protocol::notification::NotificationError;
50
51use sc_network_types::PeerId;
52use sc_utils::mpsc::{tracing_unbounded, TracingUnboundedReceiver, TracingUnboundedSender};
53
54use std::{
55 collections::{HashMap, HashSet},
56 future::Future,
57 pin::Pin,
58 sync::{
59 atomic::{AtomicUsize, Ordering},
60 Arc,
61 },
62 task::{Context, Poll},
63 time::Duration,
64};
65
66const LOG_TARGET: &str = "sub-libp2p::peerset";
68
69const DEFAULT_BACKOFF: Duration = Duration::from_secs(5);
71
72const OPEN_FAILURE_BACKOFF: Duration = Duration::from_secs(5);
74
75const SLOT_ALLOCATION_FREQUENCY: Duration = Duration::from_secs(1);
79
80const DISCONNECT_ADJUSTMENT: Reputation = Reputation::new(-256, "Peer disconnected");
84
85const OPEN_FAILURE_ADJUSTMENT: Reputation = Reputation::new(-1024, "Open failure");
89
90#[derive(Debug, Copy, Clone, PartialEq, Eq)]
94pub enum Reserved {
95 Yes,
96 No,
97}
98
99impl From<bool> for Reserved {
100 fn from(value: bool) -> Reserved {
101 match value {
102 true => Reserved::Yes,
103 false => Reserved::No,
104 }
105 }
106}
107
108impl From<Reserved> for bool {
109 fn from(value: Reserved) -> bool {
110 std::matches!(value, Reserved::Yes)
111 }
112}
113
114#[derive(Debug, Copy, Clone, PartialEq, Eq)]
115pub enum Direction {
116 Inbound(Reserved),
118
119 Outbound(Reserved),
121}
122
123impl Direction {
124 fn set_reserved(&mut self, new_reserved: Reserved) {
125 match self {
126 Direction::Inbound(ref mut reserved) | Direction::Outbound(ref mut reserved) =>
127 *reserved = new_reserved,
128 }
129 }
130}
131
132impl From<Direction> for traits::Direction {
133 fn from(direction: Direction) -> traits::Direction {
134 match direction {
135 Direction::Inbound(_) => traits::Direction::Inbound,
136 Direction::Outbound(_) => traits::Direction::Outbound,
137 }
138 }
139}
140
141#[derive(PartialEq, Eq, Debug)]
143pub enum OpenResult {
144 Accept {
146 direction: traits::Direction,
148 },
149
150 Reject,
152}
153
154#[derive(Debug)]
156pub enum PeersetCommand {
157 SetReservedPeers {
161 peers: HashSet<PeerId>,
163 },
164
165 AddReservedPeers {
169 peers: HashSet<PeerId>,
171 },
172
173 RemoveReservedPeers {
175 peers: HashSet<PeerId>,
177 },
178
179 SetReservedOnly {
181 reserved_only: bool,
183 },
184
185 DisconnectPeer {
187 peer: PeerId,
189 },
190
191 GetReservedPeers {
193 tx: oneshot::Sender<Vec<PeerId>>,
195 },
196}
197
198#[derive(Debug)]
200pub enum PeersetNotificationCommand {
201 OpenSubstream {
203 peers: Vec<PeerId>,
205 },
206
207 CloseSubstream {
209 peers: Vec<PeerId>,
211 },
212}
213
214#[derive(Debug, PartialEq, Eq)]
271pub enum PeerState {
272 Disconnected,
274
275 Backoff,
281
282 Opening {
284 direction: Direction,
286 },
287
288 Connected {
290 direction: Direction,
292 },
293
294 Canceled {
302 direction: Direction,
304 },
305
306 Closing {
312 direction: Direction,
314 },
315}
316
317#[derive(Debug)]
325pub struct Peerset {
326 protocol: ProtocolName,
328
329 cmd_rx: TracingUnboundedReceiver<PeersetCommand>,
331
332 max_out: usize,
334
335 num_out: usize,
337
338 max_in: usize,
340
341 num_in: usize,
343
344 reserved_only: bool,
346
347 reserved_peers: HashSet<PeerId>,
349
350 peerstore_handle: Arc<dyn PeerStoreProvider>,
352
353 peers: HashMap<PeerId, PeerState>,
355
356 connected_peers: Arc<AtomicUsize>,
358
359 pending_backoffs: FuturesUnordered<BoxFuture<'static, (PeerId, Reputation)>>,
361
362 next_slot_allocation: Delay,
364}
365
366macro_rules! decrement_or_warn {
367 ($slot:expr, $protocol:expr, $peer:expr, $direction:expr) => {{
368 match $slot.checked_sub(1) {
369 Some(value) => {
370 $slot = value;
371 }
372 None => {
373 log::warn!(
374 target: LOG_TARGET,
375 "{}: state mismatch, {:?} is not counted as part of {:?} slots",
376 $protocol, $peer, $direction
377 );
378 debug_assert!(false);
379 }
380 }
381 }};
382}
383
384#[derive(Debug)]
386struct PeersetHandle {
387 tx: TracingUnboundedSender<PeersetCommand>,
389}
390
391impl ProtocolHandle for PeersetHandle {
392 fn disconnect_peer(&self, peer: PeerId) {
394 let _ = self.tx.unbounded_send(PeersetCommand::DisconnectPeer { peer });
395 }
396}
397
398impl Peerset {
399 pub fn new(
401 protocol: ProtocolName,
402 max_out: usize,
403 max_in: usize,
404 reserved_only: bool,
405 reserved_peers: HashSet<PeerId>,
406 connected_peers: Arc<AtomicUsize>,
407 peerstore_handle: Arc<dyn PeerStoreProvider>,
408 ) -> (Self, TracingUnboundedSender<PeersetCommand>) {
409 let (cmd_tx, cmd_rx) = tracing_unbounded("mpsc-peerset-protocol", 100_000);
410 let peers = reserved_peers
411 .iter()
412 .map(|peer| (*peer, PeerState::Disconnected))
413 .collect::<HashMap<_, _>>();
414
415 peerstore_handle.register_protocol(Arc::new(PeersetHandle { tx: cmd_tx.clone() }));
418
419 log::debug!(
420 target: LOG_TARGET,
421 "{}: creating new peerset with max_outbound {} and max_inbound {} and reserved_only {}",
422 protocol,
423 max_out,
424 max_in,
425 reserved_only,
426 );
427
428 (
429 Self {
430 protocol,
431 max_out,
432 num_out: 0usize,
433 max_in,
434 num_in: 0usize,
435 reserved_peers,
436 cmd_rx,
437 peerstore_handle,
438 reserved_only,
439 peers,
440 connected_peers,
441 pending_backoffs: FuturesUnordered::new(),
442 next_slot_allocation: Delay::new(SLOT_ALLOCATION_FREQUENCY),
443 },
444 cmd_tx,
445 )
446 }
447
448 pub fn report_substream_opened(
456 &mut self,
457 peer: PeerId,
458 direction: traits::Direction,
459 ) -> OpenResult {
460 log::trace!(
461 target: LOG_TARGET,
462 "{}: substream opened to {peer:?}, direction {direction:?}, reserved peer {}",
463 self.protocol,
464 self.reserved_peers.contains(&peer),
465 );
466
467 let Some(state) = self.peers.get_mut(&peer) else {
468 log::warn!(target: LOG_TARGET, "{}: substream opened for unknown peer {peer:?}", self.protocol);
469 debug_assert!(false);
470 return OpenResult::Reject
471 };
472
473 match state {
474 PeerState::Opening { direction: substream_direction } => {
475 let real_direction: traits::Direction = (*substream_direction).into();
476
477 *state = PeerState::Connected { direction: *substream_direction };
478 self.connected_peers.fetch_add(1usize, Ordering::Relaxed);
479
480 return OpenResult::Accept { direction: real_direction }
481 },
482 PeerState::Canceled { direction: substream_direction } => {
486 log::trace!(
487 target: LOG_TARGET,
488 "{}: substream to {peer:?} is canceled, issue disconnection request",
489 self.protocol,
490 );
491
492 self.connected_peers.fetch_add(1usize, Ordering::Relaxed);
493 *state = PeerState::Closing { direction: *substream_direction };
494
495 return OpenResult::Reject
496 },
497 PeerState::Disconnected => {
500 log::debug!(
501 target: LOG_TARGET,
502 "{}: substream opened for a peer that was previously rejected {peer:?}",
503 self.protocol,
504 );
505 return OpenResult::Reject
506 },
507 state => {
508 log::error!(
509 target: LOG_TARGET,
510 "{}: substream opened for a peer in invalid state {peer:?}: {state:?}",
511 self.protocol,
512 );
513
514 debug_assert!(false);
515 return OpenResult::Reject;
516 },
517 }
518 }
519
520 pub fn report_substream_closed(&mut self, peer: PeerId) {
527 log::trace!(target: LOG_TARGET, "{}: substream closed to {peer:?}", self.protocol);
528
529 let Some(state) = self.peers.get_mut(&peer) else {
530 log::warn!(target: LOG_TARGET, "{}: substream closed for unknown peer {peer:?}", self.protocol);
531 debug_assert!(false);
532 return
533 };
534
535 match &state {
536 PeerState::Connected { direction: Direction::Inbound(Reserved::No) } |
539 PeerState::Closing { direction: Direction::Inbound(Reserved::No) } => {
540 log::trace!(
541 target: LOG_TARGET,
542 "{}: inbound substream closed to non-reserved peer {peer:?}: {state:?}",
543 self.protocol,
544 );
545
546 decrement_or_warn!(
547 self.num_in,
548 peer,
549 self.protocol,
550 Direction::Inbound(Reserved::No)
551 );
552 },
553 PeerState::Connected { direction: Direction::Outbound(Reserved::No) } |
556 PeerState::Closing { direction: Direction::Outbound(Reserved::No) } => {
557 log::trace!(
558 target: LOG_TARGET,
559 "{}: outbound substream closed to non-reserved peer {peer:?} {state:?}",
560 self.protocol,
561 );
562
563 decrement_or_warn!(
564 self.num_out,
565 peer,
566 self.protocol,
567 Direction::Outbound(Reserved::No)
568 );
569 },
570 PeerState::Closing { .. } | PeerState::Connected { .. } => {
572 log::debug!(target: LOG_TARGET, "{}: reserved peer {peer:?} disconnected", self.protocol);
573 },
574 PeerState::Disconnected => {
577 log::debug!(
578 target: LOG_TARGET,
579 "{}: substream closed for a peer that was previously rejected {peer:?}",
580 self.protocol,
581 );
582 },
583 state => {
584 log::warn!(target: LOG_TARGET, "{}: invalid state for disconnected peer {peer:?}: {state:?}", self.protocol);
585 debug_assert!(false);
586 },
587 }
588
589 if !matches!(state, PeerState::Disconnected) {
591 self.connected_peers.fetch_sub(1usize, Ordering::Relaxed);
592 }
593
594 *state = PeerState::Backoff;
595 self.pending_backoffs.push(Box::pin(async move {
596 Delay::new(DEFAULT_BACKOFF).await;
597 (peer, DISCONNECT_ADJUSTMENT)
598 }));
599 }
600
601 pub fn report_inbound_substream(&mut self, peer: PeerId) -> ValidationResult {
603 log::trace!(target: LOG_TARGET, "{}: inbound substream from {peer:?}", self.protocol);
604
605 if self.peerstore_handle.is_banned(&peer) {
606 log::debug!(
607 target: LOG_TARGET,
608 "{}: rejecting banned peer {peer:?}",
609 self.protocol,
610 );
611
612 return ValidationResult::Reject;
613 }
614
615 let state = self.peers.entry(peer).or_insert(PeerState::Disconnected);
616 let is_reserved_peer = self.reserved_peers.contains(&peer);
617
618 let should_reject = self.reserved_only && !is_reserved_peer;
620
621 match state {
622 PeerState::Disconnected if should_reject => {
624 log::trace!(
625 target: LOG_TARGET,
626 "{}: rejecting non-reserved peer {peer:?} in reserved-only mode (prev state: {state:?})",
627 self.protocol,
628 );
629
630 return ValidationResult::Reject
631 },
632 PeerState::Disconnected => {},
634 PeerState::Backoff => {
637 if !is_reserved_peer && self.num_in == self.max_in {
638 log::trace!(
639 target: LOG_TARGET,
640 "{}: ({peer:?}) is backed-off and cannot accept, reject inbound substream",
641 self.protocol,
642 );
643
644 return ValidationResult::Reject
645 }
646
647 if should_reject {
651 return ValidationResult::Reject
652 }
653 },
654
655 PeerState::Opening { direction: Direction::Outbound(reserved) } => {
669 if should_reject {
670 log::trace!(
671 target: LOG_TARGET,
672 "{}: rejecting inbound substream from {peer:?} ({reserved:?}) in reserved-only mode that was marked outbound",
673 self.protocol,
674 );
675
676 *state = PeerState::Canceled { direction: Direction::Outbound(*reserved) };
677 return ValidationResult::Reject
678 }
679
680 log::trace!(
681 target: LOG_TARGET,
682 "{}: inbound substream received for {peer:?} ({reserved:?}) that was marked outbound",
683 self.protocol,
684 );
685
686 return ValidationResult::Accept;
687 },
688 PeerState::Canceled { direction } => {
689 log::trace!(
690 target: LOG_TARGET,
691 "{}: {peer:?} is canceled, rejecting substream should_reject={should_reject}",
692 self.protocol,
693 );
694
695 *state = PeerState::Canceled { direction: *direction };
696 return ValidationResult::Reject
697 },
698 state => {
699 log::warn!(
700 target: LOG_TARGET,
701 "{}: invalid state ({state:?}) for inbound substream, peer {peer:?}",
702 self.protocol
703 );
704 debug_assert!(false);
705 return ValidationResult::Reject
706 },
707 }
708
709 if is_reserved_peer {
710 log::trace!(
711 target: LOG_TARGET,
712 "{}: {peer:?} accepting peer as reserved peer",
713 self.protocol,
714 );
715
716 *state = PeerState::Opening { direction: Direction::Inbound(is_reserved_peer.into()) };
717 return ValidationResult::Accept
718 }
719
720 if self.num_in < self.max_in {
721 log::trace!(
722 target: LOG_TARGET,
723 "{}: {peer:?} accepting peer as regular peer",
724 self.protocol,
725 );
726
727 self.num_in += 1;
728
729 *state = PeerState::Opening { direction: Direction::Inbound(is_reserved_peer.into()) };
730 return ValidationResult::Accept
731 }
732
733 log::trace!(
734 target: LOG_TARGET,
735 "{}: reject {peer:?}, not a reserved peer and no free inbound slots",
736 self.protocol,
737 );
738
739 *state = PeerState::Disconnected;
740 return ValidationResult::Reject
741 }
742
743 pub fn report_substream_open_failure(&mut self, peer: PeerId, error: NotificationError) {
745 log::trace!(
746 target: LOG_TARGET,
747 "{}: failed to open substream to {peer:?}: {error:?}",
748 self.protocol,
749 );
750
751 match self.peers.get(&peer) {
752 Some(PeerState::Opening { direction: Direction::Outbound(Reserved::No) }) => {
753 decrement_or_warn!(
754 self.num_out,
755 self.protocol,
756 peer,
757 Direction::Outbound(Reserved::No)
758 );
759 },
760 Some(PeerState::Opening { direction: Direction::Inbound(Reserved::No) }) => {
761 decrement_or_warn!(
762 self.num_in,
763 self.protocol,
764 peer,
765 Direction::Inbound(Reserved::No)
766 );
767 },
768 Some(PeerState::Canceled { direction }) => match direction {
769 Direction::Inbound(Reserved::No) => {
770 decrement_or_warn!(
771 self.num_in,
772 self.protocol,
773 peer,
774 Direction::Inbound(Reserved::No)
775 );
776 },
777 Direction::Outbound(Reserved::No) => {
778 decrement_or_warn!(
779 self.num_out,
780 self.protocol,
781 peer,
782 Direction::Outbound(Reserved::No)
783 );
784 },
785 _ => {},
786 },
787 Some(PeerState::Opening { direction: Direction::Inbound(Reserved::Yes) }) |
789 Some(PeerState::Opening { direction: Direction::Outbound(Reserved::Yes) }) => {
790 log::debug!(
791 target: LOG_TARGET,
792 "{}: substream open failure for reserved peer {peer:?}",
793 self.protocol,
794 );
795 },
796 state => {
797 log::debug!(
798 target: LOG_TARGET,
799 "{}: substream open failure for a unknown state: {state:?}",
800 self.protocol,
801 );
802
803 return;
804 },
805 }
806
807 self.peers.insert(peer, PeerState::Backoff);
808 self.pending_backoffs.push(Box::pin(async move {
809 Delay::new(OPEN_FAILURE_BACKOFF).await;
810 (peer, OPEN_FAILURE_ADJUSTMENT)
811 }));
812 }
813
814 pub fn report_substream_rejected(&mut self, peer: PeerId) {
816 log::trace!(target: LOG_TARGET, "{}: {peer:?} rejected by the protocol", self.protocol);
817
818 match self.peers.remove(&peer) {
819 Some(PeerState::Opening { direction }) => match direction {
820 Direction::Inbound(Reserved::Yes) | Direction::Outbound(Reserved::Yes) => {
821 log::warn!(
822 target: LOG_TARGET,
823 "{}: reserved peer {peer:?} rejected by the protocol",
824 self.protocol,
825 );
826 self.peers.insert(peer, PeerState::Disconnected);
827 },
828 Direction::Inbound(Reserved::No) => {
829 decrement_or_warn!(
830 self.num_in,
831 peer,
832 self.protocol,
833 Direction::Inbound(Reserved::No)
834 );
835 self.peers.insert(peer, PeerState::Disconnected);
836 },
837 Direction::Outbound(Reserved::No) => {
838 decrement_or_warn!(
839 self.num_out,
840 peer,
841 self.protocol,
842 Direction::Outbound(Reserved::No)
843 );
844 self.peers.insert(peer, PeerState::Disconnected);
845 },
846 },
847 Some(state @ PeerState::Canceled { .. }) => {
848 log::debug!(
849 target: LOG_TARGET,
850 "{}: substream to {peer:?} rejected by protocol but already canceled",
851 self.protocol,
852 );
853
854 self.peers.insert(peer, state);
855 },
856 Some(state) => {
857 log::debug!(
858 target: LOG_TARGET,
859 "{}: {peer:?} rejected by the protocol but not opening anymore: {state:?}",
860 self.protocol,
861 );
862
863 self.peers.insert(peer, state);
864 },
865 None => {},
866 }
867 }
868
869 fn calculate_slot_adjustment<'a>(
874 &'a mut self,
875 peers: impl Iterator<Item = &'a PeerId>,
876 ) -> (usize, usize) {
877 peers.fold((0, 0), |(mut inbound, mut outbound), peer| {
878 match self.peers.get_mut(peer) {
879 Some(PeerState::Disconnected | PeerState::Backoff) => {},
880 Some(
881 PeerState::Opening { ref mut direction } |
882 PeerState::Connected { ref mut direction } |
883 PeerState::Canceled { ref mut direction } |
884 PeerState::Closing { ref mut direction },
885 ) => {
886 *direction = match direction {
887 Direction::Inbound(Reserved::No) => {
888 inbound += 1;
889 Direction::Inbound(Reserved::Yes)
890 },
891 Direction::Outbound(Reserved::No) => {
892 outbound += 1;
893 Direction::Outbound(Reserved::Yes)
894 },
895 ref direction => **direction,
896 };
897 },
898 None => {
899 self.peers.insert(*peer, PeerState::Disconnected);
900 },
901 }
902
903 (inbound, outbound)
904 })
905 }
906
907 fn should_disconnect(&self, direction: Direction) -> bool {
912 match direction {
913 Direction::Inbound(_) => self.num_in >= self.max_in,
914 Direction::Outbound(_) => self.num_out >= self.max_out,
915 }
916 }
917
918 fn increment_slot(&mut self, direction: Direction) {
920 match direction {
921 Direction::Inbound(Reserved::No) => self.num_in += 1,
922 Direction::Outbound(Reserved::No) => self.num_out += 1,
923 _ => {},
924 }
925 }
926
927 #[cfg(test)]
929 pub fn num_in(&self) -> usize {
930 self.num_in
931 }
932
933 #[cfg(test)]
935 pub fn num_out(&self) -> usize {
936 self.num_out
937 }
938
939 #[cfg(test)]
941 pub fn peers(&self) -> &HashMap<PeerId, PeerState> {
942 &self.peers
943 }
944
945 #[cfg(test)]
947 pub fn peers_mut(&mut self) -> &mut HashMap<PeerId, PeerState> {
948 &mut self.peers
949 }
950
951 #[cfg(test)]
953 pub fn reserved_peers(&self) -> &HashSet<PeerId> {
954 &self.reserved_peers
955 }
956}
957
958impl Stream for Peerset {
959 type Item = PeersetNotificationCommand;
960
961 fn poll_next(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
962 while let Poll::Ready(Some((peer, reputation))) = self.pending_backoffs.poll_next_unpin(cx)
963 {
964 log::trace!(target: LOG_TARGET, "{}: backoff expired for {peer:?}", self.protocol);
965
966 if std::matches!(self.peers.get(&peer), None | Some(PeerState::Backoff)) {
967 self.peers.insert(peer, PeerState::Disconnected);
968 }
969
970 self.peerstore_handle.report_peer(peer, reputation);
971 }
972
973 if let Poll::Ready(Some(action)) = Pin::new(&mut self.cmd_rx).poll_next(cx) {
974 log::trace!(target: LOG_TARGET, "{}: received command {action:?}", self.protocol);
975
976 match action {
977 PeersetCommand::DisconnectPeer { peer } if !self.reserved_peers.contains(&peer) =>
978 match self.peers.remove(&peer) {
979 Some(PeerState::Connected { direction }) => {
980 log::trace!(
981 target: LOG_TARGET,
982 "{}: close connection to {peer:?}, direction {direction:?}",
983 self.protocol,
984 );
985
986 self.peers.insert(peer, PeerState::Closing { direction });
987 return Poll::Ready(Some(PeersetNotificationCommand::CloseSubstream {
988 peers: vec![peer],
989 }))
990 },
991 Some(PeerState::Backoff) => {
992 log::trace!(
993 target: LOG_TARGET,
994 "{}: cannot disconnect {peer:?}, already backed-off",
995 self.protocol,
996 );
997
998 self.peers.insert(peer, PeerState::Backoff);
999 },
1000 Some(PeerState::Opening { direction }) => {
1006 log::trace!(
1007 target: LOG_TARGET,
1008 "{}: canceling substream to disconnect peer {peer:?}",
1009 self.protocol,
1010 );
1011
1012 self.peers.insert(peer, PeerState::Canceled { direction });
1013 },
1014 Some(state @ PeerState::Closing { .. }) => {
1018 log::trace!(
1019 target: LOG_TARGET,
1020 "{}: cannot disconnect {peer:?}, already closing ({state:?})",
1021 self.protocol,
1022 );
1023
1024 self.peers.insert(peer, state);
1025 },
1026 Some(state @ PeerState::Disconnected) => {
1032 self.peers.insert(peer, state);
1033 },
1034 Some(state @ PeerState::Canceled { .. }) => {
1037 log::debug!(
1038 target: LOG_TARGET,
1039 "{}: cannot disconnect {peer:?}, already canceled ({state:?})",
1040 self.protocol,
1041 );
1042
1043 self.peers.insert(peer, state);
1044 },
1045 None => {
1053 log::debug!(target: LOG_TARGET, "{}: {peer:?} doesn't exist", self.protocol);
1054 },
1055 },
1056 PeersetCommand::DisconnectPeer { peer } => {
1057 log::debug!(
1058 target: LOG_TARGET,
1059 "{}: ignoring disconnection request for reserved peer {peer}",
1060 self.protocol,
1061 );
1062 },
1063 PeersetCommand::SetReservedPeers { peers } => {
1069 log::debug!(target: LOG_TARGET, "{}: set reserved peers {peers:?}", self.protocol);
1070
1071 let (in_peers, out_peers) = self.calculate_slot_adjustment(peers.iter());
1082 self.num_out -= out_peers;
1083 self.num_in -= in_peers;
1084
1085 let reserved_peers_maybe_remove =
1087 self.reserved_peers.difference(&peers).cloned().collect::<Vec<_>>();
1088
1089 self.reserved_peers = peers;
1090
1091 let peers_to_remove = reserved_peers_maybe_remove
1092 .into_iter()
1093 .filter(|peer| {
1094 match self.peers.remove(&peer) {
1095 Some(PeerState::Connected { mut direction }) => {
1096 let disconnect =
1102 self.reserved_only || self.should_disconnect(direction);
1103
1104 if disconnect {
1105 log::trace!(
1106 target: LOG_TARGET,
1107 "{}: close connection to previously reserved {peer:?}, direction {direction:?}",
1108 self.protocol,
1109 );
1110
1111 self.peers.insert(*peer, PeerState::Closing { direction });
1112 true
1113 } else {
1114 log::trace!(
1115 target: LOG_TARGET,
1116 "{}: {peer:?} is no longer reserved, move to regular peers, direction {direction:?}",
1117 self.protocol,
1118 );
1119
1120 direction.set_reserved(Reserved::No);
1123 self.increment_slot(direction);
1124
1125 self.peers
1126 .insert(*peer, PeerState::Connected { direction });
1127 false
1128 }
1129 },
1130 Some(PeerState::Opening { direction }) => {
1133 log::trace!(
1134 target: LOG_TARGET,
1135 "{}: cancel substream to {peer:?}, direction {direction:?}",
1136 self.protocol,
1137 );
1138
1139 self.peers.insert(*peer, PeerState::Canceled { direction });
1140 false
1141 },
1142 Some(state) => {
1143 self.peers.insert(*peer, state);
1144 false
1145 },
1146 None => {
1147 log::debug!(target: LOG_TARGET, "{}: {peer:?} doesn't exist", self.protocol);
1148 debug_assert!(false);
1149 false
1150 },
1151 }
1152 })
1153 .collect();
1154
1155 log::trace!(
1156 target: LOG_TARGET,
1157 "{}: close substreams to {peers_to_remove:?}",
1158 self.protocol,
1159 );
1160
1161 return Poll::Ready(Some(PeersetNotificationCommand::CloseSubstream {
1162 peers: peers_to_remove,
1163 }))
1164 },
1165 PeersetCommand::AddReservedPeers { peers } => {
1166 log::debug!(target: LOG_TARGET, "{}: add reserved peers {peers:?}", self.protocol);
1167
1168 let (in_peers, out_peers) = self.calculate_slot_adjustment(peers.iter());
1175 self.num_out -= out_peers;
1176 self.num_in -= in_peers;
1177
1178 let peers = peers
1179 .iter()
1180 .filter_map(|peer| {
1181 if !self.reserved_peers.insert(*peer) {
1182 log::warn!(
1183 target: LOG_TARGET,
1184 "{}: {peer:?} is already a reserved peer",
1185 self.protocol,
1186 );
1187 return None
1188 }
1189
1190 std::matches!(
1191 self.peers.get_mut(peer),
1192 None | Some(PeerState::Disconnected)
1193 )
1194 .then(|| {
1195 self.peers.insert(
1196 *peer,
1197 PeerState::Opening {
1198 direction: Direction::Outbound(Reserved::Yes),
1199 },
1200 );
1201 *peer
1202 })
1203 })
1204 .collect();
1205
1206 log::debug!(target: LOG_TARGET, "{}: start connecting to {peers:?}", self.protocol);
1207
1208 return Poll::Ready(Some(PeersetNotificationCommand::OpenSubstream { peers }))
1209 },
1210 PeersetCommand::RemoveReservedPeers { peers } => {
1211 log::debug!(target: LOG_TARGET, "{}: remove reserved peers {peers:?}", self.protocol);
1212
1213 let peers_to_remove = peers
1214 .iter()
1215 .filter_map(|peer| {
1216 if !self.reserved_peers.remove(peer) {
1217 log::debug!(
1218 target: LOG_TARGET,
1219 "{}: {peer} is not a reserved peer",
1220 self.protocol,
1221 );
1222 return None
1223 }
1224
1225 match self.peers.remove(peer)? {
1226 PeerState::Backoff => {
1231 log::trace!(
1232 target: LOG_TARGET,
1233 "{}: cannot disconnect removed reserved peer {peer:?}, already backed-off",
1234 self.protocol,
1235 );
1236
1237 self.peers.insert(*peer, PeerState::Backoff);
1238 None
1239 },
1240
1241 PeerState::Canceled { direction } => {
1252 log::trace!(
1253 target: LOG_TARGET,
1254 "{}: cannot disconnect removed reserved peer {peer:?}, already canceled",
1255 self.protocol,
1256 );
1257
1258 self.peers.insert(*peer, PeerState::Canceled { direction });
1259 None
1260 },
1261
1262 PeerState::Disconnected => {
1269 log::trace!(
1270 target: LOG_TARGET,
1271 "{}: cannot disconnect removed reserved peer {peer:?}, already disconnected",
1272 self.protocol,
1273 );
1274
1275 self.peers.insert(*peer, PeerState::Disconnected);
1276 None
1277 },
1278
1279 PeerState::Closing { direction } => {
1291 log::trace!(
1292 target: LOG_TARGET,
1293 "{}: cannot disconnect removed reserved peer {peer:?}, already closing",
1294 self.protocol,
1295 );
1296
1297 self.peers.insert(*peer, PeerState::Closing { direction });
1298 None
1299 },
1300 PeerState::Connected { mut direction } => {
1309 let disconnect = self.should_disconnect(direction);
1310
1311 if disconnect {
1312 log::trace!(
1313 target: LOG_TARGET,
1314 "{}: close connection to removed reserved {peer:?}, direction {direction:?}",
1315 self.protocol,
1316 );
1317
1318 self.peers.insert(*peer, PeerState::Closing { direction });
1319 Some(*peer)
1320 } else {
1321 log::trace!(
1322 target: LOG_TARGET,
1323 "{}: {peer:?} converted to regular peer {peer:?} direction {direction:?}",
1324 self.protocol,
1325 );
1326
1327 direction.set_reserved(Reserved::No);
1330 self.increment_slot(direction);
1331
1332 self.peers
1333 .insert(*peer, PeerState::Connected { direction });
1334
1335 None
1336 }
1337 },
1338
1339 PeerState::Opening { mut direction } => {
1340 let disconnect = self.should_disconnect(direction);
1341
1342 if disconnect {
1343 log::trace!(
1344 target: LOG_TARGET,
1345 "{}: cancel substream to disconnect removed reserved peer {peer:?}, direction {direction:?}",
1346 self.protocol,
1347 );
1348
1349 self.peers.insert(
1350 *peer,
1351 PeerState::Canceled {
1352 direction
1353 },
1354 );
1355 } else {
1356 log::trace!(
1357 target: LOG_TARGET,
1358 "{}: {peer:?} converted to regular peer {peer:?} direction {direction:?}",
1359 self.protocol,
1360 );
1361
1362 direction.set_reserved(Reserved::No);
1365 self.increment_slot(direction);
1366
1367 self.peers
1368 .insert(*peer, PeerState::Opening { direction });
1369 }
1370
1371 None
1372 },
1373 }
1374 })
1375 .collect();
1376
1377 log::debug!(
1378 target: LOG_TARGET,
1379 "{}: close substreams to {peers_to_remove:?}",
1380 self.protocol,
1381 );
1382
1383 return Poll::Ready(Some(PeersetNotificationCommand::CloseSubstream {
1384 peers: peers_to_remove,
1385 }))
1386 },
1387 PeersetCommand::SetReservedOnly { reserved_only } => {
1388 log::debug!(target: LOG_TARGET, "{}: set reserved only mode to {reserved_only}", self.protocol);
1389
1390 self.reserved_only = reserved_only;
1392
1393 if reserved_only {
1394 let peers_to_remove = self
1395 .peers
1396 .iter()
1397 .filter_map(|(peer, state)| {
1398 (!self.reserved_peers.contains(peer) &&
1399 std::matches!(state, PeerState::Connected { .. }))
1400 .then_some(*peer)
1401 })
1402 .collect::<Vec<_>>();
1403
1404 self.peers.iter_mut().for_each(|(_, state)| match state {
1410 PeerState::Connected { direction } => {
1411 *state = PeerState::Closing { direction: *direction };
1412 },
1413 PeerState::Opening { direction } => {
1416 *state = PeerState::Canceled { direction: *direction };
1417 },
1418 _ => {},
1419 });
1420
1421 return Poll::Ready(Some(PeersetNotificationCommand::CloseSubstream {
1422 peers: peers_to_remove,
1423 }))
1424 }
1425 },
1426 PeersetCommand::GetReservedPeers { tx } => {
1427 let _ = tx.send(self.reserved_peers.iter().cloned().collect());
1428 },
1429 }
1430 }
1431
1432 if let Poll::Ready(()) = Pin::new(&mut self.next_slot_allocation).poll(cx) {
1438 let mut connect_to = self
1439 .peers
1440 .iter()
1441 .filter_map(|(peer, state)| {
1442 (self.reserved_peers.contains(peer) &&
1443 std::matches!(state, PeerState::Disconnected) &&
1444 !self.peerstore_handle.is_banned(peer))
1445 .then_some(*peer)
1446 })
1447 .collect::<Vec<_>>();
1448
1449 connect_to.iter().for_each(|peer| {
1450 self.peers.insert(
1451 *peer,
1452 PeerState::Opening { direction: Direction::Outbound(Reserved::Yes) },
1453 );
1454 });
1455
1456 if self.num_out < self.max_out && !self.reserved_only {
1459 let ignore: HashSet<PeerId> = self
1464 .peers
1465 .iter()
1466 .filter_map(|(peer, state)| {
1467 (!std::matches!(state, PeerState::Disconnected)).then_some(*peer)
1468 })
1469 .chain(self.reserved_peers.iter().cloned())
1470 .collect();
1471
1472 let peers: Vec<_> =
1473 self.peerstore_handle.outgoing_candidates(self.max_out - self.num_out, ignore);
1474
1475 if peers.len() > 0 {
1476 peers.iter().for_each(|peer| {
1477 self.peers.insert(
1478 *peer,
1479 PeerState::Opening { direction: Direction::Outbound(Reserved::No) },
1480 );
1481 });
1482
1483 self.num_out += peers.len();
1484 connect_to.extend(peers);
1485 }
1486 }
1487
1488 self.next_slot_allocation = Delay::new(SLOT_ALLOCATION_FREQUENCY);
1491
1492 if !connect_to.is_empty() {
1493 log::trace!(
1494 target: LOG_TARGET,
1495 "{}: start connecting to peers {connect_to:?}",
1496 self.protocol,
1497 );
1498
1499 return Poll::Ready(Some(PeersetNotificationCommand::OpenSubstream {
1500 peers: connect_to,
1501 }))
1502 }
1503 }
1504
1505 Poll::Pending
1506 }
1507}