use crate::{utils::interval, LOG_TARGET};
use either::Either;
use fnv::FnvHashMap;
use futures::prelude::*;
use libp2p::{
core::{transport::PortUse, ConnectedPoint, Endpoint},
identify::{
Behaviour as Identify, Config as IdentifyConfig, Event as IdentifyEvent,
Info as IdentifyInfo,
},
identity::PublicKey,
multiaddr::Protocol,
ping::{Behaviour as Ping, Config as PingConfig, Event as PingEvent},
swarm::{
behaviour::{
AddressChange, ConnectionClosed, ConnectionEstablished, DialFailure, FromSwarm,
ListenFailure,
},
ConnectionDenied, ConnectionHandler, ConnectionHandlerSelect, ConnectionId,
NetworkBehaviour, THandler, THandlerInEvent, THandlerOutEvent, ToSwarm,
},
Multiaddr, PeerId,
};
use log::{debug, error, trace, warn};
use parking_lot::Mutex;
use schnellru::{ByLength, LruMap};
use smallvec::SmallVec;
use std::{
collections::{hash_map::Entry, HashSet, VecDeque},
iter,
pin::Pin,
sync::Arc,
task::{Context, Poll},
time::{Duration, Instant},
};
const CACHE_EXPIRE: Duration = Duration::from_secs(10 * 60);
const GARBAGE_COLLECT_INTERVAL: Duration = Duration::from_secs(2 * 60);
const MAX_EXTERNAL_ADDRESSES: u32 = 32;
const MIN_ADDRESS_CONFIRMATIONS: usize = 3;
pub struct PeerInfoBehaviour {
ping: Ping,
identify: Identify,
nodes_info: FnvHashMap<PeerId, NodeInfo>,
garbage_collect: Pin<Box<dyn Stream<Item = ()> + Send>>,
local_peer_id: PeerId,
public_addresses: Vec<Multiaddr>,
listen_addresses: HashSet<Multiaddr>,
address_confirmations: LruMap<Multiaddr, HashSet<PeerId>>,
external_addresses: ExternalAddresses,
pending_actions: VecDeque<ToSwarm<PeerInfoEvent, THandlerInEvent<PeerInfoBehaviour>>>,
}
#[derive(Debug)]
struct NodeInfo {
info_expire: Option<Instant>,
endpoints: SmallVec<[ConnectedPoint; crate::MAX_CONNECTIONS_PER_PEER]>,
client_version: Option<String>,
latest_ping: Option<Duration>,
}
impl NodeInfo {
fn new(endpoint: ConnectedPoint) -> Self {
let mut endpoints = SmallVec::new();
endpoints.push(endpoint);
Self { info_expire: None, endpoints, client_version: None, latest_ping: None }
}
}
#[derive(Debug, Clone, Default)]
pub struct ExternalAddresses {
addresses: Arc<Mutex<HashSet<Multiaddr>>>,
}
impl ExternalAddresses {
pub fn add(&mut self, addr: Multiaddr) -> bool {
self.addresses.lock().insert(addr)
}
pub fn remove(&mut self, addr: &Multiaddr) -> bool {
self.addresses.lock().remove(addr)
}
}
impl PeerInfoBehaviour {
pub fn new(
user_agent: String,
local_public_key: PublicKey,
external_addresses: Arc<Mutex<HashSet<Multiaddr>>>,
public_addresses: Vec<Multiaddr>,
) -> Self {
let identify = {
let cfg = IdentifyConfig::new("/substrate/1.0".to_string(), local_public_key.clone())
.with_agent_version(user_agent)
.with_cache_size(0);
Identify::new(cfg)
};
Self {
ping: Ping::new(PingConfig::new()),
identify,
nodes_info: FnvHashMap::default(),
garbage_collect: Box::pin(interval(GARBAGE_COLLECT_INTERVAL)),
local_peer_id: local_public_key.to_peer_id(),
public_addresses,
listen_addresses: HashSet::new(),
address_confirmations: LruMap::new(ByLength::new(MAX_EXTERNAL_ADDRESSES)),
external_addresses: ExternalAddresses { addresses: external_addresses },
pending_actions: Default::default(),
}
}
pub fn node(&self, peer_id: &PeerId) -> Option<Node> {
self.nodes_info.get(peer_id).map(Node)
}
fn handle_ping_report(
&mut self,
peer_id: &PeerId,
ping_time: Duration,
connection: ConnectionId,
) {
trace!(target: LOG_TARGET, "Ping time with {:?} via {:?}: {:?}", peer_id, connection, ping_time);
if let Some(entry) = self.nodes_info.get_mut(peer_id) {
entry.latest_ping = Some(ping_time);
} else {
error!(target: LOG_TARGET,
"Received ping from node we're not connected to {:?} via {:?}", peer_id, connection);
}
}
fn with_local_peer_id(&self, address: Multiaddr) -> Result<Multiaddr, Multiaddr> {
if let Some(Protocol::P2p(peer_id)) = address.iter().last() {
if peer_id == self.local_peer_id {
Ok(address)
} else {
Err(address)
}
} else {
Ok(address.with(Protocol::P2p(self.local_peer_id)))
}
}
fn handle_identify_report(&mut self, peer_id: &PeerId, info: &IdentifyInfo) {
trace!(target: LOG_TARGET, "Identified {:?} => {:?}", peer_id, info);
if let Some(entry) = self.nodes_info.get_mut(peer_id) {
entry.client_version = Some(info.agent_version.clone());
} else {
error!(target: LOG_TARGET,
"Received identify message from node we're not connected to {peer_id:?}");
}
match self.with_local_peer_id(info.observed_addr.clone()) {
Ok(observed_addr) => {
let (is_new, expired) = self.is_new_external_address(&observed_addr, *peer_id);
if is_new && self.external_addresses.add(observed_addr.clone()) {
trace!(
target: LOG_TARGET,
"Observed address reported by Identify confirmed as external {}",
observed_addr,
);
self.pending_actions.push_back(ToSwarm::ExternalAddrConfirmed(observed_addr));
}
if let Some(expired) = expired {
trace!(target: LOG_TARGET, "Removing replaced external address: {expired}");
self.external_addresses.remove(&expired);
self.pending_actions.push_back(ToSwarm::ExternalAddrExpired(expired));
}
},
Err(addr) => {
warn!(
target: LOG_TARGET,
"Identify reported observed address for a peer that is not us: {addr}",
);
},
}
}
fn is_same_address(left: &Multiaddr, right: &Multiaddr) -> bool {
let mut left = left.iter();
let mut right = right.iter();
loop {
match (left.next(), right.next()) {
(None, None) => return true,
(None, Some(Protocol::P2p(_))) => return true,
(Some(Protocol::P2p(_)), None) => return true,
(left, right) if left != right => return false,
_ => {},
}
}
}
fn is_new_external_address(
&mut self,
address: &Multiaddr,
peer_id: PeerId,
) -> (bool, Option<Multiaddr>) {
trace!(target: LOG_TARGET, "Verify new external address: {address}");
if self
.listen_addresses
.iter()
.chain(self.public_addresses.iter())
.any(|known_address| PeerInfoBehaviour::is_same_address(&known_address, &address))
{
return (true, None)
}
match self.address_confirmations.get(address) {
Some(confirmations) => {
confirmations.insert(peer_id);
if confirmations.len() >= MIN_ADDRESS_CONFIRMATIONS {
return (true, None)
}
},
None => {
let oldest = (self.address_confirmations.len() >=
self.address_confirmations.limiter().max_length() as usize)
.then(|| {
self.address_confirmations.pop_oldest().map(|(address, peers)| {
if peers.len() >= MIN_ADDRESS_CONFIRMATIONS {
return Some(address)
} else {
None
}
})
})
.flatten()
.flatten();
self.address_confirmations
.insert(address.clone(), iter::once(peer_id).collect());
return (false, oldest)
},
}
(false, None)
}
}
pub struct Node<'a>(&'a NodeInfo);
impl<'a> Node<'a> {
pub fn endpoint(&self) -> Option<&'a ConnectedPoint> {
self.0.endpoints.get(0)
}
pub fn client_version(&self) -> Option<&'a str> {
self.0.client_version.as_deref()
}
pub fn latest_ping(&self) -> Option<Duration> {
self.0.latest_ping
}
}
#[derive(Debug)]
pub enum PeerInfoEvent {
Identified {
peer_id: PeerId,
info: IdentifyInfo,
},
}
impl NetworkBehaviour for PeerInfoBehaviour {
type ConnectionHandler = ConnectionHandlerSelect<
<Ping as NetworkBehaviour>::ConnectionHandler,
<Identify as NetworkBehaviour>::ConnectionHandler,
>;
type ToSwarm = PeerInfoEvent;
fn handle_pending_inbound_connection(
&mut self,
connection_id: ConnectionId,
local_addr: &Multiaddr,
remote_addr: &Multiaddr,
) -> Result<(), ConnectionDenied> {
self.ping
.handle_pending_inbound_connection(connection_id, local_addr, remote_addr)?;
self.identify
.handle_pending_inbound_connection(connection_id, local_addr, remote_addr)
}
fn handle_pending_outbound_connection(
&mut self,
_connection_id: ConnectionId,
_maybe_peer: Option<PeerId>,
_addresses: &[Multiaddr],
_effective_role: Endpoint,
) -> Result<Vec<Multiaddr>, ConnectionDenied> {
Ok(Vec::new())
}
fn handle_established_inbound_connection(
&mut self,
connection_id: ConnectionId,
peer: PeerId,
local_addr: &Multiaddr,
remote_addr: &Multiaddr,
) -> Result<THandler<Self>, ConnectionDenied> {
let ping_handler = self.ping.handle_established_inbound_connection(
connection_id,
peer,
local_addr,
remote_addr,
)?;
let identify_handler = self.identify.handle_established_inbound_connection(
connection_id,
peer,
local_addr,
remote_addr,
)?;
Ok(ping_handler.select(identify_handler))
}
fn handle_established_outbound_connection(
&mut self,
connection_id: ConnectionId,
peer: PeerId,
addr: &Multiaddr,
role_override: Endpoint,
port_use: PortUse,
) -> Result<THandler<Self>, ConnectionDenied> {
let ping_handler = self.ping.handle_established_outbound_connection(
connection_id,
peer,
addr,
role_override,
port_use,
)?;
let identify_handler = self.identify.handle_established_outbound_connection(
connection_id,
peer,
addr,
role_override,
port_use,
)?;
Ok(ping_handler.select(identify_handler))
}
fn on_swarm_event(&mut self, event: FromSwarm) {
match event {
FromSwarm::ConnectionEstablished(
e @ ConnectionEstablished { peer_id, endpoint, .. },
) => {
self.ping.on_swarm_event(FromSwarm::ConnectionEstablished(e));
self.identify.on_swarm_event(FromSwarm::ConnectionEstablished(e));
match self.nodes_info.entry(peer_id) {
Entry::Vacant(e) => {
e.insert(NodeInfo::new(endpoint.clone()));
},
Entry::Occupied(e) => {
let e = e.into_mut();
if e.info_expire.as_ref().map(|exp| *exp < Instant::now()).unwrap_or(false)
{
e.client_version = None;
e.latest_ping = None;
}
e.info_expire = None;
e.endpoints.push(endpoint.clone());
},
}
},
FromSwarm::ConnectionClosed(ConnectionClosed {
peer_id,
connection_id,
endpoint,
cause,
remaining_established,
}) => {
self.ping.on_swarm_event(FromSwarm::ConnectionClosed(ConnectionClosed {
peer_id,
connection_id,
endpoint,
cause,
remaining_established,
}));
self.identify.on_swarm_event(FromSwarm::ConnectionClosed(ConnectionClosed {
peer_id,
connection_id,
endpoint,
cause,
remaining_established,
}));
if let Some(entry) = self.nodes_info.get_mut(&peer_id) {
if remaining_established == 0 {
entry.info_expire = Some(Instant::now() + CACHE_EXPIRE);
}
entry.endpoints.retain(|ep| ep != endpoint)
} else {
error!(target: LOG_TARGET,
"Unknown connection to {:?} closed: {:?}", peer_id, endpoint);
}
},
FromSwarm::DialFailure(DialFailure { peer_id, error, connection_id }) => {
self.ping.on_swarm_event(FromSwarm::DialFailure(DialFailure {
peer_id,
error,
connection_id,
}));
self.identify.on_swarm_event(FromSwarm::DialFailure(DialFailure {
peer_id,
error,
connection_id,
}));
},
FromSwarm::ListenerClosed(e) => {
self.ping.on_swarm_event(FromSwarm::ListenerClosed(e));
self.identify.on_swarm_event(FromSwarm::ListenerClosed(e));
},
FromSwarm::ListenFailure(ListenFailure {
local_addr,
send_back_addr,
error,
connection_id,
peer_id,
}) => {
self.ping.on_swarm_event(FromSwarm::ListenFailure(ListenFailure {
local_addr,
send_back_addr,
error,
connection_id,
peer_id,
}));
self.identify.on_swarm_event(FromSwarm::ListenFailure(ListenFailure {
local_addr,
send_back_addr,
error,
connection_id,
peer_id,
}));
},
FromSwarm::ListenerError(e) => {
self.ping.on_swarm_event(FromSwarm::ListenerError(e));
self.identify.on_swarm_event(FromSwarm::ListenerError(e));
},
FromSwarm::ExternalAddrExpired(e) => {
self.ping.on_swarm_event(FromSwarm::ExternalAddrExpired(e));
self.identify.on_swarm_event(FromSwarm::ExternalAddrExpired(e));
},
FromSwarm::NewListener(e) => {
self.ping.on_swarm_event(FromSwarm::NewListener(e));
self.identify.on_swarm_event(FromSwarm::NewListener(e));
},
FromSwarm::NewListenAddr(e) => {
self.ping.on_swarm_event(FromSwarm::NewListenAddr(e));
self.identify.on_swarm_event(FromSwarm::NewListenAddr(e));
self.listen_addresses.insert(e.addr.clone());
},
FromSwarm::ExpiredListenAddr(e) => {
self.ping.on_swarm_event(FromSwarm::ExpiredListenAddr(e));
self.identify.on_swarm_event(FromSwarm::ExpiredListenAddr(e));
self.listen_addresses.remove(e.addr);
match self.with_local_peer_id(e.addr.clone()) {
Ok(addr) => {
self.external_addresses.remove(&addr);
self.pending_actions.push_back(ToSwarm::ExternalAddrExpired(addr));
},
Err(addr) => {
warn!(
target: LOG_TARGET,
"Listen address expired with peer ID that is not us: {addr}",
);
},
}
},
FromSwarm::NewExternalAddrCandidate(e) => {
self.ping.on_swarm_event(FromSwarm::NewExternalAddrCandidate(e));
self.identify.on_swarm_event(FromSwarm::NewExternalAddrCandidate(e));
},
FromSwarm::ExternalAddrConfirmed(e) => {
self.ping.on_swarm_event(FromSwarm::ExternalAddrConfirmed(e));
self.identify.on_swarm_event(FromSwarm::ExternalAddrConfirmed(e));
},
FromSwarm::AddressChange(e @ AddressChange { peer_id, old, new, .. }) => {
self.ping.on_swarm_event(FromSwarm::AddressChange(e));
self.identify.on_swarm_event(FromSwarm::AddressChange(e));
if let Some(entry) = self.nodes_info.get_mut(&peer_id) {
if let Some(endpoint) = entry.endpoints.iter_mut().find(|e| e == &old) {
*endpoint = new.clone();
} else {
error!(target: LOG_TARGET,
"Unknown address change for peer {:?} from {:?} to {:?}", peer_id, old, new);
}
} else {
error!(target: LOG_TARGET,
"Unknown peer {:?} to change address from {:?} to {:?}", peer_id, old, new);
}
},
event => {
debug!(target: LOG_TARGET, "New unknown `FromSwarm` libp2p event: {event:?}");
self.ping.on_swarm_event(event);
self.identify.on_swarm_event(event);
},
}
}
fn on_connection_handler_event(
&mut self,
peer_id: PeerId,
connection_id: ConnectionId,
event: THandlerOutEvent<Self>,
) {
match event {
Either::Left(event) =>
self.ping.on_connection_handler_event(peer_id, connection_id, event),
Either::Right(event) =>
self.identify.on_connection_handler_event(peer_id, connection_id, event),
}
}
fn poll(&mut self, cx: &mut Context) -> Poll<ToSwarm<Self::ToSwarm, THandlerInEvent<Self>>> {
if let Some(event) = self.pending_actions.pop_front() {
return Poll::Ready(event)
}
loop {
match self.ping.poll(cx) {
Poll::Pending => break,
Poll::Ready(ToSwarm::GenerateEvent(ev)) => {
if let PingEvent { peer, result: Ok(rtt), connection } = ev {
self.handle_ping_report(&peer, rtt, connection)
}
},
Poll::Ready(event) => {
return Poll::Ready(event.map_in(Either::Left).map_out(|_| {
unreachable!("`GenerateEvent` is handled in a branch above; qed")
}));
},
}
}
loop {
match self.identify.poll(cx) {
Poll::Pending => break,
Poll::Ready(ToSwarm::GenerateEvent(event)) => match event {
IdentifyEvent::Received { peer_id, info, .. } => {
self.handle_identify_report(&peer_id, &info);
let event = PeerInfoEvent::Identified { peer_id, info };
return Poll::Ready(ToSwarm::GenerateEvent(event))
},
IdentifyEvent::Error { connection_id, peer_id, error } => {
debug!(
target: LOG_TARGET,
"Identification with peer {peer_id:?}({connection_id}) failed => {error}"
);
},
IdentifyEvent::Pushed { .. } => {},
IdentifyEvent::Sent { .. } => {},
},
Poll::Ready(event) => {
return Poll::Ready(event.map_in(Either::Right).map_out(|_| {
unreachable!("`GenerateEvent` is handled in a branch above; qed")
}));
},
}
}
while let Poll::Ready(Some(())) = self.garbage_collect.poll_next_unpin(cx) {
self.nodes_info.retain(|_, node| {
node.info_expire.as_ref().map(|exp| *exp >= Instant::now()).unwrap_or(true)
});
}
Poll::Pending
}
}