1use crate::{
22 config::{
23 FullNetworkConfiguration, IncomingRequest, NodeKeyConfig, NotificationHandshake, Params,
24 SetConfig, TransportConfig,
25 },
26 error::Error,
27 event::{DhtEvent, Event},
28 litep2p::{
29 discovery::{Discovery, DiscoveryEvent},
30 peerstore::Peerstore,
31 service::{Litep2pNetworkService, NetworkServiceCommand},
32 shim::{
33 bitswap::BitswapServer,
34 notification::{
35 config::{NotificationProtocolConfig, ProtocolControlHandle},
36 peerset::PeersetCommand,
37 },
38 request_response::{RequestResponseConfig, RequestResponseProtocol},
39 },
40 },
41 peer_store::PeerStoreProvider,
42 protocol,
43 service::{
44 metrics::{register_without_sources, MetricSources, Metrics, NotificationMetrics},
45 out_events,
46 traits::{BandwidthSink, NetworkBackend, NetworkService},
47 },
48 NetworkStatus, NotificationService, ProtocolName,
49};
50
51use codec::Encode;
52use futures::StreamExt;
53use libp2p::kad::{PeerRecord, Record as P2PRecord, RecordKey};
54use litep2p::{
55 config::ConfigBuilder,
56 crypto::ed25519::Keypair,
57 error::{DialError, NegotiationError},
58 executor::Executor,
59 protocol::{
60 libp2p::{
61 bitswap::Config as BitswapConfig,
62 kademlia::{QueryId, Record, RecordsType},
63 },
64 request_response::ConfigBuilder as RequestResponseConfigBuilder,
65 },
66 transport::{
67 tcp::config::Config as TcpTransportConfig,
68 websocket::config::Config as WebSocketTransportConfig, ConnectionLimitsConfig, Endpoint,
69 },
70 types::{
71 multiaddr::{Multiaddr, Protocol},
72 ConnectionId,
73 },
74 Litep2p, Litep2pEvent, ProtocolName as Litep2pProtocolName,
75};
76use prometheus_endpoint::Registry;
77
78use sc_client_api::BlockBackend;
79use sc_network_common::{role::Roles, ExHashT};
80use sc_network_types::PeerId;
81use sc_utils::mpsc::{tracing_unbounded, TracingUnboundedReceiver};
82use sp_runtime::traits::Block as BlockT;
83
84use std::{
85 cmp,
86 collections::{hash_map::Entry, HashMap, HashSet},
87 fs,
88 future::Future,
89 iter,
90 pin::Pin,
91 sync::{
92 atomic::{AtomicUsize, Ordering},
93 Arc,
94 },
95 time::{Duration, Instant},
96};
97
98mod discovery;
99mod peerstore;
100mod service;
101mod shim;
102
103struct Litep2pBandwidthSink {
105 sink: litep2p::BandwidthSink,
106}
107
108impl BandwidthSink for Litep2pBandwidthSink {
109 fn total_inbound(&self) -> u64 {
110 self.sink.inbound() as u64
111 }
112
113 fn total_outbound(&self) -> u64 {
114 self.sink.outbound() as u64
115 }
116}
117
118struct Litep2pExecutor {
120 executor: Box<dyn Fn(Pin<Box<dyn Future<Output = ()> + Send>>) + Send + Sync>,
122}
123
124impl Executor for Litep2pExecutor {
125 fn run(&self, future: Pin<Box<dyn Future<Output = ()> + Send>>) {
126 (self.executor)(future)
127 }
128
129 fn run_with_name(&self, _: &'static str, future: Pin<Box<dyn Future<Output = ()> + Send>>) {
130 (self.executor)(future)
131 }
132}
133
134const LOG_TARGET: &str = "sub-libp2p";
136
137struct ConnectionContext {
139 endpoints: HashMap<ConnectionId, Endpoint>,
141
142 num_connections: usize,
144}
145
146pub struct Litep2pNetworkBackend {
148 litep2p: Litep2p,
150
151 network_service: Arc<dyn NetworkService>,
153
154 cmd_rx: TracingUnboundedReceiver<NetworkServiceCommand>,
156
157 peerset_handles: HashMap<ProtocolName, ProtocolControlHandle>,
159
160 pending_get_values: HashMap<QueryId, (RecordKey, Instant)>,
162
163 pending_put_values: HashMap<QueryId, (RecordKey, Instant)>,
165
166 discovery: Discovery,
168
169 num_connected: Arc<AtomicUsize>,
171
172 peers: HashMap<litep2p::PeerId, ConnectionContext>,
174
175 peerstore_handle: Arc<dyn PeerStoreProvider>,
177
178 block_announce_protocol: ProtocolName,
180
181 event_streams: out_events::OutChannels,
183
184 metrics: Option<Metrics>,
186}
187
188impl Litep2pNetworkBackend {
189 fn parse_addresses(
192 addresses: impl Iterator<Item = Multiaddr>,
193 ) -> HashMap<PeerId, Vec<Multiaddr>> {
194 addresses
195 .into_iter()
196 .filter_map(|address| match address.iter().next() {
197 Some(
198 Protocol::Dns(_) |
199 Protocol::Dns4(_) |
200 Protocol::Dns6(_) |
201 Protocol::Ip6(_) |
202 Protocol::Ip4(_),
203 ) => match address.iter().find(|protocol| std::matches!(protocol, Protocol::P2p(_)))
204 {
205 Some(Protocol::P2p(multihash)) => PeerId::from_multihash(multihash.into())
206 .map_or(None, |peer| Some((peer, Some(address)))),
207 _ => None,
208 },
209 Some(Protocol::P2p(multihash)) =>
210 PeerId::from_multihash(multihash.into()).map_or(None, |peer| Some((peer, None))),
211 _ => None,
212 })
213 .fold(HashMap::new(), |mut acc, (peer, maybe_address)| {
214 let entry = acc.entry(peer).or_default();
215 maybe_address.map(|address| entry.push(address));
216
217 acc
218 })
219 }
220
221 fn add_addresses(&mut self, peers: impl Iterator<Item = Multiaddr>) -> HashSet<PeerId> {
223 Self::parse_addresses(peers.into_iter())
224 .into_iter()
225 .filter_map(|(peer, addresses)| {
226 if addresses.is_empty() {
228 return Some(peer)
229 }
230
231 if self.litep2p.add_known_address(peer.into(), addresses.clone().into_iter()) == 0 {
232 log::warn!(
233 target: LOG_TARGET,
234 "couldn't add any addresses for {peer:?} and it won't be added as reserved peer",
235 );
236 return None
237 }
238
239 self.peerstore_handle.add_known_peer(peer);
240 Some(peer)
241 })
242 .collect()
243 }
244}
245
246impl Litep2pNetworkBackend {
247 fn get_keypair(node_key: &NodeKeyConfig) -> Result<(Keypair, litep2p::PeerId), Error> {
249 let secret: litep2p::crypto::ed25519::SecretKey =
250 node_key.clone().into_keypair()?.secret().into();
251
252 let local_identity = Keypair::from(secret);
253 let local_public = local_identity.public();
254 let local_peer_id = local_public.to_peer_id();
255
256 Ok((local_identity, local_peer_id))
257 }
258
259 fn configure_transport<B: BlockT + 'static, H: ExHashT>(
261 config: &FullNetworkConfiguration<B, H, Self>,
262 ) -> ConfigBuilder {
263 let _ = match config.network_config.transport {
264 TransportConfig::MemoryOnly => panic!("memory transport not supported"),
265 TransportConfig::Normal { .. } => false,
266 };
267 let config_builder = ConfigBuilder::new();
268
269 let yamux_maximum_buffer_size = {
276 let requests_max = config
277 .request_response_protocols
278 .iter()
279 .map(|cfg| usize::try_from(cfg.max_request_size).unwrap_or(usize::MAX));
280 let responses_max = config
281 .request_response_protocols
282 .iter()
283 .map(|cfg| usize::try_from(cfg.max_response_size).unwrap_or(usize::MAX));
284 let notifs_max = config
285 .notification_protocols
286 .iter()
287 .map(|cfg| usize::try_from(cfg.max_notification_size()).unwrap_or(usize::MAX));
288
289 let default_max = cmp::max(
292 1024 * 1024,
293 usize::try_from(protocol::BLOCK_ANNOUNCES_TRANSACTIONS_SUBSTREAM_SIZE)
294 .unwrap_or(usize::MAX),
295 );
296
297 iter::once(default_max)
298 .chain(requests_max)
299 .chain(responses_max)
300 .chain(notifs_max)
301 .max()
302 .expect("iterator known to always yield at least one element; qed")
303 .saturating_add(10)
304 };
305
306 let yamux_config = {
307 let mut yamux_config = litep2p::yamux::Config::default();
308 yamux_config.set_window_update_mode(litep2p::yamux::WindowUpdateMode::OnRead);
311 yamux_config.set_max_buffer_size(yamux_maximum_buffer_size);
312
313 if let Some(yamux_window_size) = config.network_config.yamux_window_size {
314 yamux_config.set_receive_window(yamux_window_size);
315 }
316
317 yamux_config
318 };
319
320 let (tcp, websocket): (Vec<Option<_>>, Vec<Option<_>>) = config
321 .network_config
322 .listen_addresses
323 .iter()
324 .filter_map(|address| {
325 use sc_network_types::multiaddr::Protocol;
326
327 let mut iter = address.iter();
328
329 match iter.next() {
330 Some(Protocol::Ip4(_) | Protocol::Ip6(_)) => {},
331 protocol => {
332 log::error!(
333 target: LOG_TARGET,
334 "unknown protocol {protocol:?}, ignoring {address:?}",
335 );
336
337 return None
338 },
339 }
340
341 match iter.next() {
342 Some(Protocol::Tcp(_)) => match iter.next() {
343 Some(Protocol::Ws(_) | Protocol::Wss(_)) =>
344 Some((None, Some(address.clone()))),
345 Some(Protocol::P2p(_)) | None => Some((Some(address.clone()), None)),
346 protocol => {
347 log::error!(
348 target: LOG_TARGET,
349 "unknown protocol {protocol:?}, ignoring {address:?}",
350 );
351 None
352 },
353 },
354 protocol => {
355 log::error!(
356 target: LOG_TARGET,
357 "unknown protocol {protocol:?}, ignoring {address:?}",
358 );
359 None
360 },
361 }
362 })
363 .unzip();
364
365 config_builder
366 .with_websocket(WebSocketTransportConfig {
367 listen_addresses: websocket.into_iter().flatten().map(Into::into).collect(),
368 yamux_config: yamux_config.clone(),
369 nodelay: true,
370 ..Default::default()
371 })
372 .with_tcp(TcpTransportConfig {
373 listen_addresses: tcp.into_iter().flatten().map(Into::into).collect(),
374 yamux_config,
375 nodelay: true,
376 ..Default::default()
377 })
378 }
379}
380
381#[async_trait::async_trait]
382impl<B: BlockT + 'static, H: ExHashT> NetworkBackend<B, H> for Litep2pNetworkBackend {
383 type NotificationProtocolConfig = NotificationProtocolConfig;
384 type RequestResponseProtocolConfig = RequestResponseConfig;
385 type NetworkService<Block, Hash> = Arc<Litep2pNetworkService>;
386 type PeerStore = Peerstore;
387 type BitswapConfig = BitswapConfig;
388
389 fn new(mut params: Params<B, H, Self>) -> Result<Self, Error>
390 where
391 Self: Sized,
392 {
393 let (keypair, local_peer_id) =
394 Self::get_keypair(¶ms.network_config.network_config.node_key)?;
395 let (cmd_tx, cmd_rx) = tracing_unbounded("mpsc_network_worker", 100_000);
396
397 params.network_config.network_config.boot_nodes = params
398 .network_config
399 .network_config
400 .boot_nodes
401 .into_iter()
402 .filter(|boot_node| boot_node.peer_id != local_peer_id.into())
403 .collect();
404 params.network_config.network_config.default_peers_set.reserved_nodes = params
405 .network_config
406 .network_config
407 .default_peers_set
408 .reserved_nodes
409 .into_iter()
410 .filter(|reserved_node| {
411 if reserved_node.peer_id == local_peer_id.into() {
412 log::warn!(
413 target: LOG_TARGET,
414 "Local peer ID used in reserved node, ignoring: {reserved_node}",
415 );
416 false
417 } else {
418 true
419 }
420 })
421 .collect();
422
423 if let Some(path) = ¶ms.network_config.network_config.net_config_path {
424 fs::create_dir_all(path)?;
425 }
426
427 log::info!(target: LOG_TARGET, "Local node identity is: {local_peer_id}");
428 log::info!(target: LOG_TARGET, "Running litep2p network backend");
429
430 params.network_config.sanity_check_addresses()?;
431 params.network_config.sanity_check_bootnodes()?;
432
433 let mut config_builder =
434 Self::configure_transport(¶ms.network_config).with_keypair(keypair.clone());
435 let known_addresses = params.network_config.known_addresses();
436 let peer_store_handle = params.network_config.peer_store_handle();
437 let executor = Arc::new(Litep2pExecutor { executor: params.executor });
438
439 let FullNetworkConfiguration {
440 notification_protocols,
441 request_response_protocols,
442 network_config,
443 ..
444 } = params.network_config;
445
446 let block_announce_protocol = params.block_announce_config.protocol_name().clone();
452 let mut notif_protocols = HashMap::from_iter([(
453 params.block_announce_config.protocol_name().clone(),
454 params.block_announce_config.handle,
455 )]);
456
457 config_builder = notification_protocols
459 .into_iter()
460 .fold(config_builder, |config_builder, mut config| {
461 config.config.set_handshake(Roles::from(¶ms.role).encode());
462 notif_protocols.insert(config.protocol_name, config.handle);
463
464 config_builder.with_notification_protocol(config.config)
465 })
466 .with_notification_protocol(params.block_announce_config.config);
467
468 let metrics = match ¶ms.metrics_registry {
470 Some(registry) => Some(register_without_sources(registry)?),
471 None => None,
472 };
473
474 let (mut request_response_receivers, request_response_senders): (
480 HashMap<_, _>,
481 HashMap<_, _>,
482 ) = request_response_protocols
483 .iter()
484 .map(|config| {
485 let (tx, rx) = tracing_unbounded("outbound-requests", 10_000);
486 ((config.protocol_name.clone(), rx), (config.protocol_name.clone(), tx))
487 })
488 .unzip();
489
490 config_builder = request_response_protocols.into_iter().fold(
491 config_builder,
492 |config_builder, config| {
493 let (protocol_config, handle) = RequestResponseConfigBuilder::new(
494 Litep2pProtocolName::from(config.protocol_name.clone()),
495 )
496 .with_max_size(cmp::max(config.max_request_size, config.max_response_size) as usize)
497 .with_fallback_names(config.fallback_names.into_iter().map(From::from).collect())
498 .with_timeout(config.request_timeout)
499 .build();
500
501 let protocol = RequestResponseProtocol::new(
502 config.protocol_name.clone(),
503 handle,
504 Arc::clone(&peer_store_handle),
505 config.inbound_queue,
506 request_response_receivers
507 .remove(&config.protocol_name)
508 .expect("receiver exists as it was just added and there are no duplicate protocols; qed"),
509 request_response_senders.clone(),
510 metrics.clone(),
511 );
512
513 executor.run(Box::pin(async move {
514 protocol.run().await;
515 }));
516
517 config_builder.with_request_response_protocol(protocol_config)
518 },
519 );
520
521 let known_addresses: HashMap<litep2p::PeerId, Vec<Multiaddr>> =
523 known_addresses.into_iter().fold(HashMap::new(), |mut acc, (peer, address)| {
524 use sc_network_types::multiaddr::Protocol;
525
526 let address = match address.iter().last() {
527 Some(Protocol::Ws(_) | Protocol::Wss(_) | Protocol::Tcp(_)) =>
528 address.with(Protocol::P2p(peer.into())),
529 Some(Protocol::P2p(_)) => address,
530 _ => return acc,
531 };
532
533 acc.entry(peer.into()).or_default().push(address.into());
534 peer_store_handle.add_known_peer(peer);
535
536 acc
537 });
538
539 let listen_addresses = Arc::new(Default::default());
541 let (discovery, ping_config, identify_config, kademlia_config, maybe_mdns_config) =
542 Discovery::new(
543 &network_config,
544 params.genesis_hash,
545 params.fork_id.as_deref(),
546 ¶ms.protocol_id,
547 known_addresses.clone(),
548 Arc::clone(&listen_addresses),
549 Arc::clone(&peer_store_handle),
550 );
551
552 config_builder = config_builder
553 .with_known_addresses(known_addresses.clone().into_iter())
554 .with_libp2p_ping(ping_config)
555 .with_libp2p_identify(identify_config)
556 .with_libp2p_kademlia(kademlia_config)
557 .with_connection_limits(ConnectionLimitsConfig::default().max_incoming_connections(
558 Some(crate::MAX_CONNECTIONS_ESTABLISHED_INCOMING as usize),
559 ))
560 .with_executor(executor);
561
562 if let Some(config) = maybe_mdns_config {
563 config_builder = config_builder.with_mdns(config);
564 }
565
566 if let Some(config) = params.bitswap_config {
567 config_builder = config_builder.with_libp2p_bitswap(config);
568 }
569
570 let litep2p =
571 Litep2p::new(config_builder.build()).map_err(|error| Error::Litep2p(error))?;
572
573 litep2p.listen_addresses().for_each(|address| {
574 log::debug!(target: LOG_TARGET, "listening on: {address}");
575
576 listen_addresses.write().insert(address.clone());
577 });
578
579 let public_addresses = litep2p.public_addresses();
580 for address in network_config.public_addresses.iter() {
581 if let Err(err) = public_addresses.add_address(address.clone().into()) {
582 log::warn!(
583 target: LOG_TARGET,
584 "failed to add public address {address:?}: {err:?}",
585 );
586 }
587 }
588
589 let network_service = Arc::new(Litep2pNetworkService::new(
590 local_peer_id,
591 keypair.clone(),
592 cmd_tx,
593 Arc::clone(&peer_store_handle),
594 notif_protocols.clone(),
595 block_announce_protocol.clone(),
596 request_response_senders,
597 Arc::clone(&listen_addresses),
598 public_addresses,
599 ));
600
601 let num_connected = Arc::new(Default::default());
603 let bandwidth: Arc<dyn BandwidthSink> =
604 Arc::new(Litep2pBandwidthSink { sink: litep2p.bandwidth_sink() });
605
606 if let Some(registry) = ¶ms.metrics_registry {
607 MetricSources::register(registry, bandwidth, Arc::clone(&num_connected))?;
608 }
609
610 Ok(Self {
611 network_service,
612 cmd_rx,
613 metrics,
614 peerset_handles: notif_protocols,
615 num_connected,
616 discovery,
617 pending_put_values: HashMap::new(),
618 pending_get_values: HashMap::new(),
619 peerstore_handle: peer_store_handle,
620 block_announce_protocol,
621 event_streams: out_events::OutChannels::new(None)?,
622 peers: HashMap::new(),
623 litep2p,
624 })
625 }
626
627 fn network_service(&self) -> Arc<dyn NetworkService> {
628 Arc::clone(&self.network_service)
629 }
630
631 fn peer_store(
632 bootnodes: Vec<sc_network_types::PeerId>,
633 metrics_registry: Option<Registry>,
634 ) -> Self::PeerStore {
635 Peerstore::new(bootnodes, metrics_registry)
636 }
637
638 fn register_notification_metrics(registry: Option<&Registry>) -> NotificationMetrics {
639 NotificationMetrics::new(registry)
640 }
641
642 fn bitswap_server(
644 client: Arc<dyn BlockBackend<B> + Send + Sync>,
645 ) -> (Pin<Box<dyn Future<Output = ()> + Send>>, Self::BitswapConfig) {
646 BitswapServer::new(client)
647 }
648
649 fn notification_config(
651 protocol_name: ProtocolName,
652 fallback_names: Vec<ProtocolName>,
653 max_notification_size: u64,
654 handshake: Option<NotificationHandshake>,
655 set_config: SetConfig,
656 metrics: NotificationMetrics,
657 peerstore_handle: Arc<dyn PeerStoreProvider>,
658 ) -> (Self::NotificationProtocolConfig, Box<dyn NotificationService>) {
659 Self::NotificationProtocolConfig::new(
660 protocol_name,
661 fallback_names,
662 max_notification_size as usize,
663 handshake,
664 set_config,
665 metrics,
666 peerstore_handle,
667 )
668 }
669
670 fn request_response_config(
672 protocol_name: ProtocolName,
673 fallback_names: Vec<ProtocolName>,
674 max_request_size: u64,
675 max_response_size: u64,
676 request_timeout: Duration,
677 inbound_queue: Option<async_channel::Sender<IncomingRequest>>,
678 ) -> Self::RequestResponseProtocolConfig {
679 Self::RequestResponseProtocolConfig::new(
680 protocol_name,
681 fallback_names,
682 max_request_size,
683 max_response_size,
684 request_timeout,
685 inbound_queue,
686 )
687 }
688
689 async fn run(mut self) {
691 log::debug!(target: LOG_TARGET, "starting litep2p network backend");
692
693 loop {
694 let num_connected_peers = self
695 .peerset_handles
696 .get(&self.block_announce_protocol)
697 .map_or(0usize, |handle| handle.connected_peers.load(Ordering::Relaxed));
698 self.num_connected.store(num_connected_peers, Ordering::Relaxed);
699
700 tokio::select! {
701 command = self.cmd_rx.next() => match command {
702 None => return,
703 Some(command) => match command {
704 NetworkServiceCommand::GetValue{ key } => {
705 let query_id = self.discovery.get_value(key.clone()).await;
706 self.pending_get_values.insert(query_id, (key, Instant::now()));
707 }
708 NetworkServiceCommand::PutValue { key, value } => {
709 let query_id = self.discovery.put_value(key.clone(), value).await;
710 self.pending_put_values.insert(query_id, (key, Instant::now()));
711 }
712 NetworkServiceCommand::PutValueTo { record, peers, update_local_storage} => {
713 let kademlia_key = record.key.to_vec().into();
714 let query_id = self.discovery.put_value_to_peers(record, peers, update_local_storage).await;
715 self.pending_put_values.insert(query_id, (kademlia_key, Instant::now()));
716 }
717
718 NetworkServiceCommand::StoreRecord { key, value, publisher, expires } => {
719 self.discovery.store_record(key, value, publisher.map(Into::into), expires).await;
720 }
721 NetworkServiceCommand::EventStream { tx } => {
722 self.event_streams.push(tx);
723 }
724 NetworkServiceCommand::Status { tx } => {
725 let _ = tx.send(NetworkStatus {
726 num_connected_peers: self
727 .peerset_handles
728 .get(&self.block_announce_protocol)
729 .map_or(0usize, |handle| handle.connected_peers.load(Ordering::Relaxed)),
730 total_bytes_inbound: self.litep2p.bandwidth_sink().inbound() as u64,
731 total_bytes_outbound: self.litep2p.bandwidth_sink().outbound() as u64,
732 });
733 }
734 NetworkServiceCommand::AddPeersToReservedSet {
735 protocol,
736 peers,
737 } => {
738 let peers = self.add_addresses(peers.into_iter().map(Into::into));
739
740 match self.peerset_handles.get(&protocol) {
741 Some(handle) => {
742 let _ = handle.tx.unbounded_send(PeersetCommand::AddReservedPeers { peers });
743 }
744 None => log::warn!(target: LOG_TARGET, "protocol {protocol} doens't exist"),
745 };
746 }
747 NetworkServiceCommand::AddKnownAddress { peer, address } => {
748 let mut address: Multiaddr = address.into();
749
750 if !address.iter().any(|protocol| std::matches!(protocol, Protocol::P2p(_))) {
751 address.push(Protocol::P2p(litep2p::PeerId::from(peer).into()));
752 }
753
754 if self.litep2p.add_known_address(peer.into(), iter::once(address.clone())) == 0usize {
755 log::warn!(
756 target: LOG_TARGET,
757 "couldn't add known address ({address}) for {peer:?}, unsupported transport"
758 );
759 }
760 },
761 NetworkServiceCommand::SetReservedPeers { protocol, peers } => {
762 let peers = self.add_addresses(peers.into_iter().map(Into::into));
763
764 match self.peerset_handles.get(&protocol) {
765 Some(handle) => {
766 let _ = handle.tx.unbounded_send(PeersetCommand::SetReservedPeers { peers });
767 }
768 None => log::warn!(target: LOG_TARGET, "protocol {protocol} doens't exist"),
769 }
770
771 },
772 NetworkServiceCommand::DisconnectPeer {
773 protocol,
774 peer,
775 } => {
776 let Some(handle) = self.peerset_handles.get(&protocol) else {
777 log::warn!(target: LOG_TARGET, "protocol {protocol} doens't exist");
778 continue
779 };
780
781 let _ = handle.tx.unbounded_send(PeersetCommand::DisconnectPeer { peer });
782 }
783 NetworkServiceCommand::SetReservedOnly {
784 protocol,
785 reserved_only,
786 } => {
787 let Some(handle) = self.peerset_handles.get(&protocol) else {
788 log::warn!(target: LOG_TARGET, "protocol {protocol} doens't exist");
789 continue
790 };
791
792 let _ = handle.tx.unbounded_send(PeersetCommand::SetReservedOnly { reserved_only });
793 }
794 NetworkServiceCommand::RemoveReservedPeers {
795 protocol,
796 peers,
797 } => {
798 let Some(handle) = self.peerset_handles.get(&protocol) else {
799 log::warn!(target: LOG_TARGET, "protocol {protocol} doens't exist");
800 continue
801 };
802
803 let _ = handle.tx.unbounded_send(PeersetCommand::RemoveReservedPeers { peers });
804 }
805 }
806 },
807 event = self.discovery.next() => match event {
808 None => return,
809 Some(DiscoveryEvent::Discovered { addresses }) => {
810 for (peer, addresses) in Litep2pNetworkBackend::parse_addresses(addresses.into_iter()) {
812 if self.litep2p.add_known_address(peer.into(), addresses.clone().into_iter()) > 0 {
813 self.peerstore_handle.add_known_peer(peer);
814 }
815 }
816 }
817 Some(DiscoveryEvent::RoutingTableUpdate { peers }) => {
818 for peer in peers {
819 self.peerstore_handle.add_known_peer(peer.into());
820 }
821 }
822 Some(DiscoveryEvent::GetRecordSuccess { query_id, records }) => {
823 match self.pending_get_values.remove(&query_id) {
824 None => log::warn!(
825 target: LOG_TARGET,
826 "`GET_VALUE` succeeded for a non-existent query",
827 ),
828 Some((key, started)) => {
829 log::trace!(
830 target: LOG_TARGET,
831 "`GET_VALUE` for {:?} ({query_id:?}) succeeded",
832 key,
833 );
834 for record in litep2p_to_libp2p_peer_record(records) {
835 self.event_streams.send(
836 Event::Dht(
837 DhtEvent::ValueFound(
838 record
839 )
840 )
841 );
842 }
843
844 if let Some(ref metrics) = self.metrics {
845 metrics
846 .kademlia_query_duration
847 .with_label_values(&["value-get"])
848 .observe(started.elapsed().as_secs_f64());
849 }
850 }
851 }
852 }
853 Some(DiscoveryEvent::PutRecordSuccess { query_id }) => {
854 match self.pending_put_values.remove(&query_id) {
855 None => log::warn!(
856 target: LOG_TARGET,
857 "`PUT_VALUE` succeeded for a non-existent query",
858 ),
859 Some((key, started)) => {
860 log::trace!(
861 target: LOG_TARGET,
862 "`PUT_VALUE` for {key:?} ({query_id:?}) succeeded",
863 );
864
865 self.event_streams.send(Event::Dht(
866 DhtEvent::ValuePut(libp2p::kad::RecordKey::new(&key))
867 ));
868
869 if let Some(ref metrics) = self.metrics {
870 metrics
871 .kademlia_query_duration
872 .with_label_values(&["value-put"])
873 .observe(started.elapsed().as_secs_f64());
874 }
875 }
876 }
877 }
878 Some(DiscoveryEvent::QueryFailed { query_id }) => {
879 match self.pending_get_values.remove(&query_id) {
880 None => match self.pending_put_values.remove(&query_id) {
881 None => log::warn!(
882 target: LOG_TARGET,
883 "non-existent query failed ({query_id:?})",
884 ),
885 Some((key, started)) => {
886 log::debug!(
887 target: LOG_TARGET,
888 "`PUT_VALUE` ({query_id:?}) failed for key {key:?}",
889 );
890
891 self.event_streams.send(Event::Dht(
892 DhtEvent::ValuePutFailed(libp2p::kad::RecordKey::new(&key))
893 ));
894
895 if let Some(ref metrics) = self.metrics {
896 metrics
897 .kademlia_query_duration
898 .with_label_values(&["value-put-failed"])
899 .observe(started.elapsed().as_secs_f64());
900 }
901 }
902 }
903 Some((key, started)) => {
904 log::debug!(
905 target: LOG_TARGET,
906 "`GET_VALUE` ({query_id:?}) failed for key {key:?}",
907 );
908
909 self.event_streams.send(Event::Dht(
910 DhtEvent::ValueNotFound(libp2p::kad::RecordKey::new(&key))
911 ));
912
913 if let Some(ref metrics) = self.metrics {
914 metrics
915 .kademlia_query_duration
916 .with_label_values(&["value-get-failed"])
917 .observe(started.elapsed().as_secs_f64());
918 }
919 }
920 }
921 }
922 Some(DiscoveryEvent::Identified { peer, listen_addresses, supported_protocols, .. }) => {
923 self.discovery.add_self_reported_address(peer, supported_protocols, listen_addresses).await;
924 }
925 Some(DiscoveryEvent::ExternalAddressDiscovered { address }) => {
926 match self.litep2p.public_addresses().add_address(address.clone().into()) {
927 Ok(inserted) => if inserted {
928 log::info!(target: LOG_TARGET, "🔍 Discovered new external address for our node: {address}");
929 },
930 Err(err) => {
931 log::warn!(
932 target: LOG_TARGET,
933 "🔍 Failed to add discovered external address {address:?}: {err:?}",
934 );
935 },
936 }
937 }
938 Some(DiscoveryEvent::Ping { peer, rtt }) => {
939 log::trace!(
940 target: LOG_TARGET,
941 "ping time with {peer:?}: {rtt:?}",
942 );
943 }
944 Some(DiscoveryEvent::IncomingRecord { record: Record { key, value, publisher, expires }} ) => {
945 self.event_streams.send(Event::Dht(
946 DhtEvent::PutRecordRequest(
947 libp2p::kad::RecordKey::new(&key),
948 value,
949 publisher.map(Into::into),
950 expires,
951 )
952 ));
953 },
954
955 Some(DiscoveryEvent::RandomKademliaStarted) => {
956 if let Some(metrics) = self.metrics.as_ref() {
957 metrics.kademlia_random_queries_total.inc();
958 }
959 }
960 },
961 event = self.litep2p.next_event() => match event {
962 Some(Litep2pEvent::ConnectionEstablished { peer, endpoint }) => {
963 let Some(metrics) = &self.metrics else {
964 continue;
965 };
966
967 let direction = match endpoint {
968 Endpoint::Dialer { .. } => "out",
969 Endpoint::Listener { .. } => "in",
970 };
971 metrics.connections_opened_total.with_label_values(&[direction]).inc();
972
973 match self.peers.entry(peer) {
974 Entry::Vacant(entry) => {
975 entry.insert(ConnectionContext {
976 endpoints: HashMap::from_iter([(endpoint.connection_id(), endpoint)]),
977 num_connections: 1usize,
978 });
979 metrics.distinct_peers_connections_opened_total.inc();
980 }
981 Entry::Occupied(entry) => {
982 let entry = entry.into_mut();
983 entry.num_connections += 1;
984 entry.endpoints.insert(endpoint.connection_id(), endpoint);
985 }
986 }
987 }
988 Some(Litep2pEvent::ConnectionClosed { peer, connection_id }) => {
989 let Some(metrics) = &self.metrics else {
990 continue;
991 };
992
993 let Some(context) = self.peers.get_mut(&peer) else {
994 log::debug!(target: LOG_TARGET, "unknown peer disconnected: {peer:?} ({connection_id:?})");
995 continue
996 };
997
998 let direction = match context.endpoints.remove(&connection_id) {
999 None => {
1000 log::debug!(target: LOG_TARGET, "connection {connection_id:?} doesn't exist for {peer:?} ");
1001 continue
1002 }
1003 Some(endpoint) => {
1004 context.num_connections -= 1;
1005
1006 match endpoint {
1007 Endpoint::Dialer { .. } => "out",
1008 Endpoint::Listener { .. } => "in",
1009 }
1010 }
1011 };
1012
1013 metrics.connections_closed_total.with_label_values(&[direction, "actively-closed"]).inc();
1014
1015 if context.num_connections == 0 {
1016 self.peers.remove(&peer);
1017 metrics.distinct_peers_connections_closed_total.inc();
1018 }
1019 }
1020 Some(Litep2pEvent::DialFailure { address, error }) => {
1021 log::debug!(
1022 target: LOG_TARGET,
1023 "failed to dial peer at {address:?}: {error:?}",
1024 );
1025
1026 if let Some(metrics) = &self.metrics {
1027 let reason = match error {
1028 DialError::Timeout => "timeout",
1029 DialError::AddressError(_) => "invalid-address",
1030 DialError::DnsError(_) => "cannot-resolve-dns",
1031 DialError::NegotiationError(error) => match error {
1032 NegotiationError::Timeout => "timeout",
1033 NegotiationError::PeerIdMissing => "missing-peer-id",
1034 NegotiationError::StateMismatch => "state-mismatch",
1035 NegotiationError::PeerIdMismatch(_,_) => "peer-id-missmatch",
1036 NegotiationError::MultistreamSelectError(_) => "multistream-select-error",
1037 NegotiationError::SnowError(_) => "noise-error",
1038 NegotiationError::ParseError(_) => "parse-error",
1039 NegotiationError::IoError(_) => "io-error",
1040 NegotiationError::WebSocket(_) => "webscoket-error",
1041 }
1042 };
1043
1044 metrics.pending_connections_errors_total.with_label_values(&[&reason]).inc();
1045 }
1046 }
1047 Some(Litep2pEvent::ListDialFailures { errors }) => {
1048 log::debug!(
1049 target: LOG_TARGET,
1050 "failed to dial peer on multiple addresses {errors:?}",
1051 );
1052
1053 if let Some(metrics) = &self.metrics {
1054 metrics.pending_connections_errors_total.with_label_values(&["transport-errors"]).inc();
1055 }
1056 }
1057 _ => {}
1058 },
1059 }
1060 }
1061 }
1062}
1063
1064fn litep2p_to_libp2p_peer_record(records: RecordsType) -> Vec<PeerRecord> {
1066 match records {
1067 litep2p::protocol::libp2p::kademlia::RecordsType::LocalStore(record) => {
1068 vec![PeerRecord {
1069 record: P2PRecord {
1070 key: record.key.to_vec().into(),
1071 value: record.value,
1072 publisher: record.publisher.map(|peer_id| {
1073 let peer_id: sc_network_types::PeerId = peer_id.into();
1074 peer_id.into()
1075 }),
1076 expires: record.expires,
1077 },
1078 peer: None,
1079 }]
1080 },
1081 litep2p::protocol::libp2p::kademlia::RecordsType::Network(records) => records
1082 .into_iter()
1083 .map(|record| {
1084 let peer_id: sc_network_types::PeerId = record.peer.into();
1085
1086 PeerRecord {
1087 record: P2PRecord {
1088 key: record.record.key.to_vec().into(),
1089 value: record.record.value,
1090 publisher: record.record.publisher.map(|peer_id| {
1091 let peer_id: sc_network_types::PeerId = peer_id.into();
1092 peer_id.into()
1093 }),
1094 expires: record.record.expires,
1095 },
1096 peer: Some(peer_id.into()),
1097 }
1098 })
1099 .collect::<Vec<_>>(),
1100 }
1101}