1use crate::{
22 config::{
23 FullNetworkConfiguration, IncomingRequest, NodeKeyConfig, NotificationHandshake, Params,
24 SetConfig, TransportConfig,
25 },
26 error::Error,
27 event::{DhtEvent, Event},
28 litep2p::{
29 discovery::{Discovery, DiscoveryEvent},
30 peerstore::Peerstore,
31 service::{Litep2pNetworkService, NetworkServiceCommand},
32 shim::{
33 bitswap::BitswapServer,
34 notification::{
35 config::{NotificationProtocolConfig, ProtocolControlHandle},
36 peerset::PeersetCommand,
37 },
38 request_response::{RequestResponseConfig, RequestResponseProtocol},
39 },
40 },
41 peer_store::PeerStoreProvider,
42 service::{
43 metrics::{register_without_sources, MetricSources, Metrics, NotificationMetrics},
44 out_events,
45 traits::{BandwidthSink, NetworkBackend, NetworkService},
46 },
47 NetworkStatus, NotificationService, ProtocolName,
48};
49
50use codec::Encode;
51use futures::StreamExt;
52use litep2p::{
53 config::ConfigBuilder,
54 crypto::ed25519::Keypair,
55 error::{DialError, NegotiationError},
56 executor::Executor,
57 protocol::{
58 libp2p::{
59 bitswap::Config as BitswapConfig,
60 kademlia::{QueryId, Record},
61 },
62 request_response::ConfigBuilder as RequestResponseConfigBuilder,
63 },
64 transport::{
65 tcp::config::Config as TcpTransportConfig,
66 websocket::config::Config as WebSocketTransportConfig, ConnectionLimitsConfig, Endpoint,
67 },
68 types::{
69 multiaddr::{Multiaddr, Protocol},
70 ConnectionId,
71 },
72 Litep2p, Litep2pEvent, ProtocolName as Litep2pProtocolName,
73};
74use prometheus_endpoint::Registry;
75use sc_network_types::kad::{Key as RecordKey, PeerRecord, Record as P2PRecord};
76
77use sc_client_api::BlockBackend;
78use sc_network_common::{role::Roles, ExHashT};
79use sc_network_types::PeerId;
80use sc_utils::mpsc::{tracing_unbounded, TracingUnboundedReceiver};
81use sp_runtime::traits::Block as BlockT;
82
83use std::{
84 cmp,
85 collections::{hash_map::Entry, HashMap, HashSet},
86 fs,
87 future::Future,
88 iter,
89 pin::Pin,
90 sync::{
91 atomic::{AtomicUsize, Ordering},
92 Arc,
93 },
94 time::{Duration, Instant},
95};
96
97mod discovery;
98mod peerstore;
99mod service;
100mod shim;
101
102struct Litep2pBandwidthSink {
104 sink: litep2p::BandwidthSink,
105}
106
107impl BandwidthSink for Litep2pBandwidthSink {
108 fn total_inbound(&self) -> u64 {
109 self.sink.inbound() as u64
110 }
111
112 fn total_outbound(&self) -> u64 {
113 self.sink.outbound() as u64
114 }
115}
116
117struct Litep2pExecutor {
119 executor: Box<dyn Fn(Pin<Box<dyn Future<Output = ()> + Send>>) + Send + Sync>,
121}
122
123impl Executor for Litep2pExecutor {
124 fn run(&self, future: Pin<Box<dyn Future<Output = ()> + Send>>) {
125 (self.executor)(future)
126 }
127
128 fn run_with_name(&self, _: &'static str, future: Pin<Box<dyn Future<Output = ()> + Send>>) {
129 (self.executor)(future)
130 }
131}
132
133const LOG_TARGET: &str = "sub-libp2p";
135
136struct ConnectionContext {
138 endpoints: HashMap<ConnectionId, Endpoint>,
140
141 num_connections: usize,
143}
144
145#[derive(Debug)]
147enum KadQuery {
148 FindNode(PeerId, Instant),
150 GetValue(RecordKey, Instant),
152 PutValue(RecordKey, Instant),
154 GetProviders(RecordKey, Instant),
156}
157
158pub struct Litep2pNetworkBackend {
160 litep2p: Litep2p,
162
163 network_service: Arc<dyn NetworkService>,
165
166 cmd_rx: TracingUnboundedReceiver<NetworkServiceCommand>,
168
169 peerset_handles: HashMap<ProtocolName, ProtocolControlHandle>,
171
172 pending_queries: HashMap<QueryId, KadQuery>,
174
175 discovery: Discovery,
177
178 num_connected: Arc<AtomicUsize>,
180
181 peers: HashMap<litep2p::PeerId, ConnectionContext>,
183
184 peerstore_handle: Arc<dyn PeerStoreProvider>,
186
187 block_announce_protocol: ProtocolName,
189
190 event_streams: out_events::OutChannels,
192
193 metrics: Option<Metrics>,
195}
196
197impl Litep2pNetworkBackend {
198 fn parse_addresses(
201 addresses: impl Iterator<Item = Multiaddr>,
202 ) -> HashMap<PeerId, Vec<Multiaddr>> {
203 addresses
204 .into_iter()
205 .filter_map(|address| match address.iter().next() {
206 Some(
207 Protocol::Dns(_) |
208 Protocol::Dns4(_) |
209 Protocol::Dns6(_) |
210 Protocol::Ip6(_) |
211 Protocol::Ip4(_),
212 ) => match address.iter().find(|protocol| std::matches!(protocol, Protocol::P2p(_)))
213 {
214 Some(Protocol::P2p(multihash)) => PeerId::from_multihash(multihash.into())
215 .map_or(None, |peer| Some((peer, Some(address)))),
216 _ => None,
217 },
218 Some(Protocol::P2p(multihash)) =>
219 PeerId::from_multihash(multihash.into()).map_or(None, |peer| Some((peer, None))),
220 _ => None,
221 })
222 .fold(HashMap::new(), |mut acc, (peer, maybe_address)| {
223 let entry = acc.entry(peer).or_default();
224 maybe_address.map(|address| entry.push(address));
225
226 acc
227 })
228 }
229
230 fn add_addresses(&mut self, peers: impl Iterator<Item = Multiaddr>) -> HashSet<PeerId> {
232 Self::parse_addresses(peers.into_iter())
233 .into_iter()
234 .filter_map(|(peer, addresses)| {
235 if addresses.is_empty() {
237 return Some(peer)
238 }
239
240 if self.litep2p.add_known_address(peer.into(), addresses.clone().into_iter()) == 0 {
241 log::warn!(
242 target: LOG_TARGET,
243 "couldn't add any addresses for {peer:?} and it won't be added as reserved peer",
244 );
245 return None
246 }
247
248 self.peerstore_handle.add_known_peer(peer);
249 Some(peer)
250 })
251 .collect()
252 }
253}
254
255impl Litep2pNetworkBackend {
256 fn get_keypair(node_key: &NodeKeyConfig) -> Result<(Keypair, litep2p::PeerId), Error> {
258 let secret: litep2p::crypto::ed25519::SecretKey =
259 node_key.clone().into_keypair()?.secret().into();
260
261 let local_identity = Keypair::from(secret);
262 let local_public = local_identity.public();
263 let local_peer_id = local_public.to_peer_id();
264
265 Ok((local_identity, local_peer_id))
266 }
267
268 fn configure_transport<B: BlockT + 'static, H: ExHashT>(
270 config: &FullNetworkConfiguration<B, H, Self>,
271 ) -> ConfigBuilder {
272 let _ = match config.network_config.transport {
273 TransportConfig::MemoryOnly => panic!("memory transport not supported"),
274 TransportConfig::Normal { .. } => false,
275 };
276 let config_builder = ConfigBuilder::new();
277
278 let (tcp, websocket): (Vec<Option<_>>, Vec<Option<_>>) = config
279 .network_config
280 .listen_addresses
281 .iter()
282 .filter_map(|address| {
283 use sc_network_types::multiaddr::Protocol;
284
285 let mut iter = address.iter();
286
287 match iter.next() {
288 Some(Protocol::Ip4(_) | Protocol::Ip6(_)) => {},
289 protocol => {
290 log::error!(
291 target: LOG_TARGET,
292 "unknown protocol {protocol:?}, ignoring {address:?}",
293 );
294
295 return None
296 },
297 }
298
299 match iter.next() {
300 Some(Protocol::Tcp(_)) => match iter.next() {
301 Some(Protocol::Ws(_) | Protocol::Wss(_)) =>
302 Some((None, Some(address.clone()))),
303 Some(Protocol::P2p(_)) | None => Some((Some(address.clone()), None)),
304 protocol => {
305 log::error!(
306 target: LOG_TARGET,
307 "unknown protocol {protocol:?}, ignoring {address:?}",
308 );
309 None
310 },
311 },
312 protocol => {
313 log::error!(
314 target: LOG_TARGET,
315 "unknown protocol {protocol:?}, ignoring {address:?}",
316 );
317 None
318 },
319 }
320 })
321 .unzip();
322
323 config_builder
324 .with_websocket(WebSocketTransportConfig {
325 listen_addresses: websocket.into_iter().flatten().map(Into::into).collect(),
326 yamux_config: litep2p::yamux::Config::default(),
327 nodelay: true,
328 ..Default::default()
329 })
330 .with_tcp(TcpTransportConfig {
331 listen_addresses: tcp.into_iter().flatten().map(Into::into).collect(),
332 yamux_config: litep2p::yamux::Config::default(),
333 nodelay: true,
334 ..Default::default()
335 })
336 }
337}
338
339#[async_trait::async_trait]
340impl<B: BlockT + 'static, H: ExHashT> NetworkBackend<B, H> for Litep2pNetworkBackend {
341 type NotificationProtocolConfig = NotificationProtocolConfig;
342 type RequestResponseProtocolConfig = RequestResponseConfig;
343 type NetworkService<Block, Hash> = Arc<Litep2pNetworkService>;
344 type PeerStore = Peerstore;
345 type BitswapConfig = BitswapConfig;
346
347 fn new(mut params: Params<B, H, Self>) -> Result<Self, Error>
348 where
349 Self: Sized,
350 {
351 let (keypair, local_peer_id) =
352 Self::get_keypair(¶ms.network_config.network_config.node_key)?;
353 let (cmd_tx, cmd_rx) = tracing_unbounded("mpsc_network_worker", 100_000);
354
355 params.network_config.network_config.boot_nodes = params
356 .network_config
357 .network_config
358 .boot_nodes
359 .into_iter()
360 .filter(|boot_node| boot_node.peer_id != local_peer_id.into())
361 .collect();
362 params.network_config.network_config.default_peers_set.reserved_nodes = params
363 .network_config
364 .network_config
365 .default_peers_set
366 .reserved_nodes
367 .into_iter()
368 .filter(|reserved_node| {
369 if reserved_node.peer_id == local_peer_id.into() {
370 log::warn!(
371 target: LOG_TARGET,
372 "Local peer ID used in reserved node, ignoring: {reserved_node}",
373 );
374 false
375 } else {
376 true
377 }
378 })
379 .collect();
380
381 if let Some(path) = ¶ms.network_config.network_config.net_config_path {
382 fs::create_dir_all(path)?;
383 }
384
385 log::info!(target: LOG_TARGET, "Local node identity is: {local_peer_id}");
386 log::info!(target: LOG_TARGET, "Running litep2p network backend");
387
388 params.network_config.sanity_check_addresses()?;
389 params.network_config.sanity_check_bootnodes()?;
390
391 let mut config_builder =
392 Self::configure_transport(¶ms.network_config).with_keypair(keypair.clone());
393 let known_addresses = params.network_config.known_addresses();
394 let peer_store_handle = params.network_config.peer_store_handle();
395 let executor = Arc::new(Litep2pExecutor { executor: params.executor });
396
397 let FullNetworkConfiguration {
398 notification_protocols,
399 request_response_protocols,
400 network_config,
401 ..
402 } = params.network_config;
403
404 let block_announce_protocol = params.block_announce_config.protocol_name().clone();
410 let mut notif_protocols = HashMap::from_iter([(
411 params.block_announce_config.protocol_name().clone(),
412 params.block_announce_config.handle,
413 )]);
414
415 config_builder = notification_protocols
417 .into_iter()
418 .fold(config_builder, |config_builder, mut config| {
419 config.config.set_handshake(Roles::from(¶ms.role).encode());
420 notif_protocols.insert(config.protocol_name, config.handle);
421
422 config_builder.with_notification_protocol(config.config)
423 })
424 .with_notification_protocol(params.block_announce_config.config);
425
426 let metrics = match ¶ms.metrics_registry {
428 Some(registry) => Some(register_without_sources(registry)?),
429 None => None,
430 };
431
432 let (mut request_response_receivers, request_response_senders): (
438 HashMap<_, _>,
439 HashMap<_, _>,
440 ) = request_response_protocols
441 .iter()
442 .map(|config| {
443 let (tx, rx) = tracing_unbounded("outbound-requests", 10_000);
444 ((config.protocol_name.clone(), rx), (config.protocol_name.clone(), tx))
445 })
446 .unzip();
447
448 config_builder = request_response_protocols.into_iter().fold(
449 config_builder,
450 |config_builder, config| {
451 let (protocol_config, handle) = RequestResponseConfigBuilder::new(
452 Litep2pProtocolName::from(config.protocol_name.clone()),
453 )
454 .with_max_size(cmp::max(config.max_request_size, config.max_response_size) as usize)
455 .with_fallback_names(config.fallback_names.into_iter().map(From::from).collect())
456 .with_timeout(config.request_timeout)
457 .build();
458
459 let protocol = RequestResponseProtocol::new(
460 config.protocol_name.clone(),
461 handle,
462 Arc::clone(&peer_store_handle),
463 config.inbound_queue,
464 request_response_receivers
465 .remove(&config.protocol_name)
466 .expect("receiver exists as it was just added and there are no duplicate protocols; qed"),
467 request_response_senders.clone(),
468 metrics.clone(),
469 );
470
471 executor.run(Box::pin(async move {
472 protocol.run().await;
473 }));
474
475 config_builder.with_request_response_protocol(protocol_config)
476 },
477 );
478
479 let known_addresses: HashMap<litep2p::PeerId, Vec<Multiaddr>> =
481 known_addresses.into_iter().fold(HashMap::new(), |mut acc, (peer, address)| {
482 use sc_network_types::multiaddr::Protocol;
483
484 let address = match address.iter().last() {
485 Some(Protocol::Ws(_) | Protocol::Wss(_) | Protocol::Tcp(_)) =>
486 address.with(Protocol::P2p(peer.into())),
487 Some(Protocol::P2p(_)) => address,
488 _ => return acc,
489 };
490
491 acc.entry(peer.into()).or_default().push(address.into());
492 peer_store_handle.add_known_peer(peer);
493
494 acc
495 });
496
497 let listen_addresses = Arc::new(Default::default());
499 let (discovery, ping_config, identify_config, kademlia_config, maybe_mdns_config) =
500 Discovery::new(
501 local_peer_id,
502 &network_config,
503 params.genesis_hash,
504 params.fork_id.as_deref(),
505 ¶ms.protocol_id,
506 known_addresses.clone(),
507 Arc::clone(&listen_addresses),
508 Arc::clone(&peer_store_handle),
509 );
510
511 config_builder = config_builder
512 .with_known_addresses(known_addresses.clone().into_iter())
513 .with_libp2p_ping(ping_config)
514 .with_libp2p_identify(identify_config)
515 .with_libp2p_kademlia(kademlia_config)
516 .with_connection_limits(ConnectionLimitsConfig::default().max_incoming_connections(
517 Some(crate::MAX_CONNECTIONS_ESTABLISHED_INCOMING as usize),
518 ))
519 .with_keep_alive_timeout(network_config.idle_connection_timeout)
520 .with_system_resolver()
523 .with_executor(executor);
524
525 if let Some(config) = maybe_mdns_config {
526 config_builder = config_builder.with_mdns(config);
527 }
528
529 if let Some(config) = params.bitswap_config {
530 config_builder = config_builder.with_libp2p_bitswap(config);
531 }
532
533 let litep2p =
534 Litep2p::new(config_builder.build()).map_err(|error| Error::Litep2p(error))?;
535
536 litep2p.listen_addresses().for_each(|address| {
537 log::debug!(target: LOG_TARGET, "listening on: {address}");
538
539 listen_addresses.write().insert(address.clone());
540 });
541
542 let public_addresses = litep2p.public_addresses();
543 for address in network_config.public_addresses.iter() {
544 if let Err(err) = public_addresses.add_address(address.clone().into()) {
545 log::warn!(
546 target: LOG_TARGET,
547 "failed to add public address {address:?}: {err:?}",
548 );
549 }
550 }
551
552 let network_service = Arc::new(Litep2pNetworkService::new(
553 local_peer_id,
554 keypair.clone(),
555 cmd_tx,
556 Arc::clone(&peer_store_handle),
557 notif_protocols.clone(),
558 block_announce_protocol.clone(),
559 request_response_senders,
560 Arc::clone(&listen_addresses),
561 public_addresses,
562 ));
563
564 let num_connected = Arc::new(Default::default());
566 let bandwidth: Arc<dyn BandwidthSink> =
567 Arc::new(Litep2pBandwidthSink { sink: litep2p.bandwidth_sink() });
568
569 if let Some(registry) = ¶ms.metrics_registry {
570 MetricSources::register(registry, bandwidth, Arc::clone(&num_connected))?;
571 }
572
573 Ok(Self {
574 network_service,
575 cmd_rx,
576 metrics,
577 peerset_handles: notif_protocols,
578 num_connected,
579 discovery,
580 pending_queries: HashMap::new(),
581 peerstore_handle: peer_store_handle,
582 block_announce_protocol,
583 event_streams: out_events::OutChannels::new(None)?,
584 peers: HashMap::new(),
585 litep2p,
586 })
587 }
588
589 fn network_service(&self) -> Arc<dyn NetworkService> {
590 Arc::clone(&self.network_service)
591 }
592
593 fn peer_store(
594 bootnodes: Vec<sc_network_types::PeerId>,
595 metrics_registry: Option<Registry>,
596 ) -> Self::PeerStore {
597 Peerstore::new(bootnodes, metrics_registry)
598 }
599
600 fn register_notification_metrics(registry: Option<&Registry>) -> NotificationMetrics {
601 NotificationMetrics::new(registry)
602 }
603
604 fn bitswap_server(
606 client: Arc<dyn BlockBackend<B> + Send + Sync>,
607 ) -> (Pin<Box<dyn Future<Output = ()> + Send>>, Self::BitswapConfig) {
608 BitswapServer::new(client)
609 }
610
611 fn notification_config(
613 protocol_name: ProtocolName,
614 fallback_names: Vec<ProtocolName>,
615 max_notification_size: u64,
616 handshake: Option<NotificationHandshake>,
617 set_config: SetConfig,
618 metrics: NotificationMetrics,
619 peerstore_handle: Arc<dyn PeerStoreProvider>,
620 ) -> (Self::NotificationProtocolConfig, Box<dyn NotificationService>) {
621 Self::NotificationProtocolConfig::new(
622 protocol_name,
623 fallback_names,
624 max_notification_size as usize,
625 handshake,
626 set_config,
627 metrics,
628 peerstore_handle,
629 )
630 }
631
632 fn request_response_config(
634 protocol_name: ProtocolName,
635 fallback_names: Vec<ProtocolName>,
636 max_request_size: u64,
637 max_response_size: u64,
638 request_timeout: Duration,
639 inbound_queue: Option<async_channel::Sender<IncomingRequest>>,
640 ) -> Self::RequestResponseProtocolConfig {
641 Self::RequestResponseProtocolConfig::new(
642 protocol_name,
643 fallback_names,
644 max_request_size,
645 max_response_size,
646 request_timeout,
647 inbound_queue,
648 )
649 }
650
651 async fn run(mut self) {
653 log::debug!(target: LOG_TARGET, "starting litep2p network backend");
654
655 loop {
656 let num_connected_peers = self
657 .peerset_handles
658 .get(&self.block_announce_protocol)
659 .map_or(0usize, |handle| handle.connected_peers.load(Ordering::Relaxed));
660 self.num_connected.store(num_connected_peers, Ordering::Relaxed);
661
662 tokio::select! {
663 command = self.cmd_rx.next() => match command {
664 None => return,
665 Some(command) => match command {
666 NetworkServiceCommand::FindClosestPeers { target } => {
667 let query_id = self.discovery.find_node(target.into()).await;
668 self.pending_queries.insert(query_id, KadQuery::FindNode(target, Instant::now()));
669 }
670 NetworkServiceCommand::GetValue{ key } => {
671 let query_id = self.discovery.get_value(key.clone()).await;
672 self.pending_queries.insert(query_id, KadQuery::GetValue(key, Instant::now()));
673 }
674 NetworkServiceCommand::PutValue { key, value } => {
675 let query_id = self.discovery.put_value(key.clone(), value).await;
676 self.pending_queries.insert(query_id, KadQuery::PutValue(key, Instant::now()));
677 }
678 NetworkServiceCommand::PutValueTo { record, peers, update_local_storage} => {
679 let kademlia_key = record.key.clone();
680 let query_id = self.discovery.put_value_to_peers(record.into(), peers, update_local_storage).await;
681 self.pending_queries.insert(query_id, KadQuery::PutValue(kademlia_key, Instant::now()));
682 }
683 NetworkServiceCommand::StoreRecord { key, value, publisher, expires } => {
684 self.discovery.store_record(key, value, publisher.map(Into::into), expires).await;
685 }
686 NetworkServiceCommand::StartProviding { key } => {
687 self.discovery.start_providing(key).await;
688 }
689 NetworkServiceCommand::StopProviding { key } => {
690 self.discovery.stop_providing(key).await;
691 }
692 NetworkServiceCommand::GetProviders { key } => {
693 let query_id = self.discovery.get_providers(key.clone()).await;
694 self.pending_queries.insert(query_id, KadQuery::GetProviders(key, Instant::now()));
695 }
696 NetworkServiceCommand::EventStream { tx } => {
697 self.event_streams.push(tx);
698 }
699 NetworkServiceCommand::Status { tx } => {
700 let _ = tx.send(NetworkStatus {
701 num_connected_peers: self
702 .peerset_handles
703 .get(&self.block_announce_protocol)
704 .map_or(0usize, |handle| handle.connected_peers.load(Ordering::Relaxed)),
705 total_bytes_inbound: self.litep2p.bandwidth_sink().inbound() as u64,
706 total_bytes_outbound: self.litep2p.bandwidth_sink().outbound() as u64,
707 });
708 }
709 NetworkServiceCommand::AddPeersToReservedSet {
710 protocol,
711 peers,
712 } => {
713 let peers = self.add_addresses(peers.into_iter().map(Into::into));
714
715 match self.peerset_handles.get(&protocol) {
716 Some(handle) => {
717 let _ = handle.tx.unbounded_send(PeersetCommand::AddReservedPeers { peers });
718 }
719 None => log::warn!(target: LOG_TARGET, "protocol {protocol} doens't exist"),
720 };
721 }
722 NetworkServiceCommand::AddKnownAddress { peer, address } => {
723 let mut address: Multiaddr = address.into();
724
725 if !address.iter().any(|protocol| std::matches!(protocol, Protocol::P2p(_))) {
726 address.push(Protocol::P2p(litep2p::PeerId::from(peer).into()));
727 }
728
729 if self.litep2p.add_known_address(peer.into(), iter::once(address.clone())) > 0 {
730 self.peerstore_handle.add_known_peer(peer);
734 } else {
735 log::debug!(
736 target: LOG_TARGET,
737 "couldn't add known address ({address}) for {peer:?}, unsupported transport"
738 );
739 }
740 },
741 NetworkServiceCommand::SetReservedPeers { protocol, peers } => {
742 let peers = self.add_addresses(peers.into_iter().map(Into::into));
743
744 match self.peerset_handles.get(&protocol) {
745 Some(handle) => {
746 let _ = handle.tx.unbounded_send(PeersetCommand::SetReservedPeers { peers });
747 }
748 None => log::warn!(target: LOG_TARGET, "protocol {protocol} doens't exist"),
749 }
750
751 },
752 NetworkServiceCommand::DisconnectPeer {
753 protocol,
754 peer,
755 } => {
756 let Some(handle) = self.peerset_handles.get(&protocol) else {
757 log::warn!(target: LOG_TARGET, "protocol {protocol} doens't exist");
758 continue
759 };
760
761 let _ = handle.tx.unbounded_send(PeersetCommand::DisconnectPeer { peer });
762 }
763 NetworkServiceCommand::SetReservedOnly {
764 protocol,
765 reserved_only,
766 } => {
767 let Some(handle) = self.peerset_handles.get(&protocol) else {
768 log::warn!(target: LOG_TARGET, "protocol {protocol} doens't exist");
769 continue
770 };
771
772 let _ = handle.tx.unbounded_send(PeersetCommand::SetReservedOnly { reserved_only });
773 }
774 NetworkServiceCommand::RemoveReservedPeers {
775 protocol,
776 peers,
777 } => {
778 let Some(handle) = self.peerset_handles.get(&protocol) else {
779 log::warn!(target: LOG_TARGET, "protocol {protocol} doens't exist");
780 continue
781 };
782
783 let _ = handle.tx.unbounded_send(PeersetCommand::RemoveReservedPeers { peers });
784 }
785 }
786 },
787 event = self.discovery.next() => match event {
788 None => return,
789 Some(DiscoveryEvent::Discovered { addresses }) => {
790 for (peer, addresses) in Litep2pNetworkBackend::parse_addresses(addresses.into_iter()) {
792 if self.litep2p.add_known_address(peer.into(), addresses.clone().into_iter()) > 0 {
793 self.peerstore_handle.add_known_peer(peer);
794 }
795 }
796 }
797 Some(DiscoveryEvent::RoutingTableUpdate { peers }) => {
798 for peer in peers {
799 self.peerstore_handle.add_known_peer(peer.into());
800 }
801 }
802 Some(DiscoveryEvent::FindNodeSuccess { query_id, target, peers }) => {
803 match self.pending_queries.remove(&query_id) {
804 Some(KadQuery::FindNode(_, started)) => {
805 log::trace!(
806 target: LOG_TARGET,
807 "`FIND_NODE` for {target:?} ({query_id:?}) succeeded",
808 );
809
810 self.event_streams.send(
811 Event::Dht(
812 DhtEvent::ClosestPeersFound(
813 target.into(),
814 peers
815 .into_iter()
816 .map(|(peer, addrs)| (
817 peer.into(),
818 addrs.into_iter().map(Into::into).collect(),
819 ))
820 .collect(),
821 )
822 )
823 );
824
825 if let Some(ref metrics) = self.metrics {
826 metrics
827 .kademlia_query_duration
828 .with_label_values(&["node-find"])
829 .observe(started.elapsed().as_secs_f64());
830 }
831 },
832 query => {
833 log::error!(
834 target: LOG_TARGET,
835 "Missing/invalid pending query for `FIND_NODE`: {query:?}"
836 );
837 debug_assert!(false);
838 }
839 }
840 },
841 Some(DiscoveryEvent::GetRecordPartialResult { query_id, record }) => {
842 if !self.pending_queries.contains_key(&query_id) {
843 log::error!(
844 target: LOG_TARGET,
845 "Missing/invalid pending query for `GET_VALUE` partial result: {query_id:?}"
846 );
847
848 continue
849 }
850
851 let peer_id: sc_network_types::PeerId = record.peer.into();
852 let record = PeerRecord {
853 record: P2PRecord {
854 key: record.record.key.to_vec().into(),
855 value: record.record.value,
856 publisher: record.record.publisher.map(|peer_id| {
857 let peer_id: sc_network_types::PeerId = peer_id.into();
858 peer_id.into()
859 }),
860 expires: record.record.expires,
861 },
862 peer: Some(peer_id.into()),
863 };
864
865 self.event_streams.send(
866 Event::Dht(
867 DhtEvent::ValueFound(
868 record.into()
869 )
870 )
871 );
872 }
873 Some(DiscoveryEvent::GetRecordSuccess { query_id }) => {
874 match self.pending_queries.remove(&query_id) {
875 Some(KadQuery::GetValue(key, started)) => {
876 log::trace!(
877 target: LOG_TARGET,
878 "`GET_VALUE` for {key:?} ({query_id:?}) succeeded",
879 );
880
881 if let Some(ref metrics) = self.metrics {
882 metrics
883 .kademlia_query_duration
884 .with_label_values(&["value-get"])
885 .observe(started.elapsed().as_secs_f64());
886 }
887 },
888 query => {
889 log::error!(
890 target: LOG_TARGET,
891 "Missing/invalid pending query for `GET_VALUE`: {query:?}"
892 );
893 debug_assert!(false);
894 },
895 }
896 }
897 Some(DiscoveryEvent::PutRecordSuccess { query_id }) => {
898 match self.pending_queries.remove(&query_id) {
899 Some(KadQuery::PutValue(key, started)) => {
900 log::trace!(
901 target: LOG_TARGET,
902 "`PUT_VALUE` for {key:?} ({query_id:?}) succeeded",
903 );
904
905 self.event_streams.send(Event::Dht(
906 DhtEvent::ValuePut(key)
907 ));
908
909 if let Some(ref metrics) = self.metrics {
910 metrics
911 .kademlia_query_duration
912 .with_label_values(&["value-put"])
913 .observe(started.elapsed().as_secs_f64());
914 }
915 },
916 query => {
917 log::error!(
918 target: LOG_TARGET,
919 "Missing/invalid pending query for `PUT_VALUE`: {query:?}"
920 );
921 debug_assert!(false);
922 }
923 }
924 }
925 Some(DiscoveryEvent::GetProvidersSuccess { query_id, providers }) => {
926 match self.pending_queries.remove(&query_id) {
927 Some(KadQuery::GetProviders(key, started)) => {
928 log::trace!(
929 target: LOG_TARGET,
930 "`GET_PROVIDERS` for {key:?} ({query_id:?}) succeeded",
931 );
932
933 providers.iter().for_each(|p| {
938 self.litep2p.add_known_address(p.peer, p.addresses.clone().into_iter());
939 });
940
941 self.event_streams.send(Event::Dht(
942 DhtEvent::ProvidersFound(
943 key.clone().into(),
944 providers.into_iter().map(|p| p.peer.into()).collect()
945 )
946 ));
947
948 self.event_streams.send(Event::Dht(
951 DhtEvent::NoMoreProviders(key.into())
952 ));
953
954 if let Some(ref metrics) = self.metrics {
955 metrics
956 .kademlia_query_duration
957 .with_label_values(&["providers-get"])
958 .observe(started.elapsed().as_secs_f64());
959 }
960 },
961 query => {
962 log::error!(
963 target: LOG_TARGET,
964 "Missing/invalid pending query for `GET_PROVIDERS`: {query:?}"
965 );
966 debug_assert!(false);
967 }
968 }
969 }
970 Some(DiscoveryEvent::QueryFailed { query_id }) => {
971 match self.pending_queries.remove(&query_id) {
972 Some(KadQuery::FindNode(peer_id, started)) => {
973 log::debug!(
974 target: LOG_TARGET,
975 "`FIND_NODE` ({query_id:?}) failed for target {peer_id:?}",
976 );
977
978 self.event_streams.send(Event::Dht(
979 DhtEvent::ClosestPeersNotFound(peer_id.into())
980 ));
981
982 if let Some(ref metrics) = self.metrics {
983 metrics
984 .kademlia_query_duration
985 .with_label_values(&["node-find-failed"])
986 .observe(started.elapsed().as_secs_f64());
987 }
988 },
989 Some(KadQuery::GetValue(key, started)) => {
990 log::debug!(
991 target: LOG_TARGET,
992 "`GET_VALUE` ({query_id:?}) failed for key {key:?}",
993 );
994
995 self.event_streams.send(Event::Dht(
996 DhtEvent::ValueNotFound(key)
997 ));
998
999 if let Some(ref metrics) = self.metrics {
1000 metrics
1001 .kademlia_query_duration
1002 .with_label_values(&["value-get-failed"])
1003 .observe(started.elapsed().as_secs_f64());
1004 }
1005 },
1006 Some(KadQuery::PutValue(key, started)) => {
1007 log::debug!(
1008 target: LOG_TARGET,
1009 "`PUT_VALUE` ({query_id:?}) failed for key {key:?}",
1010 );
1011
1012 self.event_streams.send(Event::Dht(
1013 DhtEvent::ValuePutFailed(key)
1014 ));
1015
1016 if let Some(ref metrics) = self.metrics {
1017 metrics
1018 .kademlia_query_duration
1019 .with_label_values(&["value-put-failed"])
1020 .observe(started.elapsed().as_secs_f64());
1021 }
1022 },
1023 Some(KadQuery::GetProviders(key, started)) => {
1024 log::debug!(
1025 target: LOG_TARGET,
1026 "`GET_PROVIDERS` ({query_id:?}) failed for key {key:?}"
1027 );
1028
1029 self.event_streams.send(Event::Dht(
1030 DhtEvent::ProvidersNotFound(key)
1031 ));
1032
1033 if let Some(ref metrics) = self.metrics {
1034 metrics
1035 .kademlia_query_duration
1036 .with_label_values(&["providers-get-failed"])
1037 .observe(started.elapsed().as_secs_f64());
1038 }
1039 },
1040 None => {
1041 log::warn!(
1042 target: LOG_TARGET,
1043 "non-existent query failed ({query_id:?})",
1044 );
1045 }
1046 }
1047 }
1048 Some(DiscoveryEvent::Identified { peer, listen_addresses, supported_protocols, .. }) => {
1049 self.discovery.add_self_reported_address(peer, supported_protocols, listen_addresses).await;
1050 }
1051 Some(DiscoveryEvent::ExternalAddressDiscovered { address }) => {
1052 match self.litep2p.public_addresses().add_address(address.clone().into()) {
1053 Ok(inserted) => if inserted {
1054 log::info!(target: LOG_TARGET, "๐ Discovered new external address for our node: {address}");
1055 },
1056 Err(err) => {
1057 log::warn!(
1058 target: LOG_TARGET,
1059 "๐ Failed to add discovered external address {address:?}: {err:?}",
1060 );
1061 },
1062 }
1063 }
1064 Some(DiscoveryEvent::ExternalAddressExpired{ address }) => {
1065 let local_peer_id = self.litep2p.local_peer_id();
1066
1067 let address = if !std::matches!(address.iter().last(), Some(Protocol::P2p(_))) {
1069 address.with(Protocol::P2p(*local_peer_id.as_ref()))
1070 } else {
1071 address
1072 };
1073
1074 if self.litep2p.public_addresses().remove_address(&address) {
1075 log::info!(target: LOG_TARGET, "๐ Expired external address for our node: {address}");
1076 } else {
1077 log::warn!(
1078 target: LOG_TARGET,
1079 "๐ Failed to remove expired external address {address:?}"
1080 );
1081 }
1082 }
1083 Some(DiscoveryEvent::Ping { peer, rtt }) => {
1084 log::trace!(
1085 target: LOG_TARGET,
1086 "ping time with {peer:?}: {rtt:?}",
1087 );
1088 }
1089 Some(DiscoveryEvent::IncomingRecord { record: Record { key, value, publisher, expires }} ) => {
1090 self.event_streams.send(Event::Dht(
1091 DhtEvent::PutRecordRequest(
1092 key.into(),
1093 value,
1094 publisher.map(Into::into),
1095 expires,
1096 )
1097 ));
1098 },
1099
1100 Some(DiscoveryEvent::RandomKademliaStarted) => {
1101 if let Some(metrics) = self.metrics.as_ref() {
1102 metrics.kademlia_random_queries_total.inc();
1103 }
1104 }
1105 },
1106 event = self.litep2p.next_event() => match event {
1107 Some(Litep2pEvent::ConnectionEstablished { peer, endpoint }) => {
1108 let Some(metrics) = &self.metrics else {
1109 continue;
1110 };
1111
1112 let direction = match endpoint {
1113 Endpoint::Dialer { .. } => "out",
1114 Endpoint::Listener { .. } => {
1115 metrics.incoming_connections_total.inc();
1120
1121 "in"
1122 },
1123 };
1124 metrics.connections_opened_total.with_label_values(&[direction]).inc();
1125
1126 match self.peers.entry(peer) {
1127 Entry::Vacant(entry) => {
1128 entry.insert(ConnectionContext {
1129 endpoints: HashMap::from_iter([(endpoint.connection_id(), endpoint)]),
1130 num_connections: 1usize,
1131 });
1132 metrics.distinct_peers_connections_opened_total.inc();
1133 }
1134 Entry::Occupied(entry) => {
1135 let entry = entry.into_mut();
1136 entry.num_connections += 1;
1137 entry.endpoints.insert(endpoint.connection_id(), endpoint);
1138 }
1139 }
1140 }
1141 Some(Litep2pEvent::ConnectionClosed { peer, connection_id }) => {
1142 let Some(metrics) = &self.metrics else {
1143 continue;
1144 };
1145
1146 let Some(context) = self.peers.get_mut(&peer) else {
1147 log::debug!(target: LOG_TARGET, "unknown peer disconnected: {peer:?} ({connection_id:?})");
1148 continue
1149 };
1150
1151 let direction = match context.endpoints.remove(&connection_id) {
1152 None => {
1153 log::debug!(target: LOG_TARGET, "connection {connection_id:?} doesn't exist for {peer:?} ");
1154 continue
1155 }
1156 Some(endpoint) => {
1157 context.num_connections -= 1;
1158
1159 match endpoint {
1160 Endpoint::Dialer { .. } => "out",
1161 Endpoint::Listener { .. } => "in",
1162 }
1163 }
1164 };
1165
1166 metrics.connections_closed_total.with_label_values(&[direction, "actively-closed"]).inc();
1167
1168 if context.num_connections == 0 {
1169 self.peers.remove(&peer);
1170 metrics.distinct_peers_connections_closed_total.inc();
1171 }
1172 }
1173 Some(Litep2pEvent::DialFailure { address, error }) => {
1174 log::debug!(
1175 target: LOG_TARGET,
1176 "failed to dial peer at {address:?}: {error:?}",
1177 );
1178
1179 if let Some(metrics) = &self.metrics {
1180 let reason = match error {
1181 DialError::Timeout => "timeout",
1182 DialError::AddressError(_) => "invalid-address",
1183 DialError::DnsError(_) => "cannot-resolve-dns",
1184 DialError::NegotiationError(error) => match error {
1185 NegotiationError::Timeout => "timeout",
1186 NegotiationError::PeerIdMissing => "missing-peer-id",
1187 NegotiationError::StateMismatch => "state-mismatch",
1188 NegotiationError::PeerIdMismatch(_,_) => "peer-id-missmatch",
1189 NegotiationError::MultistreamSelectError(_) => "multistream-select-error",
1190 NegotiationError::SnowError(_) => "noise-error",
1191 NegotiationError::ParseError(_) => "parse-error",
1192 NegotiationError::IoError(_) => "io-error",
1193 NegotiationError::WebSocket(_) => "webscoket-error",
1194 NegotiationError::BadSignature => "bad-signature",
1195 }
1196 };
1197
1198 metrics.pending_connections_errors_total.with_label_values(&[&reason]).inc();
1199 }
1200 }
1201 Some(Litep2pEvent::ListDialFailures { errors }) => {
1202 log::debug!(
1203 target: LOG_TARGET,
1204 "failed to dial peer on multiple addresses {errors:?}",
1205 );
1206
1207 if let Some(metrics) = &self.metrics {
1208 metrics.pending_connections_errors_total.with_label_values(&["transport-errors"]).inc();
1209 }
1210 }
1211 None => {
1212 log::error!(
1213 target: LOG_TARGET,
1214 "Litep2p backend terminated"
1215 );
1216 return
1217 }
1218 },
1219 }
1220 }
1221 }
1222}