use crate::{
behaviour::{self, Behaviour, BehaviourOut},
bitswap::BitswapRequestHandler,
config::{
parse_addr, FullNetworkConfiguration, IncomingRequest, MultiaddrWithPeerId,
NonDefaultSetConfig, NotificationHandshake, Params, SetConfig, TransportConfig,
},
discovery::DiscoveryConfig,
error::Error,
event::{DhtEvent, Event},
network_state::{
NetworkState, NotConnectedPeer as NetworkStateNotConnectedPeer, Peer as NetworkStatePeer,
},
peer_store::{PeerStore, PeerStoreProvider},
protocol::{self, NotifsHandlerError, Protocol, Ready},
protocol_controller::{self, ProtoSetConfig, ProtocolController, SetId},
request_responses::{IfDisconnected, ProtocolConfig as RequestResponseConfig, RequestFailure},
service::{
signature::{Signature, SigningError},
traits::{
BandwidthSink, NetworkBackend, NetworkDHTProvider, NetworkEventStream, NetworkPeers,
NetworkRequest, NetworkService as NetworkServiceT, NetworkSigner, NetworkStateInfo,
NetworkStatus, NetworkStatusProvider, NotificationSender as NotificationSenderT,
NotificationSenderError, NotificationSenderReady as NotificationSenderReadyT,
},
},
transport,
types::ProtocolName,
NotificationService, ReputationChange,
};
use codec::DecodeAll;
use either::Either;
use futures::{channel::oneshot, prelude::*};
#[allow(deprecated)]
use libp2p::swarm::THandlerErr;
use libp2p::{
connection_limits::{ConnectionLimits, Exceeded},
core::{upgrade, ConnectedPoint, Endpoint},
identify::Info as IdentifyInfo,
identity::ed25519,
kad::{record::Key as KademliaKey, Record},
multiaddr::{self, Multiaddr},
swarm::{
Config as SwarmConfig, ConnectionError, ConnectionId, DialError, Executor, ListenError,
NetworkBehaviour, Swarm, SwarmEvent,
},
PeerId,
};
use log::{debug, error, info, trace, warn};
use metrics::{Histogram, MetricSources, Metrics};
use parking_lot::Mutex;
use prometheus_endpoint::Registry;
use sc_client_api::BlockBackend;
use sc_network_common::{
role::{ObservedRole, Roles},
ExHashT,
};
use sc_utils::mpsc::{tracing_unbounded, TracingUnboundedReceiver, TracingUnboundedSender};
use sp_runtime::traits::Block as BlockT;
pub use behaviour::{InboundFailure, OutboundFailure, ResponseFailure};
pub use libp2p::identity::{DecodingError, Keypair, PublicKey};
pub use metrics::NotificationMetrics;
pub use protocol::NotificationsSink;
use std::{
cmp,
collections::{HashMap, HashSet},
fs, iter,
marker::PhantomData,
num::NonZeroUsize,
pin::Pin,
str,
sync::{
atomic::{AtomicUsize, Ordering},
Arc,
},
time::{Duration, Instant},
};
pub(crate) mod metrics;
pub(crate) mod out_events;
pub mod signature;
pub mod traits;
struct Libp2pBandwidthSink {
sink: Arc<transport::BandwidthSinks>,
}
impl BandwidthSink for Libp2pBandwidthSink {
fn total_inbound(&self) -> u64 {
self.sink.total_inbound()
}
fn total_outbound(&self) -> u64 {
self.sink.total_outbound()
}
}
pub struct NetworkService<B: BlockT + 'static, H: ExHashT> {
num_connected: Arc<AtomicUsize>,
external_addresses: Arc<Mutex<HashSet<Multiaddr>>>,
listen_addresses: Arc<Mutex<HashSet<Multiaddr>>>,
local_peer_id: PeerId,
local_identity: Keypair,
bandwidth: Arc<dyn BandwidthSink>,
to_worker: TracingUnboundedSender<ServiceToWorkerMsg>,
notification_protocol_ids: HashMap<ProtocolName, SetId>,
protocol_handles: Vec<protocol_controller::ProtocolHandle>,
sync_protocol_handle: protocol_controller::ProtocolHandle,
peer_store_handle: Arc<dyn PeerStoreProvider>,
_marker: PhantomData<H>,
_block: PhantomData<B>,
}
#[async_trait::async_trait]
impl<B, H> NetworkBackend<B, H> for NetworkWorker<B, H>
where
B: BlockT + 'static,
H: ExHashT,
{
type NotificationProtocolConfig = NonDefaultSetConfig;
type RequestResponseProtocolConfig = RequestResponseConfig;
type NetworkService<Block, Hash> = Arc<NetworkService<B, H>>;
type PeerStore = PeerStore;
type BitswapConfig = RequestResponseConfig;
fn new(params: Params<B, H, Self>) -> Result<Self, Error>
where
Self: Sized,
{
NetworkWorker::new(params)
}
fn network_service(&self) -> Arc<dyn NetworkServiceT> {
self.service.clone()
}
fn peer_store(
bootnodes: Vec<sc_network_types::PeerId>,
metrics_registry: Option<Registry>,
) -> Self::PeerStore {
PeerStore::new(bootnodes.into_iter().map(From::from).collect(), metrics_registry)
}
fn register_notification_metrics(registry: Option<&Registry>) -> NotificationMetrics {
NotificationMetrics::new(registry)
}
fn bitswap_server(
client: Arc<dyn BlockBackend<B> + Send + Sync>,
) -> (Pin<Box<dyn Future<Output = ()> + Send>>, Self::BitswapConfig) {
let (handler, protocol_config) = BitswapRequestHandler::new(client.clone());
(Box::pin(async move { handler.run().await }), protocol_config)
}
fn notification_config(
protocol_name: ProtocolName,
fallback_names: Vec<ProtocolName>,
max_notification_size: u64,
handshake: Option<NotificationHandshake>,
set_config: SetConfig,
_metrics: NotificationMetrics,
_peerstore_handle: Arc<dyn PeerStoreProvider>,
) -> (Self::NotificationProtocolConfig, Box<dyn NotificationService>) {
NonDefaultSetConfig::new(
protocol_name,
fallback_names,
max_notification_size,
handshake,
set_config,
)
}
fn request_response_config(
protocol_name: ProtocolName,
fallback_names: Vec<ProtocolName>,
max_request_size: u64,
max_response_size: u64,
request_timeout: Duration,
inbound_queue: Option<async_channel::Sender<IncomingRequest>>,
) -> Self::RequestResponseProtocolConfig {
Self::RequestResponseProtocolConfig {
name: protocol_name,
fallback_names,
max_request_size,
max_response_size,
request_timeout,
inbound_queue,
}
}
async fn run(mut self) {
self.run().await
}
}
impl<B, H> NetworkWorker<B, H>
where
B: BlockT + 'static,
H: ExHashT,
{
pub fn new(params: Params<B, H, Self>) -> Result<Self, Error> {
let peer_store_handle = params.network_config.peer_store_handle();
let FullNetworkConfiguration {
notification_protocols,
request_response_protocols,
mut network_config,
..
} = params.network_config;
let local_identity = network_config.node_key.clone().into_keypair()?;
let local_public = local_identity.public();
let local_peer_id = local_public.to_peer_id();
let local_identity: ed25519::Keypair = local_identity.into();
let local_public: ed25519::PublicKey = local_public.into();
let local_peer_id: PeerId = local_peer_id.into();
network_config.boot_nodes = network_config
.boot_nodes
.into_iter()
.filter(|boot_node| boot_node.peer_id != local_peer_id.into())
.collect();
network_config.default_peers_set.reserved_nodes = network_config
.default_peers_set
.reserved_nodes
.into_iter()
.filter(|reserved_node| {
if reserved_node.peer_id == local_peer_id.into() {
warn!(
target: "sub-libp2p",
"Local peer ID used in reserved node, ignoring: {}",
reserved_node,
);
false
} else {
true
}
})
.collect();
ensure_addresses_consistent_with_transport(
network_config.listen_addresses.iter(),
&network_config.transport,
)?;
ensure_addresses_consistent_with_transport(
network_config.boot_nodes.iter().map(|x| &x.multiaddr),
&network_config.transport,
)?;
ensure_addresses_consistent_with_transport(
network_config.default_peers_set.reserved_nodes.iter().map(|x| &x.multiaddr),
&network_config.transport,
)?;
for notification_protocol in ¬ification_protocols {
ensure_addresses_consistent_with_transport(
notification_protocol.set_config().reserved_nodes.iter().map(|x| &x.multiaddr),
&network_config.transport,
)?;
}
ensure_addresses_consistent_with_transport(
network_config.public_addresses.iter(),
&network_config.transport,
)?;
let (to_worker, from_service) = tracing_unbounded("mpsc_network_worker", 100_000);
if let Some(path) = &network_config.net_config_path {
fs::create_dir_all(path)?;
}
info!(
target: "sub-libp2p",
"🏷 Local node identity is: {}",
local_peer_id.to_base58(),
);
log::info!(target: "sub-libp2p", "Running libp2p network backend");
let (transport, bandwidth) = {
let config_mem = match network_config.transport {
TransportConfig::MemoryOnly => true,
TransportConfig::Normal { .. } => false,
};
let yamux_maximum_buffer_size = {
let requests_max = request_response_protocols
.iter()
.map(|cfg| usize::try_from(cfg.max_request_size).unwrap_or(usize::MAX));
let responses_max = request_response_protocols
.iter()
.map(|cfg| usize::try_from(cfg.max_response_size).unwrap_or(usize::MAX));
let notifs_max = notification_protocols
.iter()
.map(|cfg| usize::try_from(cfg.max_notification_size()).unwrap_or(usize::MAX));
let default_max = cmp::max(
1024 * 1024,
usize::try_from(protocol::BLOCK_ANNOUNCES_TRANSACTIONS_SUBSTREAM_SIZE)
.unwrap_or(usize::MAX),
);
iter::once(default_max)
.chain(requests_max)
.chain(responses_max)
.chain(notifs_max)
.max()
.expect("iterator known to always yield at least one element; qed")
.saturating_add(10)
};
transport::build_transport(
local_identity.clone().into(),
config_mem,
network_config.yamux_window_size,
yamux_maximum_buffer_size,
)
};
let (to_notifications, from_protocol_controllers) =
tracing_unbounded("mpsc_protocol_controllers_to_notifications", 10_000);
let all_peer_sets_iter = iter::once(&network_config.default_peers_set)
.chain(notification_protocols.iter().map(|protocol| protocol.set_config()));
let (protocol_handles, protocol_controllers): (Vec<_>, Vec<_>) = all_peer_sets_iter
.enumerate()
.map(|(set_id, set_config)| {
let proto_set_config = ProtoSetConfig {
in_peers: set_config.in_peers,
out_peers: set_config.out_peers,
reserved_nodes: set_config
.reserved_nodes
.iter()
.map(|node| node.peer_id.into())
.collect(),
reserved_only: set_config.non_reserved_mode.is_reserved_only(),
};
ProtocolController::new(
SetId::from(set_id),
proto_set_config,
to_notifications.clone(),
Arc::clone(&peer_store_handle),
)
})
.unzip();
let sync_protocol_handle = protocol_handles[0].clone();
protocol_controllers
.into_iter()
.for_each(|controller| (params.executor)(controller.run().boxed()));
let notification_protocol_ids: HashMap<ProtocolName, SetId> =
iter::once(¶ms.block_announce_config)
.chain(notification_protocols.iter())
.enumerate()
.map(|(index, protocol)| (protocol.protocol_name().clone(), SetId::from(index)))
.collect();
let known_addresses = {
let mut addresses: Vec<_> = network_config
.default_peers_set
.reserved_nodes
.iter()
.map(|reserved| (reserved.peer_id, reserved.multiaddr.clone()))
.chain(notification_protocols.iter().flat_map(|protocol| {
protocol
.set_config()
.reserved_nodes
.iter()
.map(|reserved| (reserved.peer_id, reserved.multiaddr.clone()))
}))
.chain(
network_config
.boot_nodes
.iter()
.map(|bootnode| (bootnode.peer_id, bootnode.multiaddr.clone())),
)
.collect();
addresses.sort();
addresses.dedup();
addresses
};
network_config.boot_nodes.iter().try_for_each(|bootnode| {
if let Some(other) = network_config
.boot_nodes
.iter()
.filter(|o| o.multiaddr == bootnode.multiaddr)
.find(|o| o.peer_id != bootnode.peer_id)
{
Err(Error::DuplicateBootnode {
address: bootnode.multiaddr.clone().into(),
first_id: bootnode.peer_id.into(),
second_id: other.peer_id.into(),
})
} else {
Ok(())
}
})?;
let mut boot_node_ids = HashMap::<PeerId, Vec<Multiaddr>>::new();
for bootnode in network_config.boot_nodes.iter() {
boot_node_ids
.entry(bootnode.peer_id.into())
.or_default()
.push(bootnode.multiaddr.clone().into());
}
let boot_node_ids = Arc::new(boot_node_ids);
let num_connected = Arc::new(AtomicUsize::new(0));
let external_addresses = Arc::new(Mutex::new(HashSet::new()));
let (protocol, notif_protocol_handles) = Protocol::new(
From::from(¶ms.role),
params.notification_metrics,
notification_protocols,
params.block_announce_config,
Arc::clone(&peer_store_handle),
protocol_handles.clone(),
from_protocol_controllers,
)?;
let (mut swarm, bandwidth): (Swarm<Behaviour<B>>, _) = {
let user_agent =
format!("{} ({})", network_config.client_version, network_config.node_name);
let discovery_config = {
let mut config = DiscoveryConfig::new(local_peer_id);
config.with_permanent_addresses(
known_addresses
.iter()
.map(|(peer, address)| (peer.into(), address.clone().into()))
.collect::<Vec<_>>(),
);
config.discovery_limit(u64::from(network_config.default_peers_set.out_peers) + 15);
config.with_kademlia(
params.genesis_hash,
params.fork_id.as_deref(),
¶ms.protocol_id,
);
config.with_dht_random_walk(network_config.enable_dht_random_walk);
config.allow_non_globals_in_dht(network_config.allow_non_globals_in_dht);
config.use_kademlia_disjoint_query_paths(
network_config.kademlia_disjoint_query_paths,
);
config.with_kademlia_replication_factor(network_config.kademlia_replication_factor);
match network_config.transport {
TransportConfig::MemoryOnly => {
config.with_mdns(false);
config.allow_private_ip(false);
},
TransportConfig::Normal {
enable_mdns,
allow_private_ip: allow_private_ipv4,
..
} => {
config.with_mdns(enable_mdns);
config.allow_private_ip(allow_private_ipv4);
},
}
config
};
let behaviour = {
let result = Behaviour::new(
protocol,
user_agent,
local_public.into(),
discovery_config,
request_response_protocols,
Arc::clone(&peer_store_handle),
external_addresses.clone(),
ConnectionLimits::default()
.with_max_established_per_peer(Some(crate::MAX_CONNECTIONS_PER_PEER as u32))
.with_max_established_incoming(Some(
crate::MAX_CONNECTIONS_ESTABLISHED_INCOMING,
)),
);
match result {
Ok(b) => b,
Err(crate::request_responses::RegisterError::DuplicateProtocol(proto)) =>
return Err(Error::DuplicateRequestResponseProtocol { protocol: proto }),
}
};
let swarm = {
struct SpawnImpl<F>(F);
impl<F: Fn(Pin<Box<dyn Future<Output = ()> + Send>>)> Executor for SpawnImpl<F> {
fn exec(&self, f: Pin<Box<dyn Future<Output = ()> + Send>>) {
(self.0)(f)
}
}
let config = SwarmConfig::with_executor(SpawnImpl(params.executor))
.with_substream_upgrade_protocol_override(upgrade::Version::V1)
.with_notify_handler_buffer_size(NonZeroUsize::new(32).expect("32 != 0; qed"))
.with_per_connection_event_buffer_size(24)
.with_max_negotiating_inbound_streams(2048)
.with_idle_connection_timeout(Duration::from_secs(10));
Swarm::new(transport, behaviour, local_peer_id, config)
};
(swarm, Arc::new(Libp2pBandwidthSink { sink: bandwidth }))
};
let metrics = match ¶ms.metrics_registry {
Some(registry) => Some(metrics::register(
registry,
MetricSources {
bandwidth: bandwidth.clone(),
connected_peers: num_connected.clone(),
},
)?),
None => None,
};
for addr in &network_config.listen_addresses {
if let Err(err) = Swarm::<Behaviour<B>>::listen_on(&mut swarm, addr.clone().into()) {
warn!(target: "sub-libp2p", "Can't listen on {} because: {:?}", addr, err)
}
}
for addr in &network_config.public_addresses {
Swarm::<Behaviour<B>>::add_external_address(&mut swarm, addr.clone().into());
}
let listen_addresses_set = Arc::new(Mutex::new(HashSet::new()));
let service = Arc::new(NetworkService {
bandwidth,
external_addresses,
listen_addresses: listen_addresses_set.clone(),
num_connected: num_connected.clone(),
local_peer_id,
local_identity: local_identity.into(),
to_worker,
notification_protocol_ids,
protocol_handles,
sync_protocol_handle,
peer_store_handle: Arc::clone(&peer_store_handle),
_marker: PhantomData,
_block: Default::default(),
});
Ok(NetworkWorker {
listen_addresses: listen_addresses_set,
num_connected,
network_service: swarm,
service,
from_service,
event_streams: out_events::OutChannels::new(params.metrics_registry.as_ref())?,
metrics,
boot_node_ids,
reported_invalid_boot_nodes: Default::default(),
peer_store_handle: Arc::clone(&peer_store_handle),
notif_protocol_handles,
_marker: Default::default(),
_block: Default::default(),
})
}
pub fn status(&self) -> NetworkStatus {
NetworkStatus {
num_connected_peers: self.num_connected_peers(),
total_bytes_inbound: self.total_bytes_inbound(),
total_bytes_outbound: self.total_bytes_outbound(),
}
}
pub fn total_bytes_inbound(&self) -> u64 {
self.service.bandwidth.total_inbound()
}
pub fn total_bytes_outbound(&self) -> u64 {
self.service.bandwidth.total_outbound()
}
pub fn num_connected_peers(&self) -> usize {
self.network_service.behaviour().user_protocol().num_sync_peers()
}
pub fn add_known_address(&mut self, peer_id: PeerId, addr: Multiaddr) {
self.network_service.behaviour_mut().add_known_address(peer_id, addr);
}
pub fn service(&self) -> &Arc<NetworkService<B, H>> {
&self.service
}
pub fn local_peer_id(&self) -> &PeerId {
Swarm::<Behaviour<B>>::local_peer_id(&self.network_service)
}
pub fn listen_addresses(&self) -> impl Iterator<Item = &Multiaddr> {
Swarm::<Behaviour<B>>::listeners(&self.network_service)
}
pub fn network_state(&mut self) -> NetworkState {
let swarm = &mut self.network_service;
let open = swarm.behaviour_mut().user_protocol().open_peers().cloned().collect::<Vec<_>>();
let connected_peers = {
let swarm = &mut *swarm;
open.iter()
.filter_map(move |peer_id| {
let known_addresses = if let Ok(addrs) =
NetworkBehaviour::handle_pending_outbound_connection(
swarm.behaviour_mut(),
ConnectionId::new_unchecked(0), Some(*peer_id),
&vec![],
Endpoint::Listener,
) {
addrs.into_iter().collect()
} else {
error!(target: "sub-libp2p", "Was not able to get known addresses for {:?}", peer_id);
return None
};
let endpoint = if let Some(e) =
swarm.behaviour_mut().node(peer_id).and_then(|i| i.endpoint())
{
e.clone().into()
} else {
error!(target: "sub-libp2p", "Found state inconsistency between custom protocol \
and debug information about {:?}", peer_id);
return None
};
Some((
peer_id.to_base58(),
NetworkStatePeer {
endpoint,
version_string: swarm
.behaviour_mut()
.node(peer_id)
.and_then(|i| i.client_version().map(|s| s.to_owned())),
latest_ping_time: swarm
.behaviour_mut()
.node(peer_id)
.and_then(|i| i.latest_ping()),
known_addresses,
},
))
})
.collect()
};
let not_connected_peers = {
let swarm = &mut *swarm;
swarm
.behaviour_mut()
.known_peers()
.into_iter()
.filter(|p| open.iter().all(|n| n != p))
.map(move |peer_id| {
let known_addresses = if let Ok(addrs) =
NetworkBehaviour::handle_pending_outbound_connection(
swarm.behaviour_mut(),
ConnectionId::new_unchecked(0), Some(peer_id),
&vec![],
Endpoint::Listener,
) {
addrs.into_iter().collect()
} else {
error!(target: "sub-libp2p", "Was not able to get known addresses for {:?}", peer_id);
Default::default()
};
(
peer_id.to_base58(),
NetworkStateNotConnectedPeer {
version_string: swarm
.behaviour_mut()
.node(&peer_id)
.and_then(|i| i.client_version().map(|s| s.to_owned())),
latest_ping_time: swarm
.behaviour_mut()
.node(&peer_id)
.and_then(|i| i.latest_ping()),
known_addresses,
},
)
})
.collect()
};
let peer_id = Swarm::<Behaviour<B>>::local_peer_id(swarm).to_base58();
let listened_addresses = swarm.listeners().cloned().collect();
let external_addresses = swarm.external_addresses().cloned().collect();
NetworkState {
peer_id,
listened_addresses,
external_addresses,
connected_peers,
not_connected_peers,
peerset: serde_json::json!(
"Unimplemented. See https://github.com/paritytech/substrate/issues/14160."
),
}
}
pub fn remove_reserved_peer(&self, peer: PeerId) {
self.service.remove_reserved_peer(peer.into());
}
pub fn add_reserved_peer(&self, peer: MultiaddrWithPeerId) -> Result<(), String> {
self.service.add_reserved_peer(peer)
}
}
impl<B: BlockT + 'static, H: ExHashT> NetworkService<B, H> {
pub async fn network_state(&self) -> Result<NetworkState, ()> {
let (tx, rx) = oneshot::channel();
let _ = self
.to_worker
.unbounded_send(ServiceToWorkerMsg::NetworkState { pending_response: tx });
match rx.await {
Ok(v) => v.map_err(|_| ()),
Err(_) => Err(()),
}
}
fn split_multiaddr_and_peer_id(
&self,
peers: HashSet<Multiaddr>,
) -> Result<Vec<(PeerId, Multiaddr)>, String> {
peers
.into_iter()
.map(|mut addr| {
let peer = match addr.pop() {
Some(multiaddr::Protocol::P2p(peer_id)) => peer_id,
_ => return Err("Missing PeerId from address".to_string()),
};
if peer == self.local_peer_id {
Err("Local peer ID in peer set.".to_string())
} else {
Ok((peer, addr))
}
})
.collect::<Result<Vec<(PeerId, Multiaddr)>, String>>()
}
}
impl<B, H> NetworkStateInfo for NetworkService<B, H>
where
B: sp_runtime::traits::Block,
H: ExHashT,
{
fn external_addresses(&self) -> Vec<sc_network_types::multiaddr::Multiaddr> {
self.external_addresses.lock().iter().cloned().map(Into::into).collect()
}
fn listen_addresses(&self) -> Vec<sc_network_types::multiaddr::Multiaddr> {
self.listen_addresses.lock().iter().cloned().map(Into::into).collect()
}
fn local_peer_id(&self) -> sc_network_types::PeerId {
self.local_peer_id.into()
}
}
impl<B, H> NetworkSigner for NetworkService<B, H>
where
B: sp_runtime::traits::Block,
H: ExHashT,
{
fn sign_with_local_identity(&self, msg: Vec<u8>) -> Result<Signature, SigningError> {
let public_key = self.local_identity.public();
let bytes = self.local_identity.sign(msg.as_ref())?;
Ok(Signature {
public_key: crate::service::signature::PublicKey::Libp2p(public_key),
bytes,
})
}
fn verify(
&self,
peer_id: sc_network_types::PeerId,
public_key: &Vec<u8>,
signature: &Vec<u8>,
message: &Vec<u8>,
) -> Result<bool, String> {
let public_key =
PublicKey::try_decode_protobuf(&public_key).map_err(|error| error.to_string())?;
let peer_id: PeerId = peer_id.into();
let remote: libp2p::PeerId = public_key.to_peer_id();
Ok(peer_id == remote && public_key.verify(message, signature))
}
}
impl<B, H> NetworkDHTProvider for NetworkService<B, H>
where
B: BlockT + 'static,
H: ExHashT,
{
fn get_value(&self, key: &KademliaKey) {
let _ = self.to_worker.unbounded_send(ServiceToWorkerMsg::GetValue(key.clone()));
}
fn put_value(&self, key: KademliaKey, value: Vec<u8>) {
let _ = self.to_worker.unbounded_send(ServiceToWorkerMsg::PutValue(key, value));
}
fn put_record_to(
&self,
record: Record,
peers: HashSet<sc_network_types::PeerId>,
update_local_storage: bool,
) {
let _ = self.to_worker.unbounded_send(ServiceToWorkerMsg::PutRecordTo {
record,
peers,
update_local_storage,
});
}
fn store_record(
&self,
key: KademliaKey,
value: Vec<u8>,
publisher: Option<sc_network_types::PeerId>,
expires: Option<Instant>,
) {
let _ = self.to_worker.unbounded_send(ServiceToWorkerMsg::StoreRecord(
key,
value,
publisher.map(Into::into),
expires,
));
}
}
#[async_trait::async_trait]
impl<B, H> NetworkStatusProvider for NetworkService<B, H>
where
B: BlockT + 'static,
H: ExHashT,
{
async fn status(&self) -> Result<NetworkStatus, ()> {
let (tx, rx) = oneshot::channel();
let _ = self
.to_worker
.unbounded_send(ServiceToWorkerMsg::NetworkStatus { pending_response: tx });
match rx.await {
Ok(v) => v.map_err(|_| ()),
Err(_) => Err(()),
}
}
async fn network_state(&self) -> Result<NetworkState, ()> {
let (tx, rx) = oneshot::channel();
let _ = self
.to_worker
.unbounded_send(ServiceToWorkerMsg::NetworkState { pending_response: tx });
match rx.await {
Ok(v) => v.map_err(|_| ()),
Err(_) => Err(()),
}
}
}
#[async_trait::async_trait]
impl<B, H> NetworkPeers for NetworkService<B, H>
where
B: BlockT + 'static,
H: ExHashT,
{
fn set_authorized_peers(&self, peers: HashSet<sc_network_types::PeerId>) {
self.sync_protocol_handle
.set_reserved_peers(peers.iter().map(|peer| (*peer).into()).collect());
}
fn set_authorized_only(&self, reserved_only: bool) {
self.sync_protocol_handle.set_reserved_only(reserved_only);
}
fn add_known_address(
&self,
peer_id: sc_network_types::PeerId,
addr: sc_network_types::multiaddr::Multiaddr,
) {
let _ = self
.to_worker
.unbounded_send(ServiceToWorkerMsg::AddKnownAddress(peer_id.into(), addr.into()));
}
fn report_peer(&self, peer_id: sc_network_types::PeerId, cost_benefit: ReputationChange) {
self.peer_store_handle.report_peer(peer_id, cost_benefit);
}
fn peer_reputation(&self, peer_id: &sc_network_types::PeerId) -> i32 {
self.peer_store_handle.peer_reputation(peer_id)
}
fn disconnect_peer(&self, peer_id: sc_network_types::PeerId, protocol: ProtocolName) {
let _ = self
.to_worker
.unbounded_send(ServiceToWorkerMsg::DisconnectPeer(peer_id.into(), protocol));
}
fn accept_unreserved_peers(&self) {
self.sync_protocol_handle.set_reserved_only(false);
}
fn deny_unreserved_peers(&self) {
self.sync_protocol_handle.set_reserved_only(true);
}
fn add_reserved_peer(&self, peer: MultiaddrWithPeerId) -> Result<(), String> {
if peer.peer_id == self.local_peer_id.into() {
return Err("Local peer ID cannot be added as a reserved peer.".to_string())
}
let _ = self.to_worker.unbounded_send(ServiceToWorkerMsg::AddKnownAddress(
peer.peer_id.into(),
peer.multiaddr.into(),
));
self.sync_protocol_handle.add_reserved_peer(peer.peer_id.into());
Ok(())
}
fn remove_reserved_peer(&self, peer_id: sc_network_types::PeerId) {
self.sync_protocol_handle.remove_reserved_peer(peer_id.into());
}
fn set_reserved_peers(
&self,
protocol: ProtocolName,
peers: HashSet<sc_network_types::multiaddr::Multiaddr>,
) -> Result<(), String> {
let Some(set_id) = self.notification_protocol_ids.get(&protocol) else {
return Err(format!("Cannot set reserved peers for unknown protocol: {}", protocol))
};
let peers: HashSet<Multiaddr> = peers.into_iter().map(Into::into).collect();
let peers_addrs = self.split_multiaddr_and_peer_id(peers)?;
let mut peers: HashSet<PeerId> = HashSet::with_capacity(peers_addrs.len());
for (peer_id, addr) in peers_addrs.into_iter() {
if peer_id == self.local_peer_id {
return Err("Local peer ID cannot be added as a reserved peer.".to_string())
}
peers.insert(peer_id.into());
if !addr.is_empty() {
let _ = self
.to_worker
.unbounded_send(ServiceToWorkerMsg::AddKnownAddress(peer_id, addr));
}
}
self.protocol_handles[usize::from(*set_id)].set_reserved_peers(peers);
Ok(())
}
fn add_peers_to_reserved_set(
&self,
protocol: ProtocolName,
peers: HashSet<sc_network_types::multiaddr::Multiaddr>,
) -> Result<(), String> {
let Some(set_id) = self.notification_protocol_ids.get(&protocol) else {
return Err(format!(
"Cannot add peers to reserved set of unknown protocol: {}",
protocol
))
};
let peers: HashSet<Multiaddr> = peers.into_iter().map(Into::into).collect();
let peers = self.split_multiaddr_and_peer_id(peers)?;
for (peer_id, addr) in peers.into_iter() {
if peer_id == self.local_peer_id {
return Err("Local peer ID cannot be added as a reserved peer.".to_string())
}
if !addr.is_empty() {
let _ = self
.to_worker
.unbounded_send(ServiceToWorkerMsg::AddKnownAddress(peer_id, addr));
}
self.protocol_handles[usize::from(*set_id)].add_reserved_peer(peer_id);
}
Ok(())
}
fn remove_peers_from_reserved_set(
&self,
protocol: ProtocolName,
peers: Vec<sc_network_types::PeerId>,
) -> Result<(), String> {
let Some(set_id) = self.notification_protocol_ids.get(&protocol) else {
return Err(format!(
"Cannot remove peers from reserved set of unknown protocol: {}",
protocol
))
};
for peer_id in peers.into_iter() {
self.protocol_handles[usize::from(*set_id)].remove_reserved_peer(peer_id.into());
}
Ok(())
}
fn sync_num_connected(&self) -> usize {
self.num_connected.load(Ordering::Relaxed)
}
fn peer_role(
&self,
peer_id: sc_network_types::PeerId,
handshake: Vec<u8>,
) -> Option<ObservedRole> {
match Roles::decode_all(&mut &handshake[..]) {
Ok(role) => Some(role.into()),
Err(_) => {
log::debug!(target: "sub-libp2p", "handshake doesn't contain peer role: {handshake:?}");
self.peer_store_handle.peer_role(&(peer_id.into()))
},
}
}
async fn reserved_peers(&self) -> Result<Vec<sc_network_types::PeerId>, ()> {
let (tx, rx) = oneshot::channel();
self.sync_protocol_handle.reserved_peers(tx);
rx.await
.map(|peers| peers.into_iter().map(From::from).collect())
.map_err(|_| ())
}
}
impl<B, H> NetworkEventStream for NetworkService<B, H>
where
B: BlockT + 'static,
H: ExHashT,
{
fn event_stream(&self, name: &'static str) -> Pin<Box<dyn Stream<Item = Event> + Send>> {
let (tx, rx) = out_events::channel(name, 100_000);
let _ = self.to_worker.unbounded_send(ServiceToWorkerMsg::EventStream(tx));
Box::pin(rx)
}
}
#[async_trait::async_trait]
impl<B, H> NetworkRequest for NetworkService<B, H>
where
B: BlockT + 'static,
H: ExHashT,
{
async fn request(
&self,
target: sc_network_types::PeerId,
protocol: ProtocolName,
request: Vec<u8>,
fallback_request: Option<(Vec<u8>, ProtocolName)>,
connect: IfDisconnected,
) -> Result<(Vec<u8>, ProtocolName), RequestFailure> {
let (tx, rx) = oneshot::channel();
self.start_request(target.into(), protocol, request, fallback_request, tx, connect);
match rx.await {
Ok(v) => v,
Err(_) => Err(RequestFailure::Network(OutboundFailure::ConnectionClosed)),
}
}
fn start_request(
&self,
target: sc_network_types::PeerId,
protocol: ProtocolName,
request: Vec<u8>,
fallback_request: Option<(Vec<u8>, ProtocolName)>,
tx: oneshot::Sender<Result<(Vec<u8>, ProtocolName), RequestFailure>>,
connect: IfDisconnected,
) {
let _ = self.to_worker.unbounded_send(ServiceToWorkerMsg::Request {
target: target.into(),
protocol: protocol.into(),
request,
fallback_request,
pending_response: tx,
connect,
});
}
}
#[must_use]
pub struct NotificationSender {
sink: NotificationsSink,
protocol_name: ProtocolName,
notification_size_metric: Option<Histogram>,
}
#[async_trait::async_trait]
impl NotificationSenderT for NotificationSender {
async fn ready(
&self,
) -> Result<Box<dyn NotificationSenderReadyT + '_>, NotificationSenderError> {
Ok(Box::new(NotificationSenderReady {
ready: match self.sink.reserve_notification().await {
Ok(r) => Some(r),
Err(()) => return Err(NotificationSenderError::Closed),
},
peer_id: self.sink.peer_id(),
protocol_name: &self.protocol_name,
notification_size_metric: self.notification_size_metric.clone(),
}))
}
}
#[must_use]
pub struct NotificationSenderReady<'a> {
ready: Option<Ready<'a>>,
peer_id: &'a PeerId,
protocol_name: &'a ProtocolName,
notification_size_metric: Option<Histogram>,
}
impl<'a> NotificationSenderReadyT for NotificationSenderReady<'a> {
fn send(&mut self, notification: Vec<u8>) -> Result<(), NotificationSenderError> {
if let Some(notification_size_metric) = &self.notification_size_metric {
notification_size_metric.observe(notification.len() as f64);
}
trace!(
target: "sub-libp2p",
"External API => Notification({:?}, {}, {} bytes)",
self.peer_id, self.protocol_name, notification.len(),
);
trace!(target: "sub-libp2p", "Handler({:?}) <= Async notification", self.peer_id);
self.ready
.take()
.ok_or(NotificationSenderError::Closed)?
.send(notification)
.map_err(|()| NotificationSenderError::Closed)
}
}
enum ServiceToWorkerMsg {
GetValue(KademliaKey),
PutValue(KademliaKey, Vec<u8>),
PutRecordTo {
record: Record,
peers: HashSet<sc_network_types::PeerId>,
update_local_storage: bool,
},
StoreRecord(KademliaKey, Vec<u8>, Option<PeerId>, Option<Instant>),
AddKnownAddress(PeerId, Multiaddr),
EventStream(out_events::Sender),
Request {
target: PeerId,
protocol: ProtocolName,
request: Vec<u8>,
fallback_request: Option<(Vec<u8>, ProtocolName)>,
pending_response: oneshot::Sender<Result<(Vec<u8>, ProtocolName), RequestFailure>>,
connect: IfDisconnected,
},
NetworkStatus {
pending_response: oneshot::Sender<Result<NetworkStatus, RequestFailure>>,
},
NetworkState {
pending_response: oneshot::Sender<Result<NetworkState, RequestFailure>>,
},
DisconnectPeer(PeerId, ProtocolName),
}
#[must_use = "The NetworkWorker must be polled in order for the network to advance"]
pub struct NetworkWorker<B, H>
where
B: BlockT + 'static,
H: ExHashT,
{
listen_addresses: Arc<Mutex<HashSet<Multiaddr>>>,
num_connected: Arc<AtomicUsize>,
service: Arc<NetworkService<B, H>>,
network_service: Swarm<Behaviour<B>>,
from_service: TracingUnboundedReceiver<ServiceToWorkerMsg>,
event_streams: out_events::OutChannels,
metrics: Option<Metrics>,
boot_node_ids: Arc<HashMap<PeerId, Vec<Multiaddr>>>,
reported_invalid_boot_nodes: HashSet<PeerId>,
peer_store_handle: Arc<dyn PeerStoreProvider>,
notif_protocol_handles: Vec<protocol::ProtocolHandle>,
_marker: PhantomData<H>,
_block: PhantomData<B>,
}
impl<B, H> NetworkWorker<B, H>
where
B: BlockT + 'static,
H: ExHashT,
{
pub async fn run(mut self) {
while self.next_action().await {}
}
pub async fn next_action(&mut self) -> bool {
futures::select! {
msg = self.from_service.next() => {
if let Some(msg) = msg {
self.handle_worker_message(msg);
} else {
return false
}
},
event = self.network_service.select_next_some() => {
self.handle_swarm_event(event);
},
};
let num_connected_peers = self.network_service.behaviour().user_protocol().num_sync_peers();
self.num_connected.store(num_connected_peers, Ordering::Relaxed);
if let Some(metrics) = self.metrics.as_ref() {
if let Some(buckets) = self.network_service.behaviour_mut().num_entries_per_kbucket() {
for (lower_ilog2_bucket_bound, num_entries) in buckets {
metrics
.kbuckets_num_nodes
.with_label_values(&[&lower_ilog2_bucket_bound.to_string()])
.set(num_entries as u64);
}
}
if let Some(num_entries) = self.network_service.behaviour_mut().num_kademlia_records() {
metrics.kademlia_records_count.set(num_entries as u64);
}
if let Some(num_entries) =
self.network_service.behaviour_mut().kademlia_records_total_size()
{
metrics.kademlia_records_sizes_total.set(num_entries as u64);
}
metrics.pending_connections.set(
Swarm::network_info(&self.network_service).connection_counters().num_pending()
as u64,
);
}
true
}
fn handle_worker_message(&mut self, msg: ServiceToWorkerMsg) {
match msg {
ServiceToWorkerMsg::GetValue(key) =>
self.network_service.behaviour_mut().get_value(key),
ServiceToWorkerMsg::PutValue(key, value) =>
self.network_service.behaviour_mut().put_value(key, value),
ServiceToWorkerMsg::PutRecordTo { record, peers, update_local_storage } => self
.network_service
.behaviour_mut()
.put_record_to(record, peers, update_local_storage),
ServiceToWorkerMsg::StoreRecord(key, value, publisher, expires) => self
.network_service
.behaviour_mut()
.store_record(key, value, publisher, expires),
ServiceToWorkerMsg::AddKnownAddress(peer_id, addr) =>
self.network_service.behaviour_mut().add_known_address(peer_id, addr),
ServiceToWorkerMsg::EventStream(sender) => self.event_streams.push(sender),
ServiceToWorkerMsg::Request {
target,
protocol,
request,
fallback_request,
pending_response,
connect,
} => {
self.network_service.behaviour_mut().send_request(
&target,
protocol,
request,
fallback_request,
pending_response,
connect,
);
},
ServiceToWorkerMsg::NetworkStatus { pending_response } => {
let _ = pending_response.send(Ok(self.status()));
},
ServiceToWorkerMsg::NetworkState { pending_response } => {
let _ = pending_response.send(Ok(self.network_state()));
},
ServiceToWorkerMsg::DisconnectPeer(who, protocol_name) => self
.network_service
.behaviour_mut()
.user_protocol_mut()
.disconnect_peer(&who, protocol_name),
}
}
#[allow(deprecated)]
fn handle_swarm_event(&mut self, event: SwarmEvent<BehaviourOut, THandlerErr<Behaviour<B>>>) {
match event {
SwarmEvent::Behaviour(BehaviourOut::InboundRequest { protocol, result, .. }) => {
if let Some(metrics) = self.metrics.as_ref() {
match result {
Ok(serve_time) => {
metrics
.requests_in_success_total
.with_label_values(&[&protocol])
.observe(serve_time.as_secs_f64());
},
Err(err) => {
let reason = match err {
ResponseFailure::Network(InboundFailure::Timeout) =>
Some("timeout"),
ResponseFailure::Network(InboundFailure::UnsupportedProtocols) =>
None,
ResponseFailure::Network(InboundFailure::ResponseOmission) =>
Some("busy-omitted"),
ResponseFailure::Network(InboundFailure::ConnectionClosed) =>
Some("connection-closed"),
};
if let Some(reason) = reason {
metrics
.requests_in_failure_total
.with_label_values(&[&protocol, reason])
.inc();
}
},
}
}
},
SwarmEvent::Behaviour(BehaviourOut::RequestFinished {
protocol,
duration,
result,
..
}) =>
if let Some(metrics) = self.metrics.as_ref() {
match result {
Ok(_) => {
metrics
.requests_out_success_total
.with_label_values(&[&protocol])
.observe(duration.as_secs_f64());
},
Err(err) => {
let reason = match err {
RequestFailure::NotConnected => "not-connected",
RequestFailure::UnknownProtocol => "unknown-protocol",
RequestFailure::Refused => "refused",
RequestFailure::Obsolete => "obsolete",
RequestFailure::Network(OutboundFailure::DialFailure) =>
"dial-failure",
RequestFailure::Network(OutboundFailure::Timeout) => "timeout",
RequestFailure::Network(OutboundFailure::ConnectionClosed) =>
"connection-closed",
RequestFailure::Network(OutboundFailure::UnsupportedProtocols) =>
"unsupported",
};
metrics
.requests_out_failure_total
.with_label_values(&[&protocol, reason])
.inc();
},
}
},
SwarmEvent::Behaviour(BehaviourOut::ReputationChanges { peer, changes }) => {
for change in changes {
self.peer_store_handle.report_peer(peer.into(), change);
}
},
SwarmEvent::Behaviour(BehaviourOut::PeerIdentify {
peer_id,
info:
IdentifyInfo {
protocol_version, agent_version, mut listen_addrs, protocols, ..
},
}) => {
if listen_addrs.len() > 30 {
debug!(
target: "sub-libp2p",
"Node {:?} has reported more than 30 addresses; it is identified by {:?} and {:?}",
peer_id, protocol_version, agent_version
);
listen_addrs.truncate(30);
}
for addr in listen_addrs {
self.network_service.behaviour_mut().add_self_reported_address_to_dht(
&peer_id,
&protocols,
addr.clone(),
);
}
self.peer_store_handle.add_known_peer(peer_id.into());
},
SwarmEvent::Behaviour(BehaviourOut::Discovered(peer_id)) => {
self.peer_store_handle.add_known_peer(peer_id.into());
},
SwarmEvent::Behaviour(BehaviourOut::RandomKademliaStarted) => {
if let Some(metrics) = self.metrics.as_ref() {
metrics.kademlia_random_queries_total.inc();
}
},
SwarmEvent::Behaviour(BehaviourOut::NotificationStreamOpened {
remote,
set_id,
direction,
negotiated_fallback,
notifications_sink,
received_handshake,
}) => {
let _ = self.notif_protocol_handles[usize::from(set_id)].report_substream_opened(
remote,
direction,
received_handshake,
negotiated_fallback,
notifications_sink,
);
},
SwarmEvent::Behaviour(BehaviourOut::NotificationStreamReplaced {
remote,
set_id,
notifications_sink,
}) => {
let _ = self.notif_protocol_handles[usize::from(set_id)]
.report_notification_sink_replaced(remote, notifications_sink);
},
SwarmEvent::Behaviour(BehaviourOut::NotificationStreamClosed { remote, set_id }) => {
let _ = self.notif_protocol_handles[usize::from(set_id)]
.report_substream_closed(remote);
},
SwarmEvent::Behaviour(BehaviourOut::NotificationsReceived {
remote,
set_id,
notification,
}) => {
let _ = self.notif_protocol_handles[usize::from(set_id)]
.report_notification_received(remote, notification);
},
SwarmEvent::Behaviour(BehaviourOut::Dht(event, duration)) => {
match (self.metrics.as_ref(), duration) {
(Some(metrics), Some(duration)) => {
let query_type = match event {
DhtEvent::ValueFound(_) => "value-found",
DhtEvent::ValueNotFound(_) => "value-not-found",
DhtEvent::ValuePut(_) => "value-put",
DhtEvent::ValuePutFailed(_) => "value-put-failed",
DhtEvent::PutRecordRequest(_, _, _, _) => "put-record-request",
};
metrics
.kademlia_query_duration
.with_label_values(&[query_type])
.observe(duration.as_secs_f64());
},
_ => {},
}
self.event_streams.send(Event::Dht(event));
},
SwarmEvent::Behaviour(BehaviourOut::None) => {
},
SwarmEvent::ConnectionEstablished {
peer_id,
endpoint,
num_established,
concurrent_dial_errors,
..
} => {
if let Some(errors) = concurrent_dial_errors {
debug!(target: "sub-libp2p", "Libp2p => Connected({:?}) with errors: {:?}", peer_id, errors);
} else {
debug!(target: "sub-libp2p", "Libp2p => Connected({:?})", peer_id);
}
if let Some(metrics) = self.metrics.as_ref() {
let direction = match endpoint {
ConnectedPoint::Dialer { .. } => "out",
ConnectedPoint::Listener { .. } => "in",
};
metrics.connections_opened_total.with_label_values(&[direction]).inc();
if num_established.get() == 1 {
metrics.distinct_peers_connections_opened_total.inc();
}
}
},
SwarmEvent::ConnectionClosed {
connection_id,
peer_id,
cause,
endpoint,
num_established,
} => {
debug!(target: "sub-libp2p", "Libp2p => Disconnected({peer_id:?} via {connection_id:?}, {cause:?})");
if let Some(metrics) = self.metrics.as_ref() {
let direction = match endpoint {
ConnectedPoint::Dialer { .. } => "out",
ConnectedPoint::Listener { .. } => "in",
};
let reason = match cause {
Some(ConnectionError::IO(_)) => "transport-error",
Some(ConnectionError::Handler(Either::Left(Either::Left(
Either::Left(Either::Right(
NotifsHandlerError::SyncNotificationsClogged,
)),
)))) => "sync-notifications-clogged",
Some(ConnectionError::Handler(Either::Left(Either::Left(
Either::Right(Either::Left(_)),
)))) => "ping-timeout",
Some(ConnectionError::Handler(_)) => "protocol-error",
Some(ConnectionError::KeepAliveTimeout) => "keep-alive-timeout",
None => "actively-closed",
};
metrics.connections_closed_total.with_label_values(&[direction, reason]).inc();
if num_established == 0 {
metrics.distinct_peers_connections_closed_total.inc();
}
}
},
SwarmEvent::NewListenAddr { address, .. } => {
trace!(target: "sub-libp2p", "Libp2p => NewListenAddr({})", address);
if let Some(metrics) = self.metrics.as_ref() {
metrics.listeners_local_addresses.inc();
}
self.listen_addresses.lock().insert(address.clone());
},
SwarmEvent::ExpiredListenAddr { address, .. } => {
info!(target: "sub-libp2p", "📪 No longer listening on {}", address);
if let Some(metrics) = self.metrics.as_ref() {
metrics.listeners_local_addresses.dec();
}
self.listen_addresses.lock().remove(&address);
},
SwarmEvent::OutgoingConnectionError { connection_id, peer_id, error } => {
if let Some(peer_id) = peer_id {
trace!(
target: "sub-libp2p",
"Libp2p => Failed to reach {peer_id:?} via {connection_id:?}: {error}",
);
let not_reported = !self.reported_invalid_boot_nodes.contains(&peer_id);
if let Some(addresses) =
not_reported.then(|| self.boot_node_ids.get(&peer_id)).flatten()
{
if let DialError::WrongPeerId { obtained, endpoint } = &error {
if let ConnectedPoint::Dialer { address, role_override: _ } = endpoint {
let address_without_peer_id = parse_addr(address.clone().into())
.map_or_else(|_| address.clone(), |r| r.1.into());
if addresses.iter().any(|a| address_without_peer_id == *a) {
warn!(
"💔 The bootnode you want to connect to at `{address}` provided a \
different peer ID `{obtained}` than the one you expect `{peer_id}`.",
);
self.reported_invalid_boot_nodes.insert(peer_id);
}
}
}
}
}
if let Some(metrics) = self.metrics.as_ref() {
#[allow(deprecated)]
let reason = match error {
DialError::Denied { cause } =>
if cause.downcast::<Exceeded>().is_ok() {
Some("limit-reached")
} else {
None
},
DialError::LocalPeerId { .. } => Some("local-peer-id"),
DialError::WrongPeerId { .. } => Some("invalid-peer-id"),
DialError::Transport(_) => Some("transport-error"),
DialError::NoAddresses |
DialError::DialPeerConditionFalse(_) |
DialError::Aborted => None, };
if let Some(reason) = reason {
metrics.pending_connections_errors_total.with_label_values(&[reason]).inc();
}
}
},
SwarmEvent::Dialing { connection_id, peer_id } => {
trace!(target: "sub-libp2p", "Libp2p => Dialing({peer_id:?}) via {connection_id:?}")
},
SwarmEvent::IncomingConnection { connection_id, local_addr, send_back_addr } => {
trace!(target: "sub-libp2p", "Libp2p => IncomingConnection({local_addr},{send_back_addr} via {connection_id:?}))");
if let Some(metrics) = self.metrics.as_ref() {
metrics.incoming_connections_total.inc();
}
},
SwarmEvent::IncomingConnectionError {
connection_id,
local_addr,
send_back_addr,
error,
} => {
debug!(
target: "sub-libp2p",
"Libp2p => IncomingConnectionError({local_addr},{send_back_addr} via {connection_id:?}): {error}"
);
if let Some(metrics) = self.metrics.as_ref() {
#[allow(deprecated)]
let reason = match error {
ListenError::Denied { cause } =>
if cause.downcast::<Exceeded>().is_ok() {
Some("limit-reached")
} else {
None
},
ListenError::WrongPeerId { .. } | ListenError::LocalPeerId { .. } =>
Some("invalid-peer-id"),
ListenError::Transport(_) => Some("transport-error"),
ListenError::Aborted => None, };
if let Some(reason) = reason {
metrics
.incoming_connections_errors_total
.with_label_values(&[reason])
.inc();
}
}
},
SwarmEvent::ListenerClosed { reason, addresses, .. } => {
if let Some(metrics) = self.metrics.as_ref() {
metrics.listeners_local_addresses.sub(addresses.len() as u64);
}
let mut listen_addresses = self.listen_addresses.lock();
for addr in &addresses {
listen_addresses.remove(addr);
}
drop(listen_addresses);
let addrs =
addresses.into_iter().map(|a| a.to_string()).collect::<Vec<_>>().join(", ");
match reason {
Ok(()) => error!(
target: "sub-libp2p",
"📪 Libp2p listener ({}) closed gracefully",
addrs
),
Err(e) => error!(
target: "sub-libp2p",
"📪 Libp2p listener ({}) closed: {}",
addrs, e
),
}
},
SwarmEvent::ListenerError { error, .. } => {
debug!(target: "sub-libp2p", "Libp2p => ListenerError: {}", error);
if let Some(metrics) = self.metrics.as_ref() {
metrics.listeners_errors_total.inc();
}
},
}
}
}
impl<B, H> Unpin for NetworkWorker<B, H>
where
B: BlockT + 'static,
H: ExHashT,
{
}
pub(crate) fn ensure_addresses_consistent_with_transport<'a>(
addresses: impl Iterator<Item = &'a sc_network_types::multiaddr::Multiaddr>,
transport: &TransportConfig,
) -> Result<(), Error> {
use sc_network_types::multiaddr::Protocol;
if matches!(transport, TransportConfig::MemoryOnly) {
let addresses: Vec<_> = addresses
.filter(|x| x.iter().any(|y| !matches!(y, Protocol::Memory(_))))
.cloned()
.collect();
if !addresses.is_empty() {
return Err(Error::AddressesForAnotherTransport {
transport: transport.clone(),
addresses,
})
}
} else {
let addresses: Vec<_> = addresses
.filter(|x| x.iter().any(|y| matches!(y, Protocol::Memory(_))))
.cloned()
.collect();
if !addresses.is_empty() {
return Err(Error::AddressesForAnotherTransport {
transport: transport.clone(),
addresses,
})
}
}
Ok(())
}