1use crate::{
22 config::{
23 FullNetworkConfiguration, IncomingRequest, NodeKeyConfig, NotificationHandshake, Params,
24 SetConfig, TransportConfig,
25 },
26 error::Error,
27 event::{DhtEvent, Event},
28 litep2p::{
29 bitswap::BitswapService,
30 discovery::{Discovery, DiscoveryEvent},
31 ipfs_dht::IpfsDht,
32 peerstore::Peerstore,
33 service::{Litep2pNetworkService, NetworkServiceCommand},
34 shim::{
35 notification::{
36 config::{NotificationProtocolConfig, ProtocolControlHandle},
37 peerset::PeersetCommand,
38 },
39 request_response::{RequestResponseConfig, RequestResponseProtocol},
40 },
41 },
42 peer_store::PeerStoreProvider,
43 service::{
44 metrics::{register_without_sources, MetricSources, Metrics, NotificationMetrics},
45 out_events,
46 traits::{BandwidthSink, NetworkBackend, NetworkService},
47 },
48 NetworkStatus, NotificationService, ProtocolName,
49};
50
51use codec::Encode;
52use futures::StreamExt;
53use litep2p::{
54 config::ConfigBuilder,
55 crypto::ed25519::Keypair,
56 error::{DialError, NegotiationError},
57 executor::Executor,
58 protocol::{
59 libp2p::kademlia::{QueryId, Record},
60 request_response::ConfigBuilder as RequestResponseConfigBuilder,
61 },
62 transport::{
63 tcp::config::Config as TcpTransportConfig,
64 websocket::config::Config as WebSocketTransportConfig, ConnectionLimitsConfig, Endpoint,
65 },
66 types::{
67 multiaddr::{Multiaddr, Protocol},
68 ConnectionId,
69 },
70 Litep2p, Litep2pEvent, ProtocolName as Litep2pProtocolName,
71};
72use prometheus_endpoint::Registry;
73use sc_network_types::kad::{Key as RecordKey, PeerRecord, Record as P2PRecord};
74
75use sc_client_api::BlockBackend;
76use sc_network_common::{role::Roles, ExHashT};
77use sc_network_types::PeerId;
78use sc_utils::mpsc::{tracing_unbounded, TracingUnboundedReceiver};
79use sp_runtime::traits::Block as BlockT;
80
81use std::{
82 cmp,
83 collections::{hash_map::Entry, HashMap, HashSet},
84 fs,
85 future::Future,
86 iter,
87 pin::Pin,
88 sync::{
89 atomic::{AtomicUsize, Ordering},
90 Arc,
91 },
92 time::{Duration, Instant},
93};
94
95mod bitswap;
96mod discovery;
97mod ipfs_dht;
98mod peerstore;
99mod service;
100mod shim;
101
102struct Litep2pBandwidthSink {
104 sink: litep2p::BandwidthSink,
105}
106
107impl BandwidthSink for Litep2pBandwidthSink {
108 fn total_inbound(&self) -> u64 {
109 self.sink.inbound() as u64
110 }
111
112 fn total_outbound(&self) -> u64 {
113 self.sink.outbound() as u64
114 }
115}
116
117struct Litep2pExecutor {
119 executor: Box<dyn Fn(Pin<Box<dyn Future<Output = ()> + Send>>) + Send + Sync>,
121}
122
123impl Executor for Litep2pExecutor {
124 fn run(&self, future: Pin<Box<dyn Future<Output = ()> + Send>>) {
125 (self.executor)(future)
126 }
127
128 fn run_with_name(&self, _: &'static str, future: Pin<Box<dyn Future<Output = ()> + Send>>) {
129 (self.executor)(future)
130 }
131}
132
133const LOG_TARGET: &str = "sub-libp2p";
135
136struct ConnectionContext {
138 endpoints: HashMap<ConnectionId, Endpoint>,
140
141 num_connections: usize,
143}
144
145#[derive(Debug)]
147enum KadQuery {
148 FindNode(PeerId, Instant),
150 GetValue(RecordKey, Instant),
152 PutValue(RecordKey, Instant),
154 GetProviders(RecordKey, Instant),
156 AddProvider(RecordKey, Instant),
158}
159
160pub struct Litep2pNetworkBackend {
162 litep2p: Litep2p,
164
165 network_service: Arc<dyn NetworkService>,
167
168 cmd_rx: TracingUnboundedReceiver<NetworkServiceCommand>,
170
171 peerset_handles: HashMap<ProtocolName, ProtocolControlHandle>,
173
174 pending_queries: HashMap<QueryId, KadQuery>,
176
177 discovery: Discovery,
179
180 num_connected: Arc<AtomicUsize>,
182
183 peers: HashMap<litep2p::PeerId, ConnectionContext>,
185
186 peerstore_handle: Arc<dyn PeerStoreProvider>,
188
189 block_announce_protocol: ProtocolName,
191
192 event_streams: out_events::OutChannels,
194
195 metrics: Option<Metrics>,
197}
198
199impl Litep2pNetworkBackend {
200 fn parse_addresses(
203 addresses: impl Iterator<Item = Multiaddr>,
204 ) -> HashMap<PeerId, Vec<Multiaddr>> {
205 addresses
206 .into_iter()
207 .filter_map(|address| match address.iter().next() {
208 Some(
209 Protocol::Dns(_) |
210 Protocol::Dns4(_) |
211 Protocol::Dns6(_) |
212 Protocol::Ip6(_) |
213 Protocol::Ip4(_),
214 ) => match address.iter().find(|protocol| std::matches!(protocol, Protocol::P2p(_)))
215 {
216 Some(Protocol::P2p(peer_id)) => Some((peer_id.into(), Some(address))),
217 _ => None,
218 },
219 Some(Protocol::P2p(peer_id)) => Some((peer_id.into(), None)),
220 _ => None,
221 })
222 .fold(HashMap::new(), |mut acc, (peer, maybe_address)| {
223 let entry = acc.entry(peer).or_default();
224 maybe_address.map(|address| entry.push(address));
225
226 acc
227 })
228 }
229
230 fn add_addresses(&mut self, peers: impl Iterator<Item = Multiaddr>) -> HashSet<PeerId> {
232 Self::parse_addresses(peers.into_iter())
233 .into_iter()
234 .filter_map(|(peer, addresses)| {
235 if addresses.is_empty() {
237 return Some(peer);
238 }
239
240 if self.litep2p.add_known_address(peer.into(), addresses.clone().into_iter()) == 0 {
241 log::warn!(
242 target: LOG_TARGET,
243 "couldn't add any addresses for {peer:?} and it won't be added as reserved peer",
244 );
245 return None;
246 }
247
248 self.peerstore_handle.add_known_peer(peer);
249 Some(peer)
250 })
251 .collect()
252 }
253}
254
255impl Litep2pNetworkBackend {
256 fn get_keypair(node_key: &NodeKeyConfig) -> Result<(Keypair, litep2p::PeerId), Error> {
258 let secret: litep2p::crypto::ed25519::SecretKey =
259 node_key.clone().into_keypair()?.secret().into();
260
261 let local_identity = Keypair::from(secret);
262 let local_public = local_identity.public();
263 let local_peer_id = local_public.to_peer_id();
264
265 Ok((local_identity, local_peer_id))
266 }
267
268 fn configure_transport<B: BlockT + 'static, H: ExHashT>(
270 config: &FullNetworkConfiguration<B, H, Self>,
271 ) -> ConfigBuilder {
272 let _ = match config.network_config.transport {
273 TransportConfig::MemoryOnly => panic!("memory transport not supported"),
274 TransportConfig::Normal { .. } => false,
275 };
276 let config_builder = ConfigBuilder::new();
277
278 let (tcp, websocket): (Vec<Option<_>>, Vec<Option<_>>) = config
279 .network_config
280 .listen_addresses
281 .iter()
282 .filter_map(|address| {
283 use sc_network_types::multiaddr::Protocol;
284
285 let mut iter = address.iter();
286
287 match iter.next() {
288 Some(Protocol::Ip4(_) | Protocol::Ip6(_)) => {},
289 protocol => {
290 log::error!(
291 target: LOG_TARGET,
292 "unknown protocol {protocol:?}, ignoring {address:?}",
293 );
294
295 return None;
296 },
297 }
298
299 match iter.next() {
300 Some(Protocol::Tcp(_)) => match iter.next() {
301 Some(Protocol::Ws(_) | Protocol::Wss(_)) => {
302 Some((None, Some(address.clone())))
303 },
304 Some(Protocol::P2p(_)) | None => Some((Some(address.clone()), None)),
305 protocol => {
306 log::error!(
307 target: LOG_TARGET,
308 "unknown protocol {protocol:?}, ignoring {address:?}",
309 );
310 None
311 },
312 },
313 protocol => {
314 log::error!(
315 target: LOG_TARGET,
316 "unknown protocol {protocol:?}, ignoring {address:?}",
317 );
318 None
319 },
320 }
321 })
322 .unzip();
323
324 config_builder
325 .with_websocket(WebSocketTransportConfig {
326 listen_addresses: websocket.into_iter().flatten().map(Into::into).collect(),
327 yamux_config: litep2p::yamux::Config::default(),
328 nodelay: true,
329 ..Default::default()
330 })
331 .with_tcp(TcpTransportConfig {
332 listen_addresses: tcp.into_iter().flatten().map(Into::into).collect(),
333 yamux_config: litep2p::yamux::Config::default(),
334 nodelay: true,
335 ..Default::default()
336 })
337 }
338}
339
340#[async_trait::async_trait]
341impl<B: BlockT + 'static, H: ExHashT> NetworkBackend<B, H> for Litep2pNetworkBackend {
342 type NotificationProtocolConfig = NotificationProtocolConfig;
343 type RequestResponseProtocolConfig = RequestResponseConfig;
344 type NetworkService<Block, Hash> = Arc<Litep2pNetworkService>;
345 type PeerStore = Peerstore;
346 type BitswapConfig = bitswap::BitswapConfig;
347
348 fn new(mut params: Params<B, H, Self>) -> Result<Self, Error>
349 where
350 Self: Sized,
351 {
352 let (keypair, local_peer_id) =
353 Self::get_keypair(¶ms.network_config.network_config.node_key)?;
354 let (cmd_tx, cmd_rx) = tracing_unbounded("mpsc_network_worker", 100_000);
355
356 params.network_config.network_config.boot_nodes = params
357 .network_config
358 .network_config
359 .boot_nodes
360 .into_iter()
361 .filter(|boot_node| boot_node.peer_id != local_peer_id.into())
362 .collect();
363 params.network_config.network_config.default_peers_set.reserved_nodes = params
364 .network_config
365 .network_config
366 .default_peers_set
367 .reserved_nodes
368 .into_iter()
369 .filter(|reserved_node| {
370 if reserved_node.peer_id == local_peer_id.into() {
371 log::warn!(
372 target: LOG_TARGET,
373 "Local peer ID used in reserved node, ignoring: {reserved_node}",
374 );
375 false
376 } else {
377 true
378 }
379 })
380 .collect();
381
382 if let Some(path) = ¶ms.network_config.network_config.net_config_path {
383 fs::create_dir_all(path)?;
384 }
385
386 log::info!(target: LOG_TARGET, "Local node identity is: {local_peer_id}");
387 log::info!(target: LOG_TARGET, "Running litep2p network backend");
388
389 params.network_config.sanity_check_addresses()?;
390 params.network_config.sanity_check_bootnodes()?;
391
392 let mut config_builder =
393 Self::configure_transport(¶ms.network_config).with_keypair(keypair.clone());
394 let known_addresses = params.network_config.known_addresses();
395 let peer_store_handle = params.network_config.peer_store_handle();
396 let executor = Arc::new(Litep2pExecutor { executor: params.executor });
397
398 let FullNetworkConfiguration {
399 notification_protocols,
400 request_response_protocols,
401 network_config,
402 ..
403 } = params.network_config;
404
405 let block_announce_protocol = params.block_announce_config.protocol_name().clone();
411 let mut notif_protocols = HashMap::from_iter([(
412 params.block_announce_config.protocol_name().clone(),
413 params.block_announce_config.handle,
414 )]);
415
416 config_builder = notification_protocols
418 .into_iter()
419 .fold(config_builder, |config_builder, mut config| {
420 config.config.set_handshake(Roles::from(¶ms.role).encode());
421 notif_protocols.insert(config.protocol_name, config.handle);
422
423 config_builder.with_notification_protocol(config.config)
424 })
425 .with_notification_protocol(params.block_announce_config.config);
426
427 let metrics = match ¶ms.metrics_registry {
429 Some(registry) => Some(register_without_sources(registry)?),
430 None => None,
431 };
432
433 let (mut request_response_receivers, request_response_senders): (
439 HashMap<_, _>,
440 HashMap<_, _>,
441 ) = request_response_protocols
442 .iter()
443 .map(|config| {
444 let (tx, rx) = tracing_unbounded("outbound-requests", 10_000);
445 ((config.protocol_name.clone(), rx), (config.protocol_name.clone(), tx))
446 })
447 .unzip();
448
449 config_builder = request_response_protocols.into_iter().fold(
450 config_builder,
451 |config_builder, config| {
452 let (protocol_config, handle) = RequestResponseConfigBuilder::new(
453 Litep2pProtocolName::from(config.protocol_name.clone()),
454 )
455 .with_max_size(cmp::max(config.max_request_size, config.max_response_size) as usize)
456 .with_fallback_names(config.fallback_names.into_iter().map(From::from).collect())
457 .with_timeout(config.request_timeout)
458 .build();
459
460 let protocol = RequestResponseProtocol::new(
461 config.protocol_name.clone(),
462 handle,
463 Arc::clone(&peer_store_handle),
464 config.inbound_queue,
465 request_response_receivers
466 .remove(&config.protocol_name)
467 .expect("receiver exists as it was just added and there are no duplicate protocols; qed"),
468 request_response_senders.clone(),
469 metrics.clone(),
470 );
471
472 executor.run(Box::pin(async move {
473 protocol.run().await;
474 }));
475
476 config_builder.with_request_response_protocol(protocol_config)
477 },
478 );
479
480 let known_addresses: HashMap<litep2p::PeerId, Vec<Multiaddr>> =
482 known_addresses.into_iter().fold(HashMap::new(), |mut acc, (peer, address)| {
483 use sc_network_types::multiaddr::Protocol;
484
485 let address = match address.iter().last() {
486 Some(Protocol::Ws(_) | Protocol::Wss(_) | Protocol::Tcp(_)) => {
487 address.with(Protocol::P2p(peer.into()))
488 },
489 Some(Protocol::P2p(_)) => address,
490 _ => return acc,
491 };
492
493 acc.entry(peer.into()).or_default().push(address.into());
494 peer_store_handle.add_known_peer(peer);
495
496 acc
497 });
498
499 let listen_addresses = Arc::new(Default::default());
501 let (discovery, ping_config, identify_config, kademlia_config, maybe_mdns_config) =
502 Discovery::new(
503 local_peer_id,
504 &network_config,
505 params.genesis_hash,
506 params.fork_id.as_deref(),
507 ¶ms.protocol_id,
508 known_addresses.clone(),
509 Arc::clone(&listen_addresses),
510 Arc::clone(&peer_store_handle),
511 );
512
513 let bitswap_cmd_tx = params.ipfs_config.as_ref().map(|c| c.bitswap_config.cmd_tx.clone());
514
515 if let Some(config) = params.ipfs_config {
517 config_builder =
518 config_builder.with_libp2p_bitswap(config.bitswap_config.litep2p_config);
519
520 if !config.bootnodes.is_empty() {
521 let (ipfs_dht, kad_config) = IpfsDht::new(config.bootnodes, config.block_provider);
522 config_builder = config_builder.with_libp2p_kademlia(kad_config);
523 executor.run(Box::pin(ipfs_dht.run()));
524 } else {
525 log::warn!(
526 target: LOG_TARGET,
527 "Not starting IPFS DHT publisher because no IPFS bootnodes are configured. \
528 Only direct Bitswap requests will be handled.",
529 );
530 }
531 }
532
533 config_builder = config_builder
534 .with_known_addresses(known_addresses.clone().into_iter())
535 .with_libp2p_ping(ping_config)
536 .with_libp2p_identify(identify_config)
537 .with_libp2p_kademlia(kademlia_config)
538 .with_connection_limits(ConnectionLimitsConfig::default().max_incoming_connections(
539 Some(crate::MAX_CONNECTIONS_ESTABLISHED_INCOMING as usize),
540 ))
541 .with_keep_alive_timeout(network_config.idle_connection_timeout)
542 .with_system_resolver()
545 .with_executor(executor);
546
547 if let Some(config) = maybe_mdns_config {
548 config_builder = config_builder.with_mdns(config);
549 }
550
551 let litep2p =
552 Litep2p::new(config_builder.build()).map_err(|error| Error::Litep2p(error))?;
553
554 litep2p.listen_addresses().for_each(|address| {
555 log::debug!(target: LOG_TARGET, "listening on: {address}");
556
557 listen_addresses.write().insert(address.clone());
558 });
559
560 let public_addresses = litep2p.public_addresses();
561 for address in network_config.public_addresses.iter() {
562 if let Err(err) = public_addresses.add_address(address.clone().into()) {
563 log::warn!(
564 target: LOG_TARGET,
565 "failed to add public address {address:?}: {err:?}",
566 );
567 }
568 }
569
570 let network_service = Arc::new(Litep2pNetworkService::new(
571 local_peer_id,
572 keypair.clone(),
573 cmd_tx,
574 Arc::clone(&peer_store_handle),
575 notif_protocols.clone(),
576 block_announce_protocol.clone(),
577 request_response_senders,
578 Arc::clone(&listen_addresses),
579 public_addresses,
580 bitswap_cmd_tx,
581 ));
582
583 let num_connected = Arc::new(Default::default());
585 let bandwidth: Arc<dyn BandwidthSink> =
586 Arc::new(Litep2pBandwidthSink { sink: litep2p.bandwidth_sink() });
587
588 if let Some(registry) = ¶ms.metrics_registry {
589 MetricSources::register(registry, bandwidth, Arc::clone(&num_connected))?;
590 }
591
592 Ok(Self {
593 network_service,
594 cmd_rx,
595 metrics,
596 peerset_handles: notif_protocols,
597 num_connected,
598 discovery,
599 pending_queries: HashMap::new(),
600 peerstore_handle: peer_store_handle,
601 block_announce_protocol,
602 event_streams: out_events::OutChannels::new(None)?,
603 peers: HashMap::new(),
604 litep2p,
605 })
606 }
607
608 fn network_service(&self) -> Arc<dyn NetworkService> {
609 Arc::clone(&self.network_service)
610 }
611
612 fn peer_store(
613 bootnodes: Vec<sc_network_types::PeerId>,
614 metrics_registry: Option<Registry>,
615 ) -> Self::PeerStore {
616 Peerstore::new(bootnodes, metrics_registry)
617 }
618
619 fn register_notification_metrics(registry: Option<&Registry>) -> NotificationMetrics {
620 NotificationMetrics::new(registry)
621 }
622
623 fn bitswap_server(
625 client: Arc<dyn BlockBackend<B> + Send + Sync>,
626 ) -> (Pin<Box<dyn Future<Output = ()> + Send>>, Self::BitswapConfig) {
627 BitswapService::new(client)
628 }
629
630 fn notification_config(
632 protocol_name: ProtocolName,
633 fallback_names: Vec<ProtocolName>,
634 max_notification_size: u64,
635 handshake: Option<NotificationHandshake>,
636 set_config: SetConfig,
637 metrics: NotificationMetrics,
638 peerstore_handle: Arc<dyn PeerStoreProvider>,
639 ) -> (Self::NotificationProtocolConfig, Box<dyn NotificationService>) {
640 Self::NotificationProtocolConfig::new(
641 protocol_name,
642 fallback_names,
643 max_notification_size as usize,
644 handshake,
645 set_config,
646 metrics,
647 peerstore_handle,
648 )
649 }
650
651 fn request_response_config(
653 protocol_name: ProtocolName,
654 fallback_names: Vec<ProtocolName>,
655 max_request_size: u64,
656 max_response_size: u64,
657 request_timeout: Duration,
658 inbound_queue: Option<async_channel::Sender<IncomingRequest>>,
659 ) -> Self::RequestResponseProtocolConfig {
660 Self::RequestResponseProtocolConfig::new(
661 protocol_name,
662 fallback_names,
663 max_request_size,
664 max_response_size,
665 request_timeout,
666 inbound_queue,
667 )
668 }
669
670 async fn run(mut self) {
672 log::debug!(target: LOG_TARGET, "starting litep2p network backend");
673
674 loop {
675 let num_connected_peers = self
676 .peerset_handles
677 .get(&self.block_announce_protocol)
678 .map_or(0usize, |handle| handle.connected_peers.load(Ordering::Relaxed));
679 self.num_connected.store(num_connected_peers, Ordering::Relaxed);
680
681 tokio::select! {
682 command = self.cmd_rx.next() => match command {
683 None => return,
684 Some(command) => match command {
685 NetworkServiceCommand::FindClosestPeers { target } => {
686 let query_id = self.discovery.find_node(target.into()).await;
687 self.pending_queries.insert(query_id, KadQuery::FindNode(target, Instant::now()));
688 }
689 NetworkServiceCommand::GetValue{ key } => {
690 let query_id = self.discovery.get_value(key.clone()).await;
691 self.pending_queries.insert(query_id, KadQuery::GetValue(key, Instant::now()));
692 }
693 NetworkServiceCommand::PutValue { key, value } => {
694 let query_id = self.discovery.put_value(key.clone(), value).await;
695 self.pending_queries.insert(query_id, KadQuery::PutValue(key, Instant::now()));
696 }
697 NetworkServiceCommand::PutValueTo { record, peers, update_local_storage} => {
698 let kademlia_key = record.key.clone();
699 let query_id = self.discovery.put_value_to_peers(record.into(), peers, update_local_storage).await;
700 self.pending_queries.insert(query_id, KadQuery::PutValue(kademlia_key, Instant::now()));
701 }
702 NetworkServiceCommand::StoreRecord { key, value, publisher, expires } => {
703 self.discovery.store_record(key, value, publisher.map(Into::into), expires).await;
704 }
705 NetworkServiceCommand::StartProviding { key } => {
706 let query_id = self.discovery.start_providing(key.clone()).await;
707 self.pending_queries.insert(query_id, KadQuery::AddProvider(key, Instant::now()));
708 }
709 NetworkServiceCommand::StopProviding { key } => {
710 self.discovery.stop_providing(key).await;
711 }
712 NetworkServiceCommand::GetProviders { key } => {
713 let query_id = self.discovery.get_providers(key.clone()).await;
714 self.pending_queries.insert(query_id, KadQuery::GetProviders(key, Instant::now()));
715 }
716 NetworkServiceCommand::EventStream { tx } => {
717 self.event_streams.push(tx);
718 }
719 NetworkServiceCommand::Status { tx } => {
720 let _ = tx.send(NetworkStatus {
721 num_connected_peers: self
722 .peerset_handles
723 .get(&self.block_announce_protocol)
724 .map_or(0usize, |handle| handle.connected_peers.load(Ordering::Relaxed)),
725 total_bytes_inbound: self.litep2p.bandwidth_sink().inbound() as u64,
726 total_bytes_outbound: self.litep2p.bandwidth_sink().outbound() as u64,
727 });
728 }
729 NetworkServiceCommand::AddPeersToReservedSet {
730 protocol,
731 peers,
732 } => {
733 let peers = self.add_addresses(peers.into_iter().map(Into::into));
734
735 match self.peerset_handles.get(&protocol) {
736 Some(handle) => {
737 let _ = handle.tx.unbounded_send(PeersetCommand::AddReservedPeers { peers });
738 }
739 None => log::warn!(target: LOG_TARGET, "protocol {protocol} doens't exist"),
740 };
741 }
742 NetworkServiceCommand::AddKnownAddress { peer, address } => {
743 let mut address: Multiaddr = address.into();
744
745 if !address.iter().any(|protocol| std::matches!(protocol, Protocol::P2p(_))) {
746 address.push(Protocol::P2p(litep2p::PeerId::from(peer).into()));
747 }
748
749 if self.litep2p.add_known_address(peer.into(), iter::once(address.clone())) > 0 {
750 self.peerstore_handle.add_known_peer(peer);
754 } else {
755 log::debug!(
756 target: LOG_TARGET,
757 "couldn't add known address ({address}) for {peer:?}, unsupported transport"
758 );
759 }
760 },
761 NetworkServiceCommand::SetReservedPeers { protocol, peers } => {
762 let peers = self.add_addresses(peers.into_iter().map(Into::into));
763
764 match self.peerset_handles.get(&protocol) {
765 Some(handle) => {
766 let _ = handle.tx.unbounded_send(PeersetCommand::SetReservedPeers { peers });
767 }
768 None => log::warn!(target: LOG_TARGET, "protocol {protocol} doens't exist"),
769 }
770
771 },
772 NetworkServiceCommand::DisconnectPeer {
773 protocol,
774 peer,
775 } => {
776 let Some(handle) = self.peerset_handles.get(&protocol) else {
777 log::warn!(target: LOG_TARGET, "protocol {protocol} doens't exist");
778 continue
779 };
780
781 let _ = handle.tx.unbounded_send(PeersetCommand::DisconnectPeer { peer });
782 }
783 NetworkServiceCommand::SetReservedOnly {
784 protocol,
785 reserved_only,
786 } => {
787 let Some(handle) = self.peerset_handles.get(&protocol) else {
788 log::warn!(target: LOG_TARGET, "protocol {protocol} doens't exist");
789 continue
790 };
791
792 let _ = handle.tx.unbounded_send(PeersetCommand::SetReservedOnly { reserved_only });
793 }
794 NetworkServiceCommand::RemoveReservedPeers {
795 protocol,
796 peers,
797 } => {
798 let Some(handle) = self.peerset_handles.get(&protocol) else {
799 log::warn!(target: LOG_TARGET, "protocol {protocol} doens't exist");
800 continue
801 };
802
803 let _ = handle.tx.unbounded_send(PeersetCommand::RemoveReservedPeers { peers });
804 }
805 }
806 },
807 event = self.discovery.next() => match event {
808 None => return,
809 Some(DiscoveryEvent::Discovered { addresses }) => {
810 for (peer, addresses) in Litep2pNetworkBackend::parse_addresses(addresses.into_iter()) {
812 if self.litep2p.add_known_address(peer.into(), addresses.clone().into_iter()) > 0 {
813 self.peerstore_handle.add_known_peer(peer);
814 }
815 }
816 }
817 Some(DiscoveryEvent::RoutingTableUpdate { peers }) => {
818 for peer in peers {
819 self.peerstore_handle.add_known_peer(peer.into());
820 }
821 }
822 Some(DiscoveryEvent::FindNodeSuccess { query_id, target, peers }) => {
823 match self.pending_queries.remove(&query_id) {
824 Some(KadQuery::FindNode(_, started)) => {
825 log::trace!(
826 target: LOG_TARGET,
827 "`FIND_NODE` for {target:?} ({query_id:?}) succeeded",
828 );
829
830 self.event_streams.send(
831 Event::Dht(
832 DhtEvent::ClosestPeersFound(
833 target.into(),
834 peers
835 .into_iter()
836 .map(|(peer, addrs)| (
837 peer.into(),
838 addrs.into_iter().map(Into::into).collect(),
839 ))
840 .collect(),
841 )
842 )
843 );
844
845 if let Some(ref metrics) = self.metrics {
846 metrics
847 .kademlia_query_duration
848 .with_label_values(&["node-find"])
849 .observe(started.elapsed().as_secs_f64());
850 }
851 },
852 query => {
853 log::error!(
854 target: LOG_TARGET,
855 "Missing/invalid pending query for `FIND_NODE`: {query:?}"
856 );
857 debug_assert!(false);
858 }
859 }
860 },
861 Some(DiscoveryEvent::GetRecordPartialResult { query_id, record }) => {
862 if !self.pending_queries.contains_key(&query_id) {
863 log::error!(
864 target: LOG_TARGET,
865 "Missing/invalid pending query for `GET_VALUE` partial result: {query_id:?}"
866 );
867
868 continue
869 }
870
871 let peer_id: sc_network_types::PeerId = record.peer.into();
872 let record = PeerRecord {
873 record: P2PRecord {
874 key: record.record.key.to_vec().into(),
875 value: record.record.value,
876 publisher: record.record.publisher.map(|peer_id| {
877 let peer_id: sc_network_types::PeerId = peer_id.into();
878 peer_id.into()
879 }),
880 expires: record.record.expires,
881 },
882 peer: Some(peer_id.into()),
883 };
884
885 self.event_streams.send(
886 Event::Dht(
887 DhtEvent::ValueFound(
888 record.into()
889 )
890 )
891 );
892 }
893 Some(DiscoveryEvent::GetRecordSuccess { query_id }) => {
894 match self.pending_queries.remove(&query_id) {
895 Some(KadQuery::GetValue(key, started)) => {
896 log::trace!(
897 target: LOG_TARGET,
898 "`GET_VALUE` for {key:?} ({query_id:?}) succeeded",
899 );
900
901 if let Some(ref metrics) = self.metrics {
902 metrics
903 .kademlia_query_duration
904 .with_label_values(&["value-get"])
905 .observe(started.elapsed().as_secs_f64());
906 }
907 },
908 query => {
909 log::error!(
910 target: LOG_TARGET,
911 "Missing/invalid pending query for `GET_VALUE`: {query:?}"
912 );
913 debug_assert!(false);
914 },
915 }
916 }
917 Some(DiscoveryEvent::PutRecordSuccess { query_id }) => {
918 match self.pending_queries.remove(&query_id) {
919 Some(KadQuery::PutValue(key, started)) => {
920 log::trace!(
921 target: LOG_TARGET,
922 "`PUT_VALUE` for {key:?} ({query_id:?}) succeeded",
923 );
924
925 self.event_streams.send(Event::Dht(
926 DhtEvent::ValuePut(key)
927 ));
928
929 if let Some(ref metrics) = self.metrics {
930 metrics
931 .kademlia_query_duration
932 .with_label_values(&["value-put"])
933 .observe(started.elapsed().as_secs_f64());
934 }
935 },
936 query => {
937 log::error!(
938 target: LOG_TARGET,
939 "Missing/invalid pending query for `PUT_VALUE`: {query:?}"
940 );
941 debug_assert!(false);
942 }
943 }
944 }
945 Some(DiscoveryEvent::GetProvidersSuccess { query_id, providers }) => {
946 match self.pending_queries.remove(&query_id) {
947 Some(KadQuery::GetProviders(key, started)) => {
948 log::trace!(
949 target: LOG_TARGET,
950 "`GET_PROVIDERS` for {key:?} ({query_id:?}) succeeded",
951 );
952
953 providers.iter().for_each(|p| {
958 self.litep2p.add_known_address(p.peer, p.addresses.clone().into_iter());
959 });
960
961 self.event_streams.send(Event::Dht(
962 DhtEvent::ProvidersFound(
963 key.clone().into(),
964 providers.into_iter().map(|p| p.peer.into()).collect()
965 )
966 ));
967
968 self.event_streams.send(Event::Dht(
971 DhtEvent::NoMoreProviders(key.into())
972 ));
973
974 if let Some(ref metrics) = self.metrics {
975 metrics
976 .kademlia_query_duration
977 .with_label_values(&["providers-get"])
978 .observe(started.elapsed().as_secs_f64());
979 }
980 },
981 query => {
982 log::error!(
983 target: LOG_TARGET,
984 "Missing/invalid pending query for `GET_PROVIDERS`: {query:?}"
985 );
986 debug_assert!(false);
987 }
988 }
989 }
990 Some(DiscoveryEvent::AddProviderSuccess { query_id, provided_key }) => {
991 match self.pending_queries.remove(&query_id) {
992 Some(KadQuery::AddProvider(key, started)) => {
993 debug_assert_eq!(key, provided_key.into());
994
995 log::trace!(
996 target: LOG_TARGET,
997 "`ADD_PROVIDER` for {key:?} ({query_id:?}) succeeded",
998 );
999
1000 self.event_streams.send(Event::Dht(
1001 DhtEvent::StartedProviding(key.into())
1002 ));
1003
1004 if let Some(ref metrics) = self.metrics {
1005 metrics
1006 .kademlia_query_duration
1007 .with_label_values(&["provider-add"])
1008 .observe(started.elapsed().as_secs_f64());
1009 }
1010 }
1011 Some(_) => {
1012 log::error!(
1013 target: LOG_TARGET,
1014 "Invalid pending query for `ADD_PROVIDER`: {query_id:?}"
1015 );
1016 debug_assert!(false);
1017 }
1018 None => {
1019 log::trace!(
1020 target: LOG_TARGET,
1021 "`ADD_PROVIDER` for key {provided_key:?} ({query_id:?}) succeeded (republishing)",
1022 );
1023 }
1024 }
1025 }
1026 Some(DiscoveryEvent::QueryFailed { query_id }) => {
1027 match self.pending_queries.remove(&query_id) {
1028 Some(KadQuery::FindNode(peer_id, started)) => {
1029 log::debug!(
1030 target: LOG_TARGET,
1031 "`FIND_NODE` ({query_id:?}) failed for target {peer_id:?}",
1032 );
1033
1034 self.event_streams.send(Event::Dht(
1035 DhtEvent::ClosestPeersNotFound(peer_id.into())
1036 ));
1037
1038 if let Some(ref metrics) = self.metrics {
1039 metrics
1040 .kademlia_query_duration
1041 .with_label_values(&["node-find-failed"])
1042 .observe(started.elapsed().as_secs_f64());
1043 }
1044 },
1045 Some(KadQuery::GetValue(key, started)) => {
1046 log::debug!(
1047 target: LOG_TARGET,
1048 "`GET_VALUE` ({query_id:?}) failed for key {key:?}",
1049 );
1050
1051 self.event_streams.send(Event::Dht(
1052 DhtEvent::ValueNotFound(key)
1053 ));
1054
1055 if let Some(ref metrics) = self.metrics {
1056 metrics
1057 .kademlia_query_duration
1058 .with_label_values(&["value-get-failed"])
1059 .observe(started.elapsed().as_secs_f64());
1060 }
1061 },
1062 Some(KadQuery::PutValue(key, started)) => {
1063 log::debug!(
1064 target: LOG_TARGET,
1065 "`PUT_VALUE` ({query_id:?}) failed for key {key:?}",
1066 );
1067
1068 self.event_streams.send(Event::Dht(
1069 DhtEvent::ValuePutFailed(key)
1070 ));
1071
1072 if let Some(ref metrics) = self.metrics {
1073 metrics
1074 .kademlia_query_duration
1075 .with_label_values(&["value-put-failed"])
1076 .observe(started.elapsed().as_secs_f64());
1077 }
1078 },
1079 Some(KadQuery::GetProviders(key, started)) => {
1080 log::debug!(
1081 target: LOG_TARGET,
1082 "`GET_PROVIDERS` ({query_id:?}) failed for key {key:?}"
1083 );
1084
1085 self.event_streams.send(Event::Dht(
1086 DhtEvent::ProvidersNotFound(key)
1087 ));
1088
1089 if let Some(ref metrics) = self.metrics {
1090 metrics
1091 .kademlia_query_duration
1092 .with_label_values(&["providers-get-failed"])
1093 .observe(started.elapsed().as_secs_f64());
1094 }
1095 },
1096 Some(KadQuery::AddProvider(key, started)) => {
1097 log::debug!(
1098 target: LOG_TARGET,
1099 "`ADD_PROVIDER` ({query_id:?}) failed with key {key:?}",
1100 );
1101
1102 self.event_streams.send(Event::Dht(
1103 DhtEvent::StartProvidingFailed(key)
1104 ));
1105
1106 if let Some(ref metrics) = self.metrics {
1107 metrics
1108 .kademlia_query_duration
1109 .with_label_values(&["provider-add-failed"])
1110 .observe(started.elapsed().as_secs_f64());
1111 }
1112 },
1113 None => {
1114 log::debug!(
1115 target: LOG_TARGET,
1116 "non-existent query (likely republishing a provider) failed ({query_id:?})",
1117 );
1118 }
1119 }
1120 }
1121 Some(DiscoveryEvent::Identified { peer, listen_addresses, supported_protocols, .. }) => {
1122 self.discovery.add_self_reported_address(peer, supported_protocols, listen_addresses).await;
1123 }
1124 Some(DiscoveryEvent::ExternalAddressDiscovered { address }) => {
1125 match self.litep2p.public_addresses().add_address(address.clone().into()) {
1126 Ok(inserted) => if inserted {
1127 log::info!(target: LOG_TARGET, "๐ Discovered new external address for our node: {address}");
1128 },
1129 Err(err) => {
1130 log::warn!(
1131 target: LOG_TARGET,
1132 "๐ Failed to add discovered external address {address:?}: {err:?}",
1133 );
1134 },
1135 }
1136 }
1137 Some(DiscoveryEvent::ExternalAddressExpired{ address }) => {
1138 let local_peer_id = self.litep2p.local_peer_id();
1139
1140 let address = if !std::matches!(address.iter().last(), Some(Protocol::P2p(_))) {
1142 address.with(Protocol::P2p((*local_peer_id).into()))
1143 } else {
1144 address
1145 };
1146
1147 if self.litep2p.public_addresses().remove_address(&address) {
1148 log::info!(target: LOG_TARGET, "๐ Expired external address for our node: {address}");
1149 } else {
1150 log::warn!(
1151 target: LOG_TARGET,
1152 "๐ Failed to remove expired external address {address:?}"
1153 );
1154 }
1155 }
1156 Some(DiscoveryEvent::Ping { peer, rtt }) => {
1157 log::trace!(
1158 target: LOG_TARGET,
1159 "ping time with {peer:?}: {rtt:?}",
1160 );
1161 }
1162 Some(DiscoveryEvent::IncomingRecord { record: Record { key, value, publisher, expires }} ) => {
1163 self.event_streams.send(Event::Dht(
1164 DhtEvent::PutRecordRequest(
1165 key.into(),
1166 value,
1167 publisher.map(Into::into),
1168 expires,
1169 )
1170 ));
1171 },
1172
1173 Some(DiscoveryEvent::RandomKademliaStarted) => {
1174 if let Some(metrics) = self.metrics.as_ref() {
1175 metrics.kademlia_random_queries_total.inc();
1176 }
1177 }
1178 },
1179 event = self.litep2p.next_event() => match event {
1180 Some(Litep2pEvent::ConnectionEstablished { peer, endpoint }) => {
1181 let Some(metrics) = &self.metrics else {
1182 continue;
1183 };
1184
1185 let direction = match endpoint {
1186 Endpoint::Dialer { .. } => "out",
1187 Endpoint::Listener { .. } => {
1188 metrics.incoming_connections_total.inc();
1193
1194 "in"
1195 },
1196 };
1197 metrics.connections_opened_total.with_label_values(&[direction]).inc();
1198
1199 match self.peers.entry(peer) {
1200 Entry::Vacant(entry) => {
1201 entry.insert(ConnectionContext {
1202 endpoints: HashMap::from_iter([(endpoint.connection_id(), endpoint)]),
1203 num_connections: 1usize,
1204 });
1205 metrics.distinct_peers_connections_opened_total.inc();
1206 }
1207 Entry::Occupied(entry) => {
1208 let entry = entry.into_mut();
1209 entry.num_connections += 1;
1210 entry.endpoints.insert(endpoint.connection_id(), endpoint);
1211 }
1212 }
1213 }
1214 Some(Litep2pEvent::ConnectionClosed { peer, connection_id }) => {
1215 let Some(metrics) = &self.metrics else {
1216 continue;
1217 };
1218
1219 let Some(context) = self.peers.get_mut(&peer) else {
1220 log::debug!(target: LOG_TARGET, "unknown peer disconnected: {peer:?} ({connection_id:?})");
1221 continue
1222 };
1223
1224 let direction = match context.endpoints.remove(&connection_id) {
1225 None => {
1226 log::debug!(target: LOG_TARGET, "connection {connection_id:?} doesn't exist for {peer:?} ");
1227 continue
1228 }
1229 Some(endpoint) => {
1230 context.num_connections -= 1;
1231
1232 match endpoint {
1233 Endpoint::Dialer { .. } => "out",
1234 Endpoint::Listener { .. } => "in",
1235 }
1236 }
1237 };
1238
1239 metrics.connections_closed_total.with_label_values(&[direction, "actively-closed"]).inc();
1240
1241 if context.num_connections == 0 {
1242 self.peers.remove(&peer);
1243 metrics.distinct_peers_connections_closed_total.inc();
1244 }
1245 }
1246 Some(Litep2pEvent::DialFailure { address, error }) => {
1247 log::debug!(
1248 target: LOG_TARGET,
1249 "failed to dial peer at {address:?}: {error:?}",
1250 );
1251
1252 if let Some(metrics) = &self.metrics {
1253 let reason = match error {
1254 DialError::Timeout => "timeout",
1255 DialError::AddressError(_) => "invalid-address",
1256 DialError::DnsError(_) => "cannot-resolve-dns",
1257 DialError::NegotiationError(error) => match error {
1258 NegotiationError::Timeout => "timeout",
1259 NegotiationError::PeerIdMissing => "missing-peer-id",
1260 NegotiationError::StateMismatch => "state-mismatch",
1261 NegotiationError::PeerIdMismatch(_,_) => "peer-id-missmatch",
1262 NegotiationError::MultistreamSelectError(_) => "multistream-select-error",
1263 NegotiationError::SnowError(_) => "noise-error",
1264 NegotiationError::ParseError(_) => "parse-error",
1265 NegotiationError::IoError(_) => "io-error",
1266 NegotiationError::WebSocket(_) => "webscoket-error",
1267 NegotiationError::BadSignature => "bad-signature",
1268 }
1269 };
1270
1271 metrics.pending_connections_errors_total.with_label_values(&[&reason]).inc();
1272 }
1273 }
1274 Some(Litep2pEvent::ListDialFailures { errors }) => {
1275 log::debug!(
1276 target: LOG_TARGET,
1277 "failed to dial peer on multiple addresses {errors:?}",
1278 );
1279
1280 if let Some(metrics) = &self.metrics {
1281 metrics.pending_connections_errors_total.with_label_values(&["transport-errors"]).inc();
1282 }
1283 }
1284 None => {
1285 log::error!(
1286 target: LOG_TARGET,
1287 "Litep2p backend terminated"
1288 );
1289 return
1290 }
1291 },
1292 }
1293 }
1294 }
1295}