1use crate::{
22 config::{
23 FullNetworkConfiguration, IncomingRequest, NodeKeyConfig, NotificationHandshake, Params,
24 SetConfig, TransportConfig,
25 },
26 error::Error,
27 event::{DhtEvent, Event},
28 litep2p::{
29 bitswap::BitswapService,
30 discovery::{Discovery, DiscoveryEvent},
31 ipfs_dht::IpfsDht,
32 peerstore::Peerstore,
33 service::{Litep2pNetworkService, NetworkServiceCommand},
34 shim::{
35 notification::{
36 config::{NotificationProtocolConfig, ProtocolControlHandle},
37 peerset::PeersetCommand,
38 },
39 request_response::{RequestResponseConfig, RequestResponseProtocol},
40 },
41 },
42 peer_store::PeerStoreProvider,
43 service::{
44 metrics::{register_without_sources, MetricSources, Metrics, NotificationMetrics},
45 out_events,
46 traits::{BandwidthSink, NetworkBackend, NetworkService},
47 },
48 NetworkStatus, NotificationService, ProtocolName,
49};
50
51use codec::Encode;
52use futures::StreamExt;
53use litep2p::{
54 config::ConfigBuilder,
55 crypto::ed25519::Keypair,
56 error::{DialError, NegotiationError},
57 executor::Executor,
58 protocol::{
59 libp2p::kademlia::{QueryId, Record},
60 request_response::ConfigBuilder as RequestResponseConfigBuilder,
61 },
62 transport::{
63 tcp::config::Config as TcpTransportConfig,
64 websocket::config::Config as WebSocketTransportConfig, ConnectionLimitsConfig, Endpoint,
65 },
66 types::{
67 multiaddr::{Multiaddr, Protocol},
68 ConnectionId,
69 },
70 Litep2p, Litep2pEvent, ProtocolName as Litep2pProtocolName,
71};
72use prometheus_endpoint::Registry;
73use sc_network_types::kad::{Key as RecordKey, PeerRecord, Record as P2PRecord};
74
75use sc_client_api::BlockBackend;
76use sc_network_common::{role::Roles, ExHashT};
77use sc_network_types::PeerId;
78use sc_utils::mpsc::{tracing_unbounded, TracingUnboundedReceiver};
79use sp_runtime::traits::Block as BlockT;
80
81use std::{
82 cmp,
83 collections::{hash_map::Entry, HashMap, HashSet},
84 fs,
85 future::Future,
86 iter,
87 pin::Pin,
88 sync::{
89 atomic::{AtomicUsize, Ordering},
90 Arc,
91 },
92 time::{Duration, Instant},
93};
94
95mod bitswap;
96mod bitswap_metrics;
97mod discovery;
98mod ipfs_dht;
99mod peerstore;
100mod service;
101mod shim;
102
103struct Litep2pBandwidthSink {
105 sink: litep2p::BandwidthSink,
106}
107
108impl BandwidthSink for Litep2pBandwidthSink {
109 fn total_inbound(&self) -> u64 {
110 self.sink.inbound() as u64
111 }
112
113 fn total_outbound(&self) -> u64 {
114 self.sink.outbound() as u64
115 }
116}
117
118struct Litep2pExecutor {
120 executor: Box<dyn Fn(Pin<Box<dyn Future<Output = ()> + Send>>) + Send + Sync>,
122}
123
124impl Executor for Litep2pExecutor {
125 fn run(&self, future: Pin<Box<dyn Future<Output = ()> + Send>>) {
126 (self.executor)(future)
127 }
128
129 fn run_with_name(&self, _: &'static str, future: Pin<Box<dyn Future<Output = ()> + Send>>) {
130 (self.executor)(future)
131 }
132}
133
134const LOG_TARGET: &str = "sub-libp2p";
136
137struct ConnectionContext {
139 endpoints: HashMap<ConnectionId, Endpoint>,
141
142 num_connections: usize,
144}
145
146#[derive(Debug)]
148enum KadQuery {
149 FindNode(PeerId, Instant),
151 GetValue(RecordKey, Instant),
153 PutValue(RecordKey, Instant),
155 GetProviders(RecordKey, Instant),
157 AddProvider(RecordKey, Instant),
159}
160
161pub struct Litep2pNetworkBackend {
163 litep2p: Litep2p,
165
166 network_service: Arc<dyn NetworkService>,
168
169 cmd_rx: TracingUnboundedReceiver<NetworkServiceCommand>,
171
172 peerset_handles: HashMap<ProtocolName, ProtocolControlHandle>,
174
175 pending_queries: HashMap<QueryId, KadQuery>,
177
178 discovery: Discovery,
180
181 num_connected: Arc<AtomicUsize>,
183
184 peers: HashMap<litep2p::PeerId, ConnectionContext>,
186
187 peerstore_handle: Arc<dyn PeerStoreProvider>,
189
190 block_announce_protocol: ProtocolName,
192
193 event_streams: out_events::OutChannels,
195
196 metrics: Option<Metrics>,
198}
199
200impl Litep2pNetworkBackend {
201 fn parse_addresses(
204 addresses: impl Iterator<Item = Multiaddr>,
205 ) -> HashMap<PeerId, Vec<Multiaddr>> {
206 addresses
207 .into_iter()
208 .filter_map(|address| match address.iter().next() {
209 Some(
210 Protocol::Dns(_) |
211 Protocol::Dns4(_) |
212 Protocol::Dns6(_) |
213 Protocol::Ip6(_) |
214 Protocol::Ip4(_),
215 ) => match address.iter().find(|protocol| std::matches!(protocol, Protocol::P2p(_)))
216 {
217 Some(Protocol::P2p(peer_id)) => Some((peer_id.into(), Some(address))),
218 _ => None,
219 },
220 Some(Protocol::P2p(peer_id)) => Some((peer_id.into(), None)),
221 _ => None,
222 })
223 .fold(HashMap::new(), |mut acc, (peer, maybe_address)| {
224 let entry = acc.entry(peer).or_default();
225 maybe_address.map(|address| entry.push(address));
226
227 acc
228 })
229 }
230
231 fn add_addresses(&mut self, peers: impl Iterator<Item = Multiaddr>) -> HashSet<PeerId> {
233 Self::parse_addresses(peers.into_iter())
234 .into_iter()
235 .filter_map(|(peer, addresses)| {
236 if addresses.is_empty() {
238 return Some(peer);
239 }
240
241 if self.litep2p.add_known_address(peer.into(), addresses.clone().into_iter()) == 0 {
242 log::warn!(
243 target: LOG_TARGET,
244 "couldn't add any addresses for {peer:?} and it won't be added as reserved peer",
245 );
246 return None;
247 }
248
249 self.peerstore_handle.add_known_peer(peer);
250 Some(peer)
251 })
252 .collect()
253 }
254}
255
256impl Litep2pNetworkBackend {
257 fn get_keypair(node_key: &NodeKeyConfig) -> Result<(Keypair, litep2p::PeerId), Error> {
259 let secret: litep2p::crypto::ed25519::SecretKey =
260 node_key.clone().into_keypair()?.secret().into();
261
262 let local_identity = Keypair::from(secret);
263 let local_public = local_identity.public();
264 let local_peer_id = local_public.to_peer_id();
265
266 Ok((local_identity, local_peer_id))
267 }
268
269 fn configure_transport<B: BlockT + 'static, H: ExHashT>(
271 config: &FullNetworkConfiguration<B, H, Self>,
272 ) -> ConfigBuilder {
273 let _ = match config.network_config.transport {
274 TransportConfig::MemoryOnly => panic!("memory transport not supported"),
275 TransportConfig::Normal { .. } => false,
276 };
277 let config_builder = ConfigBuilder::new();
278
279 let (tcp, websocket): (Vec<Option<_>>, Vec<Option<_>>) = config
280 .network_config
281 .listen_addresses
282 .iter()
283 .filter_map(|address| {
284 use sc_network_types::multiaddr::Protocol;
285
286 let mut iter = address.iter();
287
288 match iter.next() {
289 Some(Protocol::Ip4(_) | Protocol::Ip6(_)) => {},
290 protocol => {
291 log::error!(
292 target: LOG_TARGET,
293 "unknown protocol {protocol:?}, ignoring {address:?}",
294 );
295
296 return None;
297 },
298 }
299
300 match iter.next() {
301 Some(Protocol::Tcp(_)) => match iter.next() {
302 Some(Protocol::Ws(_) | Protocol::Wss(_)) => {
303 Some((None, Some(address.clone())))
304 },
305 Some(Protocol::P2p(_)) | None => Some((Some(address.clone()), None)),
306 protocol => {
307 log::error!(
308 target: LOG_TARGET,
309 "unknown protocol {protocol:?}, ignoring {address:?}",
310 );
311 None
312 },
313 },
314 protocol => {
315 log::error!(
316 target: LOG_TARGET,
317 "unknown protocol {protocol:?}, ignoring {address:?}",
318 );
319 None
320 },
321 }
322 })
323 .unzip();
324
325 config_builder
326 .with_websocket(WebSocketTransportConfig {
327 listen_addresses: websocket.into_iter().flatten().map(Into::into).collect(),
328 yamux_config: litep2p::yamux::Config::default(),
329 nodelay: true,
330 ..Default::default()
331 })
332 .with_tcp(TcpTransportConfig {
333 listen_addresses: tcp.into_iter().flatten().map(Into::into).collect(),
334 yamux_config: litep2p::yamux::Config::default(),
335 nodelay: true,
336 ..Default::default()
337 })
338 }
339}
340
341#[async_trait::async_trait]
342impl<B: BlockT + 'static, H: ExHashT> NetworkBackend<B, H> for Litep2pNetworkBackend {
343 type NotificationProtocolConfig = NotificationProtocolConfig;
344 type RequestResponseProtocolConfig = RequestResponseConfig;
345 type NetworkService<Block, Hash> = Arc<Litep2pNetworkService>;
346 type PeerStore = Peerstore;
347 type BitswapConfig = bitswap::BitswapConfig;
348
349 fn new(mut params: Params<B, H, Self>) -> Result<Self, Error>
350 where
351 Self: Sized,
352 {
353 let (keypair, local_peer_id) =
354 Self::get_keypair(¶ms.network_config.network_config.node_key)?;
355 let (cmd_tx, cmd_rx) = tracing_unbounded("mpsc_network_worker", 100_000);
356
357 params.network_config.network_config.boot_nodes = params
358 .network_config
359 .network_config
360 .boot_nodes
361 .into_iter()
362 .filter(|boot_node| boot_node.peer_id != local_peer_id.into())
363 .collect();
364 params.network_config.network_config.default_peers_set.reserved_nodes = params
365 .network_config
366 .network_config
367 .default_peers_set
368 .reserved_nodes
369 .into_iter()
370 .filter(|reserved_node| {
371 if reserved_node.peer_id == local_peer_id.into() {
372 log::warn!(
373 target: LOG_TARGET,
374 "Local peer ID used in reserved node, ignoring: {reserved_node}",
375 );
376 false
377 } else {
378 true
379 }
380 })
381 .collect();
382
383 if let Some(path) = ¶ms.network_config.network_config.net_config_path {
384 fs::create_dir_all(path)?;
385 }
386
387 log::info!(target: LOG_TARGET, "Local node identity is: {local_peer_id}");
388 log::info!(target: LOG_TARGET, "Running litep2p network backend");
389
390 params.network_config.sanity_check_addresses()?;
391 params.network_config.sanity_check_bootnodes()?;
392
393 let mut config_builder =
394 Self::configure_transport(¶ms.network_config).with_keypair(keypair.clone());
395 let known_addresses = params.network_config.known_addresses();
396 let peer_store_handle = params.network_config.peer_store_handle();
397 let executor = Arc::new(Litep2pExecutor { executor: params.executor });
398
399 let FullNetworkConfiguration {
400 notification_protocols,
401 request_response_protocols,
402 network_config,
403 ..
404 } = params.network_config;
405
406 let block_announce_protocol = params.block_announce_config.protocol_name().clone();
412 let mut notif_protocols = HashMap::from_iter([(
413 params.block_announce_config.protocol_name().clone(),
414 params.block_announce_config.handle,
415 )]);
416
417 config_builder = notification_protocols
419 .into_iter()
420 .fold(config_builder, |config_builder, mut config| {
421 config.config.set_handshake(Roles::from(¶ms.role).encode());
422 notif_protocols.insert(config.protocol_name, config.handle);
423
424 config_builder.with_notification_protocol(config.config)
425 })
426 .with_notification_protocol(params.block_announce_config.config);
427
428 let metrics = match ¶ms.metrics_registry {
430 Some(registry) => Some(register_without_sources(registry)?),
431 None => None,
432 };
433
434 let (mut request_response_receivers, request_response_senders): (
440 HashMap<_, _>,
441 HashMap<_, _>,
442 ) = request_response_protocols
443 .iter()
444 .map(|config| {
445 let (tx, rx) = tracing_unbounded("outbound-requests", 10_000);
446 ((config.protocol_name.clone(), rx), (config.protocol_name.clone(), tx))
447 })
448 .unzip();
449
450 config_builder = request_response_protocols.into_iter().fold(
451 config_builder,
452 |config_builder, config| {
453 let (protocol_config, handle) = RequestResponseConfigBuilder::new(
454 Litep2pProtocolName::from(config.protocol_name.clone()),
455 )
456 .with_max_size(cmp::max(config.max_request_size, config.max_response_size) as usize)
457 .with_fallback_names(config.fallback_names.into_iter().map(From::from).collect())
458 .with_timeout(config.request_timeout)
459 .build();
460
461 let protocol = RequestResponseProtocol::new(
462 config.protocol_name.clone(),
463 handle,
464 Arc::clone(&peer_store_handle),
465 config.inbound_queue,
466 request_response_receivers
467 .remove(&config.protocol_name)
468 .expect("receiver exists as it was just added and there are no duplicate protocols; qed"),
469 request_response_senders.clone(),
470 metrics.clone(),
471 );
472
473 executor.run(Box::pin(async move {
474 protocol.run().await;
475 }));
476
477 config_builder.with_request_response_protocol(protocol_config)
478 },
479 );
480
481 let known_addresses: HashMap<litep2p::PeerId, Vec<Multiaddr>> =
483 known_addresses.into_iter().fold(HashMap::new(), |mut acc, (peer, address)| {
484 use sc_network_types::multiaddr::Protocol;
485
486 let address = match address.iter().last() {
487 Some(Protocol::Ws(_) | Protocol::Wss(_) | Protocol::Tcp(_)) => {
488 address.with(Protocol::P2p(peer.into()))
489 },
490 Some(Protocol::P2p(_)) => address,
491 _ => return acc,
492 };
493
494 acc.entry(peer.into()).or_default().push(address.into());
495 peer_store_handle.add_known_peer(peer);
496
497 acc
498 });
499
500 let listen_addresses = Arc::new(Default::default());
502 let (discovery, ping_config, identify_config, kademlia_config, maybe_mdns_config) =
503 Discovery::new(
504 local_peer_id,
505 &network_config,
506 params.genesis_hash,
507 params.fork_id.as_deref(),
508 ¶ms.protocol_id,
509 known_addresses.clone(),
510 Arc::clone(&listen_addresses),
511 Arc::clone(&peer_store_handle),
512 );
513
514 let bitswap_cmd_tx = params.ipfs_config.as_ref().map(|c| c.bitswap_config.cmd_tx.clone());
515
516 if let Some(config) = params.ipfs_config {
518 config_builder =
519 config_builder.with_libp2p_bitswap(config.bitswap_config.litep2p_config);
520
521 if !config.bootnodes.is_empty() {
522 let (ipfs_dht, kad_config) = IpfsDht::new(config.bootnodes, config.block_provider);
523 config_builder = config_builder.with_libp2p_kademlia(kad_config);
524 executor.run(Box::pin(ipfs_dht.run()));
525 } else {
526 log::warn!(
527 target: LOG_TARGET,
528 "Not starting IPFS DHT publisher because no IPFS bootnodes are configured. \
529 Only direct Bitswap requests will be handled.",
530 );
531 }
532 }
533
534 config_builder = config_builder
535 .with_known_addresses(known_addresses.clone().into_iter())
536 .with_libp2p_ping(ping_config)
537 .with_libp2p_identify(identify_config)
538 .with_libp2p_kademlia(kademlia_config)
539 .with_connection_limits(ConnectionLimitsConfig::default().max_incoming_connections(
540 Some(crate::MAX_CONNECTIONS_ESTABLISHED_INCOMING as usize),
541 ))
542 .with_keep_alive_timeout(network_config.idle_connection_timeout)
543 .with_system_resolver()
546 .with_executor(executor);
547
548 if let Some(config) = maybe_mdns_config {
549 config_builder = config_builder.with_mdns(config);
550 }
551
552 let litep2p =
553 Litep2p::new(config_builder.build()).map_err(|error| Error::Litep2p(error))?;
554
555 litep2p.listen_addresses().for_each(|address| {
556 log::debug!(target: LOG_TARGET, "listening on: {address}");
557
558 listen_addresses.write().insert(address.clone());
559 });
560
561 let public_addresses = litep2p.public_addresses();
562 for address in network_config.public_addresses.iter() {
563 if let Err(err) = public_addresses.add_address(address.clone().into()) {
564 log::warn!(
565 target: LOG_TARGET,
566 "failed to add public address {address:?}: {err:?}",
567 );
568 }
569 }
570
571 let network_service = Arc::new(Litep2pNetworkService::new(
572 local_peer_id,
573 keypair.clone(),
574 cmd_tx,
575 Arc::clone(&peer_store_handle),
576 notif_protocols.clone(),
577 block_announce_protocol.clone(),
578 request_response_senders,
579 Arc::clone(&listen_addresses),
580 public_addresses,
581 bitswap_cmd_tx,
582 ));
583
584 let num_connected = Arc::new(Default::default());
586 let bandwidth: Arc<dyn BandwidthSink> =
587 Arc::new(Litep2pBandwidthSink { sink: litep2p.bandwidth_sink() });
588
589 if let Some(registry) = ¶ms.metrics_registry {
590 MetricSources::register(registry, bandwidth, Arc::clone(&num_connected))?;
591 }
592
593 Ok(Self {
594 network_service,
595 cmd_rx,
596 metrics,
597 peerset_handles: notif_protocols,
598 num_connected,
599 discovery,
600 pending_queries: HashMap::new(),
601 peerstore_handle: peer_store_handle,
602 block_announce_protocol,
603 event_streams: out_events::OutChannels::new(None)?,
604 peers: HashMap::new(),
605 litep2p,
606 })
607 }
608
609 fn network_service(&self) -> Arc<dyn NetworkService> {
610 Arc::clone(&self.network_service)
611 }
612
613 fn peer_store(
614 bootnodes: Vec<sc_network_types::PeerId>,
615 metrics_registry: Option<Registry>,
616 ) -> Self::PeerStore {
617 Peerstore::new(bootnodes, metrics_registry)
618 }
619
620 fn register_notification_metrics(registry: Option<&Registry>) -> NotificationMetrics {
621 NotificationMetrics::new(registry)
622 }
623
624 fn bitswap_server(
626 client: Arc<dyn BlockBackend<B> + Send + Sync>,
627 metrics_registry: Option<Registry>,
628 ) -> (Pin<Box<dyn Future<Output = ()> + Send>>, Self::BitswapConfig) {
629 BitswapService::new(client, metrics_registry.as_ref())
630 }
631
632 fn notification_config(
634 protocol_name: ProtocolName,
635 fallback_names: Vec<ProtocolName>,
636 max_notification_size: u64,
637 handshake: Option<NotificationHandshake>,
638 set_config: SetConfig,
639 metrics: NotificationMetrics,
640 peerstore_handle: Arc<dyn PeerStoreProvider>,
641 ) -> (Self::NotificationProtocolConfig, Box<dyn NotificationService>) {
642 Self::NotificationProtocolConfig::new(
643 protocol_name,
644 fallback_names,
645 max_notification_size as usize,
646 handshake,
647 set_config,
648 metrics,
649 peerstore_handle,
650 )
651 }
652
653 fn request_response_config(
655 protocol_name: ProtocolName,
656 fallback_names: Vec<ProtocolName>,
657 max_request_size: u64,
658 max_response_size: u64,
659 request_timeout: Duration,
660 inbound_queue: Option<async_channel::Sender<IncomingRequest>>,
661 ) -> Self::RequestResponseProtocolConfig {
662 Self::RequestResponseProtocolConfig::new(
663 protocol_name,
664 fallback_names,
665 max_request_size,
666 max_response_size,
667 request_timeout,
668 inbound_queue,
669 )
670 }
671
672 async fn run(mut self) {
674 log::debug!(target: LOG_TARGET, "starting litep2p network backend");
675
676 loop {
677 let num_connected_peers = self
678 .peerset_handles
679 .get(&self.block_announce_protocol)
680 .map_or(0usize, |handle| handle.connected_peers.load(Ordering::Relaxed));
681 self.num_connected.store(num_connected_peers, Ordering::Relaxed);
682
683 tokio::select! {
684 command = self.cmd_rx.next() => match command {
685 None => return,
686 Some(command) => match command {
687 NetworkServiceCommand::FindClosestPeers { target } => {
688 let query_id = self.discovery.find_node(target.into()).await;
689 self.pending_queries.insert(query_id, KadQuery::FindNode(target, Instant::now()));
690 }
691 NetworkServiceCommand::GetValue{ key } => {
692 let query_id = self.discovery.get_value(key.clone()).await;
693 self.pending_queries.insert(query_id, KadQuery::GetValue(key, Instant::now()));
694 }
695 NetworkServiceCommand::PutValue { key, value } => {
696 let query_id = self.discovery.put_value(key.clone(), value).await;
697 self.pending_queries.insert(query_id, KadQuery::PutValue(key, Instant::now()));
698 }
699 NetworkServiceCommand::PutValueTo { record, peers, update_local_storage} => {
700 let kademlia_key = record.key.clone();
701 let query_id = self.discovery.put_value_to_peers(record.into(), peers, update_local_storage).await;
702 self.pending_queries.insert(query_id, KadQuery::PutValue(kademlia_key, Instant::now()));
703 }
704 NetworkServiceCommand::StoreRecord { key, value, publisher, expires } => {
705 self.discovery.store_record(key, value, publisher.map(Into::into), expires).await;
706 }
707 NetworkServiceCommand::StartProviding { key } => {
708 let query_id = self.discovery.start_providing(key.clone()).await;
709 self.pending_queries.insert(query_id, KadQuery::AddProvider(key, Instant::now()));
710 }
711 NetworkServiceCommand::StopProviding { key } => {
712 self.discovery.stop_providing(key).await;
713 }
714 NetworkServiceCommand::GetProviders { key } => {
715 let query_id = self.discovery.get_providers(key.clone()).await;
716 self.pending_queries.insert(query_id, KadQuery::GetProviders(key, Instant::now()));
717 }
718 NetworkServiceCommand::EventStream { tx } => {
719 self.event_streams.push(tx);
720 }
721 NetworkServiceCommand::Status { tx } => {
722 let _ = tx.send(NetworkStatus {
723 num_connected_peers: self
724 .peerset_handles
725 .get(&self.block_announce_protocol)
726 .map_or(0usize, |handle| handle.connected_peers.load(Ordering::Relaxed)),
727 total_bytes_inbound: self.litep2p.bandwidth_sink().inbound() as u64,
728 total_bytes_outbound: self.litep2p.bandwidth_sink().outbound() as u64,
729 });
730 }
731 NetworkServiceCommand::AddPeersToReservedSet {
732 protocol,
733 peers,
734 } => {
735 let peers = self.add_addresses(peers.into_iter().map(Into::into));
736
737 match self.peerset_handles.get(&protocol) {
738 Some(handle) => {
739 let _ = handle.tx.unbounded_send(PeersetCommand::AddReservedPeers { peers });
740 }
741 None => log::warn!(target: LOG_TARGET, "protocol {protocol} doens't exist"),
742 };
743 }
744 NetworkServiceCommand::AddKnownAddress { peer, address } => {
745 let mut address: Multiaddr = address.into();
746
747 if !address.iter().any(|protocol| std::matches!(protocol, Protocol::P2p(_))) {
748 address.push(Protocol::P2p(litep2p::PeerId::from(peer).into()));
749 }
750
751 if self.litep2p.add_known_address(peer.into(), iter::once(address.clone())) > 0 {
752 self.peerstore_handle.add_known_peer(peer);
756 } else {
757 log::debug!(
758 target: LOG_TARGET,
759 "couldn't add known address ({address}) for {peer:?}, unsupported transport"
760 );
761 }
762 },
763 NetworkServiceCommand::SetReservedPeers { protocol, peers } => {
764 let peers = self.add_addresses(peers.into_iter().map(Into::into));
765
766 match self.peerset_handles.get(&protocol) {
767 Some(handle) => {
768 let _ = handle.tx.unbounded_send(PeersetCommand::SetReservedPeers { peers });
769 }
770 None => log::warn!(target: LOG_TARGET, "protocol {protocol} doens't exist"),
771 }
772
773 },
774 NetworkServiceCommand::DisconnectPeer {
775 protocol,
776 peer,
777 } => {
778 let Some(handle) = self.peerset_handles.get(&protocol) else {
779 log::warn!(target: LOG_TARGET, "protocol {protocol} doens't exist");
780 continue
781 };
782
783 let _ = handle.tx.unbounded_send(PeersetCommand::DisconnectPeer { peer });
784 }
785 NetworkServiceCommand::SetReservedOnly {
786 protocol,
787 reserved_only,
788 } => {
789 let Some(handle) = self.peerset_handles.get(&protocol) else {
790 log::warn!(target: LOG_TARGET, "protocol {protocol} doens't exist");
791 continue
792 };
793
794 let _ = handle.tx.unbounded_send(PeersetCommand::SetReservedOnly { reserved_only });
795 }
796 NetworkServiceCommand::RemoveReservedPeers {
797 protocol,
798 peers,
799 } => {
800 let Some(handle) = self.peerset_handles.get(&protocol) else {
801 log::warn!(target: LOG_TARGET, "protocol {protocol} doens't exist");
802 continue
803 };
804
805 let _ = handle.tx.unbounded_send(PeersetCommand::RemoveReservedPeers { peers });
806 }
807 }
808 },
809 event = self.discovery.next() => match event {
810 None => return,
811 Some(DiscoveryEvent::Discovered { addresses }) => {
812 for (peer, addresses) in Litep2pNetworkBackend::parse_addresses(addresses.into_iter()) {
814 if self.litep2p.add_known_address(peer.into(), addresses.clone().into_iter()) > 0 {
815 self.peerstore_handle.add_known_peer(peer);
816 }
817 }
818 }
819 Some(DiscoveryEvent::RoutingTableUpdate { peers }) => {
820 for peer in peers {
821 self.peerstore_handle.add_known_peer(peer.into());
822 }
823 }
824 Some(DiscoveryEvent::FindNodeSuccess { query_id, target, peers }) => {
825 match self.pending_queries.remove(&query_id) {
826 Some(KadQuery::FindNode(_, started)) => {
827 log::trace!(
828 target: LOG_TARGET,
829 "`FIND_NODE` for {target:?} ({query_id:?}) succeeded",
830 );
831
832 self.event_streams.send(
833 Event::Dht(
834 DhtEvent::ClosestPeersFound(
835 target.into(),
836 peers
837 .into_iter()
838 .map(|(peer, addrs)| (
839 peer.into(),
840 addrs.into_iter().map(Into::into).collect(),
841 ))
842 .collect(),
843 )
844 )
845 );
846
847 if let Some(ref metrics) = self.metrics {
848 metrics
849 .kademlia_query_duration
850 .with_label_values(&["node-find"])
851 .observe(started.elapsed().as_secs_f64());
852 }
853 },
854 query => {
855 log::error!(
856 target: LOG_TARGET,
857 "Missing/invalid pending query for `FIND_NODE`: {query:?}"
858 );
859 debug_assert!(false);
860 }
861 }
862 },
863 Some(DiscoveryEvent::GetRecordPartialResult { query_id, record }) => {
864 if !self.pending_queries.contains_key(&query_id) {
865 log::error!(
866 target: LOG_TARGET,
867 "Missing/invalid pending query for `GET_VALUE` partial result: {query_id:?}"
868 );
869
870 continue
871 }
872
873 let peer_id: sc_network_types::PeerId = record.peer.into();
874 let record = PeerRecord {
875 record: P2PRecord {
876 key: record.record.key.to_vec().into(),
877 value: record.record.value,
878 publisher: record.record.publisher.map(|peer_id| {
879 let peer_id: sc_network_types::PeerId = peer_id.into();
880 peer_id.into()
881 }),
882 expires: record.record.expires,
883 },
884 peer: Some(peer_id.into()),
885 };
886
887 self.event_streams.send(
888 Event::Dht(
889 DhtEvent::ValueFound(
890 record.into()
891 )
892 )
893 );
894 }
895 Some(DiscoveryEvent::GetRecordSuccess { query_id }) => {
896 match self.pending_queries.remove(&query_id) {
897 Some(KadQuery::GetValue(key, started)) => {
898 log::trace!(
899 target: LOG_TARGET,
900 "`GET_VALUE` for {key:?} ({query_id:?}) succeeded",
901 );
902
903 if let Some(ref metrics) = self.metrics {
904 metrics
905 .kademlia_query_duration
906 .with_label_values(&["value-get"])
907 .observe(started.elapsed().as_secs_f64());
908 }
909 },
910 query => {
911 log::error!(
912 target: LOG_TARGET,
913 "Missing/invalid pending query for `GET_VALUE`: {query:?}"
914 );
915 debug_assert!(false);
916 },
917 }
918 }
919 Some(DiscoveryEvent::PutRecordSuccess { query_id }) => {
920 match self.pending_queries.remove(&query_id) {
921 Some(KadQuery::PutValue(key, started)) => {
922 log::trace!(
923 target: LOG_TARGET,
924 "`PUT_VALUE` for {key:?} ({query_id:?}) succeeded",
925 );
926
927 self.event_streams.send(Event::Dht(
928 DhtEvent::ValuePut(key)
929 ));
930
931 if let Some(ref metrics) = self.metrics {
932 metrics
933 .kademlia_query_duration
934 .with_label_values(&["value-put"])
935 .observe(started.elapsed().as_secs_f64());
936 }
937 },
938 query => {
939 log::error!(
940 target: LOG_TARGET,
941 "Missing/invalid pending query for `PUT_VALUE`: {query:?}"
942 );
943 debug_assert!(false);
944 }
945 }
946 }
947 Some(DiscoveryEvent::GetProvidersSuccess { query_id, providers }) => {
948 match self.pending_queries.remove(&query_id) {
949 Some(KadQuery::GetProviders(key, started)) => {
950 log::trace!(
951 target: LOG_TARGET,
952 "`GET_PROVIDERS` for {key:?} ({query_id:?}) succeeded",
953 );
954
955 providers.iter().for_each(|p| {
960 self.litep2p.add_known_address(p.peer, p.addresses.clone().into_iter());
961 });
962
963 self.event_streams.send(Event::Dht(
964 DhtEvent::ProvidersFound(
965 key.clone().into(),
966 providers.into_iter().map(|p| p.peer.into()).collect()
967 )
968 ));
969
970 self.event_streams.send(Event::Dht(
973 DhtEvent::NoMoreProviders(key.into())
974 ));
975
976 if let Some(ref metrics) = self.metrics {
977 metrics
978 .kademlia_query_duration
979 .with_label_values(&["providers-get"])
980 .observe(started.elapsed().as_secs_f64());
981 }
982 },
983 query => {
984 log::error!(
985 target: LOG_TARGET,
986 "Missing/invalid pending query for `GET_PROVIDERS`: {query:?}"
987 );
988 debug_assert!(false);
989 }
990 }
991 }
992 Some(DiscoveryEvent::AddProviderSuccess { query_id, provided_key }) => {
993 match self.pending_queries.remove(&query_id) {
994 Some(KadQuery::AddProvider(key, started)) => {
995 debug_assert_eq!(key, provided_key.into());
996
997 log::trace!(
998 target: LOG_TARGET,
999 "`ADD_PROVIDER` for {key:?} ({query_id:?}) succeeded",
1000 );
1001
1002 self.event_streams.send(Event::Dht(
1003 DhtEvent::StartedProviding(key.into())
1004 ));
1005
1006 if let Some(ref metrics) = self.metrics {
1007 metrics
1008 .kademlia_query_duration
1009 .with_label_values(&["provider-add"])
1010 .observe(started.elapsed().as_secs_f64());
1011 }
1012 }
1013 Some(_) => {
1014 log::error!(
1015 target: LOG_TARGET,
1016 "Invalid pending query for `ADD_PROVIDER`: {query_id:?}"
1017 );
1018 debug_assert!(false);
1019 }
1020 None => {
1021 log::trace!(
1022 target: LOG_TARGET,
1023 "`ADD_PROVIDER` for key {provided_key:?} ({query_id:?}) succeeded (republishing)",
1024 );
1025 }
1026 }
1027 }
1028 Some(DiscoveryEvent::QueryFailed { query_id }) => {
1029 match self.pending_queries.remove(&query_id) {
1030 Some(KadQuery::FindNode(peer_id, started)) => {
1031 log::debug!(
1032 target: LOG_TARGET,
1033 "`FIND_NODE` ({query_id:?}) failed for target {peer_id:?}",
1034 );
1035
1036 self.event_streams.send(Event::Dht(
1037 DhtEvent::ClosestPeersNotFound(peer_id.into())
1038 ));
1039
1040 if let Some(ref metrics) = self.metrics {
1041 metrics
1042 .kademlia_query_duration
1043 .with_label_values(&["node-find-failed"])
1044 .observe(started.elapsed().as_secs_f64());
1045 }
1046 },
1047 Some(KadQuery::GetValue(key, started)) => {
1048 log::debug!(
1049 target: LOG_TARGET,
1050 "`GET_VALUE` ({query_id:?}) failed for key {key:?}",
1051 );
1052
1053 self.event_streams.send(Event::Dht(
1054 DhtEvent::ValueNotFound(key)
1055 ));
1056
1057 if let Some(ref metrics) = self.metrics {
1058 metrics
1059 .kademlia_query_duration
1060 .with_label_values(&["value-get-failed"])
1061 .observe(started.elapsed().as_secs_f64());
1062 }
1063 },
1064 Some(KadQuery::PutValue(key, started)) => {
1065 log::debug!(
1066 target: LOG_TARGET,
1067 "`PUT_VALUE` ({query_id:?}) failed for key {key:?}",
1068 );
1069
1070 self.event_streams.send(Event::Dht(
1071 DhtEvent::ValuePutFailed(key)
1072 ));
1073
1074 if let Some(ref metrics) = self.metrics {
1075 metrics
1076 .kademlia_query_duration
1077 .with_label_values(&["value-put-failed"])
1078 .observe(started.elapsed().as_secs_f64());
1079 }
1080 },
1081 Some(KadQuery::GetProviders(key, started)) => {
1082 log::debug!(
1083 target: LOG_TARGET,
1084 "`GET_PROVIDERS` ({query_id:?}) failed for key {key:?}"
1085 );
1086
1087 self.event_streams.send(Event::Dht(
1088 DhtEvent::ProvidersNotFound(key)
1089 ));
1090
1091 if let Some(ref metrics) = self.metrics {
1092 metrics
1093 .kademlia_query_duration
1094 .with_label_values(&["providers-get-failed"])
1095 .observe(started.elapsed().as_secs_f64());
1096 }
1097 },
1098 Some(KadQuery::AddProvider(key, started)) => {
1099 log::debug!(
1100 target: LOG_TARGET,
1101 "`ADD_PROVIDER` ({query_id:?}) failed with key {key:?}",
1102 );
1103
1104 self.event_streams.send(Event::Dht(
1105 DhtEvent::StartProvidingFailed(key)
1106 ));
1107
1108 if let Some(ref metrics) = self.metrics {
1109 metrics
1110 .kademlia_query_duration
1111 .with_label_values(&["provider-add-failed"])
1112 .observe(started.elapsed().as_secs_f64());
1113 }
1114 },
1115 None => {
1116 log::debug!(
1117 target: LOG_TARGET,
1118 "non-existent query (likely republishing a provider) failed ({query_id:?})",
1119 );
1120 }
1121 }
1122 }
1123 Some(DiscoveryEvent::Identified { peer, listen_addresses, supported_protocols, .. }) => {
1124 self.discovery.add_self_reported_address(peer, supported_protocols, listen_addresses).await;
1125 }
1126 Some(DiscoveryEvent::ExternalAddressDiscovered { address }) => {
1127 match self.litep2p.public_addresses().add_address(address.clone().into()) {
1128 Ok(inserted) => if inserted {
1129 log::info!(target: LOG_TARGET, "🔍 Discovered new external address for our node: {address}");
1130 },
1131 Err(err) => {
1132 log::warn!(
1133 target: LOG_TARGET,
1134 "🔍 Failed to add discovered external address {address:?}: {err:?}",
1135 );
1136 },
1137 }
1138 }
1139 Some(DiscoveryEvent::ExternalAddressExpired{ address }) => {
1140 let local_peer_id = self.litep2p.local_peer_id();
1141
1142 let address = if !std::matches!(address.iter().last(), Some(Protocol::P2p(_))) {
1144 address.with(Protocol::P2p((*local_peer_id).into()))
1145 } else {
1146 address
1147 };
1148
1149 if self.litep2p.public_addresses().remove_address(&address) {
1150 log::info!(target: LOG_TARGET, "🔍 Expired external address for our node: {address}");
1151 } else {
1152 log::warn!(
1153 target: LOG_TARGET,
1154 "🔍 Failed to remove expired external address {address:?}"
1155 );
1156 }
1157 }
1158 Some(DiscoveryEvent::Ping { peer, rtt }) => {
1159 log::trace!(
1160 target: LOG_TARGET,
1161 "ping time with {peer:?}: {rtt:?}",
1162 );
1163 }
1164 Some(DiscoveryEvent::IncomingRecord { record: Record { key, value, publisher, expires }} ) => {
1165 self.event_streams.send(Event::Dht(
1166 DhtEvent::PutRecordRequest(
1167 key.into(),
1168 value,
1169 publisher.map(Into::into),
1170 expires,
1171 )
1172 ));
1173 },
1174
1175 Some(DiscoveryEvent::RandomKademliaStarted) => {
1176 if let Some(metrics) = self.metrics.as_ref() {
1177 metrics.kademlia_random_queries_total.inc();
1178 }
1179 }
1180 },
1181 event = self.litep2p.next_event() => match event {
1182 Some(Litep2pEvent::ConnectionEstablished { peer, endpoint }) => {
1183 let Some(metrics) = &self.metrics else {
1184 continue;
1185 };
1186
1187 let direction = match endpoint {
1188 Endpoint::Dialer { .. } => "out",
1189 Endpoint::Listener { .. } => {
1190 metrics.incoming_connections_total.inc();
1195
1196 "in"
1197 },
1198 };
1199 metrics.connections_opened_total.with_label_values(&[direction]).inc();
1200
1201 match self.peers.entry(peer) {
1202 Entry::Vacant(entry) => {
1203 entry.insert(ConnectionContext {
1204 endpoints: HashMap::from_iter([(endpoint.connection_id(), endpoint)]),
1205 num_connections: 1usize,
1206 });
1207 metrics.distinct_peers_connections_opened_total.inc();
1208 }
1209 Entry::Occupied(entry) => {
1210 let entry = entry.into_mut();
1211 entry.num_connections += 1;
1212 entry.endpoints.insert(endpoint.connection_id(), endpoint);
1213 }
1214 }
1215 }
1216 Some(Litep2pEvent::ConnectionClosed { peer, connection_id }) => {
1217 let Some(metrics) = &self.metrics else {
1218 continue;
1219 };
1220
1221 let Some(context) = self.peers.get_mut(&peer) else {
1222 log::debug!(target: LOG_TARGET, "unknown peer disconnected: {peer:?} ({connection_id:?})");
1223 continue
1224 };
1225
1226 let direction = match context.endpoints.remove(&connection_id) {
1227 None => {
1228 log::debug!(target: LOG_TARGET, "connection {connection_id:?} doesn't exist for {peer:?} ");
1229 continue
1230 }
1231 Some(endpoint) => {
1232 context.num_connections -= 1;
1233
1234 match endpoint {
1235 Endpoint::Dialer { .. } => "out",
1236 Endpoint::Listener { .. } => "in",
1237 }
1238 }
1239 };
1240
1241 metrics.connections_closed_total.with_label_values(&[direction, "actively-closed"]).inc();
1242
1243 if context.num_connections == 0 {
1244 self.peers.remove(&peer);
1245 metrics.distinct_peers_connections_closed_total.inc();
1246 }
1247 }
1248 Some(Litep2pEvent::DialFailure { address, error }) => {
1249 log::debug!(
1250 target: LOG_TARGET,
1251 "failed to dial peer at {address:?}: {error:?}",
1252 );
1253
1254 if let Some(metrics) = &self.metrics {
1255 let reason = match error {
1256 DialError::Timeout => "timeout",
1257 DialError::AddressError(_) => "invalid-address",
1258 DialError::DnsError(_) => "cannot-resolve-dns",
1259 DialError::NegotiationError(error) => match error {
1260 NegotiationError::Timeout => "timeout",
1261 NegotiationError::PeerIdMissing => "missing-peer-id",
1262 NegotiationError::StateMismatch => "state-mismatch",
1263 NegotiationError::PeerIdMismatch(_,_) => "peer-id-missmatch",
1264 NegotiationError::MultistreamSelectError(_) => "multistream-select-error",
1265 NegotiationError::SnowError(_) => "noise-error",
1266 NegotiationError::ParseError(_) => "parse-error",
1267 NegotiationError::IoError(_) => "io-error",
1268 NegotiationError::WebSocket(_) => "webscoket-error",
1269 NegotiationError::BadSignature => "bad-signature",
1270 }
1271 };
1272
1273 metrics.pending_connections_errors_total.with_label_values(&[&reason]).inc();
1274 }
1275 }
1276 Some(Litep2pEvent::ListDialFailures { errors }) => {
1277 log::debug!(
1278 target: LOG_TARGET,
1279 "failed to dial peer on multiple addresses {errors:?}",
1280 );
1281
1282 if let Some(metrics) = &self.metrics {
1283 metrics.pending_connections_errors_total.with_label_values(&["transport-errors"]).inc();
1284 }
1285 }
1286 None => {
1287 log::error!(
1288 target: LOG_TARGET,
1289 "Litep2p backend terminated"
1290 );
1291 return
1292 }
1293 },
1294 }
1295 }
1296 }
1297}