1use crate::{
22 config::{
23 FullNetworkConfiguration, IncomingRequest, NodeKeyConfig, NotificationHandshake, Params,
24 SetConfig, TransportConfig,
25 },
26 error::Error,
27 event::{DhtEvent, Event},
28 litep2p::{
29 discovery::{Discovery, DiscoveryEvent},
30 peerstore::Peerstore,
31 service::{Litep2pNetworkService, NetworkServiceCommand},
32 shim::{
33 bitswap::BitswapServer,
34 notification::{
35 config::{NotificationProtocolConfig, ProtocolControlHandle},
36 peerset::PeersetCommand,
37 },
38 request_response::{RequestResponseConfig, RequestResponseProtocol},
39 },
40 },
41 peer_store::PeerStoreProvider,
42 service::{
43 metrics::{register_without_sources, MetricSources, Metrics, NotificationMetrics},
44 out_events,
45 traits::{BandwidthSink, NetworkBackend, NetworkService},
46 },
47 NetworkStatus, NotificationService, ProtocolName,
48};
49
50use codec::Encode;
51use futures::StreamExt;
52use litep2p::{
53 config::ConfigBuilder,
54 crypto::ed25519::Keypair,
55 error::{DialError, NegotiationError},
56 executor::Executor,
57 protocol::{
58 libp2p::{
59 bitswap::Config as BitswapConfig,
60 kademlia::{QueryId, Record},
61 },
62 request_response::ConfigBuilder as RequestResponseConfigBuilder,
63 },
64 transport::{
65 tcp::config::Config as TcpTransportConfig,
66 websocket::config::Config as WebSocketTransportConfig, ConnectionLimitsConfig, Endpoint,
67 },
68 types::{
69 multiaddr::{Multiaddr, Protocol},
70 ConnectionId,
71 },
72 Litep2p, Litep2pEvent, ProtocolName as Litep2pProtocolName,
73};
74use prometheus_endpoint::Registry;
75use sc_network_types::kad::{Key as RecordKey, PeerRecord, Record as P2PRecord};
76
77use sc_client_api::BlockBackend;
78use sc_network_common::{role::Roles, ExHashT};
79use sc_network_types::PeerId;
80use sc_utils::mpsc::{tracing_unbounded, TracingUnboundedReceiver};
81use sp_runtime::traits::Block as BlockT;
82
83use std::{
84 cmp,
85 collections::{hash_map::Entry, HashMap, HashSet},
86 fs,
87 future::Future,
88 iter,
89 pin::Pin,
90 sync::{
91 atomic::{AtomicUsize, Ordering},
92 Arc,
93 },
94 time::{Duration, Instant},
95};
96
97mod discovery;
98mod peerstore;
99mod service;
100mod shim;
101
102const KEEP_ALIVE_TIMEOUT: Duration = Duration::from_secs(10);
104
105struct Litep2pBandwidthSink {
107 sink: litep2p::BandwidthSink,
108}
109
110impl BandwidthSink for Litep2pBandwidthSink {
111 fn total_inbound(&self) -> u64 {
112 self.sink.inbound() as u64
113 }
114
115 fn total_outbound(&self) -> u64 {
116 self.sink.outbound() as u64
117 }
118}
119
120struct Litep2pExecutor {
122 executor: Box<dyn Fn(Pin<Box<dyn Future<Output = ()> + Send>>) + Send + Sync>,
124}
125
126impl Executor for Litep2pExecutor {
127 fn run(&self, future: Pin<Box<dyn Future<Output = ()> + Send>>) {
128 (self.executor)(future)
129 }
130
131 fn run_with_name(&self, _: &'static str, future: Pin<Box<dyn Future<Output = ()> + Send>>) {
132 (self.executor)(future)
133 }
134}
135
136const LOG_TARGET: &str = "sub-libp2p";
138
139struct ConnectionContext {
141 endpoints: HashMap<ConnectionId, Endpoint>,
143
144 num_connections: usize,
146}
147
148#[derive(Debug)]
150enum KadQuery {
151 FindNode(PeerId, Instant),
153 GetValue(RecordKey, Instant),
155 PutValue(RecordKey, Instant),
157 GetProviders(RecordKey, Instant),
159}
160
161pub struct Litep2pNetworkBackend {
163 litep2p: Litep2p,
165
166 network_service: Arc<dyn NetworkService>,
168
169 cmd_rx: TracingUnboundedReceiver<NetworkServiceCommand>,
171
172 peerset_handles: HashMap<ProtocolName, ProtocolControlHandle>,
174
175 pending_queries: HashMap<QueryId, KadQuery>,
177
178 discovery: Discovery,
180
181 num_connected: Arc<AtomicUsize>,
183
184 peers: HashMap<litep2p::PeerId, ConnectionContext>,
186
187 peerstore_handle: Arc<dyn PeerStoreProvider>,
189
190 block_announce_protocol: ProtocolName,
192
193 event_streams: out_events::OutChannels,
195
196 metrics: Option<Metrics>,
198}
199
200impl Litep2pNetworkBackend {
201 fn parse_addresses(
204 addresses: impl Iterator<Item = Multiaddr>,
205 ) -> HashMap<PeerId, Vec<Multiaddr>> {
206 addresses
207 .into_iter()
208 .filter_map(|address| match address.iter().next() {
209 Some(
210 Protocol::Dns(_) |
211 Protocol::Dns4(_) |
212 Protocol::Dns6(_) |
213 Protocol::Ip6(_) |
214 Protocol::Ip4(_),
215 ) => match address.iter().find(|protocol| std::matches!(protocol, Protocol::P2p(_)))
216 {
217 Some(Protocol::P2p(multihash)) => PeerId::from_multihash(multihash.into())
218 .map_or(None, |peer| Some((peer, Some(address)))),
219 _ => None,
220 },
221 Some(Protocol::P2p(multihash)) =>
222 PeerId::from_multihash(multihash.into()).map_or(None, |peer| Some((peer, None))),
223 _ => None,
224 })
225 .fold(HashMap::new(), |mut acc, (peer, maybe_address)| {
226 let entry = acc.entry(peer).or_default();
227 maybe_address.map(|address| entry.push(address));
228
229 acc
230 })
231 }
232
233 fn add_addresses(&mut self, peers: impl Iterator<Item = Multiaddr>) -> HashSet<PeerId> {
235 Self::parse_addresses(peers.into_iter())
236 .into_iter()
237 .filter_map(|(peer, addresses)| {
238 if addresses.is_empty() {
240 return Some(peer)
241 }
242
243 if self.litep2p.add_known_address(peer.into(), addresses.clone().into_iter()) == 0 {
244 log::warn!(
245 target: LOG_TARGET,
246 "couldn't add any addresses for {peer:?} and it won't be added as reserved peer",
247 );
248 return None
249 }
250
251 self.peerstore_handle.add_known_peer(peer);
252 Some(peer)
253 })
254 .collect()
255 }
256}
257
258impl Litep2pNetworkBackend {
259 fn get_keypair(node_key: &NodeKeyConfig) -> Result<(Keypair, litep2p::PeerId), Error> {
261 let secret: litep2p::crypto::ed25519::SecretKey =
262 node_key.clone().into_keypair()?.secret().into();
263
264 let local_identity = Keypair::from(secret);
265 let local_public = local_identity.public();
266 let local_peer_id = local_public.to_peer_id();
267
268 Ok((local_identity, local_peer_id))
269 }
270
271 fn configure_transport<B: BlockT + 'static, H: ExHashT>(
273 config: &FullNetworkConfiguration<B, H, Self>,
274 ) -> ConfigBuilder {
275 let _ = match config.network_config.transport {
276 TransportConfig::MemoryOnly => panic!("memory transport not supported"),
277 TransportConfig::Normal { .. } => false,
278 };
279 let config_builder = ConfigBuilder::new();
280
281 let (tcp, websocket): (Vec<Option<_>>, Vec<Option<_>>) = config
282 .network_config
283 .listen_addresses
284 .iter()
285 .filter_map(|address| {
286 use sc_network_types::multiaddr::Protocol;
287
288 let mut iter = address.iter();
289
290 match iter.next() {
291 Some(Protocol::Ip4(_) | Protocol::Ip6(_)) => {},
292 protocol => {
293 log::error!(
294 target: LOG_TARGET,
295 "unknown protocol {protocol:?}, ignoring {address:?}",
296 );
297
298 return None
299 },
300 }
301
302 match iter.next() {
303 Some(Protocol::Tcp(_)) => match iter.next() {
304 Some(Protocol::Ws(_) | Protocol::Wss(_)) =>
305 Some((None, Some(address.clone()))),
306 Some(Protocol::P2p(_)) | None => Some((Some(address.clone()), None)),
307 protocol => {
308 log::error!(
309 target: LOG_TARGET,
310 "unknown protocol {protocol:?}, ignoring {address:?}",
311 );
312 None
313 },
314 },
315 protocol => {
316 log::error!(
317 target: LOG_TARGET,
318 "unknown protocol {protocol:?}, ignoring {address:?}",
319 );
320 None
321 },
322 }
323 })
324 .unzip();
325
326 config_builder
327 .with_websocket(WebSocketTransportConfig {
328 listen_addresses: websocket.into_iter().flatten().map(Into::into).collect(),
329 yamux_config: litep2p::yamux::Config::default(),
330 nodelay: true,
331 ..Default::default()
332 })
333 .with_tcp(TcpTransportConfig {
334 listen_addresses: tcp.into_iter().flatten().map(Into::into).collect(),
335 yamux_config: litep2p::yamux::Config::default(),
336 nodelay: true,
337 ..Default::default()
338 })
339 }
340}
341
342#[async_trait::async_trait]
343impl<B: BlockT + 'static, H: ExHashT> NetworkBackend<B, H> for Litep2pNetworkBackend {
344 type NotificationProtocolConfig = NotificationProtocolConfig;
345 type RequestResponseProtocolConfig = RequestResponseConfig;
346 type NetworkService<Block, Hash> = Arc<Litep2pNetworkService>;
347 type PeerStore = Peerstore;
348 type BitswapConfig = BitswapConfig;
349
350 fn new(mut params: Params<B, H, Self>) -> Result<Self, Error>
351 where
352 Self: Sized,
353 {
354 let (keypair, local_peer_id) =
355 Self::get_keypair(¶ms.network_config.network_config.node_key)?;
356 let (cmd_tx, cmd_rx) = tracing_unbounded("mpsc_network_worker", 100_000);
357
358 params.network_config.network_config.boot_nodes = params
359 .network_config
360 .network_config
361 .boot_nodes
362 .into_iter()
363 .filter(|boot_node| boot_node.peer_id != local_peer_id.into())
364 .collect();
365 params.network_config.network_config.default_peers_set.reserved_nodes = params
366 .network_config
367 .network_config
368 .default_peers_set
369 .reserved_nodes
370 .into_iter()
371 .filter(|reserved_node| {
372 if reserved_node.peer_id == local_peer_id.into() {
373 log::warn!(
374 target: LOG_TARGET,
375 "Local peer ID used in reserved node, ignoring: {reserved_node}",
376 );
377 false
378 } else {
379 true
380 }
381 })
382 .collect();
383
384 if let Some(path) = ¶ms.network_config.network_config.net_config_path {
385 fs::create_dir_all(path)?;
386 }
387
388 log::info!(target: LOG_TARGET, "Local node identity is: {local_peer_id}");
389 log::info!(target: LOG_TARGET, "Running litep2p network backend");
390
391 params.network_config.sanity_check_addresses()?;
392 params.network_config.sanity_check_bootnodes()?;
393
394 let mut config_builder =
395 Self::configure_transport(¶ms.network_config).with_keypair(keypair.clone());
396 let known_addresses = params.network_config.known_addresses();
397 let peer_store_handle = params.network_config.peer_store_handle();
398 let executor = Arc::new(Litep2pExecutor { executor: params.executor });
399
400 let FullNetworkConfiguration {
401 notification_protocols,
402 request_response_protocols,
403 network_config,
404 ..
405 } = params.network_config;
406
407 let block_announce_protocol = params.block_announce_config.protocol_name().clone();
413 let mut notif_protocols = HashMap::from_iter([(
414 params.block_announce_config.protocol_name().clone(),
415 params.block_announce_config.handle,
416 )]);
417
418 config_builder = notification_protocols
420 .into_iter()
421 .fold(config_builder, |config_builder, mut config| {
422 config.config.set_handshake(Roles::from(¶ms.role).encode());
423 notif_protocols.insert(config.protocol_name, config.handle);
424
425 config_builder.with_notification_protocol(config.config)
426 })
427 .with_notification_protocol(params.block_announce_config.config);
428
429 let metrics = match ¶ms.metrics_registry {
431 Some(registry) => Some(register_without_sources(registry)?),
432 None => None,
433 };
434
435 let (mut request_response_receivers, request_response_senders): (
441 HashMap<_, _>,
442 HashMap<_, _>,
443 ) = request_response_protocols
444 .iter()
445 .map(|config| {
446 let (tx, rx) = tracing_unbounded("outbound-requests", 10_000);
447 ((config.protocol_name.clone(), rx), (config.protocol_name.clone(), tx))
448 })
449 .unzip();
450
451 config_builder = request_response_protocols.into_iter().fold(
452 config_builder,
453 |config_builder, config| {
454 let (protocol_config, handle) = RequestResponseConfigBuilder::new(
455 Litep2pProtocolName::from(config.protocol_name.clone()),
456 )
457 .with_max_size(cmp::max(config.max_request_size, config.max_response_size) as usize)
458 .with_fallback_names(config.fallback_names.into_iter().map(From::from).collect())
459 .with_timeout(config.request_timeout)
460 .build();
461
462 let protocol = RequestResponseProtocol::new(
463 config.protocol_name.clone(),
464 handle,
465 Arc::clone(&peer_store_handle),
466 config.inbound_queue,
467 request_response_receivers
468 .remove(&config.protocol_name)
469 .expect("receiver exists as it was just added and there are no duplicate protocols; qed"),
470 request_response_senders.clone(),
471 metrics.clone(),
472 );
473
474 executor.run(Box::pin(async move {
475 protocol.run().await;
476 }));
477
478 config_builder.with_request_response_protocol(protocol_config)
479 },
480 );
481
482 let known_addresses: HashMap<litep2p::PeerId, Vec<Multiaddr>> =
484 known_addresses.into_iter().fold(HashMap::new(), |mut acc, (peer, address)| {
485 use sc_network_types::multiaddr::Protocol;
486
487 let address = match address.iter().last() {
488 Some(Protocol::Ws(_) | Protocol::Wss(_) | Protocol::Tcp(_)) =>
489 address.with(Protocol::P2p(peer.into())),
490 Some(Protocol::P2p(_)) => address,
491 _ => return acc,
492 };
493
494 acc.entry(peer.into()).or_default().push(address.into());
495 peer_store_handle.add_known_peer(peer);
496
497 acc
498 });
499
500 let listen_addresses = Arc::new(Default::default());
502 let (discovery, ping_config, identify_config, kademlia_config, maybe_mdns_config) =
503 Discovery::new(
504 local_peer_id,
505 &network_config,
506 params.genesis_hash,
507 params.fork_id.as_deref(),
508 ¶ms.protocol_id,
509 known_addresses.clone(),
510 Arc::clone(&listen_addresses),
511 Arc::clone(&peer_store_handle),
512 );
513
514 config_builder = config_builder
515 .with_known_addresses(known_addresses.clone().into_iter())
516 .with_libp2p_ping(ping_config)
517 .with_libp2p_identify(identify_config)
518 .with_libp2p_kademlia(kademlia_config)
519 .with_connection_limits(ConnectionLimitsConfig::default().max_incoming_connections(
520 Some(crate::MAX_CONNECTIONS_ESTABLISHED_INCOMING as usize),
521 ))
522 .with_keep_alive_timeout(KEEP_ALIVE_TIMEOUT)
525 .with_executor(executor);
526
527 if let Some(config) = maybe_mdns_config {
528 config_builder = config_builder.with_mdns(config);
529 }
530
531 if let Some(config) = params.bitswap_config {
532 config_builder = config_builder.with_libp2p_bitswap(config);
533 }
534
535 let litep2p =
536 Litep2p::new(config_builder.build()).map_err(|error| Error::Litep2p(error))?;
537
538 litep2p.listen_addresses().for_each(|address| {
539 log::debug!(target: LOG_TARGET, "listening on: {address}");
540
541 listen_addresses.write().insert(address.clone());
542 });
543
544 let public_addresses = litep2p.public_addresses();
545 for address in network_config.public_addresses.iter() {
546 if let Err(err) = public_addresses.add_address(address.clone().into()) {
547 log::warn!(
548 target: LOG_TARGET,
549 "failed to add public address {address:?}: {err:?}",
550 );
551 }
552 }
553
554 let network_service = Arc::new(Litep2pNetworkService::new(
555 local_peer_id,
556 keypair.clone(),
557 cmd_tx,
558 Arc::clone(&peer_store_handle),
559 notif_protocols.clone(),
560 block_announce_protocol.clone(),
561 request_response_senders,
562 Arc::clone(&listen_addresses),
563 public_addresses,
564 ));
565
566 let num_connected = Arc::new(Default::default());
568 let bandwidth: Arc<dyn BandwidthSink> =
569 Arc::new(Litep2pBandwidthSink { sink: litep2p.bandwidth_sink() });
570
571 if let Some(registry) = ¶ms.metrics_registry {
572 MetricSources::register(registry, bandwidth, Arc::clone(&num_connected))?;
573 }
574
575 Ok(Self {
576 network_service,
577 cmd_rx,
578 metrics,
579 peerset_handles: notif_protocols,
580 num_connected,
581 discovery,
582 pending_queries: HashMap::new(),
583 peerstore_handle: peer_store_handle,
584 block_announce_protocol,
585 event_streams: out_events::OutChannels::new(None)?,
586 peers: HashMap::new(),
587 litep2p,
588 })
589 }
590
591 fn network_service(&self) -> Arc<dyn NetworkService> {
592 Arc::clone(&self.network_service)
593 }
594
595 fn peer_store(
596 bootnodes: Vec<sc_network_types::PeerId>,
597 metrics_registry: Option<Registry>,
598 ) -> Self::PeerStore {
599 Peerstore::new(bootnodes, metrics_registry)
600 }
601
602 fn register_notification_metrics(registry: Option<&Registry>) -> NotificationMetrics {
603 NotificationMetrics::new(registry)
604 }
605
606 fn bitswap_server(
608 client: Arc<dyn BlockBackend<B> + Send + Sync>,
609 ) -> (Pin<Box<dyn Future<Output = ()> + Send>>, Self::BitswapConfig) {
610 BitswapServer::new(client)
611 }
612
613 fn notification_config(
615 protocol_name: ProtocolName,
616 fallback_names: Vec<ProtocolName>,
617 max_notification_size: u64,
618 handshake: Option<NotificationHandshake>,
619 set_config: SetConfig,
620 metrics: NotificationMetrics,
621 peerstore_handle: Arc<dyn PeerStoreProvider>,
622 ) -> (Self::NotificationProtocolConfig, Box<dyn NotificationService>) {
623 Self::NotificationProtocolConfig::new(
624 protocol_name,
625 fallback_names,
626 max_notification_size as usize,
627 handshake,
628 set_config,
629 metrics,
630 peerstore_handle,
631 )
632 }
633
634 fn request_response_config(
636 protocol_name: ProtocolName,
637 fallback_names: Vec<ProtocolName>,
638 max_request_size: u64,
639 max_response_size: u64,
640 request_timeout: Duration,
641 inbound_queue: Option<async_channel::Sender<IncomingRequest>>,
642 ) -> Self::RequestResponseProtocolConfig {
643 Self::RequestResponseProtocolConfig::new(
644 protocol_name,
645 fallback_names,
646 max_request_size,
647 max_response_size,
648 request_timeout,
649 inbound_queue,
650 )
651 }
652
653 async fn run(mut self) {
655 log::debug!(target: LOG_TARGET, "starting litep2p network backend");
656
657 loop {
658 let num_connected_peers = self
659 .peerset_handles
660 .get(&self.block_announce_protocol)
661 .map_or(0usize, |handle| handle.connected_peers.load(Ordering::Relaxed));
662 self.num_connected.store(num_connected_peers, Ordering::Relaxed);
663
664 tokio::select! {
665 command = self.cmd_rx.next() => match command {
666 None => return,
667 Some(command) => match command {
668 NetworkServiceCommand::FindClosestPeers { target } => {
669 let query_id = self.discovery.find_node(target.into()).await;
670 self.pending_queries.insert(query_id, KadQuery::FindNode(target, Instant::now()));
671 }
672 NetworkServiceCommand::GetValue{ key } => {
673 let query_id = self.discovery.get_value(key.clone()).await;
674 self.pending_queries.insert(query_id, KadQuery::GetValue(key, Instant::now()));
675 }
676 NetworkServiceCommand::PutValue { key, value } => {
677 let query_id = self.discovery.put_value(key.clone(), value).await;
678 self.pending_queries.insert(query_id, KadQuery::PutValue(key, Instant::now()));
679 }
680 NetworkServiceCommand::PutValueTo { record, peers, update_local_storage} => {
681 let kademlia_key = record.key.clone();
682 let query_id = self.discovery.put_value_to_peers(record.into(), peers, update_local_storage).await;
683 self.pending_queries.insert(query_id, KadQuery::PutValue(kademlia_key, Instant::now()));
684 }
685 NetworkServiceCommand::StoreRecord { key, value, publisher, expires } => {
686 self.discovery.store_record(key, value, publisher.map(Into::into), expires).await;
687 }
688 NetworkServiceCommand::StartProviding { key } => {
689 self.discovery.start_providing(key).await;
690 }
691 NetworkServiceCommand::StopProviding { key } => {
692 self.discovery.stop_providing(key).await;
693 }
694 NetworkServiceCommand::GetProviders { key } => {
695 let query_id = self.discovery.get_providers(key.clone()).await;
696 self.pending_queries.insert(query_id, KadQuery::GetProviders(key, Instant::now()));
697 }
698 NetworkServiceCommand::EventStream { tx } => {
699 self.event_streams.push(tx);
700 }
701 NetworkServiceCommand::Status { tx } => {
702 let _ = tx.send(NetworkStatus {
703 num_connected_peers: self
704 .peerset_handles
705 .get(&self.block_announce_protocol)
706 .map_or(0usize, |handle| handle.connected_peers.load(Ordering::Relaxed)),
707 total_bytes_inbound: self.litep2p.bandwidth_sink().inbound() as u64,
708 total_bytes_outbound: self.litep2p.bandwidth_sink().outbound() as u64,
709 });
710 }
711 NetworkServiceCommand::AddPeersToReservedSet {
712 protocol,
713 peers,
714 } => {
715 let peers = self.add_addresses(peers.into_iter().map(Into::into));
716
717 match self.peerset_handles.get(&protocol) {
718 Some(handle) => {
719 let _ = handle.tx.unbounded_send(PeersetCommand::AddReservedPeers { peers });
720 }
721 None => log::warn!(target: LOG_TARGET, "protocol {protocol} doens't exist"),
722 };
723 }
724 NetworkServiceCommand::AddKnownAddress { peer, address } => {
725 let mut address: Multiaddr = address.into();
726
727 if !address.iter().any(|protocol| std::matches!(protocol, Protocol::P2p(_))) {
728 address.push(Protocol::P2p(litep2p::PeerId::from(peer).into()));
729 }
730
731 if self.litep2p.add_known_address(peer.into(), iter::once(address.clone())) > 0 {
732 self.peerstore_handle.add_known_peer(peer);
736 } else {
737 log::debug!(
738 target: LOG_TARGET,
739 "couldn't add known address ({address}) for {peer:?}, unsupported transport"
740 );
741 }
742 },
743 NetworkServiceCommand::SetReservedPeers { protocol, peers } => {
744 let peers = self.add_addresses(peers.into_iter().map(Into::into));
745
746 match self.peerset_handles.get(&protocol) {
747 Some(handle) => {
748 let _ = handle.tx.unbounded_send(PeersetCommand::SetReservedPeers { peers });
749 }
750 None => log::warn!(target: LOG_TARGET, "protocol {protocol} doens't exist"),
751 }
752
753 },
754 NetworkServiceCommand::DisconnectPeer {
755 protocol,
756 peer,
757 } => {
758 let Some(handle) = self.peerset_handles.get(&protocol) else {
759 log::warn!(target: LOG_TARGET, "protocol {protocol} doens't exist");
760 continue
761 };
762
763 let _ = handle.tx.unbounded_send(PeersetCommand::DisconnectPeer { peer });
764 }
765 NetworkServiceCommand::SetReservedOnly {
766 protocol,
767 reserved_only,
768 } => {
769 let Some(handle) = self.peerset_handles.get(&protocol) else {
770 log::warn!(target: LOG_TARGET, "protocol {protocol} doens't exist");
771 continue
772 };
773
774 let _ = handle.tx.unbounded_send(PeersetCommand::SetReservedOnly { reserved_only });
775 }
776 NetworkServiceCommand::RemoveReservedPeers {
777 protocol,
778 peers,
779 } => {
780 let Some(handle) = self.peerset_handles.get(&protocol) else {
781 log::warn!(target: LOG_TARGET, "protocol {protocol} doens't exist");
782 continue
783 };
784
785 let _ = handle.tx.unbounded_send(PeersetCommand::RemoveReservedPeers { peers });
786 }
787 }
788 },
789 event = self.discovery.next() => match event {
790 None => return,
791 Some(DiscoveryEvent::Discovered { addresses }) => {
792 for (peer, addresses) in Litep2pNetworkBackend::parse_addresses(addresses.into_iter()) {
794 if self.litep2p.add_known_address(peer.into(), addresses.clone().into_iter()) > 0 {
795 self.peerstore_handle.add_known_peer(peer);
796 }
797 }
798 }
799 Some(DiscoveryEvent::RoutingTableUpdate { peers }) => {
800 for peer in peers {
801 self.peerstore_handle.add_known_peer(peer.into());
802 }
803 }
804 Some(DiscoveryEvent::FindNodeSuccess { query_id, target, peers }) => {
805 match self.pending_queries.remove(&query_id) {
806 Some(KadQuery::FindNode(_, started)) => {
807 log::trace!(
808 target: LOG_TARGET,
809 "`FIND_NODE` for {target:?} ({query_id:?}) succeeded",
810 );
811
812 self.event_streams.send(
813 Event::Dht(
814 DhtEvent::ClosestPeersFound(
815 target.into(),
816 peers
817 .into_iter()
818 .map(|(peer, addrs)| (
819 peer.into(),
820 addrs.into_iter().map(Into::into).collect(),
821 ))
822 .collect(),
823 )
824 )
825 );
826
827 if let Some(ref metrics) = self.metrics {
828 metrics
829 .kademlia_query_duration
830 .with_label_values(&["node-find"])
831 .observe(started.elapsed().as_secs_f64());
832 }
833 },
834 query => {
835 log::error!(
836 target: LOG_TARGET,
837 "Missing/invalid pending query for `FIND_NODE`: {query:?}"
838 );
839 debug_assert!(false);
840 }
841 }
842 },
843 Some(DiscoveryEvent::GetRecordPartialResult { query_id, record }) => {
844 if !self.pending_queries.contains_key(&query_id) {
845 log::error!(
846 target: LOG_TARGET,
847 "Missing/invalid pending query for `GET_VALUE` partial result: {query_id:?}"
848 );
849
850 continue
851 }
852
853 let peer_id: sc_network_types::PeerId = record.peer.into();
854 let record = PeerRecord {
855 record: P2PRecord {
856 key: record.record.key.to_vec().into(),
857 value: record.record.value,
858 publisher: record.record.publisher.map(|peer_id| {
859 let peer_id: sc_network_types::PeerId = peer_id.into();
860 peer_id.into()
861 }),
862 expires: record.record.expires,
863 },
864 peer: Some(peer_id.into()),
865 };
866
867 self.event_streams.send(
868 Event::Dht(
869 DhtEvent::ValueFound(
870 record.into()
871 )
872 )
873 );
874 }
875 Some(DiscoveryEvent::GetRecordSuccess { query_id }) => {
876 match self.pending_queries.remove(&query_id) {
877 Some(KadQuery::GetValue(key, started)) => {
878 log::trace!(
879 target: LOG_TARGET,
880 "`GET_VALUE` for {key:?} ({query_id:?}) succeeded",
881 );
882
883 if let Some(ref metrics) = self.metrics {
884 metrics
885 .kademlia_query_duration
886 .with_label_values(&["value-get"])
887 .observe(started.elapsed().as_secs_f64());
888 }
889 },
890 query => {
891 log::error!(
892 target: LOG_TARGET,
893 "Missing/invalid pending query for `GET_VALUE`: {query:?}"
894 );
895 debug_assert!(false);
896 },
897 }
898 }
899 Some(DiscoveryEvent::PutRecordSuccess { query_id }) => {
900 match self.pending_queries.remove(&query_id) {
901 Some(KadQuery::PutValue(key, started)) => {
902 log::trace!(
903 target: LOG_TARGET,
904 "`PUT_VALUE` for {key:?} ({query_id:?}) succeeded",
905 );
906
907 self.event_streams.send(Event::Dht(
908 DhtEvent::ValuePut(key)
909 ));
910
911 if let Some(ref metrics) = self.metrics {
912 metrics
913 .kademlia_query_duration
914 .with_label_values(&["value-put"])
915 .observe(started.elapsed().as_secs_f64());
916 }
917 },
918 query => {
919 log::error!(
920 target: LOG_TARGET,
921 "Missing/invalid pending query for `PUT_VALUE`: {query:?}"
922 );
923 debug_assert!(false);
924 }
925 }
926 }
927 Some(DiscoveryEvent::GetProvidersSuccess { query_id, providers }) => {
928 match self.pending_queries.remove(&query_id) {
929 Some(KadQuery::GetProviders(key, started)) => {
930 log::trace!(
931 target: LOG_TARGET,
932 "`GET_PROVIDERS` for {key:?} ({query_id:?}) succeeded",
933 );
934
935 providers.iter().for_each(|p| {
940 self.litep2p.add_known_address(p.peer, p.addresses.clone().into_iter());
941 });
942
943 self.event_streams.send(Event::Dht(
944 DhtEvent::ProvidersFound(
945 key.clone().into(),
946 providers.into_iter().map(|p| p.peer.into()).collect()
947 )
948 ));
949
950 self.event_streams.send(Event::Dht(
953 DhtEvent::NoMoreProviders(key.into())
954 ));
955
956 if let Some(ref metrics) = self.metrics {
957 metrics
958 .kademlia_query_duration
959 .with_label_values(&["providers-get"])
960 .observe(started.elapsed().as_secs_f64());
961 }
962 },
963 query => {
964 log::error!(
965 target: LOG_TARGET,
966 "Missing/invalid pending query for `GET_PROVIDERS`: {query:?}"
967 );
968 debug_assert!(false);
969 }
970 }
971 }
972 Some(DiscoveryEvent::QueryFailed { query_id }) => {
973 match self.pending_queries.remove(&query_id) {
974 Some(KadQuery::FindNode(peer_id, started)) => {
975 log::debug!(
976 target: LOG_TARGET,
977 "`FIND_NODE` ({query_id:?}) failed for target {peer_id:?}",
978 );
979
980 self.event_streams.send(Event::Dht(
981 DhtEvent::ClosestPeersNotFound(peer_id.into())
982 ));
983
984 if let Some(ref metrics) = self.metrics {
985 metrics
986 .kademlia_query_duration
987 .with_label_values(&["node-find-failed"])
988 .observe(started.elapsed().as_secs_f64());
989 }
990 },
991 Some(KadQuery::GetValue(key, started)) => {
992 log::debug!(
993 target: LOG_TARGET,
994 "`GET_VALUE` ({query_id:?}) failed for key {key:?}",
995 );
996
997 self.event_streams.send(Event::Dht(
998 DhtEvent::ValueNotFound(key)
999 ));
1000
1001 if let Some(ref metrics) = self.metrics {
1002 metrics
1003 .kademlia_query_duration
1004 .with_label_values(&["value-get-failed"])
1005 .observe(started.elapsed().as_secs_f64());
1006 }
1007 },
1008 Some(KadQuery::PutValue(key, started)) => {
1009 log::debug!(
1010 target: LOG_TARGET,
1011 "`PUT_VALUE` ({query_id:?}) failed for key {key:?}",
1012 );
1013
1014 self.event_streams.send(Event::Dht(
1015 DhtEvent::ValuePutFailed(key)
1016 ));
1017
1018 if let Some(ref metrics) = self.metrics {
1019 metrics
1020 .kademlia_query_duration
1021 .with_label_values(&["value-put-failed"])
1022 .observe(started.elapsed().as_secs_f64());
1023 }
1024 },
1025 Some(KadQuery::GetProviders(key, started)) => {
1026 log::debug!(
1027 target: LOG_TARGET,
1028 "`GET_PROVIDERS` ({query_id:?}) failed for key {key:?}"
1029 );
1030
1031 self.event_streams.send(Event::Dht(
1032 DhtEvent::ProvidersNotFound(key)
1033 ));
1034
1035 if let Some(ref metrics) = self.metrics {
1036 metrics
1037 .kademlia_query_duration
1038 .with_label_values(&["providers-get-failed"])
1039 .observe(started.elapsed().as_secs_f64());
1040 }
1041 },
1042 None => {
1043 log::warn!(
1044 target: LOG_TARGET,
1045 "non-existent query failed ({query_id:?})",
1046 );
1047 }
1048 }
1049 }
1050 Some(DiscoveryEvent::Identified { peer, listen_addresses, supported_protocols, .. }) => {
1051 self.discovery.add_self_reported_address(peer, supported_protocols, listen_addresses).await;
1052 }
1053 Some(DiscoveryEvent::ExternalAddressDiscovered { address }) => {
1054 match self.litep2p.public_addresses().add_address(address.clone().into()) {
1055 Ok(inserted) => if inserted {
1056 log::info!(target: LOG_TARGET, "๐ Discovered new external address for our node: {address}");
1057 },
1058 Err(err) => {
1059 log::warn!(
1060 target: LOG_TARGET,
1061 "๐ Failed to add discovered external address {address:?}: {err:?}",
1062 );
1063 },
1064 }
1065 }
1066 Some(DiscoveryEvent::ExternalAddressExpired{ address }) => {
1067 let local_peer_id = self.litep2p.local_peer_id();
1068
1069 let address = if !std::matches!(address.iter().last(), Some(Protocol::P2p(_))) {
1071 address.with(Protocol::P2p(*local_peer_id.as_ref()))
1072 } else {
1073 address
1074 };
1075
1076 if self.litep2p.public_addresses().remove_address(&address) {
1077 log::info!(target: LOG_TARGET, "๐ Expired external address for our node: {address}");
1078 } else {
1079 log::warn!(
1080 target: LOG_TARGET,
1081 "๐ Failed to remove expired external address {address:?}"
1082 );
1083 }
1084 }
1085 Some(DiscoveryEvent::Ping { peer, rtt }) => {
1086 log::trace!(
1087 target: LOG_TARGET,
1088 "ping time with {peer:?}: {rtt:?}",
1089 );
1090 }
1091 Some(DiscoveryEvent::IncomingRecord { record: Record { key, value, publisher, expires }} ) => {
1092 self.event_streams.send(Event::Dht(
1093 DhtEvent::PutRecordRequest(
1094 key.into(),
1095 value,
1096 publisher.map(Into::into),
1097 expires,
1098 )
1099 ));
1100 },
1101
1102 Some(DiscoveryEvent::RandomKademliaStarted) => {
1103 if let Some(metrics) = self.metrics.as_ref() {
1104 metrics.kademlia_random_queries_total.inc();
1105 }
1106 }
1107 },
1108 event = self.litep2p.next_event() => match event {
1109 Some(Litep2pEvent::ConnectionEstablished { peer, endpoint }) => {
1110 let Some(metrics) = &self.metrics else {
1111 continue;
1112 };
1113
1114 let direction = match endpoint {
1115 Endpoint::Dialer { .. } => "out",
1116 Endpoint::Listener { .. } => {
1117 metrics.incoming_connections_total.inc();
1122
1123 "in"
1124 },
1125 };
1126 metrics.connections_opened_total.with_label_values(&[direction]).inc();
1127
1128 match self.peers.entry(peer) {
1129 Entry::Vacant(entry) => {
1130 entry.insert(ConnectionContext {
1131 endpoints: HashMap::from_iter([(endpoint.connection_id(), endpoint)]),
1132 num_connections: 1usize,
1133 });
1134 metrics.distinct_peers_connections_opened_total.inc();
1135 }
1136 Entry::Occupied(entry) => {
1137 let entry = entry.into_mut();
1138 entry.num_connections += 1;
1139 entry.endpoints.insert(endpoint.connection_id(), endpoint);
1140 }
1141 }
1142 }
1143 Some(Litep2pEvent::ConnectionClosed { peer, connection_id }) => {
1144 let Some(metrics) = &self.metrics else {
1145 continue;
1146 };
1147
1148 let Some(context) = self.peers.get_mut(&peer) else {
1149 log::debug!(target: LOG_TARGET, "unknown peer disconnected: {peer:?} ({connection_id:?})");
1150 continue
1151 };
1152
1153 let direction = match context.endpoints.remove(&connection_id) {
1154 None => {
1155 log::debug!(target: LOG_TARGET, "connection {connection_id:?} doesn't exist for {peer:?} ");
1156 continue
1157 }
1158 Some(endpoint) => {
1159 context.num_connections -= 1;
1160
1161 match endpoint {
1162 Endpoint::Dialer { .. } => "out",
1163 Endpoint::Listener { .. } => "in",
1164 }
1165 }
1166 };
1167
1168 metrics.connections_closed_total.with_label_values(&[direction, "actively-closed"]).inc();
1169
1170 if context.num_connections == 0 {
1171 self.peers.remove(&peer);
1172 metrics.distinct_peers_connections_closed_total.inc();
1173 }
1174 }
1175 Some(Litep2pEvent::DialFailure { address, error }) => {
1176 log::debug!(
1177 target: LOG_TARGET,
1178 "failed to dial peer at {address:?}: {error:?}",
1179 );
1180
1181 if let Some(metrics) = &self.metrics {
1182 let reason = match error {
1183 DialError::Timeout => "timeout",
1184 DialError::AddressError(_) => "invalid-address",
1185 DialError::DnsError(_) => "cannot-resolve-dns",
1186 DialError::NegotiationError(error) => match error {
1187 NegotiationError::Timeout => "timeout",
1188 NegotiationError::PeerIdMissing => "missing-peer-id",
1189 NegotiationError::StateMismatch => "state-mismatch",
1190 NegotiationError::PeerIdMismatch(_,_) => "peer-id-missmatch",
1191 NegotiationError::MultistreamSelectError(_) => "multistream-select-error",
1192 NegotiationError::SnowError(_) => "noise-error",
1193 NegotiationError::ParseError(_) => "parse-error",
1194 NegotiationError::IoError(_) => "io-error",
1195 NegotiationError::WebSocket(_) => "webscoket-error",
1196 NegotiationError::BadSignature => "bad-signature",
1197 }
1198 };
1199
1200 metrics.pending_connections_errors_total.with_label_values(&[&reason]).inc();
1201 }
1202 }
1203 Some(Litep2pEvent::ListDialFailures { errors }) => {
1204 log::debug!(
1205 target: LOG_TARGET,
1206 "failed to dial peer on multiple addresses {errors:?}",
1207 );
1208
1209 if let Some(metrics) = &self.metrics {
1210 metrics.pending_connections_errors_total.with_label_values(&["transport-errors"]).inc();
1211 }
1212 }
1213 None => {
1214 log::error!(
1215 target: LOG_TARGET,
1216 "Litep2p backend terminated"
1217 );
1218 return
1219 }
1220 },
1221 }
1222 }
1223 }
1224}