use crate::{
protocol::notifications::{
handler::{self, NotificationsSink, NotifsHandler, NotifsHandlerIn, NotifsHandlerOut},
service::{NotificationCommand, ProtocolHandle, ValidationCallResult},
},
protocol_controller::{self, IncomingIndex, Message, SetId},
service::{
metrics::NotificationMetrics,
traits::{Direction, ValidationResult},
},
types::ProtocolName,
};
use bytes::BytesMut;
use fnv::FnvHashMap;
use futures::{future::BoxFuture, prelude::*, stream::FuturesUnordered};
use libp2p::{
core::{transport::PortUse, Endpoint, Multiaddr},
swarm::{
behaviour::{ConnectionClosed, ConnectionEstablished, DialFailure, FromSwarm},
ConnectionDenied, ConnectionId, DialError, NetworkBehaviour, NotifyHandler, THandler,
THandlerInEvent, THandlerOutEvent, ToSwarm,
},
PeerId,
};
use log::{debug, error, trace, warn};
use parking_lot::RwLock;
use rand::distributions::{Distribution as _, Uniform};
use sc_utils::mpsc::TracingUnboundedReceiver;
use smallvec::SmallVec;
use tokio::sync::oneshot::error::RecvError;
use tokio_stream::StreamMap;
use libp2p::swarm::CloseConnection;
use std::{
cmp,
collections::{hash_map::Entry, VecDeque},
mem,
pin::Pin,
sync::Arc,
task::{Context, Poll},
time::{Duration, Instant},
};
type PendingInboundValidation =
BoxFuture<'static, (Result<ValidationResult, RecvError>, IncomingIndex)>;
const LOG_TARGET: &str = "sub-libp2p";
pub struct Notifications {
notif_protocols: Vec<handler::ProtocolConfig>,
protocol_handles: Vec<ProtocolHandle>,
command_streams: StreamMap<usize, Box<dyn Stream<Item = NotificationCommand> + Send + Unpin>>,
protocol_controller_handles: Vec<protocol_controller::ProtocolHandle>,
from_protocol_controllers: TracingUnboundedReceiver<Message>,
peers: FnvHashMap<(PeerId, SetId), PeerState>,
delays:
stream::FuturesUnordered<Pin<Box<dyn Future<Output = (DelayId, PeerId, SetId)> + Send>>>,
next_delay_id: DelayId,
incoming: SmallVec<[IncomingPeer; 6]>,
next_incoming_index: IncomingIndex,
events: VecDeque<ToSwarm<NotificationsOut, NotifsHandlerIn>>,
pending_inbound_validations: FuturesUnordered<PendingInboundValidation>,
metrics: NotificationMetrics,
}
#[derive(Debug, Clone)]
pub struct ProtocolConfig {
pub name: ProtocolName,
pub fallback_names: Vec<ProtocolName>,
pub handshake: Vec<u8>,
pub max_notification_size: u64,
}
#[derive(Debug, Copy, Clone, PartialEq, Eq)]
struct DelayId(u64);
#[derive(Debug)]
enum PeerState {
Poisoned,
Backoff {
timer: DelayId,
timer_deadline: Instant,
},
PendingRequest {
timer: DelayId,
timer_deadline: Instant,
},
Requested,
Disabled {
backoff_until: Option<Instant>,
connections: SmallVec<[(ConnectionId, ConnectionState); crate::MAX_CONNECTIONS_PER_PEER]>,
},
DisabledPendingEnable {
timer: DelayId,
timer_deadline: Instant,
connections: SmallVec<[(ConnectionId, ConnectionState); crate::MAX_CONNECTIONS_PER_PEER]>,
},
Enabled {
connections: SmallVec<[(ConnectionId, ConnectionState); crate::MAX_CONNECTIONS_PER_PEER]>,
},
Incoming {
backoff_until: Option<Instant>,
incoming_index: IncomingIndex,
peerset_rejected: bool,
connections: SmallVec<[(ConnectionId, ConnectionState); crate::MAX_CONNECTIONS_PER_PEER]>,
},
}
impl PeerState {
fn is_open(&self) -> bool {
self.get_open().is_some()
}
fn get_open(&self) -> Option<&NotificationsSink> {
match self {
Self::Enabled { connections, .. } => connections.iter().find_map(|(_, s)| match s {
ConnectionState::Open(s) => Some(s),
_ => None,
}),
_ => None,
}
}
}
#[derive(Debug)]
enum ConnectionState {
Closed,
Closing,
Opening,
OpeningThenClosing,
OpenDesiredByRemote,
Open(NotificationsSink),
}
#[derive(Debug)]
struct IncomingPeer {
peer_id: PeerId,
set_id: SetId,
alive: bool,
incoming_id: IncomingIndex,
handshake: Vec<u8>,
}
#[derive(Debug)]
pub enum NotificationsOut {
CustomProtocolOpen {
peer_id: PeerId,
set_id: SetId,
direction: Direction,
negotiated_fallback: Option<ProtocolName>,
received_handshake: Vec<u8>,
notifications_sink: NotificationsSink,
},
CustomProtocolReplaced {
peer_id: PeerId,
set_id: SetId,
notifications_sink: NotificationsSink,
},
CustomProtocolClosed {
peer_id: PeerId,
set_id: SetId,
},
Notification {
peer_id: PeerId,
set_id: SetId,
message: BytesMut,
},
}
impl Notifications {
pub(crate) fn new(
protocol_controller_handles: Vec<protocol_controller::ProtocolHandle>,
from_protocol_controllers: TracingUnboundedReceiver<Message>,
metrics: NotificationMetrics,
notif_protocols: impl Iterator<
Item = (
ProtocolConfig,
ProtocolHandle,
Box<dyn Stream<Item = NotificationCommand> + Send + Unpin>,
),
>,
) -> Self {
let (notif_protocols, protocol_handles): (Vec<_>, Vec<_>) = notif_protocols
.map(|(cfg, protocol_handle, command_stream)| {
(
handler::ProtocolConfig {
name: cfg.name,
fallback_names: cfg.fallback_names,
handshake: Arc::new(RwLock::new(cfg.handshake)),
max_notification_size: cfg.max_notification_size,
},
(protocol_handle, command_stream),
)
})
.unzip();
assert!(!notif_protocols.is_empty());
let (mut protocol_handles, command_streams): (Vec<_>, Vec<_>) = protocol_handles
.into_iter()
.enumerate()
.map(|(set_id, (mut protocol_handle, command_stream))| {
protocol_handle.set_metrics(metrics.clone());
(protocol_handle, (set_id, command_stream))
})
.unzip();
protocol_handles.iter_mut().skip(1).for_each(|handle| {
handle.delegate_to_peerset(true);
});
Self {
notif_protocols,
protocol_handles,
command_streams: StreamMap::from_iter(command_streams.into_iter()),
protocol_controller_handles,
from_protocol_controllers,
peers: FnvHashMap::default(),
delays: Default::default(),
next_delay_id: DelayId(0),
incoming: SmallVec::new(),
next_incoming_index: IncomingIndex(0),
events: VecDeque::new(),
pending_inbound_validations: FuturesUnordered::new(),
metrics,
}
}
pub fn set_notif_protocol_handshake(
&mut self,
set_id: SetId,
handshake_message: impl Into<Vec<u8>>,
) {
if let Some(p) = self.notif_protocols.get_mut(usize::from(set_id)) {
*p.handshake.write() = handshake_message.into();
} else {
log::error!(target: "sub-libp2p", "Unknown handshake change set: {:?}", set_id);
debug_assert!(false);
}
}
pub fn open_peers(&self) -> impl Iterator<Item = &PeerId> {
self.peers.iter().filter(|(_, state)| state.is_open()).map(|((id, _), _)| id)
}
pub fn is_open(&self, peer_id: &PeerId, set_id: SetId) -> bool {
self.peers.get(&(*peer_id, set_id)).map(|p| p.is_open()).unwrap_or(false)
}
pub fn disconnect_peer(&mut self, peer_id: &PeerId, set_id: SetId) {
trace!(target: "sub-libp2p", "External API => Disconnect({}, {:?})", peer_id, set_id);
self.disconnect_peer_inner(peer_id, set_id);
}
fn disconnect_peer_inner(&mut self, peer_id: &PeerId, set_id: SetId) {
let mut entry = if let Entry::Occupied(entry) = self.peers.entry((*peer_id, set_id)) {
entry
} else {
return
};
match mem::replace(entry.get_mut(), PeerState::Poisoned) {
st @ PeerState::Disabled { .. } => *entry.into_mut() = st,
st @ PeerState::Requested => *entry.into_mut() = st,
st @ PeerState::PendingRequest { .. } => *entry.into_mut() = st,
st @ PeerState::Backoff { .. } => *entry.into_mut() = st,
PeerState::DisabledPendingEnable { connections, timer_deadline, timer: _ } => {
trace!(target: "sub-libp2p", "PSM <= Dropped({}, {:?})", peer_id, set_id);
self.protocol_controller_handles[usize::from(set_id)].dropped(*peer_id);
*entry.into_mut() =
PeerState::Disabled { connections, backoff_until: Some(timer_deadline) }
},
PeerState::Enabled { mut connections } => {
trace!(target: "sub-libp2p", "PSM <= Dropped({}, {:?})", peer_id, set_id);
self.protocol_controller_handles[usize::from(set_id)].dropped(*peer_id);
if connections.iter().any(|(_, s)| matches!(s, ConnectionState::Open(_))) {
trace!(target: "sub-libp2p", "External API <= Closed({}, {:?})", peer_id, set_id);
let event =
NotificationsOut::CustomProtocolClosed { peer_id: *peer_id, set_id };
self.events.push_back(ToSwarm::GenerateEvent(event));
}
for (connec_id, connec_state) in
connections.iter_mut().filter(|(_, s)| matches!(s, ConnectionState::Open(_)))
{
trace!(target: "sub-libp2p", "Handler({:?}, {:?}) <= Close({:?})", peer_id, *connec_id, set_id);
self.events.push_back(ToSwarm::NotifyHandler {
peer_id: *peer_id,
handler: NotifyHandler::One(*connec_id),
event: NotifsHandlerIn::Close { protocol_index: set_id.into() },
});
*connec_state = ConnectionState::Closing;
}
for (connec_id, connec_state) in
connections.iter_mut().filter(|(_, s)| matches!(s, ConnectionState::Opening))
{
trace!(target: "sub-libp2p", "Handler({:?}, {:?}) <= Close({:?})", peer_id, *connec_id, set_id);
self.events.push_back(ToSwarm::NotifyHandler {
peer_id: *peer_id,
handler: NotifyHandler::One(*connec_id),
event: NotifsHandlerIn::Close { protocol_index: set_id.into() },
});
*connec_state = ConnectionState::OpeningThenClosing;
}
debug_assert!(!connections
.iter()
.any(|(_, s)| matches!(s, ConnectionState::Open(_))));
debug_assert!(!connections
.iter()
.any(|(_, s)| matches!(s, ConnectionState::Opening)));
*entry.into_mut() = PeerState::Disabled { connections, backoff_until: None }
},
PeerState::Incoming { mut connections, backoff_until, .. } => {
let inc = if let Some(inc) = self
.incoming
.iter_mut()
.find(|i| i.peer_id == entry.key().0 && i.set_id == set_id && i.alive)
{
inc
} else {
error!(
target: "sub-libp2p",
"State mismatch in libp2p: no entry in incoming for incoming peer"
);
return
};
inc.alive = false;
for (connec_id, connec_state) in connections
.iter_mut()
.filter(|(_, s)| matches!(s, ConnectionState::OpenDesiredByRemote))
{
trace!(target: "sub-libp2p", "Handler({:?}, {:?}) <= Close({:?})", peer_id, *connec_id, set_id);
self.events.push_back(ToSwarm::NotifyHandler {
peer_id: *peer_id,
handler: NotifyHandler::One(*connec_id),
event: NotifsHandlerIn::Close { protocol_index: set_id.into() },
});
*connec_state = ConnectionState::Closing;
}
debug_assert!(!connections
.iter()
.any(|(_, s)| matches!(s, ConnectionState::OpenDesiredByRemote)));
*entry.into_mut() = PeerState::Disabled { connections, backoff_until }
},
PeerState::Poisoned => {
error!(target: "sub-libp2p", "State of {:?} is poisoned", peer_id)
},
}
}
fn peerset_report_connect(&mut self, peer_id: PeerId, set_id: SetId) {
let mut occ_entry = match self.peers.entry((peer_id, set_id)) {
Entry::Occupied(entry) => entry,
Entry::Vacant(entry) => {
trace!(
target: "sub-libp2p",
"PSM => Connect({}, {:?}): Starting to connect",
entry.key().0,
set_id,
);
trace!(target: "sub-libp2p", "Libp2p <= Dial {}", entry.key().0);
self.events.push_back(ToSwarm::Dial { opts: entry.key().0.into() });
entry.insert(PeerState::Requested);
return
},
};
let now = Instant::now();
match mem::replace(occ_entry.get_mut(), PeerState::Poisoned) {
PeerState::Backoff { ref timer, ref timer_deadline } if *timer_deadline > now => {
let peer_id = occ_entry.key().0;
trace!(
target: "sub-libp2p",
"PSM => Connect({}, {:?}): Will start to connect at until {:?}",
peer_id,
set_id,
timer_deadline,
);
*occ_entry.into_mut() =
PeerState::PendingRequest { timer: *timer, timer_deadline: *timer_deadline };
},
PeerState::Backoff { .. } => {
trace!(
target: "sub-libp2p",
"PSM => Connect({}, {:?}): Starting to connect",
occ_entry.key().0,
set_id,
);
trace!(target: "sub-libp2p", "Libp2p <= Dial {:?}", occ_entry.key());
self.events.push_back(ToSwarm::Dial { opts: occ_entry.key().0.into() });
*occ_entry.into_mut() = PeerState::Requested;
},
PeerState::Disabled { connections, backoff_until: Some(ref backoff) }
if *backoff > now =>
{
let peer_id = occ_entry.key().0;
trace!(
target: "sub-libp2p",
"PSM => Connect({}, {:?}): But peer is backed-off until {:?}",
peer_id,
set_id,
backoff,
);
let delay_id = self.next_delay_id;
self.next_delay_id.0 += 1;
let delay = futures_timer::Delay::new(*backoff - now);
self.delays.push(
async move {
delay.await;
(delay_id, peer_id, set_id)
}
.boxed(),
);
*occ_entry.into_mut() = PeerState::DisabledPendingEnable {
connections,
timer: delay_id,
timer_deadline: *backoff,
};
},
PeerState::Disabled { mut connections, backoff_until } => {
debug_assert!(!connections
.iter()
.any(|(_, s)| { matches!(s, ConnectionState::Open(_)) }));
if let Some((connec_id, connec_state)) =
connections.iter_mut().find(|(_, s)| matches!(s, ConnectionState::Closed))
{
trace!(target: "sub-libp2p", "PSM => Connect({}, {:?}): Enabling connections.",
occ_entry.key().0, set_id);
trace!(target: "sub-libp2p", "Handler({:?}, {:?}) <= Open({:?})", peer_id, *connec_id, set_id);
self.events.push_back(ToSwarm::NotifyHandler {
peer_id,
handler: NotifyHandler::One(*connec_id),
event: NotifsHandlerIn::Open { protocol_index: set_id.into() },
});
*connec_state = ConnectionState::Opening;
*occ_entry.into_mut() = PeerState::Enabled { connections };
} else {
debug_assert!(connections.iter().any(|(_, s)| {
matches!(s, ConnectionState::OpeningThenClosing | ConnectionState::Closing)
}));
trace!(
target: "sub-libp2p",
"PSM => Connect({}, {:?}): No connection in proper state. Delaying.",
occ_entry.key().0, set_id
);
let timer_deadline = {
let base = now + Duration::from_secs(5);
if let Some(backoff_until) = backoff_until {
cmp::max(base, backoff_until)
} else {
base
}
};
let delay_id = self.next_delay_id;
self.next_delay_id.0 += 1;
debug_assert!(timer_deadline > now);
let delay = futures_timer::Delay::new(timer_deadline - now);
self.delays.push(
async move {
delay.await;
(delay_id, peer_id, set_id)
}
.boxed(),
);
*occ_entry.into_mut() = PeerState::DisabledPendingEnable {
connections,
timer: delay_id,
timer_deadline,
};
}
},
st @ PeerState::Incoming { .. } => {
debug!(
target: "sub-libp2p",
"PSM => Connect({}, {:?}): Ignoring obsolete connect, we are awaiting accept/reject.",
occ_entry.key().0, set_id
);
*occ_entry.into_mut() = st;
},
st @ PeerState::Enabled { .. } => {
debug!(target: "sub-libp2p",
"PSM => Connect({}, {:?}): Already connected.",
occ_entry.key().0, set_id);
*occ_entry.into_mut() = st;
},
st @ PeerState::DisabledPendingEnable { .. } => {
debug!(target: "sub-libp2p",
"PSM => Connect({}, {:?}): Already pending enabling.",
occ_entry.key().0, set_id);
*occ_entry.into_mut() = st;
},
st @ PeerState::Requested { .. } | st @ PeerState::PendingRequest { .. } => {
debug!(target: "sub-libp2p",
"PSM => Connect({}, {:?}): Duplicate request.",
occ_entry.key().0, set_id);
*occ_entry.into_mut() = st;
},
PeerState::Poisoned => {
error!(target: "sub-libp2p", "State of {:?} is poisoned", occ_entry.key());
debug_assert!(false);
},
}
}
fn peerset_report_disconnect(&mut self, peer_id: PeerId, set_id: SetId) {
let mut entry = match self.peers.entry((peer_id, set_id)) {
Entry::Occupied(entry) => entry,
Entry::Vacant(entry) => {
trace!(target: "sub-libp2p", "PSM => Drop({}, {:?}): Already disabled.",
entry.key().0, set_id);
return
},
};
match mem::replace(entry.get_mut(), PeerState::Poisoned) {
st @ PeerState::Disabled { .. } | st @ PeerState::Backoff { .. } => {
trace!(target: "sub-libp2p", "PSM => Drop({}, {:?}): Already disabled.",
entry.key().0, set_id);
*entry.into_mut() = st;
},
PeerState::DisabledPendingEnable { connections, timer_deadline, timer: _ } => {
debug_assert!(!connections.is_empty());
trace!(target: "sub-libp2p",
"PSM => Drop({}, {:?}): Interrupting pending enabling.",
entry.key().0, set_id);
*entry.into_mut() =
PeerState::Disabled { connections, backoff_until: Some(timer_deadline) };
},
PeerState::Enabled { mut connections } => {
trace!(target: "sub-libp2p", "PSM => Drop({}, {:?}): Disabling connections.",
entry.key().0, set_id);
debug_assert!(connections.iter().any(|(_, s)| matches!(
s,
ConnectionState::Opening | ConnectionState::Open(_)
)));
if connections.iter().any(|(_, s)| matches!(s, ConnectionState::Open(_))) {
trace!(target: "sub-libp2p", "External API <= Closed({}, {:?})", entry.key().0, set_id);
let event =
NotificationsOut::CustomProtocolClosed { peer_id: entry.key().0, set_id };
self.events.push_back(ToSwarm::GenerateEvent(event));
}
for (connec_id, connec_state) in
connections.iter_mut().filter(|(_, s)| matches!(s, ConnectionState::Opening))
{
trace!(target: "sub-libp2p", "Handler({:?}, {:?}) <= Close({:?})",
entry.key(), *connec_id, set_id);
self.events.push_back(ToSwarm::NotifyHandler {
peer_id: entry.key().0,
handler: NotifyHandler::One(*connec_id),
event: NotifsHandlerIn::Close { protocol_index: set_id.into() },
});
*connec_state = ConnectionState::OpeningThenClosing;
}
for (connec_id, connec_state) in
connections.iter_mut().filter(|(_, s)| matches!(s, ConnectionState::Open(_)))
{
trace!(target: "sub-libp2p", "Handler({:?}, {:?}) <= Close({:?})",
entry.key(), *connec_id, set_id);
self.events.push_back(ToSwarm::NotifyHandler {
peer_id: entry.key().0,
handler: NotifyHandler::One(*connec_id),
event: NotifsHandlerIn::Close { protocol_index: set_id.into() },
});
*connec_state = ConnectionState::Closing;
}
*entry.into_mut() = PeerState::Disabled { connections, backoff_until: None }
},
PeerState::Requested => {
trace!(target: "sub-libp2p", "PSM => Drop({}, {:?}): Not yet connected.",
entry.key().0, set_id);
entry.remove();
},
PeerState::PendingRequest { timer, timer_deadline } => {
trace!(target: "sub-libp2p", "PSM => Drop({}, {:?}): Not yet connected",
entry.key().0, set_id);
*entry.into_mut() = PeerState::Backoff { timer, timer_deadline }
},
PeerState::Incoming { backoff_until, connections, incoming_index, .. } => {
debug!(
target: "sub-libp2p",
"PSM => Drop({}, {:?}): Ignoring obsolete disconnect, we are awaiting accept/reject.",
entry.key().0, set_id,
);
*entry.into_mut() = PeerState::Incoming {
backoff_until,
connections,
incoming_index,
peerset_rejected: true,
};
},
PeerState::Poisoned => {
error!(target: "sub-libp2p", "State of {:?} is poisoned", entry.key());
debug_assert!(false);
},
}
}
fn peerset_report_preaccept(&mut self, index: IncomingIndex) {
let Some(pos) = self.incoming.iter().position(|i| i.incoming_id == index) else {
error!(target: LOG_TARGET, "PSM => Preaccept({:?}): Invalid index", index);
return
};
trace!(
target: LOG_TARGET,
"PSM => Preaccept({:?}): Sent to protocol for validation",
index
);
let incoming = &self.incoming[pos];
match self.protocol_handles[usize::from(incoming.set_id)]
.report_incoming_substream(incoming.peer_id, incoming.handshake.clone())
{
Ok(ValidationCallResult::Delegated) => {
self.protocol_report_accept(index);
},
Ok(ValidationCallResult::WaitForValidation(rx)) => {
self.pending_inbound_validations
.push(Box::pin(async move { (rx.await, index) }));
},
Err(err) => {
debug!(target: LOG_TARGET, "protocol has exited: {err:?} {:?}", incoming.set_id);
self.protocol_report_reject(index);
},
}
}
fn protocol_report_accept(&mut self, index: IncomingIndex) {
let (pos, incoming) =
if let Some(pos) = self.incoming.iter().position(|i| i.incoming_id == index) {
(pos, self.incoming.get(pos))
} else {
error!(target: "sub-libp2p", "PSM => Accept({:?}): Invalid index", index);
return
};
let Some(incoming) = incoming else {
error!(target: "sub-libp2p", "Incoming connection ({:?}) doesn't exist", index);
debug_assert!(false);
return;
};
if !incoming.alive {
trace!(
target: "sub-libp2p",
"PSM => Accept({:?}, {}, {:?}): Obsolete incoming",
index,
incoming.peer_id,
incoming.set_id,
);
match self.peers.get_mut(&(incoming.peer_id, incoming.set_id)) {
Some(PeerState::DisabledPendingEnable { .. }) | Some(PeerState::Enabled { .. }) => {
},
_ => {
trace!(target: "sub-libp2p", "PSM <= Dropped({}, {:?})",
incoming.peer_id, incoming.set_id);
self.protocol_controller_handles[usize::from(incoming.set_id)]
.dropped(incoming.peer_id);
},
}
self.incoming.remove(pos);
return
}
let state = match self.peers.get_mut(&(incoming.peer_id, incoming.set_id)) {
Some(s) => s,
None => {
log::debug!(
target: "sub-libp2p",
"Connection to {:?} closed, ({:?} {:?}), ignoring accept",
incoming.peer_id,
incoming.set_id,
index,
);
self.incoming.remove(pos);
return
},
};
match mem::replace(state, PeerState::Poisoned) {
PeerState::Incoming {
mut connections,
incoming_index,
peerset_rejected,
backoff_until,
} => {
if index < incoming_index {
warn!(
target: "sub-libp2p",
"PSM => Accept({:?}, {}, {:?}): Ignoring obsolete incoming index, we are already awaiting {:?}.",
index, incoming.peer_id, incoming.set_id, incoming_index
);
self.incoming.remove(pos);
return
} else if index > incoming_index {
error!(
target: "sub-libp2p",
"PSM => Accept({:?}, {}, {:?}): Ignoring incoming index from the future, we are awaiting {:?}.",
index, incoming.peer_id, incoming.set_id, incoming_index
);
self.incoming.remove(pos);
debug_assert!(false);
return
}
if peerset_rejected {
trace!(
target: "sub-libp2p",
"Protocol accepted ({:?} {:?} {:?}) but Peerset had request disconnection, rejecting",
index,
incoming.peer_id,
incoming.set_id
);
*state = PeerState::Incoming {
connections,
backoff_until,
peerset_rejected,
incoming_index,
};
return self.report_reject(index).map_or((), |_| ())
}
trace!(
target: "sub-libp2p",
"PSM => Accept({:?}, {}, {:?}): Enabling connections.",
index,
incoming.peer_id,
incoming.set_id
);
debug_assert!(connections
.iter()
.any(|(_, s)| matches!(s, ConnectionState::OpenDesiredByRemote)));
for (connec_id, connec_state) in connections
.iter_mut()
.filter(|(_, s)| matches!(s, ConnectionState::OpenDesiredByRemote))
{
trace!(target: "sub-libp2p", "Handler({:?}, {:?}) <= Open({:?})",
incoming.peer_id, *connec_id, incoming.set_id);
self.events.push_back(ToSwarm::NotifyHandler {
peer_id: incoming.peer_id,
handler: NotifyHandler::One(*connec_id),
event: NotifsHandlerIn::Open { protocol_index: incoming.set_id.into() },
});
*connec_state = ConnectionState::Opening;
}
self.incoming.remove(pos);
*state = PeerState::Enabled { connections };
},
st @ PeerState::Disabled { .. } | st @ PeerState::Backoff { .. } => {
self.incoming.remove(pos);
*state = st;
},
peer => {
error!(
target: "sub-libp2p",
"State mismatch in libp2p: Expected alive incoming. Got {:?}.",
peer
);
self.incoming.remove(pos);
debug_assert!(false);
},
}
}
fn peerset_report_reject(&mut self, index: IncomingIndex) {
let _ = self.report_reject(index);
}
fn protocol_report_reject(&mut self, index: IncomingIndex) {
if let Some((set_id, peer_id)) = self.report_reject(index) {
self.protocol_controller_handles[usize::from(set_id)].dropped(peer_id)
}
}
fn report_reject(&mut self, index: IncomingIndex) -> Option<(SetId, PeerId)> {
let incoming = if let Some(pos) = self.incoming.iter().position(|i| i.incoming_id == index)
{
self.incoming.remove(pos)
} else {
error!(target: "sub-libp2p", "PSM => Reject({:?}): Invalid index", index);
return None
};
if !incoming.alive {
trace!(
target: "sub-libp2p",
"PSM => Reject({:?}, {}, {:?}): Obsolete incoming, ignoring",
index,
incoming.peer_id,
incoming.set_id,
);
return None
}
let state = match self.peers.get_mut(&(incoming.peer_id, incoming.set_id)) {
Some(s) => s,
None => {
log::debug!(
target: "sub-libp2p",
"Connection to {:?} closed, ({:?} {:?}), ignoring accept",
incoming.peer_id,
incoming.set_id,
index,
);
return None
},
};
match mem::replace(state, PeerState::Poisoned) {
PeerState::Incoming { mut connections, backoff_until, incoming_index, .. } => {
if index < incoming_index {
warn!(
target: "sub-libp2p",
"PSM => Reject({:?}, {}, {:?}): Ignoring obsolete incoming index, we are already awaiting {:?}.",
index, incoming.peer_id, incoming.set_id, incoming_index
);
return None
} else if index > incoming_index {
error!(
target: "sub-libp2p",
"PSM => Reject({:?}, {}, {:?}): Ignoring incoming index from the future, we are awaiting {:?}.",
index, incoming.peer_id, incoming.set_id, incoming_index
);
debug_assert!(false);
return None
}
trace!(target: "sub-libp2p", "PSM => Reject({:?}, {}, {:?}): Rejecting connections.",
index, incoming.peer_id, incoming.set_id);
debug_assert!(connections
.iter()
.any(|(_, s)| matches!(s, ConnectionState::OpenDesiredByRemote)));
for (connec_id, connec_state) in connections
.iter_mut()
.filter(|(_, s)| matches!(s, ConnectionState::OpenDesiredByRemote))
{
trace!(target: "sub-libp2p", "Handler({:?}, {:?}) <= Close({:?})",
incoming.peer_id, connec_id, incoming.set_id);
self.events.push_back(ToSwarm::NotifyHandler {
peer_id: incoming.peer_id,
handler: NotifyHandler::One(*connec_id),
event: NotifsHandlerIn::Close { protocol_index: incoming.set_id.into() },
});
*connec_state = ConnectionState::Closing;
}
*state = PeerState::Disabled { connections, backoff_until };
Some((incoming.set_id, incoming.peer_id))
},
st @ PeerState::Disabled { .. } | st @ PeerState::Backoff { .. } => {
*state = st;
None
},
peer => {
error!(
target: LOG_TARGET,
"State mismatch in libp2p: Expected alive incoming. Got {peer:?}.",
);
None
},
}
}
}
impl NetworkBehaviour for Notifications {
type ConnectionHandler = NotifsHandler;
type ToSwarm = NotificationsOut;
fn handle_pending_inbound_connection(
&mut self,
_connection_id: ConnectionId,
_local_addr: &Multiaddr,
_remote_addr: &Multiaddr,
) -> Result<(), ConnectionDenied> {
Ok(())
}
fn handle_pending_outbound_connection(
&mut self,
_connection_id: ConnectionId,
_maybe_peer: Option<PeerId>,
_addresses: &[Multiaddr],
_effective_role: Endpoint,
) -> Result<Vec<Multiaddr>, ConnectionDenied> {
Ok(Vec::new())
}
fn handle_established_inbound_connection(
&mut self,
_connection_id: ConnectionId,
peer: PeerId,
_local_addr: &Multiaddr,
_remote_addr: &Multiaddr,
) -> Result<THandler<Self>, ConnectionDenied> {
Ok(NotifsHandler::new(peer, self.notif_protocols.clone(), Some(self.metrics.clone())))
}
fn handle_established_outbound_connection(
&mut self,
_connection_id: ConnectionId,
peer: PeerId,
_addr: &Multiaddr,
_role_override: Endpoint,
_port_use: PortUse,
) -> Result<THandler<Self>, ConnectionDenied> {
Ok(NotifsHandler::new(peer, self.notif_protocols.clone(), Some(self.metrics.clone())))
}
fn on_swarm_event(&mut self, event: FromSwarm) {
match event {
FromSwarm::ConnectionEstablished(ConnectionEstablished {
peer_id,
endpoint,
connection_id,
..
}) => {
for set_id in (0..self.notif_protocols.len()).map(SetId::from) {
match self.peers.entry((peer_id, set_id)).or_insert(PeerState::Poisoned) {
st @ &mut PeerState::Requested |
st @ &mut PeerState::PendingRequest { .. } => {
trace!(target: "sub-libp2p",
"Libp2p => Connected({}, {:?}, {:?}): Connection was requested by PSM.",
peer_id, set_id, endpoint
);
trace!(target: "sub-libp2p", "Handler({:?}, {:?}) <= Open({:?})", peer_id, connection_id, set_id);
self.events.push_back(ToSwarm::NotifyHandler {
peer_id,
handler: NotifyHandler::One(connection_id),
event: NotifsHandlerIn::Open { protocol_index: set_id.into() },
});
let mut connections = SmallVec::new();
connections.push((connection_id, ConnectionState::Opening));
*st = PeerState::Enabled { connections };
},
st @ &mut PeerState::Poisoned | st @ &mut PeerState::Backoff { .. } => {
let backoff_until =
if let PeerState::Backoff { timer_deadline, .. } = st {
Some(*timer_deadline)
} else {
None
};
trace!(target: "sub-libp2p",
"Libp2p => Connected({}, {:?}, {:?}, {:?}): Not requested by PSM, disabling.",
peer_id, set_id, endpoint, connection_id);
let mut connections = SmallVec::new();
connections.push((connection_id, ConnectionState::Closed));
*st = PeerState::Disabled { connections, backoff_until };
},
PeerState::Incoming { connections, .. } |
PeerState::Disabled { connections, .. } |
PeerState::DisabledPendingEnable { connections, .. } |
PeerState::Enabled { connections, .. } => {
trace!(target: "sub-libp2p",
"Libp2p => Connected({}, {:?}, {:?}, {:?}): Secondary connection. Leaving closed.",
peer_id, set_id, endpoint, connection_id);
connections.push((connection_id, ConnectionState::Closed));
},
}
}
},
FromSwarm::ConnectionClosed(ConnectionClosed { peer_id, connection_id, .. }) => {
for set_id in (0..self.notif_protocols.len()).map(SetId::from) {
let mut entry = if let Entry::Occupied(entry) =
self.peers.entry((peer_id, set_id))
{
entry
} else {
error!(target: "sub-libp2p", "inject_connection_closed: State mismatch in the custom protos handler");
debug_assert!(false);
return
};
match mem::replace(entry.get_mut(), PeerState::Poisoned) {
PeerState::Disabled { mut connections, backoff_until } => {
trace!(target: "sub-libp2p", "Libp2p => Disconnected({}, {:?}, {:?}): Disabled.",
peer_id, set_id, connection_id);
if let Some(pos) =
connections.iter().position(|(c, _)| *c == connection_id)
{
connections.remove(pos);
} else {
debug_assert!(false);
error!(target: "sub-libp2p",
"inject_connection_closed: State mismatch in the custom protos handler");
}
if connections.is_empty() {
if let Some(until) = backoff_until {
let now = Instant::now();
if until > now {
let delay_id = self.next_delay_id;
self.next_delay_id.0 += 1;
let delay = futures_timer::Delay::new(until - now);
self.delays.push(
async move {
delay.await;
(delay_id, peer_id, set_id)
}
.boxed(),
);
*entry.get_mut() = PeerState::Backoff {
timer: delay_id,
timer_deadline: until,
};
} else {
entry.remove();
}
} else {
entry.remove();
}
} else {
*entry.get_mut() =
PeerState::Disabled { connections, backoff_until };
}
},
PeerState::DisabledPendingEnable {
mut connections,
timer_deadline,
timer,
} => {
trace!(
target: "sub-libp2p",
"Libp2p => Disconnected({}, {:?}, {:?}): Disabled but pending enable.",
peer_id, set_id, connection_id
);
if let Some(pos) =
connections.iter().position(|(c, _)| *c == connection_id)
{
connections.remove(pos);
} else {
error!(target: "sub-libp2p",
"inject_connection_closed: State mismatch in the custom protos handler");
debug_assert!(false);
}
if connections.is_empty() {
trace!(target: "sub-libp2p", "PSM <= Dropped({}, {:?})", peer_id, set_id);
self.protocol_controller_handles[usize::from(set_id)]
.dropped(peer_id);
*entry.get_mut() = PeerState::Backoff { timer, timer_deadline };
} else {
*entry.get_mut() = PeerState::DisabledPendingEnable {
connections,
timer_deadline,
timer,
};
}
},
PeerState::Incoming {
mut connections,
backoff_until,
incoming_index,
peerset_rejected,
} => {
trace!(
target: "sub-libp2p",
"Libp2p => Disconnected({}, {:?}, {:?}): OpenDesiredByRemote.",
peer_id, set_id, connection_id
);
debug_assert!(connections
.iter()
.any(|(_, s)| matches!(s, ConnectionState::OpenDesiredByRemote)));
if let Some(pos) =
connections.iter().position(|(c, _)| *c == connection_id)
{
connections.remove(pos);
} else {
error!(target: "sub-libp2p",
"inject_connection_closed: State mismatch in the custom protos handler");
debug_assert!(false);
}
let no_desired_left = !connections
.iter()
.any(|(_, s)| matches!(s, ConnectionState::OpenDesiredByRemote));
if no_desired_left {
if let Some(state) = self
.incoming
.iter_mut()
.find(|i| i.alive && i.set_id == set_id && i.peer_id == peer_id)
{
state.alive = false;
} else {
error!(target: "sub-libp2p", "State mismatch in libp2p: no entry in \
incoming corresponding to an incoming state in peers");
debug_assert!(false);
}
}
if connections.is_empty() {
if let Some(until) = backoff_until {
let now = Instant::now();
if until > now {
let delay_id = self.next_delay_id;
self.next_delay_id.0 += 1;
let delay = futures_timer::Delay::new(until - now);
self.delays.push(
async move {
delay.await;
(delay_id, peer_id, set_id)
}
.boxed(),
);
*entry.get_mut() = PeerState::Backoff {
timer: delay_id,
timer_deadline: until,
};
} else {
entry.remove();
}
} else {
entry.remove();
}
} else if no_desired_left {
*entry.get_mut() =
PeerState::Disabled { connections, backoff_until };
} else {
*entry.get_mut() = PeerState::Incoming {
connections,
backoff_until,
incoming_index,
peerset_rejected,
};
}
},
PeerState::Enabled { mut connections } => {
trace!(
target: "sub-libp2p",
"Libp2p => Disconnected({}, {:?}, {:?}): Enabled.",
peer_id, set_id, connection_id
);
debug_assert!(connections.iter().any(|(_, s)| matches!(
s,
ConnectionState::Opening | ConnectionState::Open(_)
)));
if let Some(pos) =
connections.iter().position(|(c, _)| *c == connection_id)
{
let (_, state) = connections.remove(pos);
if let ConnectionState::Open(_) = state {
if let Some((replacement_pos, replacement_sink)) = connections
.iter()
.enumerate()
.find_map(|(num, (_, s))| match s {
ConnectionState::Open(s) => Some((num, s.clone())),
_ => None,
}) {
if pos <= replacement_pos {
trace!(
target: "sub-libp2p",
"External API <= Sink replaced({}, {:?})",
peer_id, set_id
);
let event = NotificationsOut::CustomProtocolReplaced {
peer_id,
set_id,
notifications_sink: replacement_sink.clone(),
};
self.events.push_back(ToSwarm::GenerateEvent(event));
}
} else {
trace!(
target: "sub-libp2p", "External API <= Closed({}, {:?})",
peer_id, set_id
);
let event = NotificationsOut::CustomProtocolClosed {
peer_id,
set_id,
};
self.events.push_back(ToSwarm::GenerateEvent(event));
}
}
} else {
error!(target: "sub-libp2p",
"inject_connection_closed: State mismatch in the custom protos handler");
debug_assert!(false);
}
if connections.is_empty() {
trace!(target: "sub-libp2p", "PSM <= Dropped({}, {:?})", peer_id, set_id);
self.protocol_controller_handles[usize::from(set_id)]
.dropped(peer_id);
let ban_dur = Uniform::new(5, 10).sample(&mut rand::thread_rng());
let delay_id = self.next_delay_id;
self.next_delay_id.0 += 1;
let delay = futures_timer::Delay::new(Duration::from_secs(ban_dur));
self.delays.push(
async move {
delay.await;
(delay_id, peer_id, set_id)
}
.boxed(),
);
*entry.get_mut() = PeerState::Backoff {
timer: delay_id,
timer_deadline: Instant::now() + Duration::from_secs(ban_dur),
};
} else if !connections.iter().any(|(_, s)| {
matches!(s, ConnectionState::Opening | ConnectionState::Open(_))
}) {
trace!(target: "sub-libp2p", "PSM <= Dropped({}, {:?})", peer_id, set_id);
self.protocol_controller_handles[usize::from(set_id)]
.dropped(peer_id);
*entry.get_mut() =
PeerState::Disabled { connections, backoff_until: None };
} else {
*entry.get_mut() = PeerState::Enabled { connections };
}
},
PeerState::Requested |
PeerState::PendingRequest { .. } |
PeerState::Backoff { .. } => {
error!(target: "sub-libp2p",
"`inject_connection_closed` called for unknown peer {}",
peer_id);
debug_assert!(false);
},
PeerState::Poisoned => {
error!(target: "sub-libp2p", "State of peer {} is poisoned", peer_id);
debug_assert!(false);
},
}
}
},
FromSwarm::DialFailure(DialFailure { peer_id, error, .. }) => {
if let DialError::Transport(errors) = error {
for (addr, error) in errors.iter() {
trace!(target: "sub-libp2p", "Libp2p => Reach failure for {:?} through {:?}: {:?}", peer_id, addr, error);
}
}
if let Some(peer_id) = peer_id {
trace!(target: "sub-libp2p", "Libp2p => Dial failure for {:?}", peer_id);
for set_id in (0..self.notif_protocols.len()).map(SetId::from) {
if let Entry::Occupied(mut entry) = self.peers.entry((peer_id, set_id)) {
match mem::replace(entry.get_mut(), PeerState::Poisoned) {
st @ PeerState::Backoff { .. } => {
*entry.into_mut() = st;
},
st @ PeerState::Requested |
st @ PeerState::PendingRequest { .. } => {
trace!(target: "sub-libp2p", "PSM <= Dropped({}, {:?})", peer_id, set_id);
self.protocol_controller_handles[usize::from(set_id)]
.dropped(peer_id);
let now = Instant::now();
let ban_duration = match st {
PeerState::PendingRequest { timer_deadline, .. }
if timer_deadline > now =>
cmp::max(timer_deadline - now, Duration::from_secs(5)),
_ => Duration::from_secs(5),
};
let delay_id = self.next_delay_id;
self.next_delay_id.0 += 1;
let delay = futures_timer::Delay::new(ban_duration);
self.delays.push(
async move {
delay.await;
(delay_id, peer_id, set_id)
}
.boxed(),
);
*entry.into_mut() = PeerState::Backoff {
timer: delay_id,
timer_deadline: now + ban_duration,
};
},
st @ PeerState::Disabled { .. } |
st @ PeerState::Enabled { .. } |
st @ PeerState::DisabledPendingEnable { .. } |
st @ PeerState::Incoming { .. } => {
*entry.into_mut() = st;
},
PeerState::Poisoned => {
error!(target: "sub-libp2p", "State of {:?} is poisoned", peer_id);
debug_assert!(false);
},
}
}
}
}
},
FromSwarm::ListenerClosed(_) => {},
FromSwarm::ListenFailure(_) => {},
FromSwarm::ListenerError(_) => {},
FromSwarm::ExternalAddrExpired(_) => {},
FromSwarm::NewListener(_) => {},
FromSwarm::ExpiredListenAddr(_) => {},
FromSwarm::NewExternalAddrCandidate(_) => {},
FromSwarm::ExternalAddrConfirmed(_) => {},
FromSwarm::AddressChange(_) => {},
FromSwarm::NewListenAddr(_) => {},
event => {
warn!(target: "sub-libp2p", "New unknown `FromSwarm` libp2p event: {event:?}");
},
}
}
fn on_connection_handler_event(
&mut self,
peer_id: PeerId,
connection_id: ConnectionId,
event: THandlerOutEvent<Self>,
) {
match event {
NotifsHandlerOut::OpenDesiredByRemote { protocol_index, handshake } => {
let set_id = SetId::from(protocol_index);
trace!(target: "sub-libp2p",
"Handler({:?}, {:?}]) => OpenDesiredByRemote({:?})",
peer_id, connection_id, set_id);
let mut entry = if let Entry::Occupied(entry) = self.peers.entry((peer_id, set_id))
{
entry
} else {
error!(
target: "sub-libp2p",
"OpenDesiredByRemote: State mismatch in the custom protos handler"
);
debug_assert!(false);
return
};
match mem::replace(entry.get_mut(), PeerState::Poisoned) {
PeerState::Incoming {
mut connections,
backoff_until,
incoming_index,
peerset_rejected,
} => {
debug_assert!(connections
.iter()
.any(|(_, s)| matches!(s, ConnectionState::OpenDesiredByRemote)));
if let Some((_, connec_state)) =
connections.iter_mut().find(|(c, _)| *c == connection_id)
{
if let ConnectionState::Closed = *connec_state {
*connec_state = ConnectionState::OpenDesiredByRemote;
} else {
debug_assert!(matches!(
connec_state,
ConnectionState::OpeningThenClosing | ConnectionState::Closing
));
}
} else {
error!(
target: "sub-libp2p",
"OpenDesiredByRemote: State mismatch in the custom protos handler"
);
debug_assert!(false);
}
*entry.into_mut() = PeerState::Incoming {
connections,
backoff_until,
incoming_index,
peerset_rejected,
};
},
PeerState::Enabled { mut connections } => {
debug_assert!(connections.iter().any(|(_, s)| matches!(
s,
ConnectionState::Opening | ConnectionState::Open(_)
)));
if let Some((_, connec_state)) =
connections.iter_mut().find(|(c, _)| *c == connection_id)
{
if let ConnectionState::Closed = *connec_state {
trace!(target: "sub-libp2p", "Handler({:?}, {:?}) <= Open({:?})",
peer_id, connection_id, set_id);
self.events.push_back(ToSwarm::NotifyHandler {
peer_id,
handler: NotifyHandler::One(connection_id),
event: NotifsHandlerIn::Open { protocol_index: set_id.into() },
});
*connec_state = ConnectionState::Opening;
} else {
debug_assert!(matches!(
connec_state,
ConnectionState::OpenDesiredByRemote |
ConnectionState::Closing | ConnectionState::Opening
));
}
} else {
error!(
target: "sub-libp2p",
"OpenDesiredByRemote: State mismatch in the custom protos handler"
);
debug_assert!(false);
}
*entry.into_mut() = PeerState::Enabled { connections };
},
PeerState::Disabled { mut connections, backoff_until } => {
if let Some((_, connec_state)) =
connections.iter_mut().find(|(c, _)| *c == connection_id)
{
if let ConnectionState::Closed = *connec_state {
*connec_state = ConnectionState::OpenDesiredByRemote;
let incoming_id = self.next_incoming_index;
self.next_incoming_index.0 += 1;
trace!(target: "sub-libp2p", "PSM <= Incoming({}, {:?}, {:?}).",
peer_id, set_id, incoming_id);
self.protocol_controller_handles[usize::from(set_id)]
.incoming_connection(peer_id, incoming_id);
self.incoming.push(IncomingPeer {
peer_id,
set_id,
alive: true,
incoming_id,
handshake,
});
*entry.into_mut() = PeerState::Incoming {
connections,
backoff_until,
peerset_rejected: false,
incoming_index: incoming_id,
};
} else {
debug_assert!(matches!(
connec_state,
ConnectionState::OpeningThenClosing | ConnectionState::Closing
));
*entry.into_mut() =
PeerState::Disabled { connections, backoff_until };
}
} else {
error!(
target: "sub-libp2p",
"OpenDesiredByRemote: State mismatch in the custom protos handler"
);
debug_assert!(false);
}
},
PeerState::DisabledPendingEnable { mut connections, timer, timer_deadline } => {
if let Some((_, connec_state)) =
connections.iter_mut().find(|(c, _)| *c == connection_id)
{
if let ConnectionState::Closed = *connec_state {
trace!(target: "sub-libp2p", "Handler({:?}, {:?}) <= Open({:?})",
peer_id, connection_id, set_id);
self.events.push_back(ToSwarm::NotifyHandler {
peer_id,
handler: NotifyHandler::One(connection_id),
event: NotifsHandlerIn::Open { protocol_index: set_id.into() },
});
*connec_state = ConnectionState::Opening;
*entry.into_mut() = PeerState::Enabled { connections };
} else {
debug_assert!(matches!(
connec_state,
ConnectionState::OpeningThenClosing | ConnectionState::Closing
));
*entry.into_mut() = PeerState::DisabledPendingEnable {
connections,
timer,
timer_deadline,
};
}
} else {
error!(
target: "sub-libp2p",
"OpenDesiredByRemote: State mismatch in the custom protos handler"
);
debug_assert!(false);
}
},
state => {
error!(target: "sub-libp2p",
"OpenDesiredByRemote: Unexpected state in the custom protos handler: {:?}",
state);
debug_assert!(false);
},
};
},
NotifsHandlerOut::CloseDesired { protocol_index } => {
let set_id = SetId::from(protocol_index);
trace!(target: "sub-libp2p",
"Handler({}, {:?}) => CloseDesired({:?})",
peer_id, connection_id, set_id);
let mut entry = if let Entry::Occupied(entry) = self.peers.entry((peer_id, set_id))
{
entry
} else {
error!(target: "sub-libp2p", "CloseDesired: State mismatch in the custom protos handler");
debug_assert!(false);
return
};
match mem::replace(entry.get_mut(), PeerState::Poisoned) {
PeerState::Enabled { mut connections } => {
debug_assert!(connections.iter().any(|(_, s)| matches!(
s,
ConnectionState::Opening | ConnectionState::Open(_)
)));
let pos = if let Some(pos) =
connections.iter().position(|(c, _)| *c == connection_id)
{
pos
} else {
error!(target: "sub-libp2p",
"CloseDesired: State mismatch in the custom protos handler");
debug_assert!(false);
return
};
if matches!(connections[pos].1, ConnectionState::Closing) {
*entry.into_mut() = PeerState::Enabled { connections };
return
}
debug_assert!(matches!(connections[pos].1, ConnectionState::Open(_)));
connections[pos].1 = ConnectionState::Closing;
trace!(target: "sub-libp2p", "Handler({}, {:?}) <= Close({:?})", peer_id, connection_id, set_id);
self.events.push_back(ToSwarm::NotifyHandler {
peer_id,
handler: NotifyHandler::One(connection_id),
event: NotifsHandlerIn::Close { protocol_index: set_id.into() },
});
if let Some((replacement_pos, replacement_sink)) =
connections.iter().enumerate().find_map(|(num, (_, s))| match s {
ConnectionState::Open(s) => Some((num, s.clone())),
_ => None,
}) {
if pos <= replacement_pos {
trace!(target: "sub-libp2p", "External API <= Sink replaced({:?}, {:?})", peer_id, set_id);
let event = NotificationsOut::CustomProtocolReplaced {
peer_id,
set_id,
notifications_sink: replacement_sink.clone(),
};
self.events.push_back(ToSwarm::GenerateEvent(event));
}
*entry.into_mut() = PeerState::Enabled { connections };
} else {
if !connections
.iter()
.any(|(_, s)| matches!(s, ConnectionState::Opening))
{
trace!(target: "sub-libp2p", "PSM <= Dropped({}, {:?})", peer_id, set_id);
self.protocol_controller_handles[usize::from(set_id)]
.dropped(peer_id);
*entry.into_mut() =
PeerState::Disabled { connections, backoff_until: None };
} else {
*entry.into_mut() = PeerState::Enabled { connections };
}
trace!(target: "sub-libp2p", "External API <= Closed({}, {:?})", peer_id, set_id);
let event = NotificationsOut::CustomProtocolClosed { peer_id, set_id };
self.events.push_back(ToSwarm::GenerateEvent(event));
}
},
state @ PeerState::Disabled { .. } |
state @ PeerState::DisabledPendingEnable { .. } => {
*entry.into_mut() = state;
},
state => {
error!(target: "sub-libp2p",
"Unexpected state in the custom protos handler: {:?}",
state);
},
}
},
NotifsHandlerOut::CloseResult { protocol_index } => {
let set_id = SetId::from(protocol_index);
trace!(target: "sub-libp2p",
"Handler({}, {:?}) => CloseResult({:?})",
peer_id, connection_id, set_id);
match self.peers.get_mut(&(peer_id, set_id)) {
Some(PeerState::Incoming { connections, .. }) |
Some(PeerState::DisabledPendingEnable { connections, .. }) |
Some(PeerState::Disabled { connections, .. }) |
Some(PeerState::Enabled { connections, .. }) => {
if let Some((_, connec_state)) = connections.iter_mut().find(|(c, s)| {
*c == connection_id && matches!(s, ConnectionState::Closing)
}) {
*connec_state = ConnectionState::Closed;
} else {
error!(target: "sub-libp2p",
"CloseResult: State mismatch in the custom protos handler");
debug_assert!(false);
}
},
state => {
error!(target: "sub-libp2p",
"CloseResult: Unexpected state in the custom protos handler: {:?}",
state);
debug_assert!(false);
},
}
},
NotifsHandlerOut::OpenResultOk {
protocol_index,
negotiated_fallback,
received_handshake,
notifications_sink,
inbound,
..
} => {
let set_id = SetId::from(protocol_index);
trace!(target: "sub-libp2p",
"Handler({}, {:?}) => OpenResultOk({:?})",
peer_id, connection_id, set_id);
match self.peers.get_mut(&(peer_id, set_id)) {
Some(PeerState::Enabled { connections, .. }) => {
debug_assert!(connections.iter().any(|(_, s)| matches!(
s,
ConnectionState::Opening | ConnectionState::Open(_)
)));
let any_open =
connections.iter().any(|(_, s)| matches!(s, ConnectionState::Open(_)));
if let Some((_, connec_state)) = connections.iter_mut().find(|(c, s)| {
*c == connection_id && matches!(s, ConnectionState::Opening)
}) {
if !any_open {
trace!(target: "sub-libp2p", "External API <= Open({}, {:?})", peer_id, set_id);
let event = NotificationsOut::CustomProtocolOpen {
peer_id,
set_id,
direction: if inbound {
Direction::Inbound
} else {
Direction::Outbound
},
received_handshake: received_handshake.clone(),
negotiated_fallback: negotiated_fallback.clone(),
notifications_sink: notifications_sink.clone(),
};
self.events.push_back(ToSwarm::GenerateEvent(event));
}
*connec_state = ConnectionState::Open(notifications_sink);
} else if let Some((_, connec_state)) =
connections.iter_mut().find(|(c, s)| {
*c == connection_id &&
matches!(s, ConnectionState::OpeningThenClosing)
}) {
*connec_state = ConnectionState::Closing;
} else {
error!(target: "sub-libp2p",
"OpenResultOk State mismatch in the custom protos handler");
debug_assert!(false);
}
},
Some(PeerState::Incoming { connections, .. }) |
Some(PeerState::DisabledPendingEnable { connections, .. }) |
Some(PeerState::Disabled { connections, .. }) => {
if let Some((_, connec_state)) = connections.iter_mut().find(|(c, s)| {
*c == connection_id && matches!(s, ConnectionState::OpeningThenClosing)
}) {
*connec_state = ConnectionState::Closing;
} else {
error!(target: "sub-libp2p",
"OpenResultOk State mismatch in the custom protos handler");
debug_assert!(false);
}
},
state => {
error!(target: "sub-libp2p",
"OpenResultOk: Unexpected state in the custom protos handler: {:?}",
state);
debug_assert!(false);
},
}
},
NotifsHandlerOut::OpenResultErr { protocol_index } => {
let set_id = SetId::from(protocol_index);
trace!(target: "sub-libp2p",
"Handler({:?}, {:?}) => OpenResultErr({:?})",
peer_id, connection_id, set_id);
let mut entry = if let Entry::Occupied(entry) = self.peers.entry((peer_id, set_id))
{
entry
} else {
error!(target: "sub-libp2p", "OpenResultErr: State mismatch in the custom protos handler");
debug_assert!(false);
return
};
match mem::replace(entry.get_mut(), PeerState::Poisoned) {
PeerState::Enabled { mut connections } => {
debug_assert!(connections.iter().any(|(_, s)| matches!(
s,
ConnectionState::Opening | ConnectionState::Open(_)
)));
if let Some((_, connec_state)) = connections.iter_mut().find(|(c, s)| {
*c == connection_id && matches!(s, ConnectionState::Opening)
}) {
*connec_state = ConnectionState::Closed;
} else if let Some((_, connec_state)) =
connections.iter_mut().find(|(c, s)| {
*c == connection_id &&
matches!(s, ConnectionState::OpeningThenClosing)
}) {
*connec_state = ConnectionState::Closing;
} else {
error!(target: "sub-libp2p",
"OpenResultErr: State mismatch in the custom protos handler");
debug_assert!(false);
}
if !connections.iter().any(|(_, s)| {
matches!(s, ConnectionState::Opening | ConnectionState::Open(_))
}) {
trace!(target: "sub-libp2p", "PSM <= Dropped({:?}, {:?})", peer_id, set_id);
self.protocol_controller_handles[usize::from(set_id)].dropped(peer_id);
let ban_dur = Uniform::new(5, 10).sample(&mut rand::thread_rng());
*entry.into_mut() = PeerState::Disabled {
connections,
backoff_until: Some(Instant::now() + Duration::from_secs(ban_dur)),
};
} else {
*entry.into_mut() = PeerState::Enabled { connections };
}
},
mut state @ PeerState::Incoming { .. } |
mut state @ PeerState::DisabledPendingEnable { .. } |
mut state @ PeerState::Disabled { .. } => {
match &mut state {
PeerState::Incoming { connections, .. } |
PeerState::Disabled { connections, .. } |
PeerState::DisabledPendingEnable { connections, .. } => {
if let Some((_, connec_state)) =
connections.iter_mut().find(|(c, s)| {
*c == connection_id &&
matches!(s, ConnectionState::OpeningThenClosing)
}) {
*connec_state = ConnectionState::Closing;
} else {
error!(target: "sub-libp2p",
"OpenResultErr: State mismatch in the custom protos handler");
debug_assert!(false);
}
},
_ => unreachable!(
"Match branches are the same as the one on which we
enter this block; qed"
),
};
*entry.into_mut() = state;
},
state => {
error!(target: "sub-libp2p",
"Unexpected state in the custom protos handler: {:?}",
state);
debug_assert!(false);
},
};
},
NotifsHandlerOut::Notification { protocol_index, message } => {
let set_id = SetId::from(protocol_index);
if self.is_open(&peer_id, set_id) {
trace!(
target: "sub-libp2p",
"Handler({:?}) => Notification({}, {:?}, {} bytes)",
connection_id,
peer_id,
set_id,
message.len()
);
trace!(
target: "sub-libp2p",
"External API <= Message({}, {:?})",
peer_id,
set_id,
);
let event = NotificationsOut::Notification {
peer_id,
set_id,
message: message.clone(),
};
self.events.push_back(ToSwarm::GenerateEvent(event));
} else {
trace!(
target: "sub-libp2p",
"Handler({:?}) => Post-close notification({}, {:?}, {} bytes)",
connection_id,
peer_id,
set_id,
message.len()
);
}
},
NotifsHandlerOut::Close { protocol_index } => {
let set_id = SetId::from(protocol_index);
trace!(target: "sub-libp2p", "Handler({}, {:?}) => SyncNotificationsClogged({:?})", peer_id, connection_id, set_id);
self.events.push_back(ToSwarm::CloseConnection {
peer_id,
connection: CloseConnection::One(connection_id),
});
},
}
}
fn poll(&mut self, cx: &mut Context) -> Poll<ToSwarm<Self::ToSwarm, THandlerInEvent<Self>>> {
if let Some(event) = self.events.pop_front() {
return Poll::Ready(event)
}
loop {
match futures::Stream::poll_next(Pin::new(&mut self.from_protocol_controllers), cx) {
Poll::Ready(Some(Message::Accept(index))) => {
self.peerset_report_preaccept(index);
},
Poll::Ready(Some(Message::Reject(index))) => {
let _ = self.peerset_report_reject(index);
},
Poll::Ready(Some(Message::Connect { peer_id, set_id, .. })) => {
self.peerset_report_connect(peer_id, set_id);
},
Poll::Ready(Some(Message::Drop { peer_id, set_id, .. })) => {
self.peerset_report_disconnect(peer_id, set_id);
},
Poll::Ready(None) => {
error!(
target: "sub-libp2p",
"Protocol controllers receiver stream has returned `None`. Ignore this error if the node is shutting down.",
);
break
},
Poll::Pending => break,
}
}
loop {
match futures::Stream::poll_next(Pin::new(&mut self.command_streams), cx) {
Poll::Ready(Some((set_id, command))) => match command {
NotificationCommand::SetHandshake(handshake) => {
self.set_notif_protocol_handshake(set_id.into(), handshake);
},
NotificationCommand::OpenSubstream(_peer) |
NotificationCommand::CloseSubstream(_peer) => {
todo!("substream control not implemented");
},
},
Poll::Ready(None) => {
error!(target: LOG_TARGET, "Protocol command streams have been shut down");
break
},
Poll::Pending => break,
}
}
while let Poll::Ready(Some((result, index))) =
self.pending_inbound_validations.poll_next_unpin(cx)
{
match result {
Ok(ValidationResult::Accept) => {
self.protocol_report_accept(index);
},
Ok(ValidationResult::Reject) => {
self.protocol_report_reject(index);
},
Err(_) => {
error!(target: LOG_TARGET, "Protocol has shut down");
break
},
}
}
while let Poll::Ready(Some((delay_id, peer_id, set_id))) =
Pin::new(&mut self.delays).poll_next(cx)
{
let peer_state = match self.peers.get_mut(&(peer_id, set_id)) {
Some(s) => s,
None => continue,
};
match peer_state {
PeerState::Backoff { timer, .. } if *timer == delay_id => {
trace!(target: "sub-libp2p", "Libp2p <= Clean up ban of {:?} from the state ({:?})", peer_id, set_id);
self.peers.remove(&(peer_id, set_id));
},
PeerState::PendingRequest { timer, .. } if *timer == delay_id => {
trace!(target: "sub-libp2p", "Libp2p <= Dial {:?} now that ban has expired ({:?})", peer_id, set_id);
self.events.push_back(ToSwarm::Dial { opts: peer_id.into() });
*peer_state = PeerState::Requested;
},
PeerState::DisabledPendingEnable { connections, timer, timer_deadline }
if *timer == delay_id =>
{
if let Some((connec_id, connec_state)) =
connections.iter_mut().find(|(_, s)| matches!(s, ConnectionState::Closed))
{
trace!(target: "sub-libp2p", "Handler({}, {:?}) <= Open({:?}) (ban expired)",
peer_id, *connec_id, set_id);
self.events.push_back(ToSwarm::NotifyHandler {
peer_id,
handler: NotifyHandler::One(*connec_id),
event: NotifsHandlerIn::Open { protocol_index: set_id.into() },
});
*connec_state = ConnectionState::Opening;
*peer_state = PeerState::Enabled { connections: mem::take(connections) };
} else {
*timer_deadline = Instant::now() + Duration::from_secs(5);
let delay = futures_timer::Delay::new(Duration::from_secs(5));
let timer = *timer;
self.delays.push(
async move {
delay.await;
(timer, peer_id, set_id)
}
.boxed(),
);
}
},
_ => {},
}
}
if let Some(event) = self.events.pop_front() {
return Poll::Ready(event)
}
Poll::Pending
}
}
#[cfg(test)]
mod tests {
use super::*;
use crate::{
mock::MockPeerStore,
protocol::notifications::handler::tests::*,
protocol_controller::{IncomingIndex, ProtoSetConfig, ProtocolController},
};
use libp2p::core::ConnectedPoint;
use sc_utils::mpsc::tracing_unbounded;
use std::{collections::HashSet, iter};
impl PartialEq for ConnectionState {
fn eq(&self, other: &ConnectionState) -> bool {
match (self, other) {
(ConnectionState::Closed, ConnectionState::Closed) => true,
(ConnectionState::Closing, ConnectionState::Closing) => true,
(ConnectionState::Opening, ConnectionState::Opening) => true,
(ConnectionState::OpeningThenClosing, ConnectionState::OpeningThenClosing) => true,
(ConnectionState::OpenDesiredByRemote, ConnectionState::OpenDesiredByRemote) =>
true,
(ConnectionState::Open(_), ConnectionState::Open(_)) => true,
_ => false,
}
}
}
fn development_notifs(
) -> (Notifications, ProtocolController, Box<dyn crate::service::traits::NotificationService>)
{
let (protocol_handle_pair, notif_service) =
crate::protocol::notifications::service::notification_service("/proto/1".into());
let (to_notifications, from_controller) =
tracing_unbounded("test_controller_to_notifications", 10_000);
let (handle, controller) = ProtocolController::new(
SetId::from(0),
ProtoSetConfig {
in_peers: 25,
out_peers: 25,
reserved_nodes: HashSet::new(),
reserved_only: false,
},
to_notifications,
Arc::new(MockPeerStore {}),
);
let (notif_handle, command_stream) = protocol_handle_pair.split();
(
Notifications::new(
vec![handle],
from_controller,
NotificationMetrics::new(None),
iter::once((
ProtocolConfig {
name: "/foo".into(),
fallback_names: Vec::new(),
handshake: vec![1, 2, 3, 4],
max_notification_size: u64::MAX,
},
notif_handle,
command_stream,
)),
),
controller,
notif_service,
)
}
#[test]
fn update_handshake() {
let (mut notif, _controller, _notif_service) = development_notifs();
let inner = notif.notif_protocols.get_mut(0).unwrap().handshake.read().clone();
assert_eq!(inner, vec![1, 2, 3, 4]);
notif.set_notif_protocol_handshake(0.into(), vec![5, 6, 7, 8]);
let inner = notif.notif_protocols.get_mut(0).unwrap().handshake.read().clone();
assert_eq!(inner, vec![5, 6, 7, 8]);
}
#[test]
#[should_panic]
#[cfg(debug_assertions)]
fn update_unknown_handshake() {
let (mut notif, _controller, _notif_service) = development_notifs();
notif.set_notif_protocol_handshake(1337.into(), vec![5, 6, 7, 8]);
}
#[test]
fn disconnect_backoff_peer() {
let (mut notif, _controller, _notif_service) = development_notifs();
let peer = PeerId::random();
notif.peers.insert(
(peer, 0.into()),
PeerState::Backoff { timer: DelayId(0), timer_deadline: Instant::now() },
);
notif.disconnect_peer(&peer, 0.into());
assert!(std::matches!(
notif.peers.get(&(peer, 0.into())),
Some(PeerState::Backoff { timer: DelayId(0), .. })
));
}
#[test]
fn disconnect_pending_request() {
let (mut notif, _controller, _notif_service) = development_notifs();
let peer = PeerId::random();
notif.peers.insert(
(peer, 0.into()),
PeerState::PendingRequest { timer: DelayId(0), timer_deadline: Instant::now() },
);
notif.disconnect_peer(&peer, 0.into());
assert!(std::matches!(
notif.peers.get(&(peer, 0.into())),
Some(PeerState::PendingRequest { timer: DelayId(0), .. })
));
}
#[test]
fn disconnect_requested_peer() {
let (mut notif, _controller, _notif_service) = development_notifs();
let peer = PeerId::random();
notif.peers.insert((peer, 0.into()), PeerState::Requested);
notif.disconnect_peer(&peer, 0.into());
assert!(std::matches!(notif.peers.get(&(peer, 0.into())), Some(PeerState::Requested)));
}
#[test]
fn disconnect_disabled_peer() {
let (mut notif, _controller, _notif_service) = development_notifs();
let peer = PeerId::random();
notif.peers.insert(
(peer, 0.into()),
PeerState::Disabled { backoff_until: None, connections: SmallVec::new() },
);
notif.disconnect_peer(&peer, 0.into());
assert!(std::matches!(
notif.peers.get(&(peer, 0.into())),
Some(PeerState::Disabled { backoff_until: None, .. })
));
}
#[test]
fn remote_opens_connection_and_substream() {
let (mut notif, _controller, _notif_service) = development_notifs();
let peer = PeerId::random();
let conn = ConnectionId::new_unchecked(0);
let connected = ConnectedPoint::Listener {
local_addr: Multiaddr::empty(),
send_back_addr: Multiaddr::empty(),
};
notif.on_swarm_event(FromSwarm::ConnectionEstablished(
libp2p::swarm::behaviour::ConnectionEstablished {
peer_id: peer,
connection_id: conn,
endpoint: &connected,
failed_addresses: &[],
other_established: 0usize,
},
));
if let Some(&PeerState::Disabled { ref connections, backoff_until: None }) =
notif.peers.get(&(peer, 0.into()))
{
assert_eq!(connections[0], (conn, ConnectionState::Closed));
} else {
panic!("invalid state");
}
notif.on_connection_handler_event(
peer,
conn,
NotifsHandlerOut::OpenDesiredByRemote {
protocol_index: 0,
handshake: vec![1, 3, 3, 7],
},
);
if let Some(&PeerState::Incoming { ref connections, backoff_until: None, .. }) =
notif.peers.get(&(peer, 0.into()))
{
assert_eq!(connections.len(), 1);
assert_eq!(connections[0], (conn, ConnectionState::OpenDesiredByRemote));
} else {
panic!("invalid state");
}
assert!(std::matches!(
notif.incoming.pop(),
Some(IncomingPeer { alive: true, incoming_id: IncomingIndex(0), .. }),
));
}
#[tokio::test]
async fn disconnect_remote_substream_before_handled_by_controller() {
let (mut notif, _controller, _notif_service) = development_notifs();
let peer = PeerId::random();
let conn = ConnectionId::new_unchecked(0);
let connected = ConnectedPoint::Listener {
local_addr: Multiaddr::empty(),
send_back_addr: Multiaddr::empty(),
};
notif.on_swarm_event(FromSwarm::ConnectionEstablished(
libp2p::swarm::behaviour::ConnectionEstablished {
peer_id: peer,
connection_id: conn,
endpoint: &connected,
failed_addresses: &[],
other_established: 0usize,
},
));
notif.on_connection_handler_event(
peer,
conn,
NotifsHandlerOut::OpenDesiredByRemote {
protocol_index: 0,
handshake: vec![1, 3, 3, 7],
},
);
notif.disconnect_peer(&peer, 0.into());
if let Some(&PeerState::Disabled { ref connections, backoff_until: None }) =
notif.peers.get(&(peer, 0.into()))
{
assert_eq!(connections.len(), 1);
assert_eq!(connections[0], (conn, ConnectionState::Closing));
} else {
panic!("invalid state");
}
}
#[test]
fn peerset_report_connect_backoff() {
let (mut notif, _controller, _notif_service) = development_notifs();
let set_id = SetId::from(0);
let peer = PeerId::random();
let conn = ConnectionId::new_unchecked(0);
let connected = ConnectedPoint::Listener {
local_addr: Multiaddr::empty(),
send_back_addr: Multiaddr::empty(),
};
notif.on_swarm_event(FromSwarm::ConnectionEstablished(
libp2p::swarm::behaviour::ConnectionEstablished {
peer_id: peer,
connection_id: conn,
endpoint: &connected,
failed_addresses: &[],
other_established: 0usize,
},
));
assert!(std::matches!(notif.peers.get(&(peer, set_id)), Some(&PeerState::Disabled { .. })));
if let Some(PeerState::Disabled { ref mut backoff_until, .. }) =
notif.peers.get_mut(&(peer, set_id))
{
*backoff_until =
Some(Instant::now().checked_add(std::time::Duration::from_secs(5)).unwrap());
}
notif.on_swarm_event(FromSwarm::ConnectionClosed(
libp2p::swarm::behaviour::ConnectionClosed {
peer_id: peer,
connection_id: conn,
endpoint: &connected.clone(),
cause: None,
remaining_established: 0usize,
},
));
let timer = if let Some(&PeerState::Backoff { timer_deadline, .. }) =
notif.peers.get(&(peer, set_id))
{
timer_deadline
} else {
panic!("invalid state");
};
notif.peerset_report_connect(peer, set_id);
if let Some(&PeerState::PendingRequest { timer_deadline, .. }) =
notif.peers.get(&(peer, set_id))
{
assert_eq!(timer, timer_deadline);
} else {
panic!("invalid state");
}
}
#[test]
fn peerset_connect_incoming() {
let (mut notif, _controller, _notif_service) = development_notifs();
let peer = PeerId::random();
let conn = ConnectionId::new_unchecked(0);
let set_id = SetId::from(0);
let connected = ConnectedPoint::Listener {
local_addr: Multiaddr::empty(),
send_back_addr: Multiaddr::empty(),
};
notif.on_swarm_event(FromSwarm::ConnectionEstablished(
libp2p::swarm::behaviour::ConnectionEstablished {
peer_id: peer,
connection_id: conn,
endpoint: &connected,
failed_addresses: &[],
other_established: 0usize,
},
));
assert!(std::matches!(notif.peers.get(&(peer, set_id)), Some(&PeerState::Disabled { .. })));
notif.on_connection_handler_event(
peer,
conn,
NotifsHandlerOut::OpenDesiredByRemote {
protocol_index: 0,
handshake: vec![1, 3, 3, 7],
},
);
notif.protocol_report_accept(IncomingIndex(0));
assert!(std::matches!(notif.peers.get(&(peer, set_id)), Some(&PeerState::Enabled { .. })));
}
#[test]
fn peerset_disconnect_disable_pending_enable() {
let (mut notif, _controller, _notif_service) = development_notifs();
let set_id = SetId::from(0);
let peer = PeerId::random();
let conn = ConnectionId::new_unchecked(0);
let connected = ConnectedPoint::Listener {
local_addr: Multiaddr::empty(),
send_back_addr: Multiaddr::empty(),
};
notif.on_swarm_event(FromSwarm::ConnectionEstablished(
libp2p::swarm::behaviour::ConnectionEstablished {
peer_id: peer,
connection_id: conn,
endpoint: &connected,
failed_addresses: &[],
other_established: 0usize,
},
));
assert!(std::matches!(notif.peers.get(&(peer, set_id)), Some(&PeerState::Disabled { .. })));
if let Some(PeerState::Disabled { ref mut backoff_until, .. }) =
notif.peers.get_mut(&(peer, set_id))
{
*backoff_until =
Some(Instant::now().checked_add(std::time::Duration::from_secs(5)).unwrap());
}
notif.peerset_report_connect(peer, set_id);
assert!(std::matches!(
notif.peers.get(&(peer, set_id)),
Some(&PeerState::DisabledPendingEnable { .. })
));
notif.peerset_report_disconnect(peer, set_id);
if let Some(PeerState::Disabled { backoff_until, .. }) = notif.peers.get(&(peer, set_id)) {
assert!(backoff_until.is_some());
assert!(backoff_until.unwrap() > Instant::now());
} else {
panic!("invalid state");
}
}
#[test]
fn peerset_disconnect_enabled() {
let (mut notif, _controller, _notif_service) = development_notifs();
let peer = PeerId::random();
let conn = ConnectionId::new_unchecked(0);
let set_id = SetId::from(0);
let connected = ConnectedPoint::Listener {
local_addr: Multiaddr::empty(),
send_back_addr: Multiaddr::empty(),
};
notif.on_swarm_event(FromSwarm::ConnectionEstablished(
libp2p::swarm::behaviour::ConnectionEstablished {
peer_id: peer,
connection_id: conn,
endpoint: &connected,
failed_addresses: &[],
other_established: 0usize,
},
));
notif.on_connection_handler_event(
peer,
conn,
NotifsHandlerOut::OpenDesiredByRemote {
protocol_index: 0,
handshake: vec![1, 3, 3, 7],
},
);
notif.protocol_report_accept(IncomingIndex(0));
assert!(std::matches!(notif.peers.get(&(peer, set_id)), Some(&PeerState::Enabled { .. })));
notif.peerset_report_disconnect(peer, set_id);
assert!(std::matches!(notif.peers.get(&(peer, set_id)), Some(&PeerState::Disabled { .. })));
}
#[test]
fn peerset_disconnect_requested() {
let (mut notif, _controller, _notif_service) = development_notifs();
let peer = PeerId::random();
let set_id = SetId::from(0);
notif.peerset_report_connect(peer, set_id);
assert!(std::matches!(notif.peers.get(&(peer, set_id)), Some(&PeerState::Requested)));
notif.peerset_report_disconnect(peer, set_id);
assert!(notif.peers.get(&(peer, set_id)).is_none());
}
#[test]
fn peerset_disconnect_pending_request() {
let (mut notif, _controller, _notif_service) = development_notifs();
let set_id = SetId::from(0);
let peer = PeerId::random();
let conn = ConnectionId::new_unchecked(0);
let connected = ConnectedPoint::Listener {
local_addr: Multiaddr::empty(),
send_back_addr: Multiaddr::empty(),
};
notif.on_swarm_event(FromSwarm::ConnectionEstablished(
libp2p::swarm::behaviour::ConnectionEstablished {
peer_id: peer,
connection_id: conn,
endpoint: &connected,
failed_addresses: &[],
other_established: 0usize,
},
));
assert!(std::matches!(notif.peers.get(&(peer, set_id)), Some(&PeerState::Disabled { .. })));
if let Some(PeerState::Disabled { ref mut backoff_until, .. }) =
notif.peers.get_mut(&(peer, set_id))
{
*backoff_until =
Some(Instant::now().checked_add(std::time::Duration::from_secs(5)).unwrap());
}
notif.on_swarm_event(FromSwarm::ConnectionClosed(
libp2p::swarm::behaviour::ConnectionClosed {
peer_id: peer,
connection_id: conn,
endpoint: &connected.clone(),
cause: None,
remaining_established: 0usize,
},
));
assert!(std::matches!(notif.peers.get(&(peer, set_id)), Some(&PeerState::Backoff { .. })));
notif.peerset_report_connect(peer, set_id);
assert!(std::matches!(
notif.peers.get(&(peer, set_id)),
Some(&PeerState::PendingRequest { .. })
));
notif.peerset_report_disconnect(peer, set_id);
assert!(std::matches!(notif.peers.get(&(peer, set_id)), Some(&PeerState::Backoff { .. })));
}
#[test]
fn peerset_accept_peer_not_alive() {
let (mut notif, _controller, _notif_service) = development_notifs();
let peer = PeerId::random();
let conn = ConnectionId::new_unchecked(0);
let set_id = SetId::from(0);
let connected = ConnectedPoint::Listener {
local_addr: Multiaddr::empty(),
send_back_addr: Multiaddr::empty(),
};
notif.on_swarm_event(FromSwarm::ConnectionEstablished(
libp2p::swarm::behaviour::ConnectionEstablished {
peer_id: peer,
connection_id: conn,
endpoint: &connected,
failed_addresses: &[],
other_established: 0usize,
},
));
assert!(std::matches!(notif.peers.get(&(peer, set_id)), Some(&PeerState::Disabled { .. })));
notif.on_connection_handler_event(
peer,
conn,
NotifsHandlerOut::OpenDesiredByRemote {
protocol_index: 0,
handshake: vec![1, 3, 3, 7],
},
);
assert!(std::matches!(notif.peers.get(&(peer, set_id)), Some(&PeerState::Incoming { .. })));
assert!(std::matches!(
notif.incoming[0],
IncomingPeer { alive: true, incoming_id: IncomingIndex(0), .. },
));
notif.disconnect_peer(&peer, set_id);
assert!(std::matches!(notif.peers.get(&(peer, set_id)), Some(&PeerState::Disabled { .. })));
assert!(std::matches!(
notif.incoming[0],
IncomingPeer { alive: false, incoming_id: IncomingIndex(0), .. },
));
notif.protocol_report_accept(IncomingIndex(0));
assert_eq!(notif.incoming.len(), 0);
assert!(std::matches!(notif.peers.get(&(peer, set_id)), Some(PeerState::Disabled { .. })));
}
#[test]
fn secondary_connection_peer_state_incoming() {
let (mut notif, _controller, _notif_service) = development_notifs();
let peer = PeerId::random();
let conn = ConnectionId::new_unchecked(0);
let conn2 = ConnectionId::new_unchecked(1);
let set_id = SetId::from(0);
let connected = ConnectedPoint::Listener {
local_addr: Multiaddr::empty(),
send_back_addr: Multiaddr::empty(),
};
notif.on_swarm_event(FromSwarm::ConnectionEstablished(
libp2p::swarm::behaviour::ConnectionEstablished {
peer_id: peer,
connection_id: conn,
endpoint: &connected,
failed_addresses: &[],
other_established: 0usize,
},
));
assert!(std::matches!(notif.peers.get(&(peer, set_id)), Some(&PeerState::Disabled { .. })));
notif.on_connection_handler_event(
peer,
conn,
NotifsHandlerOut::OpenDesiredByRemote {
protocol_index: 0,
handshake: vec![1, 3, 3, 7],
},
);
if let Some(PeerState::Incoming { connections, .. }) = notif.peers.get(&(peer, set_id)) {
assert_eq!(connections.len(), 1);
assert_eq!(connections[0], (conn, ConnectionState::OpenDesiredByRemote));
} else {
panic!("invalid state");
}
notif.on_swarm_event(FromSwarm::ConnectionEstablished(
libp2p::swarm::behaviour::ConnectionEstablished {
peer_id: peer,
connection_id: conn2,
endpoint: &connected,
failed_addresses: &[],
other_established: 0usize,
},
));
if let Some(PeerState::Incoming { connections, .. }) = notif.peers.get(&(peer, set_id)) {
assert_eq!(connections.len(), 2);
assert_eq!(connections[0], (conn, ConnectionState::OpenDesiredByRemote));
assert_eq!(connections[1], (conn2, ConnectionState::Closed));
} else {
panic!("invalid state");
}
}
#[test]
fn close_connection_for_disabled_peer() {
let (mut notif, _controller, _notif_service) = development_notifs();
let peer = PeerId::random();
let conn = ConnectionId::new_unchecked(0);
let set_id = SetId::from(0);
let connected = ConnectedPoint::Listener {
local_addr: Multiaddr::empty(),
send_back_addr: Multiaddr::empty(),
};
notif.on_swarm_event(FromSwarm::ConnectionEstablished(
libp2p::swarm::behaviour::ConnectionEstablished {
peer_id: peer,
connection_id: conn,
endpoint: &connected,
failed_addresses: &[],
other_established: 0usize,
},
));
assert!(std::matches!(notif.peers.get(&(peer, set_id)), Some(&PeerState::Disabled { .. })));
notif.on_swarm_event(FromSwarm::ConnectionClosed(
libp2p::swarm::behaviour::ConnectionClosed {
peer_id: peer,
connection_id: conn,
endpoint: &connected.clone(),
cause: None,
remaining_established: 0usize,
},
));
assert!(notif.peers.get(&(peer, set_id)).is_none());
}
#[test]
fn close_connection_for_incoming_peer_one_connection() {
let (mut notif, _controller, _notif_service) = development_notifs();
let peer = PeerId::random();
let conn = ConnectionId::new_unchecked(0);
let set_id = SetId::from(0);
let connected = ConnectedPoint::Listener {
local_addr: Multiaddr::empty(),
send_back_addr: Multiaddr::empty(),
};
notif.on_swarm_event(FromSwarm::ConnectionEstablished(
libp2p::swarm::behaviour::ConnectionEstablished {
peer_id: peer,
connection_id: conn,
endpoint: &connected,
failed_addresses: &[],
other_established: 0usize,
},
));
assert!(std::matches!(notif.peers.get(&(peer, set_id)), Some(&PeerState::Disabled { .. })));
notif.on_connection_handler_event(
peer,
conn,
NotifsHandlerOut::OpenDesiredByRemote {
protocol_index: 0,
handshake: vec![1, 3, 3, 7],
},
);
assert!(std::matches!(notif.peers.get(&(peer, set_id)), Some(&PeerState::Incoming { .. })));
notif.on_swarm_event(FromSwarm::ConnectionClosed(
libp2p::swarm::behaviour::ConnectionClosed {
peer_id: peer,
connection_id: conn,
endpoint: &connected.clone(),
cause: None,
remaining_established: 0usize,
},
));
assert!(notif.peers.get(&(peer, set_id)).is_none());
assert!(std::matches!(
notif.incoming[0],
IncomingPeer { alive: false, incoming_id: IncomingIndex(0), .. },
));
}
#[test]
fn close_connection_for_incoming_peer_two_connections() {
let (mut notif, _controller, _notif_service) = development_notifs();
let peer = PeerId::random();
let conn = ConnectionId::new_unchecked(0);
let conn1 = ConnectionId::new_unchecked(1);
let set_id = SetId::from(0);
let connected = ConnectedPoint::Listener {
local_addr: Multiaddr::empty(),
send_back_addr: Multiaddr::empty(),
};
let mut conns = SmallVec::<
[(ConnectionId, ConnectionState); crate::MAX_CONNECTIONS_PER_PEER],
>::from(vec![(conn, ConnectionState::Closed)]);
notif.on_swarm_event(FromSwarm::ConnectionEstablished(
libp2p::swarm::behaviour::ConnectionEstablished {
peer_id: peer,
connection_id: conn,
endpoint: &connected,
failed_addresses: &[],
other_established: 0usize,
},
));
assert!(std::matches!(notif.peers.get(&(peer, set_id)), Some(&PeerState::Disabled { .. })));
notif.on_connection_handler_event(
peer,
conn,
NotifsHandlerOut::OpenDesiredByRemote {
protocol_index: 0,
handshake: vec![1, 3, 3, 7],
},
);
assert!(std::matches!(notif.peers.get(&(peer, set_id)), Some(&PeerState::Incoming { .. })));
notif.on_swarm_event(FromSwarm::ConnectionEstablished(
libp2p::swarm::behaviour::ConnectionEstablished {
peer_id: peer,
connection_id: conn1,
endpoint: &connected,
failed_addresses: &[],
other_established: 0usize,
},
));
conns.push((conn1, ConnectionState::Closed));
if let Some(PeerState::Incoming { ref connections, .. }) = notif.peers.get(&(peer, set_id))
{
assert_eq!(connections.len(), 2);
assert_eq!(connections[0], (conn, ConnectionState::OpenDesiredByRemote));
assert_eq!(connections[1], (conn1, ConnectionState::Closed));
}
notif.on_swarm_event(FromSwarm::ConnectionClosed(
libp2p::swarm::behaviour::ConnectionClosed {
peer_id: peer,
connection_id: conn,
endpoint: &connected.clone(),
cause: None,
remaining_established: 0usize,
},
));
if let Some(PeerState::Disabled { connections, .. }) = notif.peers.get(&(peer, set_id)) {
assert_eq!(connections.len(), 1);
assert_eq!(connections[0], (conn1, ConnectionState::Closed));
} else {
panic!("invalid state");
}
}
#[test]
fn connection_and_substream_open() {
let (mut notif, _controller, _notif_service) = development_notifs();
let peer = PeerId::random();
let conn = ConnectionId::new_unchecked(0);
let set_id = SetId::from(0);
let connected = ConnectedPoint::Listener {
local_addr: Multiaddr::empty(),
send_back_addr: Multiaddr::empty(),
};
let mut conn_yielder = ConnectionYielder::new();
notif.on_swarm_event(FromSwarm::ConnectionEstablished(
libp2p::swarm::behaviour::ConnectionEstablished {
peer_id: peer,
connection_id: conn,
endpoint: &connected,
failed_addresses: &[],
other_established: 0usize,
},
));
assert!(std::matches!(notif.peers.get(&(peer, set_id)), Some(&PeerState::Disabled { .. })));
notif.on_connection_handler_event(
peer,
conn,
NotifsHandlerOut::OpenDesiredByRemote {
protocol_index: 0,
handshake: vec![1, 3, 3, 7],
},
);
assert!(std::matches!(notif.peers.get(&(peer, set_id)), Some(&PeerState::Incoming { .. })));
notif.protocol_report_accept(IncomingIndex(0));
assert!(std::matches!(notif.peers.get(&(peer, set_id)), Some(&PeerState::Enabled { .. })));
let event = conn_yielder.open_substream(peer, 0, vec![1, 2, 3, 4]);
notif.on_connection_handler_event(peer, conn, event);
assert!(std::matches!(notif.peers.get(&(peer, set_id)), Some(&PeerState::Enabled { .. })));
if let Some(PeerState::Enabled { ref connections, .. }) = notif.peers.get(&(peer, set_id)) {
assert_eq!(connections.len(), 1);
assert_eq!(connections[0].0, conn);
assert!(std::matches!(connections[0].1, ConnectionState::Open(_)));
}
assert!(std::matches!(
notif.events[notif.events.len() - 1],
ToSwarm::GenerateEvent(NotificationsOut::CustomProtocolOpen { .. })
));
}
#[test]
fn connection_closed_sink_replaced() {
let (mut notif, _controller, _notif_service) = development_notifs();
let peer = PeerId::random();
let conn1 = ConnectionId::new_unchecked(0);
let conn2 = ConnectionId::new_unchecked(1);
let set_id = SetId::from(0);
let connected = ConnectedPoint::Listener {
local_addr: Multiaddr::empty(),
send_back_addr: Multiaddr::empty(),
};
let mut conn_yielder = ConnectionYielder::new();
for conn_id in vec![conn1, conn2] {
notif.on_swarm_event(FromSwarm::ConnectionEstablished(
libp2p::swarm::behaviour::ConnectionEstablished {
peer_id: peer,
connection_id: conn_id,
endpoint: &connected,
failed_addresses: &[],
other_established: 0usize,
},
));
}
if let Some(PeerState::Disabled { connections, .. }) = notif.peers.get(&(peer, set_id)) {
assert_eq!(connections[0], (conn1, ConnectionState::Closed));
assert_eq!(connections[1], (conn2, ConnectionState::Closed));
} else {
panic!("invalid state");
}
notif.peerset_report_connect(peer, set_id);
notif.on_connection_handler_event(
peer,
conn2,
NotifsHandlerOut::OpenDesiredByRemote {
protocol_index: 0,
handshake: vec![1, 3, 3, 7],
},
);
if let Some(PeerState::Enabled { connections, .. }) = notif.peers.get(&(peer, set_id)) {
assert_eq!(connections[0], (conn1, ConnectionState::Opening));
assert_eq!(connections[1], (conn2, ConnectionState::Opening));
} else {
panic!("invalid state");
}
for conn in vec![conn1, conn2].iter() {
notif.on_connection_handler_event(
peer,
*conn,
conn_yielder.open_substream(peer, 0, vec![1, 2, 3, 4]),
);
}
if let Some(PeerState::Enabled { ref connections, .. }) = notif.peers.get(&(peer, set_id)) {
assert_eq!(connections[0].0, conn1);
assert!(std::matches!(connections[0].1, ConnectionState::Open(_)));
assert_eq!(connections[1].0, conn2);
assert!(std::matches!(connections[1].1, ConnectionState::Open(_)));
} else {
panic!("invalid state");
}
assert_eq!(notif.open_peers().collect::<Vec<_>>(), vec![&peer],);
notif.on_swarm_event(FromSwarm::ConnectionClosed(
libp2p::swarm::behaviour::ConnectionClosed {
peer_id: peer,
connection_id: conn1,
endpoint: &connected.clone(),
cause: None,
remaining_established: 0usize,
},
));
if let Some(PeerState::Enabled { ref connections, .. }) = notif.peers.get(&(peer, set_id)) {
assert_eq!(connections.len(), 1);
assert_eq!(connections[0].0, conn2);
assert!(std::matches!(connections[0].1, ConnectionState::Open(_)));
} else {
panic!("invalid state");
}
assert!(std::matches!(
notif.events[notif.events.len() - 1],
ToSwarm::GenerateEvent(NotificationsOut::CustomProtocolReplaced { .. })
));
}
#[test]
fn dial_failure_for_requested_peer() {
let (mut notif, _controller, _notif_service) = development_notifs();
let peer = PeerId::random();
let set_id = SetId::from(0);
notif.peerset_report_connect(peer, set_id);
assert!(std::matches!(notif.peers.get(&(peer, set_id)), Some(&PeerState::Requested)));
notif.on_swarm_event(FromSwarm::DialFailure(libp2p::swarm::behaviour::DialFailure {
peer_id: Some(peer),
error: &libp2p::swarm::DialError::Aborted,
connection_id: ConnectionId::new_unchecked(1337),
}));
if let Some(PeerState::Backoff { timer_deadline, .. }) = notif.peers.get(&(peer, set_id)) {
assert!(timer_deadline > &Instant::now());
} else {
panic!("invalid state");
}
}
#[tokio::test]
async fn write_notification() {
let (mut notif, _controller, _notif_service) = development_notifs();
let peer = PeerId::random();
let conn = ConnectionId::new_unchecked(0);
let set_id = SetId::from(0);
let connected = ConnectedPoint::Listener {
local_addr: Multiaddr::empty(),
send_back_addr: Multiaddr::empty(),
};
let mut conn_yielder = ConnectionYielder::new();
notif.on_swarm_event(FromSwarm::ConnectionEstablished(
libp2p::swarm::behaviour::ConnectionEstablished {
peer_id: peer,
connection_id: conn,
endpoint: &connected,
failed_addresses: &[],
other_established: 0usize,
},
));
assert!(std::matches!(notif.peers.get(&(peer, set_id)), Some(&PeerState::Disabled { .. })));
notif.peerset_report_connect(peer, set_id);
assert!(std::matches!(notif.peers.get(&(peer, set_id)), Some(&PeerState::Enabled { .. })));
notif.on_connection_handler_event(
peer,
conn,
conn_yielder.open_substream(peer, 0, vec![1, 2, 3, 4]),
);
if let Some(PeerState::Enabled { ref connections, .. }) = notif.peers.get(&(peer, set_id)) {
assert_eq!(connections[0].0, conn);
assert!(std::matches!(connections[0].1, ConnectionState::Open(_)));
} else {
panic!("invalid state");
}
notif
.peers
.get(&(peer, set_id))
.unwrap()
.get_open()
.unwrap()
.send_sync_notification(vec![1, 3, 3, 7]);
assert_eq!(conn_yielder.get_next_event(peer, set_id.into()).await, Some(vec![1, 3, 3, 7]));
}
#[test]
fn peerset_report_connect_backoff_expired() {
let (mut notif, _controller, _notif_service) = development_notifs();
let set_id = SetId::from(0);
let peer = PeerId::random();
let conn = ConnectionId::new_unchecked(0);
let connected = ConnectedPoint::Listener {
local_addr: Multiaddr::empty(),
send_back_addr: Multiaddr::empty(),
};
let backoff_duration = Duration::from_millis(100);
notif.on_swarm_event(FromSwarm::ConnectionEstablished(
libp2p::swarm::behaviour::ConnectionEstablished {
peer_id: peer,
connection_id: conn,
endpoint: &connected,
failed_addresses: &[],
other_established: 0usize,
},
));
assert!(std::matches!(notif.peers.get(&(peer, set_id)), Some(&PeerState::Disabled { .. })));
if let Some(PeerState::Disabled { ref mut backoff_until, .. }) =
notif.peers.get_mut(&(peer, set_id))
{
*backoff_until = Some(Instant::now().checked_add(backoff_duration).unwrap());
}
notif.on_swarm_event(FromSwarm::ConnectionClosed(
libp2p::swarm::behaviour::ConnectionClosed {
peer_id: peer,
connection_id: conn,
endpoint: &connected.clone(),
cause: None,
remaining_established: 0usize,
},
));
std::thread::sleep(backoff_duration * 2);
notif.peerset_report_connect(peer, set_id);
assert!(std::matches!(notif.peers.get(&(peer, set_id)), Some(&PeerState::Requested { .. })))
}
#[test]
fn peerset_report_disconnect_disabled() {
let (mut notif, _controller, _notif_service) = development_notifs();
let peer = PeerId::random();
let set_id = SetId::from(0);
let conn = ConnectionId::new_unchecked(0);
let connected = ConnectedPoint::Listener {
local_addr: Multiaddr::empty(),
send_back_addr: Multiaddr::empty(),
};
notif.on_swarm_event(FromSwarm::ConnectionEstablished(
libp2p::swarm::behaviour::ConnectionEstablished {
peer_id: peer,
connection_id: conn,
endpoint: &connected,
failed_addresses: &[],
other_established: 0usize,
},
));
assert!(std::matches!(notif.peers.get(&(peer, set_id)), Some(&PeerState::Disabled { .. })));
notif.peerset_report_disconnect(peer, set_id);
assert!(std::matches!(notif.peers.get(&(peer, set_id)), Some(&PeerState::Disabled { .. })));
}
#[test]
fn peerset_report_disconnect_backoff() {
let (mut notif, _controller, _notif_service) = development_notifs();
let set_id = SetId::from(0);
let peer = PeerId::random();
let conn = ConnectionId::new_unchecked(0);
let connected = ConnectedPoint::Listener {
local_addr: Multiaddr::empty(),
send_back_addr: Multiaddr::empty(),
};
let backoff_duration = Duration::from_secs(2);
notif.on_swarm_event(FromSwarm::ConnectionEstablished(
libp2p::swarm::behaviour::ConnectionEstablished {
peer_id: peer,
connection_id: conn,
endpoint: &connected,
failed_addresses: &[],
other_established: 0usize,
},
));
assert!(std::matches!(notif.peers.get(&(peer, set_id)), Some(&PeerState::Disabled { .. })));
if let Some(PeerState::Disabled { ref mut backoff_until, .. }) =
notif.peers.get_mut(&(peer, set_id))
{
*backoff_until = Some(Instant::now().checked_add(backoff_duration).unwrap());
}
notif.on_swarm_event(FromSwarm::ConnectionClosed(
libp2p::swarm::behaviour::ConnectionClosed {
peer_id: peer,
connection_id: conn,
endpoint: &connected.clone(),
cause: None,
remaining_established: 0usize,
},
));
assert!(std::matches!(notif.peers.get(&(peer, set_id)), Some(&PeerState::Backoff { .. })));
notif.peerset_report_disconnect(peer, set_id);
assert!(std::matches!(notif.peers.get(&(peer, set_id)), Some(&PeerState::Backoff { .. })));
}
#[test]
fn peer_is_backed_off_if_both_connections_get_closed_while_peer_is_disabled_with_back_off() {
let (mut notif, _controller, _notif_service) = development_notifs();
let set_id = SetId::from(0);
let peer = PeerId::random();
let conn1 = ConnectionId::new_unchecked(0);
let conn2 = ConnectionId::new_unchecked(1);
let connected = ConnectedPoint::Listener {
local_addr: Multiaddr::empty(),
send_back_addr: Multiaddr::empty(),
};
notif.on_swarm_event(FromSwarm::ConnectionEstablished(
libp2p::swarm::behaviour::ConnectionEstablished {
peer_id: peer,
connection_id: conn1,
endpoint: &connected,
failed_addresses: &[],
other_established: 0usize,
},
));
notif.on_swarm_event(FromSwarm::ConnectionEstablished(
libp2p::swarm::behaviour::ConnectionEstablished {
peer_id: peer,
connection_id: conn2,
endpoint: &connected,
failed_addresses: &[],
other_established: 0usize,
},
));
assert!(std::matches!(notif.peers.get(&(peer, set_id)), Some(&PeerState::Disabled { .. })));
if let Some(PeerState::Disabled { ref mut backoff_until, .. }) =
notif.peers.get_mut(&(peer, set_id))
{
*backoff_until =
Some(Instant::now().checked_add(std::time::Duration::from_secs(5)).unwrap());
}
notif.peerset_report_connect(peer, set_id);
assert!(std::matches!(
notif.peers.get(&(peer, set_id)),
Some(&PeerState::DisabledPendingEnable { .. })
));
notif.on_swarm_event(FromSwarm::ConnectionClosed(
libp2p::swarm::behaviour::ConnectionClosed {
peer_id: peer,
connection_id: conn1,
endpoint: &connected.clone(),
cause: None,
remaining_established: 0usize,
},
));
assert!(std::matches!(
notif.peers.get(&(peer, set_id)),
Some(&PeerState::DisabledPendingEnable { .. })
));
notif.on_swarm_event(FromSwarm::ConnectionClosed(
libp2p::swarm::behaviour::ConnectionClosed {
peer_id: peer,
connection_id: conn2,
endpoint: &connected.clone(),
cause: None,
remaining_established: 0usize,
},
));
assert!(std::matches!(notif.peers.get(&(peer, set_id)), Some(&PeerState::Backoff { .. })));
}
#[test]
fn inject_connection_closed_incoming_with_backoff() {
let (mut notif, _controller, _notif_service) = development_notifs();
let peer = PeerId::random();
let set_id = SetId::from(0);
let conn = ConnectionId::new_unchecked(0);
let connected = ConnectedPoint::Listener {
local_addr: Multiaddr::empty(),
send_back_addr: Multiaddr::empty(),
};
notif.on_swarm_event(FromSwarm::ConnectionEstablished(
libp2p::swarm::behaviour::ConnectionEstablished {
peer_id: peer,
connection_id: conn,
endpoint: &connected,
failed_addresses: &[],
other_established: 0usize,
},
));
assert!(std::matches!(notif.peers.get(&(peer, set_id)), Some(&PeerState::Disabled { .. })));
notif.on_connection_handler_event(
peer,
conn,
NotifsHandlerOut::OpenDesiredByRemote {
protocol_index: 0,
handshake: vec![1, 3, 3, 7],
},
);
if let Some(&mut PeerState::Incoming { ref mut backoff_until, .. }) =
notif.peers.get_mut(&(peer, 0.into()))
{
*backoff_until =
Some(Instant::now().checked_add(std::time::Duration::from_secs(5)).unwrap());
} else {
panic!("invalid state");
}
notif.on_swarm_event(FromSwarm::ConnectionClosed(
libp2p::swarm::behaviour::ConnectionClosed {
peer_id: peer,
connection_id: conn,
endpoint: &connected.clone(),
cause: None,
remaining_established: 0usize,
},
));
assert!(std::matches!(notif.peers.get(&(peer, set_id)), Some(&PeerState::Backoff { .. })));
}
#[test]
fn two_connections_inactive_connection_gets_closed_peer_state_is_still_incoming() {
let (mut notif, _controller, _notif_service) = development_notifs();
let peer = PeerId::random();
let conn1 = ConnectionId::new_unchecked(0);
let conn2 = ConnectionId::new_unchecked(1);
let set_id = SetId::from(0);
let connected = ConnectedPoint::Listener {
local_addr: Multiaddr::empty(),
send_back_addr: Multiaddr::empty(),
};
for conn_id in vec![conn1, conn2] {
notif.on_swarm_event(FromSwarm::ConnectionEstablished(
libp2p::swarm::behaviour::ConnectionEstablished {
peer_id: peer,
connection_id: conn_id,
endpoint: &connected,
failed_addresses: &[],
other_established: 0usize,
},
));
}
if let Some(PeerState::Disabled { connections, .. }) = notif.peers.get(&(peer, set_id)) {
assert_eq!(connections[0], (conn1, ConnectionState::Closed));
assert_eq!(connections[1], (conn2, ConnectionState::Closed));
} else {
panic!("invalid state");
}
notif.on_connection_handler_event(
peer,
conn1,
NotifsHandlerOut::OpenDesiredByRemote {
protocol_index: 0,
handshake: vec![1, 3, 3, 7],
},
);
assert!(std::matches!(
notif.peers.get_mut(&(peer, 0.into())),
Some(&mut PeerState::Incoming { .. })
));
notif.on_swarm_event(FromSwarm::ConnectionClosed(
libp2p::swarm::behaviour::ConnectionClosed {
peer_id: peer,
connection_id: conn2,
endpoint: &connected.clone(),
cause: None,
remaining_established: 0usize,
},
));
assert!(std::matches!(notif.peers.get(&(peer, set_id)), Some(&PeerState::Incoming { .. })));
}
#[test]
fn two_connections_active_connection_gets_closed_peer_state_is_disabled() {
let (mut notif, _controller, _notif_service) = development_notifs();
let peer = PeerId::random();
let conn1 = ConnectionId::new_unchecked(0);
let conn2 = ConnectionId::new_unchecked(1);
let set_id = SetId::from(0);
let connected = ConnectedPoint::Listener {
local_addr: Multiaddr::empty(),
send_back_addr: Multiaddr::empty(),
};
for conn_id in vec![conn1, conn2] {
notif.on_swarm_event(FromSwarm::ConnectionEstablished(
libp2p::swarm::behaviour::ConnectionEstablished {
peer_id: peer,
connection_id: conn_id,
endpoint: &ConnectedPoint::Listener {
local_addr: Multiaddr::empty(),
send_back_addr: Multiaddr::empty(),
},
failed_addresses: &[],
other_established: 0usize,
},
));
}
if let Some(PeerState::Disabled { connections, .. }) = notif.peers.get(&(peer, set_id)) {
assert_eq!(connections[0], (conn1, ConnectionState::Closed));
assert_eq!(connections[1], (conn2, ConnectionState::Closed));
} else {
panic!("invalid state");
}
notif.on_connection_handler_event(
peer,
conn1,
NotifsHandlerOut::OpenDesiredByRemote {
protocol_index: 0,
handshake: vec![1, 3, 3, 7],
},
);
assert!(std::matches!(
notif.peers.get_mut(&(peer, 0.into())),
Some(PeerState::Incoming { .. })
));
notif.on_swarm_event(FromSwarm::ConnectionClosed(
libp2p::swarm::behaviour::ConnectionClosed {
peer_id: peer,
connection_id: conn1,
endpoint: &connected.clone(),
cause: None,
remaining_established: 0usize,
},
));
assert!(std::matches!(notif.peers.get(&(peer, set_id)), Some(&PeerState::Disabled { .. })));
}
#[test]
fn inject_connection_closed_for_active_connection() {
let (mut notif, _controller, _notif_service) = development_notifs();
let peer = PeerId::random();
let conn1 = ConnectionId::new_unchecked(0);
let conn2 = ConnectionId::new_unchecked(1);
let set_id = SetId::from(0);
let connected = ConnectedPoint::Listener {
local_addr: Multiaddr::empty(),
send_back_addr: Multiaddr::empty(),
};
let mut conn_yielder = ConnectionYielder::new();
for conn_id in vec![conn1, conn2] {
notif.on_swarm_event(FromSwarm::ConnectionEstablished(
libp2p::swarm::behaviour::ConnectionEstablished {
peer_id: peer,
connection_id: conn_id,
endpoint: &connected,
failed_addresses: &[],
other_established: 0usize,
},
));
}
if let Some(PeerState::Disabled { connections, .. }) = notif.peers.get(&(peer, set_id)) {
assert_eq!(connections[0], (conn1, ConnectionState::Closed));
assert_eq!(connections[1], (conn2, ConnectionState::Closed));
} else {
panic!("invalid state");
}
notif.peerset_report_connect(peer, set_id);
if let Some(PeerState::Enabled { connections, .. }) = notif.peers.get(&(peer, set_id)) {
assert_eq!(connections[0], (conn1, ConnectionState::Opening));
assert_eq!(connections[1], (conn2, ConnectionState::Closed));
} else {
panic!("invalid state");
}
notif.on_connection_handler_event(
peer,
conn1,
conn_yielder.open_substream(peer, 0, vec![1, 2, 3, 4]),
);
if let Some(PeerState::Enabled { ref connections, .. }) = notif.peers.get(&(peer, set_id)) {
assert!(std::matches!(connections[0].1, ConnectionState::Open(_)));
assert_eq!(connections[0].0, conn1);
assert_eq!(connections[1], (conn2, ConnectionState::Closed));
} else {
panic!("invalid state");
}
notif.on_swarm_event(FromSwarm::ConnectionClosed(
libp2p::swarm::behaviour::ConnectionClosed {
peer_id: peer,
connection_id: conn1,
endpoint: &connected.clone(),
cause: None,
remaining_established: 0usize,
},
));
}
#[test]
fn inject_dial_failure_for_pending_request() {
let (mut notif, _controller, _notif_service) = development_notifs();
let set_id = SetId::from(0);
let peer = PeerId::random();
let conn = ConnectionId::new_unchecked(0);
let connected = ConnectedPoint::Listener {
local_addr: Multiaddr::empty(),
send_back_addr: Multiaddr::empty(),
};
notif.on_swarm_event(FromSwarm::ConnectionEstablished(
libp2p::swarm::behaviour::ConnectionEstablished {
peer_id: peer,
connection_id: conn,
endpoint: &connected,
failed_addresses: &[],
other_established: 0usize,
},
));
assert!(std::matches!(notif.peers.get(&(peer, set_id)), Some(&PeerState::Disabled { .. })));
if let Some(PeerState::Disabled { ref mut backoff_until, .. }) =
notif.peers.get_mut(&(peer, set_id))
{
*backoff_until =
Some(Instant::now().checked_add(std::time::Duration::from_secs(5)).unwrap());
}
notif.on_swarm_event(FromSwarm::ConnectionClosed(
libp2p::swarm::behaviour::ConnectionClosed {
peer_id: peer,
connection_id: conn,
endpoint: &connected.clone(),
cause: None,
remaining_established: 0usize,
},
));
assert!(std::matches!(notif.peers.get(&(peer, set_id)), Some(&PeerState::Backoff { .. })));
notif.peerset_report_connect(peer, set_id);
assert!(std::matches!(
notif.peers.get(&(peer, set_id)),
Some(&PeerState::PendingRequest { .. })
));
let now = Instant::now();
notif.on_swarm_event(FromSwarm::DialFailure(libp2p::swarm::behaviour::DialFailure {
peer_id: Some(peer),
error: &libp2p::swarm::DialError::Aborted,
connection_id: ConnectionId::new_unchecked(0),
}));
if let Some(PeerState::PendingRequest { ref timer_deadline, .. }) =
notif.peers.get(&(peer, set_id))
{
assert!(timer_deadline > &(now + std::time::Duration::from_secs(5)));
}
}
#[test]
fn peerstate_incoming_open_desired_by_remote() {
let (mut notif, _controller, _notif_service) = development_notifs();
let peer = PeerId::random();
let set_id = SetId::from(0);
let conn1 = ConnectionId::new_unchecked(0);
let conn2 = ConnectionId::new_unchecked(1);
let connected = ConnectedPoint::Listener {
local_addr: Multiaddr::empty(),
send_back_addr: Multiaddr::empty(),
};
notif.on_swarm_event(FromSwarm::ConnectionEstablished(
libp2p::swarm::behaviour::ConnectionEstablished {
peer_id: peer,
connection_id: conn1,
endpoint: &connected,
failed_addresses: &[],
other_established: 0usize,
},
));
notif.on_swarm_event(FromSwarm::ConnectionEstablished(
libp2p::swarm::behaviour::ConnectionEstablished {
peer_id: peer,
connection_id: conn2,
endpoint: &connected,
failed_addresses: &[],
other_established: 0usize,
},
));
assert!(std::matches!(notif.peers.get(&(peer, set_id)), Some(&PeerState::Disabled { .. })));
notif.on_connection_handler_event(
peer,
conn1,
NotifsHandlerOut::OpenDesiredByRemote {
protocol_index: 0,
handshake: vec![1, 3, 3, 7],
},
);
assert!(std::matches!(notif.peers.get(&(peer, set_id)), Some(&PeerState::Incoming { .. })));
notif.on_connection_handler_event(
peer,
conn2,
NotifsHandlerOut::OpenDesiredByRemote {
protocol_index: 0,
handshake: vec![1, 3, 3, 7],
},
);
if let Some(PeerState::Incoming { ref connections, .. }) = notif.peers.get(&(peer, set_id))
{
assert_eq!(connections[0], (conn1, ConnectionState::OpenDesiredByRemote));
assert_eq!(connections[1], (conn2, ConnectionState::OpenDesiredByRemote));
}
}
#[tokio::test]
async fn remove_backoff_peer_after_timeout() {
let (mut notif, _controller, _notif_service) = development_notifs();
let peer = PeerId::random();
let set_id = SetId::from(0);
let conn = ConnectionId::new_unchecked(0);
let connected = ConnectedPoint::Listener {
local_addr: Multiaddr::empty(),
send_back_addr: Multiaddr::empty(),
};
notif.on_swarm_event(FromSwarm::ConnectionEstablished(
libp2p::swarm::behaviour::ConnectionEstablished {
peer_id: peer,
connection_id: conn,
endpoint: &connected,
failed_addresses: &[],
other_established: 0usize,
},
));
if let Some(&mut PeerState::Disabled { ref mut backoff_until, .. }) =
notif.peers.get_mut(&(peer, 0.into()))
{
*backoff_until =
Some(Instant::now().checked_add(std::time::Duration::from_millis(100)).unwrap());
} else {
panic!("invalid state");
}
notif.on_swarm_event(FromSwarm::ConnectionClosed(
libp2p::swarm::behaviour::ConnectionClosed {
peer_id: peer,
connection_id: conn,
endpoint: &connected.clone(),
cause: None,
remaining_established: 0usize,
},
));
let until = if let Some(&PeerState::Backoff { timer_deadline, .. }) =
notif.peers.get(&(peer, set_id))
{
timer_deadline
} else {
panic!("invalid state");
};
if until > Instant::now() {
std::thread::sleep(until - Instant::now());
}
assert!(notif.peers.get(&(peer, set_id)).is_some());
if tokio::time::timeout(Duration::from_secs(5), async {
loop {
futures::future::poll_fn(|cx| {
let _ = notif.poll(cx);
Poll::Ready(())
})
.await;
if notif.peers.get(&(peer, set_id)).is_none() {
break
}
}
})
.await
.is_err()
{
panic!("backoff peer was not removed in time");
}
assert!(notif.peers.get(&(peer, set_id)).is_none());
}
#[tokio::test]
async fn reschedule_disabled_pending_enable_when_connection_not_closed() {
let (mut notif, _controller, _notif_service) = development_notifs();
let peer = PeerId::random();
let conn = ConnectionId::new_unchecked(0);
let set_id = SetId::from(0);
let mut conn_yielder = ConnectionYielder::new();
notif.on_swarm_event(FromSwarm::ConnectionEstablished(
libp2p::swarm::behaviour::ConnectionEstablished {
peer_id: peer,
connection_id: conn,
endpoint: &ConnectedPoint::Listener {
local_addr: Multiaddr::empty(),
send_back_addr: Multiaddr::empty(),
},
failed_addresses: &[],
other_established: 0usize,
},
));
assert!(std::matches!(notif.peers.get(&(peer, set_id)), Some(&PeerState::Disabled { .. })));
notif.on_connection_handler_event(
peer,
conn,
NotifsHandlerOut::OpenDesiredByRemote {
protocol_index: 0,
handshake: vec![1, 3, 3, 7],
},
);
assert!(std::matches!(notif.peers.get(&(peer, set_id)), Some(&PeerState::Incoming { .. })));
notif.protocol_report_accept(IncomingIndex(0));
assert!(std::matches!(notif.peers.get(&(peer, set_id)), Some(&PeerState::Enabled { .. })));
let event = conn_yielder.open_substream(peer, 0, vec![1, 2, 3, 4]);
notif.on_connection_handler_event(peer, conn, event);
assert!(std::matches!(notif.peers.get(&(peer, set_id)), Some(&PeerState::Enabled { .. })));
if let Some(PeerState::Enabled { ref connections, .. }) = notif.peers.get(&(peer, set_id)) {
assert!(std::matches!(connections[0], (_, ConnectionState::Open(_))));
assert_eq!(connections[0].0, conn);
} else {
panic!("invalid state");
}
notif.peerset_report_disconnect(peer, set_id);
if let Some(PeerState::Disabled { ref connections, ref mut backoff_until }) =
notif.peers.get_mut(&(peer, set_id))
{
assert!(std::matches!(connections[0], (_, ConnectionState::Closing)));
assert_eq!(connections[0].0, conn);
*backoff_until =
Some(Instant::now().checked_add(std::time::Duration::from_secs(2)).unwrap());
} else {
panic!("invalid state");
}
notif.peerset_report_connect(peer, set_id);
let prev_instant =
if let Some(PeerState::DisabledPendingEnable {
ref connections, timer_deadline, ..
}) = notif.peers.get(&(peer, set_id))
{
assert!(std::matches!(connections[0], (_, ConnectionState::Closing)));
assert_eq!(connections[0].0, conn);
*timer_deadline
} else {
panic!("invalid state");
};
if tokio::time::timeout(Duration::from_secs(5), async {
loop {
futures::future::poll_fn(|cx| {
let _ = notif.poll(cx);
Poll::Ready(())
})
.await;
if let Some(PeerState::DisabledPendingEnable {
timer_deadline, connections, ..
}) = notif.peers.get(&(peer, set_id))
{
assert!(std::matches!(connections[0], (_, ConnectionState::Closing)));
if timer_deadline != &prev_instant {
break
}
} else {
panic!("invalid state");
}
}
})
.await
.is_err()
{
panic!("backoff peer was not removed in time");
}
}
#[test]
#[should_panic]
#[cfg(debug_assertions)]
fn peerset_report_connect_with_enabled_peer() {
let (mut notif, _controller, _notif_service) = development_notifs();
let peer = PeerId::random();
let conn = ConnectionId::new_unchecked(0);
let set_id = SetId::from(0);
let connected = ConnectedPoint::Listener {
local_addr: Multiaddr::empty(),
send_back_addr: Multiaddr::empty(),
};
let mut conn_yielder = ConnectionYielder::new();
notif.on_swarm_event(FromSwarm::ConnectionEstablished(
libp2p::swarm::behaviour::ConnectionEstablished {
peer_id: peer,
connection_id: conn,
endpoint: &connected,
failed_addresses: &[],
other_established: 0usize,
},
));
assert!(std::matches!(notif.peers.get(&(peer, set_id)), Some(&PeerState::Disabled { .. })));
notif.on_connection_handler_event(
peer,
conn,
NotifsHandlerOut::OpenDesiredByRemote {
protocol_index: 0,
handshake: vec![1, 3, 3, 7],
},
);
assert!(std::matches!(notif.peers.get(&(peer, set_id)), Some(&PeerState::Incoming { .. })));
notif.peerset_report_connect(peer, set_id);
assert!(std::matches!(notif.peers.get(&(peer, set_id)), Some(&PeerState::Enabled { .. })));
let event = conn_yielder.open_substream(peer, 0, vec![1, 2, 3, 4]);
notif.on_connection_handler_event(peer, conn, event);
assert!(std::matches!(notif.peers.get(&(peer, set_id)), Some(&PeerState::Enabled { .. })));
if let Some(PeerState::Enabled { ref connections, .. }) = notif.peers.get(&(peer, set_id)) {
assert!(std::matches!(connections[0], (_, ConnectionState::Open(_))));
assert_eq!(connections[0].0, conn);
} else {
panic!("invalid state");
}
notif.peerset_report_connect(peer, set_id);
}
#[test]
#[cfg(debug_assertions)]
fn peerset_report_connect_with_disabled_pending_enable_peer() {
let (mut notif, _controller, _notif_service) = development_notifs();
let set_id = SetId::from(0);
let peer = PeerId::random();
let conn = ConnectionId::new_unchecked(0);
let connected = ConnectedPoint::Listener {
local_addr: Multiaddr::empty(),
send_back_addr: Multiaddr::empty(),
};
notif.on_swarm_event(FromSwarm::ConnectionEstablished(
libp2p::swarm::behaviour::ConnectionEstablished {
peer_id: peer,
connection_id: conn,
endpoint: &connected,
failed_addresses: &[],
other_established: 0usize,
},
));
assert!(std::matches!(notif.peers.get(&(peer, set_id)), Some(&PeerState::Disabled { .. })));
if let Some(PeerState::Disabled { ref mut backoff_until, .. }) =
notif.peers.get_mut(&(peer, set_id))
{
*backoff_until =
Some(Instant::now().checked_add(std::time::Duration::from_secs(5)).unwrap());
}
notif.peerset_report_connect(peer, set_id);
assert!(std::matches!(
notif.peers.get(&(peer, set_id)),
Some(&PeerState::DisabledPendingEnable { .. })
));
notif.peerset_report_connect(peer, set_id);
assert!(std::matches!(
notif.peers.get(&(peer, set_id)),
Some(&PeerState::DisabledPendingEnable { .. })
));
}
#[test]
#[cfg(debug_assertions)]
fn peerset_report_connect_with_requested_peer() {
let (mut notif, _controller, _notif_service) = development_notifs();
let peer = PeerId::random();
let set_id = SetId::from(0);
notif.peerset_report_connect(peer, set_id);
assert!(std::matches!(notif.peers.get(&(peer, set_id)), Some(&PeerState::Requested)));
notif.peerset_report_connect(peer, set_id);
assert!(std::matches!(notif.peers.get(&(peer, set_id)), Some(&PeerState::Requested)));
}
#[test]
#[cfg(debug_assertions)]
fn peerset_report_connect_with_pending_requested() {
let (mut notif, _controller, _notif_service) = development_notifs();
let set_id = SetId::from(0);
let peer = PeerId::random();
let conn = ConnectionId::new_unchecked(0);
let connected = ConnectedPoint::Listener {
local_addr: Multiaddr::empty(),
send_back_addr: Multiaddr::empty(),
};
notif.on_swarm_event(FromSwarm::ConnectionEstablished(
libp2p::swarm::behaviour::ConnectionEstablished {
peer_id: peer,
connection_id: conn,
endpoint: &connected,
failed_addresses: &[],
other_established: 0usize,
},
));
assert!(std::matches!(notif.peers.get(&(peer, set_id)), Some(&PeerState::Disabled { .. })));
if let Some(PeerState::Disabled { ref mut backoff_until, .. }) =
notif.peers.get_mut(&(peer, set_id))
{
*backoff_until =
Some(Instant::now().checked_add(std::time::Duration::from_secs(5)).unwrap());
}
notif.on_swarm_event(FromSwarm::ConnectionClosed(
libp2p::swarm::behaviour::ConnectionClosed {
peer_id: peer,
connection_id: conn,
endpoint: &connected.clone(),
cause: None,
remaining_established: 0usize,
},
));
assert!(std::matches!(notif.peers.get(&(peer, set_id)), Some(&PeerState::Backoff { .. })));
notif.peerset_report_connect(peer, set_id);
assert!(std::matches!(
notif.peers.get(&(peer, set_id)),
Some(&PeerState::PendingRequest { .. })
));
notif.peerset_report_connect(peer, set_id);
assert!(std::matches!(
notif.peers.get(&(peer, set_id)),
Some(&PeerState::PendingRequest { .. })
));
}
#[test]
#[cfg(debug_assertions)]
fn peerset_report_connect_with_incoming_peer() {
let (mut notif, _controller, _notif_service) = development_notifs();
let peer = PeerId::random();
let set_id = SetId::from(0);
let conn = ConnectionId::new_unchecked(0);
let connected = ConnectedPoint::Listener {
local_addr: Multiaddr::empty(),
send_back_addr: Multiaddr::empty(),
};
notif.on_swarm_event(FromSwarm::ConnectionEstablished(
libp2p::swarm::behaviour::ConnectionEstablished {
peer_id: peer,
connection_id: conn,
endpoint: &connected,
failed_addresses: &[],
other_established: 0usize,
},
));
assert!(std::matches!(notif.peers.get(&(peer, set_id)), Some(&PeerState::Disabled { .. })));
notif.on_connection_handler_event(
peer,
conn,
NotifsHandlerOut::OpenDesiredByRemote {
protocol_index: 0,
handshake: vec![1, 3, 3, 7],
},
);
assert!(std::matches!(notif.peers.get(&(peer, set_id)), Some(&PeerState::Incoming { .. })));
notif.peerset_report_connect(peer, set_id);
assert!(std::matches!(notif.peers.get(&(peer, set_id)), Some(&PeerState::Incoming { .. })));
}
#[test]
#[cfg(debug_assertions)]
fn peerset_report_disconnect_with_incoming_peer() {
let (mut notif, _controller, _notif_service) = development_notifs();
let peer = PeerId::random();
let set_id = SetId::from(0);
let conn = ConnectionId::new_unchecked(0);
let connected = ConnectedPoint::Listener {
local_addr: Multiaddr::empty(),
send_back_addr: Multiaddr::empty(),
};
notif.on_swarm_event(FromSwarm::ConnectionEstablished(
libp2p::swarm::behaviour::ConnectionEstablished {
peer_id: peer,
connection_id: conn,
endpoint: &connected,
failed_addresses: &[],
other_established: 0usize,
},
));
assert!(std::matches!(notif.peers.get(&(peer, set_id)), Some(&PeerState::Disabled { .. })));
notif.on_connection_handler_event(
peer,
conn,
NotifsHandlerOut::OpenDesiredByRemote {
protocol_index: 0,
handshake: vec![1, 3, 3, 7],
},
);
assert!(std::matches!(notif.peers.get(&(peer, set_id)), Some(&PeerState::Incoming { .. })));
notif.peerset_report_disconnect(peer, set_id);
assert!(std::matches!(notif.peers.get(&(peer, set_id)), Some(&PeerState::Incoming { .. })));
}
#[test]
#[cfg(debug_assertions)]
fn peerset_report_disconnect_with_incoming_peer_protocol_accepts() {
let (mut notif, _controller, _notif_service) = development_notifs();
let peer = PeerId::random();
let set_id = SetId::from(0);
let conn = ConnectionId::new_unchecked(0);
let connected = ConnectedPoint::Listener {
local_addr: Multiaddr::empty(),
send_back_addr: Multiaddr::empty(),
};
notif.on_swarm_event(FromSwarm::ConnectionEstablished(
libp2p::swarm::behaviour::ConnectionEstablished {
peer_id: peer,
connection_id: conn,
endpoint: &connected,
failed_addresses: &[],
other_established: 0usize,
},
));
assert!(std::matches!(notif.peers.get(&(peer, set_id)), Some(&PeerState::Disabled { .. })));
notif.on_connection_handler_event(
peer,
conn,
NotifsHandlerOut::OpenDesiredByRemote {
protocol_index: 0,
handshake: vec![1, 3, 3, 7],
},
);
assert!(std::matches!(notif.peers.get(&(peer, set_id)), Some(&PeerState::Incoming { .. })));
notif.peerset_report_disconnect(peer, set_id);
let incoming_index = match notif.peers.get(&(peer, set_id)) {
Some(&PeerState::Incoming { peerset_rejected, incoming_index, .. }) => {
assert!(peerset_rejected);
incoming_index
},
state => panic!("invalid state: {state:?}"),
};
notif.protocol_report_accept(incoming_index);
match notif.peers.get(&(peer, set_id)) {
Some(&PeerState::Disabled { .. }) => {},
state => panic!("invalid state: {state:?}"),
};
}
#[test]
#[cfg(debug_assertions)]
fn peer_disconnected_protocol_accepts() {
let (mut notif, _controller, _notif_service) = development_notifs();
let peer = PeerId::random();
let set_id = SetId::from(0);
let conn = ConnectionId::new_unchecked(0);
let connected = ConnectedPoint::Listener {
local_addr: Multiaddr::empty(),
send_back_addr: Multiaddr::empty(),
};
notif.on_swarm_event(FromSwarm::ConnectionEstablished(
libp2p::swarm::behaviour::ConnectionEstablished {
peer_id: peer,
connection_id: conn,
endpoint: &connected,
failed_addresses: &[],
other_established: 0usize,
},
));
assert!(std::matches!(notif.peers.get(&(peer, set_id)), Some(&PeerState::Disabled { .. })));
notif.on_connection_handler_event(
peer,
conn,
NotifsHandlerOut::OpenDesiredByRemote {
protocol_index: 0,
handshake: vec![1, 3, 3, 7],
},
);
assert!(std::matches!(notif.peers.get(&(peer, set_id)), Some(&PeerState::Incoming { .. })));
assert!(notif.incoming.iter().any(|entry| entry.incoming_id == IncomingIndex(0)));
notif.disconnect_peer(&peer, set_id);
notif.protocol_report_accept(IncomingIndex(0));
match notif.peers.get(&(peer, set_id)) {
Some(&PeerState::Disabled { .. }) => {},
state => panic!("invalid state: {state:?}"),
};
assert!(!notif.incoming.iter().any(|entry| entry.incoming_id == IncomingIndex(0)));
}
#[test]
#[cfg(debug_assertions)]
fn connection_closed_protocol_accepts() {
let (mut notif, _controller, _notif_service) = development_notifs();
let peer = PeerId::random();
let set_id = SetId::from(0);
let conn = ConnectionId::new_unchecked(0);
let connected = ConnectedPoint::Listener {
local_addr: Multiaddr::empty(),
send_back_addr: Multiaddr::empty(),
};
notif.on_swarm_event(FromSwarm::ConnectionEstablished(
libp2p::swarm::behaviour::ConnectionEstablished {
peer_id: peer,
connection_id: conn,
endpoint: &connected,
failed_addresses: &[],
other_established: 0usize,
},
));
assert!(std::matches!(notif.peers.get(&(peer, set_id)), Some(&PeerState::Disabled { .. })));
notif.on_connection_handler_event(
peer,
conn,
NotifsHandlerOut::OpenDesiredByRemote {
protocol_index: 0,
handshake: vec![1, 3, 3, 7],
},
);
assert!(std::matches!(notif.peers.get(&(peer, set_id)), Some(&PeerState::Incoming { .. })));
notif.on_swarm_event(FromSwarm::ConnectionClosed(
libp2p::swarm::behaviour::ConnectionClosed {
peer_id: peer,
connection_id: ConnectionId::new_unchecked(0),
endpoint: &connected.clone(),
cause: None,
remaining_established: 0usize,
},
));
notif.protocol_report_accept(IncomingIndex(0));
match notif.peers.get(&(peer, set_id)) {
None => {},
state => panic!("invalid state: {state:?}"),
};
}
#[test]
#[cfg(debug_assertions)]
fn peer_disconnected_protocol_reject() {
let (mut notif, _controller, _notif_service) = development_notifs();
let peer = PeerId::random();
let set_id = SetId::from(0);
let conn = ConnectionId::new_unchecked(0);
let connected = ConnectedPoint::Listener {
local_addr: Multiaddr::empty(),
send_back_addr: Multiaddr::empty(),
};
notif.on_swarm_event(FromSwarm::ConnectionEstablished(
libp2p::swarm::behaviour::ConnectionEstablished {
peer_id: peer,
connection_id: conn,
endpoint: &connected,
failed_addresses: &[],
other_established: 0usize,
},
));
assert!(std::matches!(notif.peers.get(&(peer, set_id)), Some(&PeerState::Disabled { .. })));
notif.on_connection_handler_event(
peer,
conn,
NotifsHandlerOut::OpenDesiredByRemote {
protocol_index: 0,
handshake: vec![1, 3, 3, 7],
},
);
assert!(std::matches!(notif.peers.get(&(peer, set_id)), Some(&PeerState::Incoming { .. })));
assert!(notif.incoming.iter().any(|entry| entry.incoming_id == IncomingIndex(0)));
notif.disconnect_peer(&peer, set_id);
notif.protocol_report_reject(IncomingIndex(0));
match notif.peers.get(&(peer, set_id)) {
Some(&PeerState::Disabled { .. }) => {},
state => panic!("invalid state: {state:?}"),
};
assert!(!notif.incoming.iter().any(|entry| entry.incoming_id == IncomingIndex(0)));
}
#[test]
#[cfg(debug_assertions)]
fn connection_closed_protocol_rejects() {
let (mut notif, _controller, _notif_service) = development_notifs();
let peer = PeerId::random();
let set_id = SetId::from(0);
let conn = ConnectionId::new_unchecked(0);
let connected = ConnectedPoint::Listener {
local_addr: Multiaddr::empty(),
send_back_addr: Multiaddr::empty(),
};
notif.on_swarm_event(FromSwarm::ConnectionEstablished(
libp2p::swarm::behaviour::ConnectionEstablished {
peer_id: peer,
connection_id: conn,
endpoint: &connected,
failed_addresses: &[],
other_established: 0usize,
},
));
assert!(std::matches!(notif.peers.get(&(peer, set_id)), Some(&PeerState::Disabled { .. })));
notif.on_connection_handler_event(
peer,
conn,
NotifsHandlerOut::OpenDesiredByRemote {
protocol_index: 0,
handshake: vec![1, 3, 3, 7],
},
);
assert!(std::matches!(notif.peers.get(&(peer, set_id)), Some(&PeerState::Incoming { .. })));
notif.on_swarm_event(FromSwarm::ConnectionClosed(
libp2p::swarm::behaviour::ConnectionClosed {
peer_id: peer,
connection_id: ConnectionId::new_unchecked(0),
endpoint: &connected.clone(),
cause: None,
remaining_established: 0usize,
},
));
notif.protocol_report_reject(IncomingIndex(0));
match notif.peers.get(&(peer, set_id)) {
None => {},
state => panic!("invalid state: {state:?}"),
};
}
#[test]
#[should_panic]
#[cfg(debug_assertions)]
fn protocol_report_accept_not_incoming_peer() {
let (mut notif, _controller, _notif_service) = development_notifs();
let peer = PeerId::random();
let conn = ConnectionId::new_unchecked(0);
let set_id = SetId::from(0);
let connected = ConnectedPoint::Listener {
local_addr: Multiaddr::empty(),
send_back_addr: Multiaddr::empty(),
};
let mut conn_yielder = ConnectionYielder::new();
notif.on_swarm_event(FromSwarm::ConnectionEstablished(
libp2p::swarm::behaviour::ConnectionEstablished {
peer_id: peer,
connection_id: conn,
endpoint: &connected,
failed_addresses: &[],
other_established: 0usize,
},
));
assert!(std::matches!(notif.peers.get(&(peer, set_id)), Some(&PeerState::Disabled { .. })));
notif.on_connection_handler_event(
peer,
conn,
NotifsHandlerOut::OpenDesiredByRemote {
protocol_index: 0,
handshake: vec![1, 3, 3, 7],
},
);
assert!(std::matches!(notif.peers.get(&(peer, set_id)), Some(&PeerState::Incoming { .. })));
assert!(std::matches!(
notif.incoming[0],
IncomingPeer { alive: true, incoming_id: IncomingIndex(0), .. },
));
notif.peerset_report_connect(peer, set_id);
assert!(std::matches!(notif.peers.get(&(peer, set_id)), Some(&PeerState::Enabled { .. })));
let event = conn_yielder.open_substream(peer, 0, vec![1, 2, 3, 4]);
notif.on_connection_handler_event(peer, conn, event);
assert!(std::matches!(notif.peers.get(&(peer, set_id)), Some(&PeerState::Enabled { .. })));
notif.incoming[0].alive = true;
notif.protocol_report_accept(IncomingIndex(0));
}
#[test]
#[should_panic]
#[cfg(debug_assertions)]
fn inject_connection_closed_non_existent_peer() {
let (mut notif, _controller, _notif_service) = development_notifs();
let peer = PeerId::random();
let endpoint = ConnectedPoint::Listener {
local_addr: Multiaddr::empty(),
send_back_addr: Multiaddr::empty(),
};
notif.on_swarm_event(FromSwarm::ConnectionClosed(
libp2p::swarm::behaviour::ConnectionClosed {
peer_id: peer,
connection_id: ConnectionId::new_unchecked(0),
endpoint: &endpoint.clone(),
cause: None,
remaining_established: 0usize,
},
));
}
#[test]
fn disconnect_non_existent_peer() {
let (mut notif, _controller, _notif_service) = development_notifs();
let peer = PeerId::random();
let set_id = SetId::from(0);
notif.peerset_report_disconnect(peer, set_id);
assert!(notif.peers.is_empty());
assert!(notif.incoming.is_empty());
}
#[test]
fn accept_non_existent_connection() {
let (mut notif, _controller, _notif_service) = development_notifs();
notif.protocol_report_accept(0.into());
assert!(notif.peers.is_empty());
assert!(notif.incoming.is_empty());
}
#[test]
fn reject_non_existent_connection() {
let (mut notif, _controller, _notif_service) = development_notifs();
notif.protocol_report_reject(0.into());
assert!(notif.peers.is_empty());
assert!(notif.incoming.is_empty());
}
#[test]
fn reject_non_active_connection() {
let (mut notif, _controller, _notif_service) = development_notifs();
let peer = PeerId::random();
let conn = ConnectionId::new_unchecked(0);
let set_id = SetId::from(0);
let connected = ConnectedPoint::Listener {
local_addr: Multiaddr::empty(),
send_back_addr: Multiaddr::empty(),
};
notif.on_swarm_event(FromSwarm::ConnectionEstablished(
libp2p::swarm::behaviour::ConnectionEstablished {
peer_id: peer,
connection_id: conn,
endpoint: &connected,
failed_addresses: &[],
other_established: 0usize,
},
));
assert!(std::matches!(notif.peers.get(&(peer, set_id)), Some(&PeerState::Disabled { .. })));
notif.on_connection_handler_event(
peer,
conn,
NotifsHandlerOut::OpenDesiredByRemote {
protocol_index: 0,
handshake: vec![1, 3, 3, 7],
},
);
assert!(std::matches!(notif.peers.get(&(peer, set_id)), Some(&PeerState::Incoming { .. })));
notif.incoming[0].alive = false;
notif.protocol_report_reject(0.into());
assert!(std::matches!(notif.peers.get(&(peer, set_id)), Some(&PeerState::Incoming { .. })));
}
#[test]
#[should_panic]
#[cfg(debug_assertions)]
fn inject_non_existent_connection_closed_for_incoming_peer() {
let (mut notif, _controller, _notif_service) = development_notifs();
let peer = PeerId::random();
let conn = ConnectionId::new_unchecked(0);
let set_id = SetId::from(0);
let connected = ConnectedPoint::Listener {
local_addr: Multiaddr::empty(),
send_back_addr: Multiaddr::empty(),
};
notif.on_swarm_event(FromSwarm::ConnectionEstablished(
libp2p::swarm::behaviour::ConnectionEstablished {
peer_id: peer,
connection_id: conn,
endpoint: &connected,
failed_addresses: &[],
other_established: 0usize,
},
));
assert!(std::matches!(notif.peers.get(&(peer, set_id)), Some(&PeerState::Disabled { .. })));
notif.on_connection_handler_event(
peer,
conn,
NotifsHandlerOut::OpenDesiredByRemote {
protocol_index: 0,
handshake: vec![1, 3, 3, 7],
},
);
assert!(std::matches!(notif.peers.get(&(peer, set_id)), Some(&PeerState::Incoming { .. })));
notif.on_swarm_event(FromSwarm::ConnectionClosed(
libp2p::swarm::behaviour::ConnectionClosed {
peer_id: peer,
connection_id: ConnectionId::new_unchecked(1337),
endpoint: &connected.clone(),
cause: None,
remaining_established: 0usize,
},
));
}
#[test]
#[should_panic]
#[cfg(debug_assertions)]
fn inject_non_existent_connection_closed_for_disabled_peer() {
let (mut notif, _controller, _notif_service) = development_notifs();
let set_id = SetId::from(0);
let peer = PeerId::random();
let conn = ConnectionId::new_unchecked(0);
let connected = ConnectedPoint::Listener {
local_addr: Multiaddr::empty(),
send_back_addr: Multiaddr::empty(),
};
notif.on_swarm_event(FromSwarm::ConnectionEstablished(
libp2p::swarm::behaviour::ConnectionEstablished {
peer_id: peer,
connection_id: conn,
endpoint: &connected,
failed_addresses: &[],
other_established: 0usize,
},
));
assert!(std::matches!(notif.peers.get(&(peer, set_id)), Some(&PeerState::Disabled { .. })));
notif.on_swarm_event(FromSwarm::ConnectionClosed(
libp2p::swarm::behaviour::ConnectionClosed {
peer_id: peer,
connection_id: ConnectionId::new_unchecked(1337),
endpoint: &connected.clone(),
cause: None,
remaining_established: 0usize,
},
));
}
#[test]
#[should_panic]
#[cfg(debug_assertions)]
fn inject_non_existent_connection_closed_for_disabled_pending_enable() {
let (mut notif, _controller, _notif_service) = development_notifs();
let set_id = SetId::from(0);
let peer = PeerId::random();
let conn = ConnectionId::new_unchecked(0);
let connected = ConnectedPoint::Listener {
local_addr: Multiaddr::empty(),
send_back_addr: Multiaddr::empty(),
};
notif.on_swarm_event(FromSwarm::ConnectionEstablished(
libp2p::swarm::behaviour::ConnectionEstablished {
peer_id: peer,
connection_id: conn,
endpoint: &connected,
failed_addresses: &[],
other_established: 0usize,
},
));
assert!(std::matches!(notif.peers.get(&(peer, set_id)), Some(&PeerState::Disabled { .. })));
if let Some(PeerState::Disabled { ref mut backoff_until, .. }) =
notif.peers.get_mut(&(peer, set_id))
{
*backoff_until =
Some(Instant::now().checked_add(std::time::Duration::from_secs(5)).unwrap());
}
notif.peerset_report_connect(peer, set_id);
assert!(std::matches!(
notif.peers.get(&(peer, set_id)),
Some(&PeerState::DisabledPendingEnable { .. })
));
notif.on_swarm_event(FromSwarm::ConnectionClosed(
libp2p::swarm::behaviour::ConnectionClosed {
peer_id: peer,
connection_id: ConnectionId::new_unchecked(1337),
endpoint: &connected.clone(),
cause: None,
remaining_established: 0usize,
},
));
}
#[test]
#[should_panic]
#[cfg(debug_assertions)]
fn inject_connection_closed_for_incoming_peer_state_mismatch() {
let (mut notif, _controller, _notif_service) = development_notifs();
let peer = PeerId::random();
let conn = ConnectionId::new_unchecked(0);
let set_id = SetId::from(0);
let connected = ConnectedPoint::Listener {
local_addr: Multiaddr::empty(),
send_back_addr: Multiaddr::empty(),
};
notif.on_swarm_event(FromSwarm::ConnectionEstablished(
libp2p::swarm::behaviour::ConnectionEstablished {
peer_id: peer,
connection_id: conn,
endpoint: &connected,
failed_addresses: &[],
other_established: 0usize,
},
));
assert!(std::matches!(notif.peers.get(&(peer, set_id)), Some(&PeerState::Disabled { .. })));
notif.on_connection_handler_event(
peer,
conn,
NotifsHandlerOut::OpenDesiredByRemote {
protocol_index: 0,
handshake: vec![1, 3, 3, 7],
},
);
assert!(std::matches!(notif.peers.get(&(peer, set_id)), Some(&PeerState::Incoming { .. })));
notif.incoming[0].alive = false;
notif.on_swarm_event(FromSwarm::ConnectionClosed(
libp2p::swarm::behaviour::ConnectionClosed {
peer_id: peer,
connection_id: conn,
endpoint: &connected.clone(),
cause: None,
remaining_established: 0usize,
},
));
}
#[test]
#[should_panic]
#[cfg(debug_assertions)]
fn inject_connection_closed_for_enabled_state_mismatch() {
let (mut notif, _controller, _notif_service) = development_notifs();
let peer = PeerId::random();
let conn = ConnectionId::new_unchecked(0);
let set_id = SetId::from(0);
let connected = ConnectedPoint::Listener {
local_addr: Multiaddr::empty(),
send_back_addr: Multiaddr::empty(),
};
notif.on_swarm_event(FromSwarm::ConnectionEstablished(
libp2p::swarm::behaviour::ConnectionEstablished {
peer_id: peer,
connection_id: conn,
endpoint: &connected,
failed_addresses: &[],
other_established: 0usize,
},
));
assert!(std::matches!(notif.peers.get(&(peer, set_id)), Some(&PeerState::Disabled { .. })));
notif.on_connection_handler_event(
peer,
conn,
NotifsHandlerOut::OpenDesiredByRemote {
protocol_index: 0,
handshake: vec![1, 3, 3, 7],
},
);
assert!(std::matches!(notif.peers.get(&(peer, set_id)), Some(&PeerState::Incoming { .. })));
notif.peerset_report_connect(peer, set_id);
assert!(std::matches!(notif.peers.get(&(peer, set_id)), Some(&PeerState::Enabled { .. })));
notif.on_swarm_event(FromSwarm::ConnectionClosed(
libp2p::swarm::behaviour::ConnectionClosed {
peer_id: peer,
connection_id: ConnectionId::new_unchecked(1337),
endpoint: &connected.clone(),
cause: None,
remaining_established: 0usize,
},
));
}
#[test]
#[should_panic]
#[cfg(debug_assertions)]
fn inject_connection_closed_for_backoff_peer() {
let (mut notif, _controller, _notif_service) = development_notifs();
let set_id = SetId::from(0);
let peer = PeerId::random();
let conn = ConnectionId::new_unchecked(0);
let connected = ConnectedPoint::Listener {
local_addr: Multiaddr::empty(),
send_back_addr: Multiaddr::empty(),
};
notif.on_swarm_event(FromSwarm::ConnectionEstablished(
libp2p::swarm::behaviour::ConnectionEstablished {
peer_id: peer,
connection_id: conn,
endpoint: &connected,
failed_addresses: &[],
other_established: 0usize,
},
));
assert!(std::matches!(notif.peers.get(&(peer, set_id)), Some(&PeerState::Disabled { .. })));
if let Some(PeerState::Disabled { ref mut backoff_until, .. }) =
notif.peers.get_mut(&(peer, set_id))
{
*backoff_until =
Some(Instant::now().checked_add(std::time::Duration::from_secs(5)).unwrap());
}
notif.on_swarm_event(FromSwarm::ConnectionClosed(
libp2p::swarm::behaviour::ConnectionClosed {
peer_id: peer,
connection_id: conn,
endpoint: &connected.clone(),
cause: None,
remaining_established: 0usize,
},
));
assert!(std::matches!(notif.peers.get(&(peer, set_id)), Some(&PeerState::Backoff { .. })));
notif.on_swarm_event(FromSwarm::ConnectionClosed(
libp2p::swarm::behaviour::ConnectionClosed {
peer_id: peer,
connection_id: conn,
endpoint: &connected.clone(),
cause: None,
remaining_established: 0usize,
},
));
}
#[test]
#[should_panic]
#[cfg(debug_assertions)]
fn open_result_ok_non_existent_peer() {
let (mut notif, _controller, _notif_service) = development_notifs();
let conn = ConnectionId::new_unchecked(0);
let mut conn_yielder = ConnectionYielder::new();
notif.on_connection_handler_event(
PeerId::random(),
conn,
conn_yielder.open_substream(PeerId::random(), 0, vec![1, 2, 3, 4]),
);
}
}