referrerpolicy=no-referrer-when-downgrade

sc_network/litep2p/
mod.rs

1// This file is part of Substrate.
2
3// Copyright (C) Parity Technologies (UK) Ltd.
4// SPDX-License-Identifier: GPL-3.0-or-later WITH Classpath-exception-2.0
5
6// This program is free software: you can redistribute it and/or modify
7// it under the terms of the GNU General Public License as published by
8// the Free Software Foundation, either version 3 of the License, or
9// (at your option) any later version.
10
11// This program is distributed in the hope that it will be useful,
12// but WITHOUT ANY WARRANTY; without even the implied warranty of
13// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
14// GNU General Public License for more details.
15
16// You should have received a copy of the GNU General Public License
17// along with this program. If not, see <https://www.gnu.org/licenses/>.
18
19//! `NetworkBackend` implementation for `litep2p`.
20
21use crate::{
22	config::{
23		FullNetworkConfiguration, IncomingRequest, NodeKeyConfig, NotificationHandshake, Params,
24		SetConfig, TransportConfig,
25	},
26	error::Error,
27	event::{DhtEvent, Event},
28	litep2p::{
29		discovery::{Discovery, DiscoveryEvent},
30		peerstore::Peerstore,
31		service::{Litep2pNetworkService, NetworkServiceCommand},
32		shim::{
33			bitswap::BitswapServer,
34			notification::{
35				config::{NotificationProtocolConfig, ProtocolControlHandle},
36				peerset::PeersetCommand,
37			},
38			request_response::{RequestResponseConfig, RequestResponseProtocol},
39		},
40	},
41	peer_store::PeerStoreProvider,
42	service::{
43		metrics::{register_without_sources, MetricSources, Metrics, NotificationMetrics},
44		out_events,
45		traits::{BandwidthSink, NetworkBackend, NetworkService},
46	},
47	NetworkStatus, NotificationService, ProtocolName,
48};
49
50use codec::Encode;
51use futures::StreamExt;
52use litep2p::{
53	config::ConfigBuilder,
54	crypto::ed25519::Keypair,
55	error::{DialError, NegotiationError},
56	executor::Executor,
57	protocol::{
58		libp2p::{
59			bitswap::Config as BitswapConfig,
60			kademlia::{QueryId, Record},
61		},
62		request_response::ConfigBuilder as RequestResponseConfigBuilder,
63	},
64	transport::{
65		tcp::config::Config as TcpTransportConfig,
66		websocket::config::Config as WebSocketTransportConfig, ConnectionLimitsConfig, Endpoint,
67	},
68	types::{
69		multiaddr::{Multiaddr, Protocol},
70		ConnectionId,
71	},
72	Litep2p, Litep2pEvent, ProtocolName as Litep2pProtocolName,
73};
74use prometheus_endpoint::Registry;
75use sc_network_types::kad::{Key as RecordKey, PeerRecord, Record as P2PRecord};
76
77use sc_client_api::BlockBackend;
78use sc_network_common::{role::Roles, ExHashT};
79use sc_network_types::PeerId;
80use sc_utils::mpsc::{tracing_unbounded, TracingUnboundedReceiver};
81use sp_runtime::traits::Block as BlockT;
82
83use std::{
84	cmp,
85	collections::{hash_map::Entry, HashMap, HashSet},
86	fs,
87	future::Future,
88	iter,
89	pin::Pin,
90	sync::{
91		atomic::{AtomicUsize, Ordering},
92		Arc,
93	},
94	time::{Duration, Instant},
95};
96
97mod discovery;
98mod peerstore;
99mod service;
100mod shim;
101
102/// Timeout for connection waiting new substreams.
103const KEEP_ALIVE_TIMEOUT: Duration = Duration::from_secs(10);
104
105/// Litep2p bandwidth sink.
106struct Litep2pBandwidthSink {
107	sink: litep2p::BandwidthSink,
108}
109
110impl BandwidthSink for Litep2pBandwidthSink {
111	fn total_inbound(&self) -> u64 {
112		self.sink.inbound() as u64
113	}
114
115	fn total_outbound(&self) -> u64 {
116		self.sink.outbound() as u64
117	}
118}
119
120/// Litep2p task executor.
121struct Litep2pExecutor {
122	/// Executor.
123	executor: Box<dyn Fn(Pin<Box<dyn Future<Output = ()> + Send>>) + Send + Sync>,
124}
125
126impl Executor for Litep2pExecutor {
127	fn run(&self, future: Pin<Box<dyn Future<Output = ()> + Send>>) {
128		(self.executor)(future)
129	}
130
131	fn run_with_name(&self, _: &'static str, future: Pin<Box<dyn Future<Output = ()> + Send>>) {
132		(self.executor)(future)
133	}
134}
135
136/// Logging target for the file.
137const LOG_TARGET: &str = "sub-libp2p";
138
139/// Peer context.
140struct ConnectionContext {
141	/// Peer endpoints.
142	endpoints: HashMap<ConnectionId, Endpoint>,
143
144	/// Number of active connections.
145	num_connections: usize,
146}
147
148/// Kademlia query we are tracking.
149#[derive(Debug)]
150enum KadQuery {
151	/// `FIND_NODE` query for target and when it was initiated.
152	FindNode(PeerId, Instant),
153	/// `GET_VALUE` query for key and when it was initiated.
154	GetValue(RecordKey, Instant),
155	/// `PUT_VALUE` query for key and when it was initiated.
156	PutValue(RecordKey, Instant),
157	/// `GET_PROVIDERS` query for key and when it was initiated.
158	GetProviders(RecordKey, Instant),
159}
160
161/// Networking backend for `litep2p`.
162pub struct Litep2pNetworkBackend {
163	/// Main `litep2p` object.
164	litep2p: Litep2p,
165
166	/// `NetworkService` implementation for `Litep2pNetworkBackend`.
167	network_service: Arc<dyn NetworkService>,
168
169	/// RX channel for receiving commands from `Litep2pNetworkService`.
170	cmd_rx: TracingUnboundedReceiver<NetworkServiceCommand>,
171
172	/// `Peerset` handles to notification protocols.
173	peerset_handles: HashMap<ProtocolName, ProtocolControlHandle>,
174
175	/// Pending Kademlia queries.
176	pending_queries: HashMap<QueryId, KadQuery>,
177
178	/// Discovery.
179	discovery: Discovery,
180
181	/// Number of connected peers.
182	num_connected: Arc<AtomicUsize>,
183
184	/// Connected peers.
185	peers: HashMap<litep2p::PeerId, ConnectionContext>,
186
187	/// Peerstore.
188	peerstore_handle: Arc<dyn PeerStoreProvider>,
189
190	/// Block announce protocol name.
191	block_announce_protocol: ProtocolName,
192
193	/// Sender for DHT events.
194	event_streams: out_events::OutChannels,
195
196	/// Prometheus metrics.
197	metrics: Option<Metrics>,
198}
199
200impl Litep2pNetworkBackend {
201	/// From an iterator of multiaddress(es), parse and group all addresses of peers
202	/// so that litep2p can consume the information easily.
203	fn parse_addresses(
204		addresses: impl Iterator<Item = Multiaddr>,
205	) -> HashMap<PeerId, Vec<Multiaddr>> {
206		addresses
207			.into_iter()
208			.filter_map(|address| match address.iter().next() {
209				Some(
210					Protocol::Dns(_) |
211					Protocol::Dns4(_) |
212					Protocol::Dns6(_) |
213					Protocol::Ip6(_) |
214					Protocol::Ip4(_),
215				) => match address.iter().find(|protocol| std::matches!(protocol, Protocol::P2p(_)))
216				{
217					Some(Protocol::P2p(multihash)) => PeerId::from_multihash(multihash.into())
218						.map_or(None, |peer| Some((peer, Some(address)))),
219					_ => None,
220				},
221				Some(Protocol::P2p(multihash)) =>
222					PeerId::from_multihash(multihash.into()).map_or(None, |peer| Some((peer, None))),
223				_ => None,
224			})
225			.fold(HashMap::new(), |mut acc, (peer, maybe_address)| {
226				let entry = acc.entry(peer).or_default();
227				maybe_address.map(|address| entry.push(address));
228
229				acc
230			})
231	}
232
233	/// Add new known addresses to `litep2p` and return the parsed peer IDs.
234	fn add_addresses(&mut self, peers: impl Iterator<Item = Multiaddr>) -> HashSet<PeerId> {
235		Self::parse_addresses(peers.into_iter())
236			.into_iter()
237			.filter_map(|(peer, addresses)| {
238				// `peers` contained multiaddress in the form `/p2p/<peer ID>`
239				if addresses.is_empty() {
240					return Some(peer)
241				}
242
243				if self.litep2p.add_known_address(peer.into(), addresses.clone().into_iter()) == 0 {
244					log::warn!(
245						target: LOG_TARGET,
246						"couldn't add any addresses for {peer:?} and it won't be added as reserved peer",
247					);
248					return None
249				}
250
251				self.peerstore_handle.add_known_peer(peer);
252				Some(peer)
253			})
254			.collect()
255	}
256}
257
258impl Litep2pNetworkBackend {
259	/// Get `litep2p` keypair from `NodeKeyConfig`.
260	fn get_keypair(node_key: &NodeKeyConfig) -> Result<(Keypair, litep2p::PeerId), Error> {
261		let secret: litep2p::crypto::ed25519::SecretKey =
262			node_key.clone().into_keypair()?.secret().into();
263
264		let local_identity = Keypair::from(secret);
265		let local_public = local_identity.public();
266		let local_peer_id = local_public.to_peer_id();
267
268		Ok((local_identity, local_peer_id))
269	}
270
271	/// Configure transport protocols for `Litep2pNetworkBackend`.
272	fn configure_transport<B: BlockT + 'static, H: ExHashT>(
273		config: &FullNetworkConfiguration<B, H, Self>,
274	) -> ConfigBuilder {
275		let _ = match config.network_config.transport {
276			TransportConfig::MemoryOnly => panic!("memory transport not supported"),
277			TransportConfig::Normal { .. } => false,
278		};
279		let config_builder = ConfigBuilder::new();
280
281		let (tcp, websocket): (Vec<Option<_>>, Vec<Option<_>>) = config
282			.network_config
283			.listen_addresses
284			.iter()
285			.filter_map(|address| {
286				use sc_network_types::multiaddr::Protocol;
287
288				let mut iter = address.iter();
289
290				match iter.next() {
291					Some(Protocol::Ip4(_) | Protocol::Ip6(_)) => {},
292					protocol => {
293						log::error!(
294							target: LOG_TARGET,
295							"unknown protocol {protocol:?}, ignoring {address:?}",
296						);
297
298						return None
299					},
300				}
301
302				match iter.next() {
303					Some(Protocol::Tcp(_)) => match iter.next() {
304						Some(Protocol::Ws(_) | Protocol::Wss(_)) =>
305							Some((None, Some(address.clone()))),
306						Some(Protocol::P2p(_)) | None => Some((Some(address.clone()), None)),
307						protocol => {
308							log::error!(
309								target: LOG_TARGET,
310								"unknown protocol {protocol:?}, ignoring {address:?}",
311							);
312							None
313						},
314					},
315					protocol => {
316						log::error!(
317							target: LOG_TARGET,
318							"unknown protocol {protocol:?}, ignoring {address:?}",
319						);
320						None
321					},
322				}
323			})
324			.unzip();
325
326		config_builder
327			.with_websocket(WebSocketTransportConfig {
328				listen_addresses: websocket.into_iter().flatten().map(Into::into).collect(),
329				yamux_config: litep2p::yamux::Config::default(),
330				nodelay: true,
331				..Default::default()
332			})
333			.with_tcp(TcpTransportConfig {
334				listen_addresses: tcp.into_iter().flatten().map(Into::into).collect(),
335				yamux_config: litep2p::yamux::Config::default(),
336				nodelay: true,
337				..Default::default()
338			})
339	}
340}
341
342#[async_trait::async_trait]
343impl<B: BlockT + 'static, H: ExHashT> NetworkBackend<B, H> for Litep2pNetworkBackend {
344	type NotificationProtocolConfig = NotificationProtocolConfig;
345	type RequestResponseProtocolConfig = RequestResponseConfig;
346	type NetworkService<Block, Hash> = Arc<Litep2pNetworkService>;
347	type PeerStore = Peerstore;
348	type BitswapConfig = BitswapConfig;
349
350	fn new(mut params: Params<B, H, Self>) -> Result<Self, Error>
351	where
352		Self: Sized,
353	{
354		let (keypair, local_peer_id) =
355			Self::get_keypair(&params.network_config.network_config.node_key)?;
356		let (cmd_tx, cmd_rx) = tracing_unbounded("mpsc_network_worker", 100_000);
357
358		params.network_config.network_config.boot_nodes = params
359			.network_config
360			.network_config
361			.boot_nodes
362			.into_iter()
363			.filter(|boot_node| boot_node.peer_id != local_peer_id.into())
364			.collect();
365		params.network_config.network_config.default_peers_set.reserved_nodes = params
366			.network_config
367			.network_config
368			.default_peers_set
369			.reserved_nodes
370			.into_iter()
371			.filter(|reserved_node| {
372				if reserved_node.peer_id == local_peer_id.into() {
373					log::warn!(
374						target: LOG_TARGET,
375						"Local peer ID used in reserved node, ignoring: {reserved_node}",
376					);
377					false
378				} else {
379					true
380				}
381			})
382			.collect();
383
384		if let Some(path) = &params.network_config.network_config.net_config_path {
385			fs::create_dir_all(path)?;
386		}
387
388		log::info!(target: LOG_TARGET, "Local node identity is: {local_peer_id}");
389		log::info!(target: LOG_TARGET, "Running litep2p network backend");
390
391		params.network_config.sanity_check_addresses()?;
392		params.network_config.sanity_check_bootnodes()?;
393
394		let mut config_builder =
395			Self::configure_transport(&params.network_config).with_keypair(keypair.clone());
396		let known_addresses = params.network_config.known_addresses();
397		let peer_store_handle = params.network_config.peer_store_handle();
398		let executor = Arc::new(Litep2pExecutor { executor: params.executor });
399
400		let FullNetworkConfiguration {
401			notification_protocols,
402			request_response_protocols,
403			network_config,
404			..
405		} = params.network_config;
406
407		// initialize notification protocols
408		//
409		// pass the protocol configuration to `Litep2pConfigBuilder` and save the TX channel
410		// to the protocol's `Peerset` together with the protocol name to allow other subsystems
411		// of Polkadot SDK to control connectivity of the notification protocol
412		let block_announce_protocol = params.block_announce_config.protocol_name().clone();
413		let mut notif_protocols = HashMap::from_iter([(
414			params.block_announce_config.protocol_name().clone(),
415			params.block_announce_config.handle,
416		)]);
417
418		// handshake for all but the syncing protocol is set to node role
419		config_builder = notification_protocols
420			.into_iter()
421			.fold(config_builder, |config_builder, mut config| {
422				config.config.set_handshake(Roles::from(&params.role).encode());
423				notif_protocols.insert(config.protocol_name, config.handle);
424
425				config_builder.with_notification_protocol(config.config)
426			})
427			.with_notification_protocol(params.block_announce_config.config);
428
429		// initialize request-response protocols
430		let metrics = match &params.metrics_registry {
431			Some(registry) => Some(register_without_sources(registry)?),
432			None => None,
433		};
434
435		// create channels that are used to send request before initializing protocols so the
436		// senders can be passed onto all request-response protocols
437		//
438		// all protocols must have each others' senders so they can send the fallback request in
439		// case the main protocol is not supported by the remote peer and user specified a fallback
440		let (mut request_response_receivers, request_response_senders): (
441			HashMap<_, _>,
442			HashMap<_, _>,
443		) = request_response_protocols
444			.iter()
445			.map(|config| {
446				let (tx, rx) = tracing_unbounded("outbound-requests", 10_000);
447				((config.protocol_name.clone(), rx), (config.protocol_name.clone(), tx))
448			})
449			.unzip();
450
451		config_builder = request_response_protocols.into_iter().fold(
452			config_builder,
453			|config_builder, config| {
454				let (protocol_config, handle) = RequestResponseConfigBuilder::new(
455					Litep2pProtocolName::from(config.protocol_name.clone()),
456				)
457				.with_max_size(cmp::max(config.max_request_size, config.max_response_size) as usize)
458				.with_fallback_names(config.fallback_names.into_iter().map(From::from).collect())
459				.with_timeout(config.request_timeout)
460				.build();
461
462				let protocol = RequestResponseProtocol::new(
463					config.protocol_name.clone(),
464					handle,
465					Arc::clone(&peer_store_handle),
466					config.inbound_queue,
467					request_response_receivers
468						.remove(&config.protocol_name)
469						.expect("receiver exists as it was just added and there are no duplicate protocols; qed"),
470					request_response_senders.clone(),
471					metrics.clone(),
472				);
473
474				executor.run(Box::pin(async move {
475					protocol.run().await;
476				}));
477
478				config_builder.with_request_response_protocol(protocol_config)
479			},
480		);
481
482		// collect known addresses
483		let known_addresses: HashMap<litep2p::PeerId, Vec<Multiaddr>> =
484			known_addresses.into_iter().fold(HashMap::new(), |mut acc, (peer, address)| {
485				use sc_network_types::multiaddr::Protocol;
486
487				let address = match address.iter().last() {
488					Some(Protocol::Ws(_) | Protocol::Wss(_) | Protocol::Tcp(_)) =>
489						address.with(Protocol::P2p(peer.into())),
490					Some(Protocol::P2p(_)) => address,
491					_ => return acc,
492				};
493
494				acc.entry(peer.into()).or_default().push(address.into());
495				peer_store_handle.add_known_peer(peer);
496
497				acc
498			});
499
500		// enable ipfs ping, identify and kademlia, and potentially mdns if user enabled it
501		let listen_addresses = Arc::new(Default::default());
502		let (discovery, ping_config, identify_config, kademlia_config, maybe_mdns_config) =
503			Discovery::new(
504				local_peer_id,
505				&network_config,
506				params.genesis_hash,
507				params.fork_id.as_deref(),
508				&params.protocol_id,
509				known_addresses.clone(),
510				Arc::clone(&listen_addresses),
511				Arc::clone(&peer_store_handle),
512			);
513
514		config_builder = config_builder
515			.with_known_addresses(known_addresses.clone().into_iter())
516			.with_libp2p_ping(ping_config)
517			.with_libp2p_identify(identify_config)
518			.with_libp2p_kademlia(kademlia_config)
519			.with_connection_limits(ConnectionLimitsConfig::default().max_incoming_connections(
520				Some(crate::MAX_CONNECTIONS_ESTABLISHED_INCOMING as usize),
521			))
522			// This has the same effect as `libp2p::Swarm::with_idle_connection_timeout` which is
523			// set to 10 seconds as well.
524			.with_keep_alive_timeout(KEEP_ALIVE_TIMEOUT)
525			.with_executor(executor);
526
527		if let Some(config) = maybe_mdns_config {
528			config_builder = config_builder.with_mdns(config);
529		}
530
531		if let Some(config) = params.bitswap_config {
532			config_builder = config_builder.with_libp2p_bitswap(config);
533		}
534
535		let litep2p =
536			Litep2p::new(config_builder.build()).map_err(|error| Error::Litep2p(error))?;
537
538		litep2p.listen_addresses().for_each(|address| {
539			log::debug!(target: LOG_TARGET, "listening on: {address}");
540
541			listen_addresses.write().insert(address.clone());
542		});
543
544		let public_addresses = litep2p.public_addresses();
545		for address in network_config.public_addresses.iter() {
546			if let Err(err) = public_addresses.add_address(address.clone().into()) {
547				log::warn!(
548					target: LOG_TARGET,
549					"failed to add public address {address:?}: {err:?}",
550				);
551			}
552		}
553
554		let network_service = Arc::new(Litep2pNetworkService::new(
555			local_peer_id,
556			keypair.clone(),
557			cmd_tx,
558			Arc::clone(&peer_store_handle),
559			notif_protocols.clone(),
560			block_announce_protocol.clone(),
561			request_response_senders,
562			Arc::clone(&listen_addresses),
563			public_addresses,
564		));
565
566		// register rest of the metrics now that `Litep2p` has been created
567		let num_connected = Arc::new(Default::default());
568		let bandwidth: Arc<dyn BandwidthSink> =
569			Arc::new(Litep2pBandwidthSink { sink: litep2p.bandwidth_sink() });
570
571		if let Some(registry) = &params.metrics_registry {
572			MetricSources::register(registry, bandwidth, Arc::clone(&num_connected))?;
573		}
574
575		Ok(Self {
576			network_service,
577			cmd_rx,
578			metrics,
579			peerset_handles: notif_protocols,
580			num_connected,
581			discovery,
582			pending_queries: HashMap::new(),
583			peerstore_handle: peer_store_handle,
584			block_announce_protocol,
585			event_streams: out_events::OutChannels::new(None)?,
586			peers: HashMap::new(),
587			litep2p,
588		})
589	}
590
591	fn network_service(&self) -> Arc<dyn NetworkService> {
592		Arc::clone(&self.network_service)
593	}
594
595	fn peer_store(
596		bootnodes: Vec<sc_network_types::PeerId>,
597		metrics_registry: Option<Registry>,
598	) -> Self::PeerStore {
599		Peerstore::new(bootnodes, metrics_registry)
600	}
601
602	fn register_notification_metrics(registry: Option<&Registry>) -> NotificationMetrics {
603		NotificationMetrics::new(registry)
604	}
605
606	/// Create Bitswap server.
607	fn bitswap_server(
608		client: Arc<dyn BlockBackend<B> + Send + Sync>,
609	) -> (Pin<Box<dyn Future<Output = ()> + Send>>, Self::BitswapConfig) {
610		BitswapServer::new(client)
611	}
612
613	/// Create notification protocol configuration for `protocol`.
614	fn notification_config(
615		protocol_name: ProtocolName,
616		fallback_names: Vec<ProtocolName>,
617		max_notification_size: u64,
618		handshake: Option<NotificationHandshake>,
619		set_config: SetConfig,
620		metrics: NotificationMetrics,
621		peerstore_handle: Arc<dyn PeerStoreProvider>,
622	) -> (Self::NotificationProtocolConfig, Box<dyn NotificationService>) {
623		Self::NotificationProtocolConfig::new(
624			protocol_name,
625			fallback_names,
626			max_notification_size as usize,
627			handshake,
628			set_config,
629			metrics,
630			peerstore_handle,
631		)
632	}
633
634	/// Create request-response protocol configuration.
635	fn request_response_config(
636		protocol_name: ProtocolName,
637		fallback_names: Vec<ProtocolName>,
638		max_request_size: u64,
639		max_response_size: u64,
640		request_timeout: Duration,
641		inbound_queue: Option<async_channel::Sender<IncomingRequest>>,
642	) -> Self::RequestResponseProtocolConfig {
643		Self::RequestResponseProtocolConfig::new(
644			protocol_name,
645			fallback_names,
646			max_request_size,
647			max_response_size,
648			request_timeout,
649			inbound_queue,
650		)
651	}
652
653	/// Start [`Litep2pNetworkBackend`] event loop.
654	async fn run(mut self) {
655		log::debug!(target: LOG_TARGET, "starting litep2p network backend");
656
657		loop {
658			let num_connected_peers = self
659				.peerset_handles
660				.get(&self.block_announce_protocol)
661				.map_or(0usize, |handle| handle.connected_peers.load(Ordering::Relaxed));
662			self.num_connected.store(num_connected_peers, Ordering::Relaxed);
663
664			tokio::select! {
665				command = self.cmd_rx.next() => match command {
666					None => return,
667					Some(command) => match command {
668						NetworkServiceCommand::FindClosestPeers { target } => {
669							let query_id = self.discovery.find_node(target.into()).await;
670							self.pending_queries.insert(query_id, KadQuery::FindNode(target, Instant::now()));
671						}
672						NetworkServiceCommand::GetValue{ key } => {
673							let query_id = self.discovery.get_value(key.clone()).await;
674							self.pending_queries.insert(query_id, KadQuery::GetValue(key, Instant::now()));
675						}
676						NetworkServiceCommand::PutValue { key, value } => {
677							let query_id = self.discovery.put_value(key.clone(), value).await;
678							self.pending_queries.insert(query_id, KadQuery::PutValue(key, Instant::now()));
679						}
680						NetworkServiceCommand::PutValueTo { record, peers, update_local_storage} => {
681							let kademlia_key = record.key.clone();
682							let query_id = self.discovery.put_value_to_peers(record.into(), peers, update_local_storage).await;
683							self.pending_queries.insert(query_id, KadQuery::PutValue(kademlia_key, Instant::now()));
684						}
685						NetworkServiceCommand::StoreRecord { key, value, publisher, expires } => {
686							self.discovery.store_record(key, value, publisher.map(Into::into), expires).await;
687						}
688						NetworkServiceCommand::StartProviding { key } => {
689							self.discovery.start_providing(key).await;
690						}
691						NetworkServiceCommand::StopProviding { key } => {
692							self.discovery.stop_providing(key).await;
693						}
694						NetworkServiceCommand::GetProviders { key } => {
695							let query_id = self.discovery.get_providers(key.clone()).await;
696							self.pending_queries.insert(query_id, KadQuery::GetProviders(key, Instant::now()));
697						}
698						NetworkServiceCommand::EventStream { tx } => {
699							self.event_streams.push(tx);
700						}
701						NetworkServiceCommand::Status { tx } => {
702							let _ = tx.send(NetworkStatus {
703								num_connected_peers: self
704									.peerset_handles
705									.get(&self.block_announce_protocol)
706									.map_or(0usize, |handle| handle.connected_peers.load(Ordering::Relaxed)),
707								total_bytes_inbound: self.litep2p.bandwidth_sink().inbound() as u64,
708								total_bytes_outbound: self.litep2p.bandwidth_sink().outbound() as u64,
709							});
710						}
711						NetworkServiceCommand::AddPeersToReservedSet {
712							protocol,
713							peers,
714						} => {
715							let peers = self.add_addresses(peers.into_iter().map(Into::into));
716
717							match self.peerset_handles.get(&protocol) {
718								Some(handle) => {
719									let _ = handle.tx.unbounded_send(PeersetCommand::AddReservedPeers { peers });
720								}
721								None => log::warn!(target: LOG_TARGET, "protocol {protocol} doens't exist"),
722							};
723						}
724						NetworkServiceCommand::AddKnownAddress { peer, address } => {
725							let mut address: Multiaddr = address.into();
726
727							if !address.iter().any(|protocol| std::matches!(protocol, Protocol::P2p(_))) {
728								address.push(Protocol::P2p(litep2p::PeerId::from(peer).into()));
729							}
730
731							if self.litep2p.add_known_address(peer.into(), iter::once(address.clone())) > 0 {
732								// libp2p backend generates `DiscoveryOut::Discovered(peer_id)`
733								// event when a new address is added for a peer, which leads to the
734								// peer being added to peerstore. Do the same directly here.
735								self.peerstore_handle.add_known_peer(peer);
736							} else {
737								log::debug!(
738									target: LOG_TARGET,
739									"couldn't add known address ({address}) for {peer:?}, unsupported transport"
740								);
741							}
742						},
743						NetworkServiceCommand::SetReservedPeers { protocol, peers } => {
744							let peers = self.add_addresses(peers.into_iter().map(Into::into));
745
746							match self.peerset_handles.get(&protocol) {
747								Some(handle) => {
748									let _ = handle.tx.unbounded_send(PeersetCommand::SetReservedPeers { peers });
749								}
750								None => log::warn!(target: LOG_TARGET, "protocol {protocol} doens't exist"),
751							}
752
753						},
754						NetworkServiceCommand::DisconnectPeer {
755							protocol,
756							peer,
757						} => {
758							let Some(handle) = self.peerset_handles.get(&protocol) else {
759								log::warn!(target: LOG_TARGET, "protocol {protocol} doens't exist");
760								continue
761							};
762
763							let _ = handle.tx.unbounded_send(PeersetCommand::DisconnectPeer { peer });
764						}
765						NetworkServiceCommand::SetReservedOnly {
766							protocol,
767							reserved_only,
768						} => {
769							let Some(handle) = self.peerset_handles.get(&protocol) else {
770								log::warn!(target: LOG_TARGET, "protocol {protocol} doens't exist");
771								continue
772							};
773
774							let _ = handle.tx.unbounded_send(PeersetCommand::SetReservedOnly { reserved_only });
775						}
776						NetworkServiceCommand::RemoveReservedPeers {
777							protocol,
778							peers,
779						} => {
780							let Some(handle) = self.peerset_handles.get(&protocol) else {
781								log::warn!(target: LOG_TARGET, "protocol {protocol} doens't exist");
782								continue
783							};
784
785							let _ = handle.tx.unbounded_send(PeersetCommand::RemoveReservedPeers { peers });
786						}
787					}
788				},
789				event = self.discovery.next() => match event {
790					None => return,
791					Some(DiscoveryEvent::Discovered { addresses }) => {
792						// if at least one address was added for the peer, report the peer to `Peerstore`
793						for (peer, addresses) in Litep2pNetworkBackend::parse_addresses(addresses.into_iter()) {
794							if self.litep2p.add_known_address(peer.into(), addresses.clone().into_iter()) > 0 {
795								self.peerstore_handle.add_known_peer(peer);
796							}
797						}
798					}
799					Some(DiscoveryEvent::RoutingTableUpdate { peers }) => {
800						for peer in peers {
801							self.peerstore_handle.add_known_peer(peer.into());
802						}
803					}
804					Some(DiscoveryEvent::FindNodeSuccess { query_id, target, peers }) => {
805						match self.pending_queries.remove(&query_id) {
806							Some(KadQuery::FindNode(_, started)) => {
807								log::trace!(
808									target: LOG_TARGET,
809									"`FIND_NODE` for {target:?} ({query_id:?}) succeeded",
810								);
811
812								self.event_streams.send(
813									Event::Dht(
814										DhtEvent::ClosestPeersFound(
815											target.into(),
816											peers
817												.into_iter()
818												.map(|(peer, addrs)| (
819													peer.into(),
820													addrs.into_iter().map(Into::into).collect(),
821												))
822												.collect(),
823										)
824									)
825								);
826
827								if let Some(ref metrics) = self.metrics {
828									metrics
829										.kademlia_query_duration
830										.with_label_values(&["node-find"])
831										.observe(started.elapsed().as_secs_f64());
832								}
833							},
834							query => {
835								log::error!(
836									target: LOG_TARGET,
837									"Missing/invalid pending query for `FIND_NODE`: {query:?}"
838								);
839								debug_assert!(false);
840							}
841						}
842					},
843					Some(DiscoveryEvent::GetRecordPartialResult { query_id, record }) => {
844						if !self.pending_queries.contains_key(&query_id) {
845							log::error!(
846								target: LOG_TARGET,
847								"Missing/invalid pending query for `GET_VALUE` partial result: {query_id:?}"
848							);
849
850							continue
851						}
852
853						let peer_id: sc_network_types::PeerId = record.peer.into();
854						let record = PeerRecord {
855							record: P2PRecord {
856								key: record.record.key.to_vec().into(),
857								value: record.record.value,
858								publisher: record.record.publisher.map(|peer_id| {
859									let peer_id: sc_network_types::PeerId = peer_id.into();
860									peer_id.into()
861								}),
862								expires: record.record.expires,
863							},
864							peer: Some(peer_id.into()),
865						};
866
867						self.event_streams.send(
868							Event::Dht(
869								DhtEvent::ValueFound(
870									record.into()
871								)
872							)
873						);
874					}
875					Some(DiscoveryEvent::GetRecordSuccess { query_id }) => {
876						match self.pending_queries.remove(&query_id) {
877							Some(KadQuery::GetValue(key, started)) => {
878								log::trace!(
879									target: LOG_TARGET,
880									"`GET_VALUE` for {key:?} ({query_id:?}) succeeded",
881								);
882
883								if let Some(ref metrics) = self.metrics {
884									metrics
885										.kademlia_query_duration
886										.with_label_values(&["value-get"])
887										.observe(started.elapsed().as_secs_f64());
888								}
889							},
890							query => {
891								log::error!(
892									target: LOG_TARGET,
893									"Missing/invalid pending query for `GET_VALUE`: {query:?}"
894								);
895								debug_assert!(false);
896							},
897						}
898					}
899					Some(DiscoveryEvent::PutRecordSuccess { query_id }) => {
900						match self.pending_queries.remove(&query_id) {
901							Some(KadQuery::PutValue(key, started)) => {
902								log::trace!(
903									target: LOG_TARGET,
904									"`PUT_VALUE` for {key:?} ({query_id:?}) succeeded",
905								);
906
907								self.event_streams.send(Event::Dht(
908									DhtEvent::ValuePut(key)
909								));
910
911								if let Some(ref metrics) = self.metrics {
912									metrics
913										.kademlia_query_duration
914										.with_label_values(&["value-put"])
915										.observe(started.elapsed().as_secs_f64());
916								}
917							},
918							query => {
919								log::error!(
920									target: LOG_TARGET,
921									"Missing/invalid pending query for `PUT_VALUE`: {query:?}"
922								);
923								debug_assert!(false);
924							}
925						}
926					}
927					Some(DiscoveryEvent::GetProvidersSuccess { query_id, providers }) => {
928						match self.pending_queries.remove(&query_id) {
929							Some(KadQuery::GetProviders(key, started)) => {
930								log::trace!(
931									target: LOG_TARGET,
932									"`GET_PROVIDERS` for {key:?} ({query_id:?}) succeeded",
933								);
934
935								// We likely requested providers to connect to them,
936								// so let's add their addresses to litep2p's transport manager.
937								// Consider also looking the addresses of providers up with `FIND_NODE`
938								// query, as it can yield more up to date addresses.
939								providers.iter().for_each(|p| {
940									self.litep2p.add_known_address(p.peer, p.addresses.clone().into_iter());
941								});
942
943								self.event_streams.send(Event::Dht(
944									DhtEvent::ProvidersFound(
945										key.clone().into(),
946										providers.into_iter().map(|p| p.peer.into()).collect()
947									)
948								));
949
950								// litep2p returns all providers in a single event, so we let
951								// subscribers know no more providers will be yielded.
952								self.event_streams.send(Event::Dht(
953									DhtEvent::NoMoreProviders(key.into())
954								));
955
956								if let Some(ref metrics) = self.metrics {
957									metrics
958										.kademlia_query_duration
959										.with_label_values(&["providers-get"])
960										.observe(started.elapsed().as_secs_f64());
961								}
962							},
963							query => {
964								log::error!(
965									target: LOG_TARGET,
966									"Missing/invalid pending query for `GET_PROVIDERS`: {query:?}"
967								);
968								debug_assert!(false);
969							}
970						}
971					}
972					Some(DiscoveryEvent::QueryFailed { query_id }) => {
973						match self.pending_queries.remove(&query_id) {
974							Some(KadQuery::FindNode(peer_id, started)) => {
975								log::debug!(
976									target: LOG_TARGET,
977									"`FIND_NODE` ({query_id:?}) failed for target {peer_id:?}",
978								);
979
980								self.event_streams.send(Event::Dht(
981									DhtEvent::ClosestPeersNotFound(peer_id.into())
982								));
983
984								if let Some(ref metrics) = self.metrics {
985									metrics
986										.kademlia_query_duration
987										.with_label_values(&["node-find-failed"])
988										.observe(started.elapsed().as_secs_f64());
989								}
990							},
991							Some(KadQuery::GetValue(key, started)) => {
992								log::debug!(
993									target: LOG_TARGET,
994									"`GET_VALUE` ({query_id:?}) failed for key {key:?}",
995								);
996
997								self.event_streams.send(Event::Dht(
998									DhtEvent::ValueNotFound(key)
999								));
1000
1001								if let Some(ref metrics) = self.metrics {
1002									metrics
1003										.kademlia_query_duration
1004										.with_label_values(&["value-get-failed"])
1005										.observe(started.elapsed().as_secs_f64());
1006								}
1007							},
1008							Some(KadQuery::PutValue(key, started)) => {
1009								log::debug!(
1010									target: LOG_TARGET,
1011									"`PUT_VALUE` ({query_id:?}) failed for key {key:?}",
1012								);
1013
1014								self.event_streams.send(Event::Dht(
1015									DhtEvent::ValuePutFailed(key)
1016								));
1017
1018								if let Some(ref metrics) = self.metrics {
1019									metrics
1020										.kademlia_query_duration
1021										.with_label_values(&["value-put-failed"])
1022										.observe(started.elapsed().as_secs_f64());
1023								}
1024							},
1025							Some(KadQuery::GetProviders(key, started)) => {
1026								log::debug!(
1027									target: LOG_TARGET,
1028									"`GET_PROVIDERS` ({query_id:?}) failed for key {key:?}"
1029								);
1030
1031								self.event_streams.send(Event::Dht(
1032									DhtEvent::ProvidersNotFound(key)
1033								));
1034
1035								if let Some(ref metrics) = self.metrics {
1036									metrics
1037										.kademlia_query_duration
1038										.with_label_values(&["providers-get-failed"])
1039										.observe(started.elapsed().as_secs_f64());
1040								}
1041							},
1042							None => {
1043								log::warn!(
1044									target: LOG_TARGET,
1045									"non-existent query failed ({query_id:?})",
1046								);
1047							}
1048						}
1049					}
1050					Some(DiscoveryEvent::Identified { peer, listen_addresses, supported_protocols, .. }) => {
1051						self.discovery.add_self_reported_address(peer, supported_protocols, listen_addresses).await;
1052					}
1053					Some(DiscoveryEvent::ExternalAddressDiscovered { address }) => {
1054						match self.litep2p.public_addresses().add_address(address.clone().into()) {
1055							Ok(inserted) => if inserted {
1056								log::info!(target: LOG_TARGET, "๐Ÿ” Discovered new external address for our node: {address}");
1057							},
1058							Err(err) => {
1059								log::warn!(
1060									target: LOG_TARGET,
1061									"๐Ÿ” Failed to add discovered external address {address:?}: {err:?}",
1062								);
1063							},
1064						}
1065					}
1066					Some(DiscoveryEvent::ExternalAddressExpired{ address }) => {
1067						let local_peer_id = self.litep2p.local_peer_id();
1068
1069						// Litep2p requires the peer ID to be present in the address.
1070						let address = if !std::matches!(address.iter().last(), Some(Protocol::P2p(_))) {
1071							address.with(Protocol::P2p(*local_peer_id.as_ref()))
1072						} else {
1073							address
1074						};
1075
1076						if self.litep2p.public_addresses().remove_address(&address) {
1077							log::info!(target: LOG_TARGET, "๐Ÿ” Expired external address for our node: {address}");
1078						} else {
1079							log::warn!(
1080								target: LOG_TARGET,
1081								"๐Ÿ” Failed to remove expired external address {address:?}"
1082							);
1083						}
1084					}
1085					Some(DiscoveryEvent::Ping { peer, rtt }) => {
1086						log::trace!(
1087							target: LOG_TARGET,
1088							"ping time with {peer:?}: {rtt:?}",
1089						);
1090					}
1091					Some(DiscoveryEvent::IncomingRecord { record: Record { key, value, publisher, expires }} ) => {
1092						self.event_streams.send(Event::Dht(
1093							DhtEvent::PutRecordRequest(
1094								key.into(),
1095								value,
1096								publisher.map(Into::into),
1097								expires,
1098							)
1099						));
1100					},
1101
1102					Some(DiscoveryEvent::RandomKademliaStarted) => {
1103						if let Some(metrics) = self.metrics.as_ref() {
1104							metrics.kademlia_random_queries_total.inc();
1105						}
1106					}
1107				},
1108				event = self.litep2p.next_event() => match event {
1109					Some(Litep2pEvent::ConnectionEstablished { peer, endpoint }) => {
1110						let Some(metrics) = &self.metrics else {
1111							continue;
1112						};
1113
1114						let direction = match endpoint {
1115							Endpoint::Dialer { .. } => "out",
1116							Endpoint::Listener { .. } => {
1117								// Increment incoming connections counter.
1118								//
1119								// Note: For litep2p these are represented by established negotiated connections,
1120								// while for libp2p (legacy) these represent not-yet-negotiated connections.
1121								metrics.incoming_connections_total.inc();
1122
1123								"in"
1124							},
1125						};
1126						metrics.connections_opened_total.with_label_values(&[direction]).inc();
1127
1128						match self.peers.entry(peer) {
1129							Entry::Vacant(entry) => {
1130								entry.insert(ConnectionContext {
1131									endpoints: HashMap::from_iter([(endpoint.connection_id(), endpoint)]),
1132									num_connections: 1usize,
1133								});
1134								metrics.distinct_peers_connections_opened_total.inc();
1135							}
1136							Entry::Occupied(entry) => {
1137								let entry = entry.into_mut();
1138								entry.num_connections += 1;
1139								entry.endpoints.insert(endpoint.connection_id(), endpoint);
1140							}
1141						}
1142					}
1143					Some(Litep2pEvent::ConnectionClosed { peer, connection_id }) => {
1144						let Some(metrics) = &self.metrics else {
1145							continue;
1146						};
1147
1148						let Some(context) = self.peers.get_mut(&peer) else {
1149							log::debug!(target: LOG_TARGET, "unknown peer disconnected: {peer:?} ({connection_id:?})");
1150							continue
1151						};
1152
1153						let direction = match context.endpoints.remove(&connection_id) {
1154							None => {
1155								log::debug!(target: LOG_TARGET, "connection {connection_id:?} doesn't exist for {peer:?} ");
1156								continue
1157							}
1158							Some(endpoint) => {
1159								context.num_connections -= 1;
1160
1161								match endpoint {
1162									Endpoint::Dialer { .. } => "out",
1163									Endpoint::Listener { .. } => "in",
1164								}
1165							}
1166						};
1167
1168						metrics.connections_closed_total.with_label_values(&[direction, "actively-closed"]).inc();
1169
1170						if context.num_connections == 0 {
1171							self.peers.remove(&peer);
1172							metrics.distinct_peers_connections_closed_total.inc();
1173						}
1174					}
1175					Some(Litep2pEvent::DialFailure { address, error }) => {
1176						log::debug!(
1177							target: LOG_TARGET,
1178							"failed to dial peer at {address:?}: {error:?}",
1179						);
1180
1181						if let Some(metrics) = &self.metrics {
1182							let reason = match error {
1183								DialError::Timeout => "timeout",
1184								DialError::AddressError(_) => "invalid-address",
1185								DialError::DnsError(_) => "cannot-resolve-dns",
1186								DialError::NegotiationError(error) => match error {
1187									NegotiationError::Timeout => "timeout",
1188									NegotiationError::PeerIdMissing => "missing-peer-id",
1189									NegotiationError::StateMismatch => "state-mismatch",
1190									NegotiationError::PeerIdMismatch(_,_) => "peer-id-missmatch",
1191									NegotiationError::MultistreamSelectError(_) => "multistream-select-error",
1192									NegotiationError::SnowError(_) => "noise-error",
1193									NegotiationError::ParseError(_) => "parse-error",
1194									NegotiationError::IoError(_) => "io-error",
1195									NegotiationError::WebSocket(_) => "webscoket-error",
1196									NegotiationError::BadSignature => "bad-signature",
1197								}
1198							};
1199
1200							metrics.pending_connections_errors_total.with_label_values(&[&reason]).inc();
1201						}
1202					}
1203					Some(Litep2pEvent::ListDialFailures { errors }) => {
1204						log::debug!(
1205							target: LOG_TARGET,
1206							"failed to dial peer on multiple addresses {errors:?}",
1207						);
1208
1209						if let Some(metrics) = &self.metrics {
1210							metrics.pending_connections_errors_total.with_label_values(&["transport-errors"]).inc();
1211						}
1212					}
1213					None => {
1214						log::error!(
1215								target: LOG_TARGET,
1216								"Litep2p backend terminated"
1217						);
1218						return
1219					}
1220				},
1221			}
1222		}
1223	}
1224}