sc_network/litep2p/
mod.rs

1// This file is part of Substrate.
2
3// Copyright (C) Parity Technologies (UK) Ltd.
4// SPDX-License-Identifier: GPL-3.0-or-later WITH Classpath-exception-2.0
5
6// This program is free software: you can redistribute it and/or modify
7// it under the terms of the GNU General Public License as published by
8// the Free Software Foundation, either version 3 of the License, or
9// (at your option) any later version.
10
11// This program is distributed in the hope that it will be useful,
12// but WITHOUT ANY WARRANTY; without even the implied warranty of
13// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
14// GNU General Public License for more details.
15
16// You should have received a copy of the GNU General Public License
17// along with this program. If not, see <https://www.gnu.org/licenses/>.
18
19//! `NetworkBackend` implementation for `litep2p`.
20
21use crate::{
22	config::{
23		FullNetworkConfiguration, IncomingRequest, NodeKeyConfig, NotificationHandshake, Params,
24		SetConfig, TransportConfig,
25	},
26	error::Error,
27	event::{DhtEvent, Event},
28	litep2p::{
29		bitswap::BitswapService,
30		discovery::{Discovery, DiscoveryEvent},
31		ipfs_dht::IpfsDht,
32		peerstore::Peerstore,
33		service::{Litep2pNetworkService, NetworkServiceCommand},
34		shim::{
35			notification::{
36				config::{NotificationProtocolConfig, ProtocolControlHandle},
37				peerset::PeersetCommand,
38			},
39			request_response::{RequestResponseConfig, RequestResponseProtocol},
40		},
41	},
42	peer_store::PeerStoreProvider,
43	service::{
44		metrics::{register_without_sources, MetricSources, Metrics, NotificationMetrics},
45		out_events,
46		traits::{BandwidthSink, NetworkBackend, NetworkService},
47	},
48	NetworkStatus, NotificationService, ProtocolName,
49};
50
51use codec::Encode;
52use futures::StreamExt;
53use litep2p::{
54	config::ConfigBuilder,
55	crypto::ed25519::Keypair,
56	error::{DialError, NegotiationError},
57	executor::Executor,
58	protocol::{
59		libp2p::kademlia::{QueryId, Record},
60		request_response::ConfigBuilder as RequestResponseConfigBuilder,
61	},
62	transport::{
63		tcp::config::Config as TcpTransportConfig,
64		websocket::config::Config as WebSocketTransportConfig, ConnectionLimitsConfig, Endpoint,
65	},
66	types::{
67		multiaddr::{Multiaddr, Protocol},
68		ConnectionId,
69	},
70	Litep2p, Litep2pEvent, ProtocolName as Litep2pProtocolName,
71};
72use prometheus_endpoint::Registry;
73use sc_network_types::kad::{Key as RecordKey, PeerRecord, Record as P2PRecord};
74
75use sc_client_api::BlockBackend;
76use sc_network_common::{role::Roles, ExHashT};
77use sc_network_types::PeerId;
78use sc_utils::mpsc::{tracing_unbounded, TracingUnboundedReceiver};
79use sp_runtime::traits::Block as BlockT;
80
81use std::{
82	cmp,
83	collections::{hash_map::Entry, HashMap, HashSet},
84	fs,
85	future::Future,
86	iter,
87	pin::Pin,
88	sync::{
89		atomic::{AtomicUsize, Ordering},
90		Arc,
91	},
92	time::{Duration, Instant},
93};
94
95mod bitswap;
96mod bitswap_metrics;
97mod discovery;
98mod ipfs_dht;
99mod peerstore;
100mod service;
101mod shim;
102
103/// Litep2p bandwidth sink.
104struct Litep2pBandwidthSink {
105	sink: litep2p::BandwidthSink,
106}
107
108impl BandwidthSink for Litep2pBandwidthSink {
109	fn total_inbound(&self) -> u64 {
110		self.sink.inbound() as u64
111	}
112
113	fn total_outbound(&self) -> u64 {
114		self.sink.outbound() as u64
115	}
116}
117
118/// Litep2p task executor.
119struct Litep2pExecutor {
120	/// Executor.
121	executor: Box<dyn Fn(Pin<Box<dyn Future<Output = ()> + Send>>) + Send + Sync>,
122}
123
124impl Executor for Litep2pExecutor {
125	fn run(&self, future: Pin<Box<dyn Future<Output = ()> + Send>>) {
126		(self.executor)(future)
127	}
128
129	fn run_with_name(&self, _: &'static str, future: Pin<Box<dyn Future<Output = ()> + Send>>) {
130		(self.executor)(future)
131	}
132}
133
134/// Logging target for the file.
135const LOG_TARGET: &str = "sub-libp2p";
136
137/// Peer context.
138struct ConnectionContext {
139	/// Peer endpoints.
140	endpoints: HashMap<ConnectionId, Endpoint>,
141
142	/// Number of active connections.
143	num_connections: usize,
144}
145
146/// Kademlia query we are tracking.
147#[derive(Debug)]
148enum KadQuery {
149	/// `FIND_NODE` query for target and when it was initiated.
150	FindNode(PeerId, Instant),
151	/// `GET_VALUE` query for key and when it was initiated.
152	GetValue(RecordKey, Instant),
153	/// `PUT_VALUE` query for key and when it was initiated.
154	PutValue(RecordKey, Instant),
155	/// `GET_PROVIDERS` query for key and when it was initiated.
156	GetProviders(RecordKey, Instant),
157	/// `ADD_PROVIDER` query for key and when it was initiated.
158	AddProvider(RecordKey, Instant),
159}
160
161/// Networking backend for `litep2p`.
162pub struct Litep2pNetworkBackend {
163	/// Main `litep2p` object.
164	litep2p: Litep2p,
165
166	/// `NetworkService` implementation for `Litep2pNetworkBackend`.
167	network_service: Arc<dyn NetworkService>,
168
169	/// RX channel for receiving commands from `Litep2pNetworkService`.
170	cmd_rx: TracingUnboundedReceiver<NetworkServiceCommand>,
171
172	/// `Peerset` handles to notification protocols.
173	peerset_handles: HashMap<ProtocolName, ProtocolControlHandle>,
174
175	/// Pending Kademlia queries.
176	pending_queries: HashMap<QueryId, KadQuery>,
177
178	/// Discovery.
179	discovery: Discovery,
180
181	/// Number of connected peers.
182	num_connected: Arc<AtomicUsize>,
183
184	/// Connected peers.
185	peers: HashMap<litep2p::PeerId, ConnectionContext>,
186
187	/// Peerstore.
188	peerstore_handle: Arc<dyn PeerStoreProvider>,
189
190	/// Block announce protocol name.
191	block_announce_protocol: ProtocolName,
192
193	/// Sender for DHT events.
194	event_streams: out_events::OutChannels,
195
196	/// Prometheus metrics.
197	metrics: Option<Metrics>,
198}
199
200impl Litep2pNetworkBackend {
201	/// From an iterator of multiaddress(es), parse and group all addresses of peers
202	/// so that litep2p can consume the information easily.
203	fn parse_addresses(
204		addresses: impl Iterator<Item = Multiaddr>,
205	) -> HashMap<PeerId, Vec<Multiaddr>> {
206		addresses
207			.into_iter()
208			.filter_map(|address| match address.iter().next() {
209				Some(
210					Protocol::Dns(_) |
211					Protocol::Dns4(_) |
212					Protocol::Dns6(_) |
213					Protocol::Ip6(_) |
214					Protocol::Ip4(_),
215				) => match address.iter().find(|protocol| std::matches!(protocol, Protocol::P2p(_)))
216				{
217					Some(Protocol::P2p(peer_id)) => Some((peer_id.into(), Some(address))),
218					_ => None,
219				},
220				Some(Protocol::P2p(peer_id)) => Some((peer_id.into(), None)),
221				_ => None,
222			})
223			.fold(HashMap::new(), |mut acc, (peer, maybe_address)| {
224				let entry = acc.entry(peer).or_default();
225				maybe_address.map(|address| entry.push(address));
226
227				acc
228			})
229	}
230
231	/// Add new known addresses to `litep2p` and return the parsed peer IDs.
232	fn add_addresses(&mut self, peers: impl Iterator<Item = Multiaddr>) -> HashSet<PeerId> {
233		Self::parse_addresses(peers.into_iter())
234			.into_iter()
235			.filter_map(|(peer, addresses)| {
236				// `peers` contained multiaddress in the form `/p2p/<peer ID>`
237				if addresses.is_empty() {
238					return Some(peer);
239				}
240
241				if self.litep2p.add_known_address(peer.into(), addresses.clone().into_iter()) == 0 {
242					log::warn!(
243						target: LOG_TARGET,
244						"couldn't add any addresses for {peer:?} and it won't be added as reserved peer",
245					);
246					return None;
247				}
248
249				self.peerstore_handle.add_known_peer(peer);
250				Some(peer)
251			})
252			.collect()
253	}
254}
255
256impl Litep2pNetworkBackend {
257	/// Get `litep2p` keypair from `NodeKeyConfig`.
258	fn get_keypair(node_key: &NodeKeyConfig) -> Result<(Keypair, litep2p::PeerId), Error> {
259		let secret: litep2p::crypto::ed25519::SecretKey =
260			node_key.clone().into_keypair()?.secret().into();
261
262		let local_identity = Keypair::from(secret);
263		let local_public = local_identity.public();
264		let local_peer_id = local_public.to_peer_id();
265
266		Ok((local_identity, local_peer_id))
267	}
268
269	/// Configure transport protocols for `Litep2pNetworkBackend`.
270	fn configure_transport<B: BlockT + 'static, H: ExHashT>(
271		config: &FullNetworkConfiguration<B, H, Self>,
272	) -> ConfigBuilder {
273		let _ = match config.network_config.transport {
274			TransportConfig::MemoryOnly => panic!("memory transport not supported"),
275			TransportConfig::Normal { .. } => false,
276		};
277		let config_builder = ConfigBuilder::new();
278
279		let (tcp, websocket): (Vec<Option<_>>, Vec<Option<_>>) = config
280			.network_config
281			.listen_addresses
282			.iter()
283			.filter_map(|address| {
284				use sc_network_types::multiaddr::Protocol;
285
286				let mut iter = address.iter();
287
288				match iter.next() {
289					Some(Protocol::Ip4(_) | Protocol::Ip6(_)) => {},
290					protocol => {
291						log::error!(
292							target: LOG_TARGET,
293							"unknown protocol {protocol:?}, ignoring {address:?}",
294						);
295
296						return None;
297					},
298				}
299
300				match iter.next() {
301					Some(Protocol::Tcp(_)) => match iter.next() {
302						Some(Protocol::Ws(_) | Protocol::Wss(_)) => {
303							Some((None, Some(address.clone())))
304						},
305						Some(Protocol::P2p(_)) | None => Some((Some(address.clone()), None)),
306						protocol => {
307							log::error!(
308								target: LOG_TARGET,
309								"unknown protocol {protocol:?}, ignoring {address:?}",
310							);
311							None
312						},
313					},
314					protocol => {
315						log::error!(
316							target: LOG_TARGET,
317							"unknown protocol {protocol:?}, ignoring {address:?}",
318						);
319						None
320					},
321				}
322			})
323			.unzip();
324
325		config_builder
326			.with_websocket(WebSocketTransportConfig {
327				listen_addresses: websocket.into_iter().flatten().map(Into::into).collect(),
328				yamux_config: litep2p::yamux::Config::default(),
329				nodelay: true,
330				..Default::default()
331			})
332			.with_tcp(TcpTransportConfig {
333				listen_addresses: tcp.into_iter().flatten().map(Into::into).collect(),
334				yamux_config: litep2p::yamux::Config::default(),
335				nodelay: true,
336				..Default::default()
337			})
338	}
339}
340
341#[async_trait::async_trait]
342impl<B: BlockT + 'static, H: ExHashT> NetworkBackend<B, H> for Litep2pNetworkBackend {
343	type NotificationProtocolConfig = NotificationProtocolConfig;
344	type RequestResponseProtocolConfig = RequestResponseConfig;
345	type NetworkService<Block, Hash> = Arc<Litep2pNetworkService>;
346	type PeerStore = Peerstore;
347	type BitswapConfig = bitswap::BitswapConfig;
348
349	fn new(mut params: Params<B, H, Self>) -> Result<Self, Error>
350	where
351		Self: Sized,
352	{
353		let (keypair, local_peer_id) =
354			Self::get_keypair(&params.network_config.network_config.node_key)?;
355		let (cmd_tx, cmd_rx) = tracing_unbounded("mpsc_network_worker", 100_000);
356
357		params.network_config.network_config.boot_nodes = params
358			.network_config
359			.network_config
360			.boot_nodes
361			.into_iter()
362			.filter(|boot_node| boot_node.peer_id != local_peer_id.into())
363			.collect();
364		params.network_config.network_config.default_peers_set.reserved_nodes = params
365			.network_config
366			.network_config
367			.default_peers_set
368			.reserved_nodes
369			.into_iter()
370			.filter(|reserved_node| {
371				if reserved_node.peer_id == local_peer_id.into() {
372					log::warn!(
373						target: LOG_TARGET,
374						"Local peer ID used in reserved node, ignoring: {reserved_node}",
375					);
376					false
377				} else {
378					true
379				}
380			})
381			.collect();
382
383		if let Some(path) = &params.network_config.network_config.net_config_path {
384			fs::create_dir_all(path)?;
385		}
386
387		log::info!(target: LOG_TARGET, "Local node identity is: {local_peer_id}");
388		log::info!(target: LOG_TARGET, "Running litep2p network backend");
389
390		params.network_config.sanity_check_addresses()?;
391		params.network_config.sanity_check_bootnodes()?;
392
393		let mut config_builder =
394			Self::configure_transport(&params.network_config).with_keypair(keypair.clone());
395		let known_addresses = params.network_config.known_addresses();
396		let peer_store_handle = params.network_config.peer_store_handle();
397		let executor = Arc::new(Litep2pExecutor { executor: params.executor });
398
399		let FullNetworkConfiguration {
400			notification_protocols,
401			request_response_protocols,
402			network_config,
403			..
404		} = params.network_config;
405
406		// initialize notification protocols
407		//
408		// pass the protocol configuration to `Litep2pConfigBuilder` and save the TX channel
409		// to the protocol's `Peerset` together with the protocol name to allow other subsystems
410		// of Polkadot SDK to control connectivity of the notification protocol
411		let block_announce_protocol = params.block_announce_config.protocol_name().clone();
412		let mut notif_protocols = HashMap::from_iter([(
413			params.block_announce_config.protocol_name().clone(),
414			params.block_announce_config.handle,
415		)]);
416
417		// handshake for all but the syncing protocol is set to node role
418		config_builder = notification_protocols
419			.into_iter()
420			.fold(config_builder, |config_builder, mut config| {
421				config.config.set_handshake(Roles::from(&params.role).encode());
422				notif_protocols.insert(config.protocol_name, config.handle);
423
424				config_builder.with_notification_protocol(config.config)
425			})
426			.with_notification_protocol(params.block_announce_config.config);
427
428		// initialize request-response protocols
429		let metrics = match &params.metrics_registry {
430			Some(registry) => Some(register_without_sources(registry)?),
431			None => None,
432		};
433
434		// create channels that are used to send request before initializing protocols so the
435		// senders can be passed onto all request-response protocols
436		//
437		// all protocols must have each others' senders so they can send the fallback request in
438		// case the main protocol is not supported by the remote peer and user specified a fallback
439		let (mut request_response_receivers, request_response_senders): (
440			HashMap<_, _>,
441			HashMap<_, _>,
442		) = request_response_protocols
443			.iter()
444			.map(|config| {
445				let (tx, rx) = tracing_unbounded("outbound-requests", 10_000);
446				((config.protocol_name.clone(), rx), (config.protocol_name.clone(), tx))
447			})
448			.unzip();
449
450		config_builder = request_response_protocols.into_iter().fold(
451			config_builder,
452			|config_builder, config| {
453				let (protocol_config, handle) = RequestResponseConfigBuilder::new(
454					Litep2pProtocolName::from(config.protocol_name.clone()),
455				)
456				.with_max_size(cmp::max(config.max_request_size, config.max_response_size) as usize)
457				.with_fallback_names(config.fallback_names.into_iter().map(From::from).collect())
458				.with_timeout(config.request_timeout)
459				.build();
460
461				let protocol = RequestResponseProtocol::new(
462					config.protocol_name.clone(),
463					handle,
464					Arc::clone(&peer_store_handle),
465					config.inbound_queue,
466					request_response_receivers
467						.remove(&config.protocol_name)
468						.expect("receiver exists as it was just added and there are no duplicate protocols; qed"),
469					request_response_senders.clone(),
470					metrics.clone(),
471				);
472
473				executor.run(Box::pin(async move {
474					protocol.run().await;
475				}));
476
477				config_builder.with_request_response_protocol(protocol_config)
478			},
479		);
480
481		// collect known addresses
482		let known_addresses: HashMap<litep2p::PeerId, Vec<Multiaddr>> =
483			known_addresses.into_iter().fold(HashMap::new(), |mut acc, (peer, address)| {
484				use sc_network_types::multiaddr::Protocol;
485
486				let address = match address.iter().last() {
487					Some(Protocol::Ws(_) | Protocol::Wss(_) | Protocol::Tcp(_)) => {
488						address.with(Protocol::P2p(peer.into()))
489					},
490					Some(Protocol::P2p(_)) => address,
491					_ => return acc,
492				};
493
494				acc.entry(peer.into()).or_default().push(address.into());
495				peer_store_handle.add_known_peer(peer);
496
497				acc
498			});
499
500		// enable ipfs ping, identify and kademlia, and potentially mdns if user enabled it
501		let listen_addresses = Arc::new(Default::default());
502		let (discovery, ping_config, identify_config, kademlia_config, maybe_mdns_config) =
503			Discovery::new(
504				local_peer_id,
505				&network_config,
506				params.genesis_hash,
507				params.fork_id.as_deref(),
508				&params.protocol_id,
509				known_addresses.clone(),
510				Arc::clone(&listen_addresses),
511				Arc::clone(&peer_store_handle),
512			);
513
514		let bitswap_cmd_tx = params.ipfs_config.as_ref().map(|c| c.bitswap_config.cmd_tx.clone());
515
516		// enable Bitswap & IPFS DHT
517		if let Some(config) = params.ipfs_config {
518			config_builder =
519				config_builder.with_libp2p_bitswap(config.bitswap_config.litep2p_config);
520
521			if !config.bootnodes.is_empty() {
522				let (ipfs_dht, kad_config) = IpfsDht::new(config.bootnodes, config.block_provider);
523				config_builder = config_builder.with_libp2p_kademlia(kad_config);
524				executor.run(Box::pin(ipfs_dht.run()));
525			} else {
526				log::warn!(
527					target: LOG_TARGET,
528					"Not starting IPFS DHT publisher because no IPFS bootnodes are configured. \
529					 Only direct Bitswap requests will be handled.",
530				);
531			}
532		}
533
534		config_builder = config_builder
535			.with_known_addresses(known_addresses.clone().into_iter())
536			.with_libp2p_ping(ping_config)
537			.with_libp2p_identify(identify_config)
538			.with_libp2p_kademlia(kademlia_config)
539			.with_connection_limits(ConnectionLimitsConfig::default().max_incoming_connections(
540				Some(crate::MAX_CONNECTIONS_ESTABLISHED_INCOMING as usize),
541			))
542			.with_keep_alive_timeout(network_config.idle_connection_timeout)
543			// Use system DNS resolver to enable intranet domain resolution and administrator
544			// control over DNS lookup.
545			.with_system_resolver()
546			.with_executor(executor);
547
548		if let Some(config) = maybe_mdns_config {
549			config_builder = config_builder.with_mdns(config);
550		}
551
552		let litep2p =
553			Litep2p::new(config_builder.build()).map_err(|error| Error::Litep2p(error))?;
554
555		litep2p.listen_addresses().for_each(|address| {
556			log::debug!(target: LOG_TARGET, "listening on: {address}");
557
558			listen_addresses.write().insert(address.clone());
559		});
560
561		let public_addresses = litep2p.public_addresses();
562		for address in network_config.public_addresses.iter() {
563			if let Err(err) = public_addresses.add_address(address.clone().into()) {
564				log::warn!(
565					target: LOG_TARGET,
566					"failed to add public address {address:?}: {err:?}",
567				);
568			}
569		}
570
571		let network_service = Arc::new(Litep2pNetworkService::new(
572			local_peer_id,
573			keypair.clone(),
574			cmd_tx,
575			Arc::clone(&peer_store_handle),
576			notif_protocols.clone(),
577			block_announce_protocol.clone(),
578			request_response_senders,
579			Arc::clone(&listen_addresses),
580			public_addresses,
581			bitswap_cmd_tx,
582		));
583
584		// register rest of the metrics now that `Litep2p` has been created
585		let num_connected = Arc::new(Default::default());
586		let bandwidth: Arc<dyn BandwidthSink> =
587			Arc::new(Litep2pBandwidthSink { sink: litep2p.bandwidth_sink() });
588
589		if let Some(registry) = &params.metrics_registry {
590			MetricSources::register(registry, bandwidth, Arc::clone(&num_connected))?;
591		}
592
593		Ok(Self {
594			network_service,
595			cmd_rx,
596			metrics,
597			peerset_handles: notif_protocols,
598			num_connected,
599			discovery,
600			pending_queries: HashMap::new(),
601			peerstore_handle: peer_store_handle,
602			block_announce_protocol,
603			event_streams: out_events::OutChannels::new(None)?,
604			peers: HashMap::new(),
605			litep2p,
606		})
607	}
608
609	fn network_service(&self) -> Arc<dyn NetworkService> {
610		Arc::clone(&self.network_service)
611	}
612
613	fn peer_store(
614		bootnodes: Vec<sc_network_types::PeerId>,
615		metrics_registry: Option<Registry>,
616	) -> Self::PeerStore {
617		Peerstore::new(bootnodes, metrics_registry)
618	}
619
620	fn register_notification_metrics(registry: Option<&Registry>) -> NotificationMetrics {
621		NotificationMetrics::new(registry)
622	}
623
624	/// Create Bitswap server.
625	fn bitswap_server(
626		client: Arc<dyn BlockBackend<B> + Send + Sync>,
627		metrics_registry: Option<Registry>,
628	) -> (Pin<Box<dyn Future<Output = ()> + Send>>, Self::BitswapConfig) {
629		BitswapService::new(client, metrics_registry.as_ref())
630	}
631
632	/// Create notification protocol configuration for `protocol`.
633	fn notification_config(
634		protocol_name: ProtocolName,
635		fallback_names: Vec<ProtocolName>,
636		max_notification_size: u64,
637		handshake: Option<NotificationHandshake>,
638		set_config: SetConfig,
639		metrics: NotificationMetrics,
640		peerstore_handle: Arc<dyn PeerStoreProvider>,
641	) -> (Self::NotificationProtocolConfig, Box<dyn NotificationService>) {
642		Self::NotificationProtocolConfig::new(
643			protocol_name,
644			fallback_names,
645			max_notification_size as usize,
646			handshake,
647			set_config,
648			metrics,
649			peerstore_handle,
650		)
651	}
652
653	/// Create request-response protocol configuration.
654	fn request_response_config(
655		protocol_name: ProtocolName,
656		fallback_names: Vec<ProtocolName>,
657		max_request_size: u64,
658		max_response_size: u64,
659		request_timeout: Duration,
660		inbound_queue: Option<async_channel::Sender<IncomingRequest>>,
661	) -> Self::RequestResponseProtocolConfig {
662		Self::RequestResponseProtocolConfig::new(
663			protocol_name,
664			fallback_names,
665			max_request_size,
666			max_response_size,
667			request_timeout,
668			inbound_queue,
669		)
670	}
671
672	/// Start [`Litep2pNetworkBackend`] event loop.
673	async fn run(mut self) {
674		log::debug!(target: LOG_TARGET, "starting litep2p network backend");
675
676		loop {
677			let num_connected_peers = self
678				.peerset_handles
679				.get(&self.block_announce_protocol)
680				.map_or(0usize, |handle| handle.connected_peers.load(Ordering::Relaxed));
681			self.num_connected.store(num_connected_peers, Ordering::Relaxed);
682
683			tokio::select! {
684				command = self.cmd_rx.next() => match command {
685					None => return,
686					Some(command) => match command {
687						NetworkServiceCommand::FindClosestPeers { target } => {
688							let query_id = self.discovery.find_node(target.into()).await;
689							self.pending_queries.insert(query_id, KadQuery::FindNode(target, Instant::now()));
690						}
691						NetworkServiceCommand::GetValue{ key } => {
692							let query_id = self.discovery.get_value(key.clone()).await;
693							self.pending_queries.insert(query_id, KadQuery::GetValue(key, Instant::now()));
694						}
695						NetworkServiceCommand::PutValue { key, value } => {
696							let query_id = self.discovery.put_value(key.clone(), value).await;
697							self.pending_queries.insert(query_id, KadQuery::PutValue(key, Instant::now()));
698						}
699						NetworkServiceCommand::PutValueTo { record, peers, update_local_storage} => {
700							let kademlia_key = record.key.clone();
701							let query_id = self.discovery.put_value_to_peers(record.into(), peers, update_local_storage).await;
702							self.pending_queries.insert(query_id, KadQuery::PutValue(kademlia_key, Instant::now()));
703						}
704						NetworkServiceCommand::StoreRecord { key, value, publisher, expires } => {
705							self.discovery.store_record(key, value, publisher.map(Into::into), expires).await;
706						}
707						NetworkServiceCommand::StartProviding { key } => {
708							let query_id = self.discovery.start_providing(key.clone()).await;
709							self.pending_queries.insert(query_id, KadQuery::AddProvider(key, Instant::now()));
710						}
711						NetworkServiceCommand::StopProviding { key } => {
712							self.discovery.stop_providing(key).await;
713						}
714						NetworkServiceCommand::GetProviders { key } => {
715							let query_id = self.discovery.get_providers(key.clone()).await;
716							self.pending_queries.insert(query_id, KadQuery::GetProviders(key, Instant::now()));
717						}
718						NetworkServiceCommand::EventStream { tx } => {
719							self.event_streams.push(tx);
720						}
721						NetworkServiceCommand::Status { tx } => {
722							let _ = tx.send(NetworkStatus {
723								num_connected_peers: self
724									.peerset_handles
725									.get(&self.block_announce_protocol)
726									.map_or(0usize, |handle| handle.connected_peers.load(Ordering::Relaxed)),
727								total_bytes_inbound: self.litep2p.bandwidth_sink().inbound() as u64,
728								total_bytes_outbound: self.litep2p.bandwidth_sink().outbound() as u64,
729							});
730						}
731						NetworkServiceCommand::AddPeersToReservedSet {
732							protocol,
733							peers,
734						} => {
735							let peers = self.add_addresses(peers.into_iter().map(Into::into));
736
737							match self.peerset_handles.get(&protocol) {
738								Some(handle) => {
739									let _ = handle.tx.unbounded_send(PeersetCommand::AddReservedPeers { peers });
740								}
741								None => log::warn!(target: LOG_TARGET, "protocol {protocol} doens't exist"),
742							};
743						}
744						NetworkServiceCommand::AddKnownAddress { peer, address } => {
745							let mut address: Multiaddr = address.into();
746
747							if !address.iter().any(|protocol| std::matches!(protocol, Protocol::P2p(_))) {
748								address.push(Protocol::P2p(litep2p::PeerId::from(peer).into()));
749							}
750
751							if self.litep2p.add_known_address(peer.into(), iter::once(address.clone())) > 0 {
752								// libp2p backend generates `DiscoveryOut::Discovered(peer_id)`
753								// event when a new address is added for a peer, which leads to the
754								// peer being added to peerstore. Do the same directly here.
755								self.peerstore_handle.add_known_peer(peer);
756							} else {
757								log::debug!(
758									target: LOG_TARGET,
759									"couldn't add known address ({address}) for {peer:?}, unsupported transport"
760								);
761							}
762						},
763						NetworkServiceCommand::SetReservedPeers { protocol, peers } => {
764							let peers = self.add_addresses(peers.into_iter().map(Into::into));
765
766							match self.peerset_handles.get(&protocol) {
767								Some(handle) => {
768									let _ = handle.tx.unbounded_send(PeersetCommand::SetReservedPeers { peers });
769								}
770								None => log::warn!(target: LOG_TARGET, "protocol {protocol} doens't exist"),
771							}
772
773						},
774						NetworkServiceCommand::DisconnectPeer {
775							protocol,
776							peer,
777						} => {
778							let Some(handle) = self.peerset_handles.get(&protocol) else {
779								log::warn!(target: LOG_TARGET, "protocol {protocol} doens't exist");
780								continue
781							};
782
783							let _ = handle.tx.unbounded_send(PeersetCommand::DisconnectPeer { peer });
784						}
785						NetworkServiceCommand::SetReservedOnly {
786							protocol,
787							reserved_only,
788						} => {
789							let Some(handle) = self.peerset_handles.get(&protocol) else {
790								log::warn!(target: LOG_TARGET, "protocol {protocol} doens't exist");
791								continue
792							};
793
794							let _ = handle.tx.unbounded_send(PeersetCommand::SetReservedOnly { reserved_only });
795						}
796						NetworkServiceCommand::RemoveReservedPeers {
797							protocol,
798							peers,
799						} => {
800							let Some(handle) = self.peerset_handles.get(&protocol) else {
801								log::warn!(target: LOG_TARGET, "protocol {protocol} doens't exist");
802								continue
803							};
804
805							let _ = handle.tx.unbounded_send(PeersetCommand::RemoveReservedPeers { peers });
806						}
807					}
808				},
809				event = self.discovery.next() => match event {
810					None => return,
811					Some(DiscoveryEvent::Discovered { addresses }) => {
812						// if at least one address was added for the peer, report the peer to `Peerstore`
813						for (peer, addresses) in Litep2pNetworkBackend::parse_addresses(addresses.into_iter()) {
814							if self.litep2p.add_known_address(peer.into(), addresses.clone().into_iter()) > 0 {
815								self.peerstore_handle.add_known_peer(peer);
816							}
817						}
818					}
819					Some(DiscoveryEvent::RoutingTableUpdate { peers }) => {
820						for peer in peers {
821							self.peerstore_handle.add_known_peer(peer.into());
822						}
823					}
824					Some(DiscoveryEvent::FindNodeSuccess { query_id, target, peers }) => {
825						match self.pending_queries.remove(&query_id) {
826							Some(KadQuery::FindNode(_, started)) => {
827								log::trace!(
828									target: LOG_TARGET,
829									"`FIND_NODE` for {target:?} ({query_id:?}) succeeded",
830								);
831
832								self.event_streams.send(
833									Event::Dht(
834										DhtEvent::ClosestPeersFound(
835											target.into(),
836											peers
837												.into_iter()
838												.map(|(peer, addrs)| (
839													peer.into(),
840													addrs.into_iter().map(Into::into).collect(),
841												))
842												.collect(),
843										)
844									)
845								);
846
847								if let Some(ref metrics) = self.metrics {
848									metrics
849										.kademlia_query_duration
850										.with_label_values(&["node-find"])
851										.observe(started.elapsed().as_secs_f64());
852								}
853							},
854							query => {
855								log::error!(
856									target: LOG_TARGET,
857									"Missing/invalid pending query for `FIND_NODE`: {query:?}"
858								);
859								debug_assert!(false);
860							}
861						}
862					},
863					Some(DiscoveryEvent::GetRecordPartialResult { query_id, record }) => {
864						if !self.pending_queries.contains_key(&query_id) {
865							log::error!(
866								target: LOG_TARGET,
867								"Missing/invalid pending query for `GET_VALUE` partial result: {query_id:?}"
868							);
869
870							continue
871						}
872
873						let peer_id: sc_network_types::PeerId = record.peer.into();
874						let record = PeerRecord {
875							record: P2PRecord {
876								key: record.record.key.to_vec().into(),
877								value: record.record.value,
878								publisher: record.record.publisher.map(|peer_id| {
879									let peer_id: sc_network_types::PeerId = peer_id.into();
880									peer_id.into()
881								}),
882								expires: record.record.expires,
883							},
884							peer: Some(peer_id.into()),
885						};
886
887						self.event_streams.send(
888							Event::Dht(
889								DhtEvent::ValueFound(
890									record.into()
891								)
892							)
893						);
894					}
895					Some(DiscoveryEvent::GetRecordSuccess { query_id }) => {
896						match self.pending_queries.remove(&query_id) {
897							Some(KadQuery::GetValue(key, started)) => {
898								log::trace!(
899									target: LOG_TARGET,
900									"`GET_VALUE` for {key:?} ({query_id:?}) succeeded",
901								);
902
903								if let Some(ref metrics) = self.metrics {
904									metrics
905										.kademlia_query_duration
906										.with_label_values(&["value-get"])
907										.observe(started.elapsed().as_secs_f64());
908								}
909							},
910							query => {
911								log::error!(
912									target: LOG_TARGET,
913									"Missing/invalid pending query for `GET_VALUE`: {query:?}"
914								);
915								debug_assert!(false);
916							},
917						}
918					}
919					Some(DiscoveryEvent::PutRecordSuccess { query_id }) => {
920						match self.pending_queries.remove(&query_id) {
921							Some(KadQuery::PutValue(key, started)) => {
922								log::trace!(
923									target: LOG_TARGET,
924									"`PUT_VALUE` for {key:?} ({query_id:?}) succeeded",
925								);
926
927								self.event_streams.send(Event::Dht(
928									DhtEvent::ValuePut(key)
929								));
930
931								if let Some(ref metrics) = self.metrics {
932									metrics
933										.kademlia_query_duration
934										.with_label_values(&["value-put"])
935										.observe(started.elapsed().as_secs_f64());
936								}
937							},
938							query => {
939								log::error!(
940									target: LOG_TARGET,
941									"Missing/invalid pending query for `PUT_VALUE`: {query:?}"
942								);
943								debug_assert!(false);
944							}
945						}
946					}
947					Some(DiscoveryEvent::GetProvidersSuccess { query_id, providers }) => {
948						match self.pending_queries.remove(&query_id) {
949							Some(KadQuery::GetProviders(key, started)) => {
950								log::trace!(
951									target: LOG_TARGET,
952									"`GET_PROVIDERS` for {key:?} ({query_id:?}) succeeded",
953								);
954
955								// We likely requested providers to connect to them,
956								// so let's add their addresses to litep2p's transport manager.
957								// Consider also looking the addresses of providers up with `FIND_NODE`
958								// query, as it can yield more up to date addresses.
959								providers.iter().for_each(|p| {
960									self.litep2p.add_known_address(p.peer, p.addresses.clone().into_iter());
961								});
962
963								self.event_streams.send(Event::Dht(
964									DhtEvent::ProvidersFound(
965										key.clone().into(),
966										providers.into_iter().map(|p| p.peer.into()).collect()
967									)
968								));
969
970								// litep2p returns all providers in a single event, so we let
971								// subscribers know no more providers will be yielded.
972								self.event_streams.send(Event::Dht(
973									DhtEvent::NoMoreProviders(key.into())
974								));
975
976								if let Some(ref metrics) = self.metrics {
977									metrics
978										.kademlia_query_duration
979										.with_label_values(&["providers-get"])
980										.observe(started.elapsed().as_secs_f64());
981								}
982							},
983							query => {
984								log::error!(
985									target: LOG_TARGET,
986									"Missing/invalid pending query for `GET_PROVIDERS`: {query:?}"
987								);
988								debug_assert!(false);
989							}
990						}
991					}
992					Some(DiscoveryEvent::AddProviderSuccess { query_id, provided_key }) => {
993						match self.pending_queries.remove(&query_id) {
994							Some(KadQuery::AddProvider(key, started)) => {
995								debug_assert_eq!(key, provided_key.into());
996
997								log::trace!(
998									target: LOG_TARGET,
999									"`ADD_PROVIDER` for {key:?} ({query_id:?}) succeeded",
1000								);
1001
1002								self.event_streams.send(Event::Dht(
1003									DhtEvent::StartedProviding(key.into())
1004								));
1005
1006								if let Some(ref metrics) = self.metrics {
1007									metrics
1008										.kademlia_query_duration
1009										.with_label_values(&["provider-add"])
1010										.observe(started.elapsed().as_secs_f64());
1011								}
1012							}
1013							Some(_) => {
1014								log::error!(
1015									target: LOG_TARGET,
1016									"Invalid pending query for `ADD_PROVIDER`: {query_id:?}"
1017								);
1018								debug_assert!(false);
1019							}
1020							None => {
1021								log::trace!(
1022									target: LOG_TARGET,
1023									"`ADD_PROVIDER` for key {provided_key:?} ({query_id:?}) succeeded (republishing)",
1024								);
1025							}
1026						}
1027					}
1028					Some(DiscoveryEvent::QueryFailed { query_id }) => {
1029						match self.pending_queries.remove(&query_id) {
1030							Some(KadQuery::FindNode(peer_id, started)) => {
1031								log::debug!(
1032									target: LOG_TARGET,
1033									"`FIND_NODE` ({query_id:?}) failed for target {peer_id:?}",
1034								);
1035
1036								self.event_streams.send(Event::Dht(
1037									DhtEvent::ClosestPeersNotFound(peer_id.into())
1038								));
1039
1040								if let Some(ref metrics) = self.metrics {
1041									metrics
1042										.kademlia_query_duration
1043										.with_label_values(&["node-find-failed"])
1044										.observe(started.elapsed().as_secs_f64());
1045								}
1046							},
1047							Some(KadQuery::GetValue(key, started)) => {
1048								log::debug!(
1049									target: LOG_TARGET,
1050									"`GET_VALUE` ({query_id:?}) failed for key {key:?}",
1051								);
1052
1053								self.event_streams.send(Event::Dht(
1054									DhtEvent::ValueNotFound(key)
1055								));
1056
1057								if let Some(ref metrics) = self.metrics {
1058									metrics
1059										.kademlia_query_duration
1060										.with_label_values(&["value-get-failed"])
1061										.observe(started.elapsed().as_secs_f64());
1062								}
1063							},
1064							Some(KadQuery::PutValue(key, started)) => {
1065								log::debug!(
1066									target: LOG_TARGET,
1067									"`PUT_VALUE` ({query_id:?}) failed for key {key:?}",
1068								);
1069
1070								self.event_streams.send(Event::Dht(
1071									DhtEvent::ValuePutFailed(key)
1072								));
1073
1074								if let Some(ref metrics) = self.metrics {
1075									metrics
1076										.kademlia_query_duration
1077										.with_label_values(&["value-put-failed"])
1078										.observe(started.elapsed().as_secs_f64());
1079								}
1080							},
1081							Some(KadQuery::GetProviders(key, started)) => {
1082								log::debug!(
1083									target: LOG_TARGET,
1084									"`GET_PROVIDERS` ({query_id:?}) failed for key {key:?}"
1085								);
1086
1087								self.event_streams.send(Event::Dht(
1088									DhtEvent::ProvidersNotFound(key)
1089								));
1090
1091								if let Some(ref metrics) = self.metrics {
1092									metrics
1093										.kademlia_query_duration
1094										.with_label_values(&["providers-get-failed"])
1095										.observe(started.elapsed().as_secs_f64());
1096								}
1097							},
1098							Some(KadQuery::AddProvider(key, started)) => {
1099								log::debug!(
1100									target: LOG_TARGET,
1101									"`ADD_PROVIDER` ({query_id:?}) failed with key {key:?}",
1102								);
1103
1104								self.event_streams.send(Event::Dht(
1105									DhtEvent::StartProvidingFailed(key)
1106								));
1107
1108								if let Some(ref metrics) = self.metrics {
1109									metrics
1110										.kademlia_query_duration
1111										.with_label_values(&["provider-add-failed"])
1112										.observe(started.elapsed().as_secs_f64());
1113								}
1114							},
1115							None => {
1116								log::debug!(
1117									target: LOG_TARGET,
1118									"non-existent query (likely republishing a provider) failed ({query_id:?})",
1119								);
1120							}
1121						}
1122					}
1123					Some(DiscoveryEvent::Identified { peer, listen_addresses, supported_protocols, .. }) => {
1124						self.discovery.add_self_reported_address(peer, supported_protocols, listen_addresses).await;
1125					}
1126					Some(DiscoveryEvent::ExternalAddressDiscovered { address }) => {
1127						match self.litep2p.public_addresses().add_address(address.clone().into()) {
1128							Ok(inserted) => if inserted {
1129								log::info!(target: LOG_TARGET, "🔍 Discovered new external address for our node: {address}");
1130							},
1131							Err(err) => {
1132								log::warn!(
1133									target: LOG_TARGET,
1134									"🔍 Failed to add discovered external address {address:?}: {err:?}",
1135								);
1136							},
1137						}
1138					}
1139					Some(DiscoveryEvent::ExternalAddressExpired{ address }) => {
1140						let local_peer_id = self.litep2p.local_peer_id();
1141
1142						// Litep2p requires the peer ID to be present in the address.
1143						let address = if !std::matches!(address.iter().last(), Some(Protocol::P2p(_))) {
1144							address.with(Protocol::P2p((*local_peer_id).into()))
1145						} else {
1146							address
1147						};
1148
1149						if self.litep2p.public_addresses().remove_address(&address) {
1150							log::info!(target: LOG_TARGET, "🔍 Expired external address for our node: {address}");
1151						} else {
1152							log::warn!(
1153								target: LOG_TARGET,
1154								"🔍 Failed to remove expired external address {address:?}"
1155							);
1156						}
1157					}
1158					Some(DiscoveryEvent::Ping { peer, rtt }) => {
1159						log::trace!(
1160							target: LOG_TARGET,
1161							"ping time with {peer:?}: {rtt:?}",
1162						);
1163					}
1164					Some(DiscoveryEvent::IncomingRecord { record: Record { key, value, publisher, expires }} ) => {
1165						self.event_streams.send(Event::Dht(
1166							DhtEvent::PutRecordRequest(
1167								key.into(),
1168								value,
1169								publisher.map(Into::into),
1170								expires,
1171							)
1172						));
1173					},
1174
1175					Some(DiscoveryEvent::RandomKademliaStarted) => {
1176						if let Some(metrics) = self.metrics.as_ref() {
1177							metrics.kademlia_random_queries_total.inc();
1178						}
1179					}
1180				},
1181				event = self.litep2p.next_event() => match event {
1182					Some(Litep2pEvent::ConnectionEstablished { peer, endpoint }) => {
1183						let Some(metrics) = &self.metrics else {
1184							continue;
1185						};
1186
1187						let direction = match endpoint {
1188							Endpoint::Dialer { .. } => "out",
1189							Endpoint::Listener { .. } => {
1190								// Increment incoming connections counter.
1191								//
1192								// Note: For litep2p these are represented by established negotiated connections,
1193								// while for libp2p (legacy) these represent not-yet-negotiated connections.
1194								metrics.incoming_connections_total.inc();
1195
1196								"in"
1197							},
1198						};
1199						metrics.connections_opened_total.with_label_values(&[direction]).inc();
1200
1201						match self.peers.entry(peer) {
1202							Entry::Vacant(entry) => {
1203								entry.insert(ConnectionContext {
1204									endpoints: HashMap::from_iter([(endpoint.connection_id(), endpoint)]),
1205									num_connections: 1usize,
1206								});
1207								metrics.distinct_peers_connections_opened_total.inc();
1208							}
1209							Entry::Occupied(entry) => {
1210								let entry = entry.into_mut();
1211								entry.num_connections += 1;
1212								entry.endpoints.insert(endpoint.connection_id(), endpoint);
1213							}
1214						}
1215					}
1216					Some(Litep2pEvent::ConnectionClosed { peer, connection_id }) => {
1217						let Some(metrics) = &self.metrics else {
1218							continue;
1219						};
1220
1221						let Some(context) = self.peers.get_mut(&peer) else {
1222							log::debug!(target: LOG_TARGET, "unknown peer disconnected: {peer:?} ({connection_id:?})");
1223							continue
1224						};
1225
1226						let direction = match context.endpoints.remove(&connection_id) {
1227							None => {
1228								log::debug!(target: LOG_TARGET, "connection {connection_id:?} doesn't exist for {peer:?} ");
1229								continue
1230							}
1231							Some(endpoint) => {
1232								context.num_connections -= 1;
1233
1234								match endpoint {
1235									Endpoint::Dialer { .. } => "out",
1236									Endpoint::Listener { .. } => "in",
1237								}
1238							}
1239						};
1240
1241						metrics.connections_closed_total.with_label_values(&[direction, "actively-closed"]).inc();
1242
1243						if context.num_connections == 0 {
1244							self.peers.remove(&peer);
1245							metrics.distinct_peers_connections_closed_total.inc();
1246						}
1247					}
1248					Some(Litep2pEvent::DialFailure { address, error }) => {
1249						log::debug!(
1250							target: LOG_TARGET,
1251							"failed to dial peer at {address:?}: {error:?}",
1252						);
1253
1254						if let Some(metrics) = &self.metrics {
1255							let reason = match error {
1256								DialError::Timeout => "timeout",
1257								DialError::AddressError(_) => "invalid-address",
1258								DialError::DnsError(_) => "cannot-resolve-dns",
1259								DialError::NegotiationError(error) => match error {
1260									NegotiationError::Timeout => "timeout",
1261									NegotiationError::PeerIdMissing => "missing-peer-id",
1262									NegotiationError::StateMismatch => "state-mismatch",
1263									NegotiationError::PeerIdMismatch(_,_) => "peer-id-missmatch",
1264									NegotiationError::MultistreamSelectError(_) => "multistream-select-error",
1265									NegotiationError::SnowError(_) => "noise-error",
1266									NegotiationError::ParseError(_) => "parse-error",
1267									NegotiationError::IoError(_) => "io-error",
1268									NegotiationError::WebSocket(_) => "webscoket-error",
1269									NegotiationError::BadSignature => "bad-signature",
1270								}
1271							};
1272
1273							metrics.pending_connections_errors_total.with_label_values(&[&reason]).inc();
1274						}
1275					}
1276					Some(Litep2pEvent::ListDialFailures { errors }) => {
1277						log::debug!(
1278							target: LOG_TARGET,
1279							"failed to dial peer on multiple addresses {errors:?}",
1280						);
1281
1282						if let Some(metrics) = &self.metrics {
1283							metrics.pending_connections_errors_total.with_label_values(&["transport-errors"]).inc();
1284						}
1285					}
1286					None => {
1287						log::error!(
1288								target: LOG_TARGET,
1289								"Litep2p backend terminated"
1290						);
1291						return
1292					}
1293				},
1294			}
1295		}
1296	}
1297}