use crate::{
config::{NetworkConfiguration, ProtocolId},
peer_store::PeerStoreProvider,
};
use array_bytes::bytes2hex;
use futures::{FutureExt, Stream};
use futures_timer::Delay;
use ip_network::IpNetwork;
use litep2p::{
protocol::{
libp2p::{
identify::{Config as IdentifyConfig, IdentifyEvent},
kademlia::{
Config as KademliaConfig, ConfigBuilder as KademliaConfigBuilder, ContentProvider,
IncomingRecordValidationMode, KademliaEvent, KademliaHandle, QueryId, Quorum,
Record, RecordKey, RecordsType,
},
ping::{Config as PingConfig, PingEvent},
},
mdns::{Config as MdnsConfig, MdnsEvent},
},
types::multiaddr::{Multiaddr, Protocol},
PeerId, ProtocolName,
};
use parking_lot::RwLock;
use sc_network_types::kad::Key as KademliaKey;
use schnellru::{ByLength, LruMap};
use std::{
cmp,
collections::{HashMap, HashSet, VecDeque},
num::NonZeroUsize,
pin::Pin,
sync::Arc,
task::{Context, Poll},
time::{Duration, Instant},
};
const LOG_TARGET: &str = "sub-libp2p::discovery";
const KADEMLIA_QUERY_INTERVAL: Duration = Duration::from_secs(5);
const MDNS_QUERY_INTERVAL: Duration = Duration::from_secs(30);
const GET_RECORD_REDUNDANCY_FACTOR: usize = 4;
const MAX_EXTERNAL_ADDRESSES: u32 = 32;
const MIN_ADDRESS_CONFIRMATIONS: usize = 2;
#[derive(Debug)]
pub enum DiscoveryEvent {
Ping {
peer: PeerId,
rtt: Duration,
},
Identified {
peer: PeerId,
listen_addresses: Vec<Multiaddr>,
supported_protocols: HashSet<ProtocolName>,
},
Discovered {
addresses: Vec<Multiaddr>,
},
RoutingTableUpdate {
peers: HashSet<PeerId>,
},
ExternalAddressDiscovered {
address: Multiaddr,
},
ExternalAddressExpired {
address: Multiaddr,
},
GetRecordSuccess {
query_id: QueryId,
records: RecordsType,
},
PutRecordSuccess {
query_id: QueryId,
},
GetProvidersSuccess {
query_id: QueryId,
providers: Vec<ContentProvider>,
},
QueryFailed {
query_id: QueryId,
},
IncomingRecord {
record: Record,
},
RandomKademliaStarted,
}
pub struct Discovery {
local_peer_id: litep2p::PeerId,
ping_event_stream: Box<dyn Stream<Item = PingEvent> + Send + Unpin>,
identify_event_stream: Box<dyn Stream<Item = IdentifyEvent> + Send + Unpin>,
mdns_event_stream: Option<Box<dyn Stream<Item = MdnsEvent> + Send + Unpin>>,
kademlia_handle: KademliaHandle,
_peerstore_handle: Arc<dyn PeerStoreProvider>,
next_kad_query: Option<Delay>,
find_node_query_id: Option<QueryId>,
pending_events: VecDeque<DiscoveryEvent>,
allow_non_global_addresses: bool,
local_protocols: HashSet<ProtocolName>,
public_addresses: HashSet<Multiaddr>,
listen_addresses: Arc<RwLock<HashSet<Multiaddr>>>,
address_confirmations: LruMap<Multiaddr, HashSet<PeerId>>,
duration_to_next_find_query: Duration,
}
fn legacy_kademlia_protocol_name(id: &ProtocolId) -> ProtocolName {
ProtocolName::from(format!("/{}/kad", id.as_ref()))
}
fn kademlia_protocol_name<Hash: AsRef<[u8]>>(
genesis_hash: Hash,
fork_id: Option<&str>,
) -> ProtocolName {
let genesis_hash_hex = bytes2hex("", genesis_hash.as_ref());
let protocol = if let Some(fork_id) = fork_id {
format!("/{}/{}/kad", genesis_hash_hex, fork_id)
} else {
format!("/{}/kad", genesis_hash_hex)
};
ProtocolName::from(protocol)
}
impl Discovery {
pub fn new<Hash: AsRef<[u8]> + Clone>(
local_peer_id: litep2p::PeerId,
config: &NetworkConfiguration,
genesis_hash: Hash,
fork_id: Option<&str>,
protocol_id: &ProtocolId,
known_peers: HashMap<PeerId, Vec<Multiaddr>>,
listen_addresses: Arc<RwLock<HashSet<Multiaddr>>>,
_peerstore_handle: Arc<dyn PeerStoreProvider>,
) -> (Self, PingConfig, IdentifyConfig, KademliaConfig, Option<MdnsConfig>) {
let (ping_config, ping_event_stream) = PingConfig::default();
let user_agent = format!("{} ({})", config.client_version, config.node_name);
let (identify_config, identify_event_stream) =
IdentifyConfig::new("/substrate/1.0".to_string(), Some(user_agent));
let (mdns_config, mdns_event_stream) = match config.transport {
crate::config::TransportConfig::Normal { enable_mdns, .. } => match enable_mdns {
true => {
let (mdns_config, mdns_event_stream) = MdnsConfig::new(MDNS_QUERY_INTERVAL);
(Some(mdns_config), Some(mdns_event_stream))
},
false => (None, None),
},
_ => panic!("memory transport not supported"),
};
let (kademlia_config, kademlia_handle) = {
let protocol_names = vec![
kademlia_protocol_name(genesis_hash.clone(), fork_id),
legacy_kademlia_protocol_name(protocol_id),
];
KademliaConfigBuilder::new()
.with_known_peers(known_peers)
.with_protocol_names(protocol_names)
.with_incoming_records_validation_mode(IncomingRecordValidationMode::Manual)
.build()
};
(
Self {
local_peer_id,
ping_event_stream,
identify_event_stream,
mdns_event_stream,
kademlia_handle,
_peerstore_handle,
listen_addresses,
find_node_query_id: None,
pending_events: VecDeque::new(),
duration_to_next_find_query: Duration::from_secs(1),
address_confirmations: LruMap::new(ByLength::new(MAX_EXTERNAL_ADDRESSES)),
allow_non_global_addresses: config.allow_non_globals_in_dht,
public_addresses: config.public_addresses.iter().cloned().map(Into::into).collect(),
next_kad_query: Some(Delay::new(KADEMLIA_QUERY_INTERVAL)),
local_protocols: HashSet::from_iter([kademlia_protocol_name(
genesis_hash,
fork_id,
)]),
},
ping_config,
identify_config,
kademlia_config,
mdns_config,
)
}
#[allow(unused)]
pub async fn add_known_peer(&mut self, peer: PeerId, addresses: Vec<Multiaddr>) {
self.kademlia_handle.add_known_peer(peer, addresses).await;
}
pub async fn add_self_reported_address(
&mut self,
peer: PeerId,
supported_protocols: HashSet<ProtocolName>,
addresses: Vec<Multiaddr>,
) {
if self.local_protocols.is_disjoint(&supported_protocols) {
log::trace!(
target: LOG_TARGET,
"Ignoring self-reported address of peer {peer} as remote node is not part of the \
Kademlia DHT supported by the local node.",
);
return
}
let addresses = addresses
.into_iter()
.filter_map(|address| {
if !self.allow_non_global_addresses && !Discovery::can_add_to_dht(&address) {
log::trace!(
target: LOG_TARGET,
"ignoring self-reported non-global address {address} from {peer}."
);
return None
}
Some(address)
})
.collect();
log::trace!(
target: LOG_TARGET,
"add self-reported addresses for {peer:?}: {addresses:?}",
);
self.kademlia_handle.add_known_peer(peer, addresses).await;
}
pub async fn get_value(&mut self, key: KademliaKey) -> QueryId {
self.kademlia_handle
.get_record(
RecordKey::new(&key.to_vec()),
Quorum::N(NonZeroUsize::new(GET_RECORD_REDUNDANCY_FACTOR).unwrap()),
)
.await
}
pub async fn put_value(&mut self, key: KademliaKey, value: Vec<u8>) -> QueryId {
self.kademlia_handle
.put_record(Record::new(RecordKey::new(&key.to_vec()), value))
.await
}
pub async fn put_value_to_peers(
&mut self,
record: Record,
peers: Vec<sc_network_types::PeerId>,
update_local_storage: bool,
) -> QueryId {
self.kademlia_handle
.put_record_to_peers(
record,
peers.into_iter().map(|peer| peer.into()).collect(),
update_local_storage,
)
.await
}
pub async fn store_record(
&mut self,
key: KademliaKey,
value: Vec<u8>,
publisher: Option<sc_network_types::PeerId>,
expires: Option<Instant>,
) {
log::debug!(
target: LOG_TARGET,
"Storing DHT record with key {key:?}, originally published by {publisher:?}, \
expires {expires:?}.",
);
self.kademlia_handle
.store_record(Record {
key: RecordKey::new(&key.to_vec()),
value,
publisher: publisher.map(Into::into),
expires,
})
.await;
}
pub async fn start_providing(&mut self, key: KademliaKey) {
self.kademlia_handle.start_providing(key.into()).await;
}
pub async fn stop_providing(&mut self, key: KademliaKey) {
self.kademlia_handle.stop_providing(key.into()).await;
}
pub async fn get_providers(&mut self, key: KademliaKey) -> QueryId {
self.kademlia_handle.get_providers(key.into()).await
}
fn is_known_address(known: &Multiaddr, observed: &Multiaddr) -> bool {
let mut known = known.iter();
let mut observed = observed.iter();
loop {
match (known.next(), observed.next()) {
(None, None) => return true,
(None, Some(Protocol::P2p(_))) => return true,
(Some(Protocol::P2p(_)), None) => return true,
(known, observed) if known != observed => return false,
_ => {},
}
}
}
fn can_add_to_dht(address: &Multiaddr) -> bool {
let ip = match address.iter().next() {
Some(Protocol::Ip4(ip)) => IpNetwork::from(ip),
Some(Protocol::Ip6(ip)) => IpNetwork::from(ip),
Some(Protocol::Dns(_)) | Some(Protocol::Dns4(_)) | Some(Protocol::Dns6(_)) =>
return true,
_ => return false,
};
ip.is_global()
}
fn is_new_external_address(
&mut self,
address: &Multiaddr,
peer: PeerId,
) -> (bool, Option<Multiaddr>) {
log::trace!(target: LOG_TARGET, "verify new external address: {address}");
if self
.listen_addresses
.read()
.iter()
.chain(self.public_addresses.iter())
.any(|known_address| Discovery::is_known_address(&known_address, &address))
{
return (true, None)
}
match self.address_confirmations.get(address) {
Some(confirmations) => {
confirmations.insert(peer);
if confirmations.len() >= MIN_ADDRESS_CONFIRMATIONS {
return (true, None)
}
},
None => {
let oldest = (self.address_confirmations.len() >=
self.address_confirmations.limiter().max_length() as usize)
.then(|| {
self.address_confirmations.pop_oldest().map(|(address, peers)| {
if peers.len() >= MIN_ADDRESS_CONFIRMATIONS {
return Some(address)
} else {
None
}
})
})
.flatten()
.flatten();
self.address_confirmations.insert(address.clone(), Default::default());
return (false, oldest)
},
}
(false, None)
}
}
impl Stream for Discovery {
type Item = DiscoveryEvent;
fn poll_next(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
let this = Pin::into_inner(self);
if let Some(event) = this.pending_events.pop_front() {
return Poll::Ready(Some(event))
}
if let Some(mut delay) = this.next_kad_query.take() {
match delay.poll_unpin(cx) {
Poll::Pending => {
this.next_kad_query = Some(delay);
},
Poll::Ready(()) => {
let peer = PeerId::random();
log::trace!(target: LOG_TARGET, "start next kademlia query for {peer:?}");
match this.kademlia_handle.try_find_node(peer) {
Ok(query_id) => {
this.find_node_query_id = Some(query_id);
return Poll::Ready(Some(DiscoveryEvent::RandomKademliaStarted))
},
Err(()) => {
this.duration_to_next_find_query = cmp::min(
this.duration_to_next_find_query * 2,
Duration::from_secs(60),
);
this.next_kad_query =
Some(Delay::new(this.duration_to_next_find_query));
},
}
},
}
}
match Pin::new(&mut this.kademlia_handle).poll_next(cx) {
Poll::Pending => {},
Poll::Ready(None) => return Poll::Ready(None),
Poll::Ready(Some(KademliaEvent::FindNodeSuccess { peers, .. })) => {
log::trace!(target: LOG_TARGET, "dht random walk yielded {} peers", peers.len());
this.next_kad_query = Some(Delay::new(KADEMLIA_QUERY_INTERVAL));
return Poll::Ready(Some(DiscoveryEvent::RoutingTableUpdate {
peers: peers.into_iter().map(|(peer, _)| peer).collect(),
}))
},
Poll::Ready(Some(KademliaEvent::RoutingTableUpdate { peers })) => {
log::trace!(target: LOG_TARGET, "routing table update, discovered {} peers", peers.len());
return Poll::Ready(Some(DiscoveryEvent::RoutingTableUpdate {
peers: peers.into_iter().collect(),
}))
},
Poll::Ready(Some(KademliaEvent::GetRecordSuccess { query_id, records })) => {
log::trace!(
target: LOG_TARGET,
"`GET_RECORD` succeeded for {query_id:?}: {records:?}",
);
return Poll::Ready(Some(DiscoveryEvent::GetRecordSuccess { query_id, records }));
},
Poll::Ready(Some(KademliaEvent::PutRecordSuccess { query_id, key: _ })) =>
return Poll::Ready(Some(DiscoveryEvent::PutRecordSuccess { query_id })),
Poll::Ready(Some(KademliaEvent::QueryFailed { query_id })) => {
match this.find_node_query_id == Some(query_id) {
true => {
this.find_node_query_id = None;
this.duration_to_next_find_query =
cmp::min(this.duration_to_next_find_query * 2, Duration::from_secs(60));
this.next_kad_query = Some(Delay::new(this.duration_to_next_find_query));
},
false => return Poll::Ready(Some(DiscoveryEvent::QueryFailed { query_id })),
}
},
Poll::Ready(Some(KademliaEvent::IncomingRecord { record })) => {
log::trace!(
target: LOG_TARGET,
"incoming `PUT_RECORD` request with key {:?} from publisher {:?}",
record.key,
record.publisher,
);
return Poll::Ready(Some(DiscoveryEvent::IncomingRecord { record }))
},
Poll::Ready(Some(KademliaEvent::GetProvidersSuccess {
provided_key,
providers,
query_id,
})) => {
log::trace!(
target: LOG_TARGET,
"`GET_PROVIDERS` for {query_id:?} with {provided_key:?} yielded {providers:?}",
);
return Poll::Ready(Some(DiscoveryEvent::GetProvidersSuccess {
query_id,
providers,
}))
},
Poll::Ready(Some(KademliaEvent::IncomingProvider { .. })) => {},
}
match Pin::new(&mut this.identify_event_stream).poll_next(cx) {
Poll::Pending => {},
Poll::Ready(None) => return Poll::Ready(None),
Poll::Ready(Some(IdentifyEvent::PeerIdentified {
peer,
listen_addresses,
supported_protocols,
observed_address,
..
})) => {
let observed_address =
if let Some(Protocol::P2p(peer_id)) = observed_address.iter().last() {
if peer_id != *this.local_peer_id.as_ref() {
log::warn!(
target: LOG_TARGET,
"Discovered external address for a peer that is not us: {observed_address}",
);
None
} else {
Some(observed_address)
}
} else {
Some(observed_address.with(Protocol::P2p(this.local_peer_id.into())))
};
if let Some(observed_address) = observed_address {
let (is_new, expired_address) =
this.is_new_external_address(&observed_address, peer);
if let Some(expired_address) = expired_address {
log::trace!(
target: LOG_TARGET,
"Removing expired external address expired={expired_address} is_new={is_new} observed={observed_address}",
);
this.pending_events.push_back(DiscoveryEvent::ExternalAddressExpired {
address: expired_address,
});
}
if is_new {
this.pending_events.push_back(DiscoveryEvent::ExternalAddressDiscovered {
address: observed_address.clone(),
});
}
}
return Poll::Ready(Some(DiscoveryEvent::Identified {
peer,
listen_addresses,
supported_protocols,
}));
},
}
match Pin::new(&mut this.ping_event_stream).poll_next(cx) {
Poll::Pending => {},
Poll::Ready(None) => return Poll::Ready(None),
Poll::Ready(Some(PingEvent::Ping { peer, ping })) =>
return Poll::Ready(Some(DiscoveryEvent::Ping { peer, rtt: ping })),
}
if let Some(ref mut mdns_event_stream) = &mut this.mdns_event_stream {
match Pin::new(mdns_event_stream).poll_next(cx) {
Poll::Pending => {},
Poll::Ready(None) => return Poll::Ready(None),
Poll::Ready(Some(MdnsEvent::Discovered(addresses))) =>
return Poll::Ready(Some(DiscoveryEvent::Discovered { addresses })),
}
}
Poll::Pending
}
}