referrerpolicy=no-referrer-when-downgrade

sc_network/litep2p/
mod.rs

1// This file is part of Substrate.
2
3// Copyright (C) Parity Technologies (UK) Ltd.
4// SPDX-License-Identifier: GPL-3.0-or-later WITH Classpath-exception-2.0
5
6// This program is free software: you can redistribute it and/or modify
7// it under the terms of the GNU General Public License as published by
8// the Free Software Foundation, either version 3 of the License, or
9// (at your option) any later version.
10
11// This program is distributed in the hope that it will be useful,
12// but WITHOUT ANY WARRANTY; without even the implied warranty of
13// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
14// GNU General Public License for more details.
15
16// You should have received a copy of the GNU General Public License
17// along with this program. If not, see <https://www.gnu.org/licenses/>.
18
19//! `NetworkBackend` implementation for `litep2p`.
20
21use crate::{
22	config::{
23		FullNetworkConfiguration, IncomingRequest, NodeKeyConfig, NotificationHandshake, Params,
24		SetConfig, TransportConfig,
25	},
26	error::Error,
27	event::{DhtEvent, Event},
28	litep2p::{
29		discovery::{Discovery, DiscoveryEvent},
30		peerstore::Peerstore,
31		service::{Litep2pNetworkService, NetworkServiceCommand},
32		shim::{
33			bitswap::BitswapServer,
34			notification::{
35				config::{NotificationProtocolConfig, ProtocolControlHandle},
36				peerset::PeersetCommand,
37			},
38			request_response::{RequestResponseConfig, RequestResponseProtocol},
39		},
40	},
41	peer_store::PeerStoreProvider,
42	service::{
43		metrics::{register_without_sources, MetricSources, Metrics, NotificationMetrics},
44		out_events,
45		traits::{BandwidthSink, NetworkBackend, NetworkService},
46	},
47	NetworkStatus, NotificationService, ProtocolName,
48};
49
50use codec::Encode;
51use futures::StreamExt;
52use litep2p::{
53	config::ConfigBuilder,
54	crypto::ed25519::Keypair,
55	error::{DialError, NegotiationError},
56	executor::Executor,
57	protocol::{
58		libp2p::{
59			bitswap::Config as BitswapConfig,
60			kademlia::{QueryId, Record},
61		},
62		request_response::ConfigBuilder as RequestResponseConfigBuilder,
63	},
64	transport::{
65		tcp::config::Config as TcpTransportConfig,
66		websocket::config::Config as WebSocketTransportConfig, ConnectionLimitsConfig, Endpoint,
67	},
68	types::{
69		multiaddr::{Multiaddr, Protocol},
70		ConnectionId,
71	},
72	Litep2p, Litep2pEvent, ProtocolName as Litep2pProtocolName,
73};
74use prometheus_endpoint::Registry;
75use sc_network_types::kad::{Key as RecordKey, PeerRecord, Record as P2PRecord};
76
77use sc_client_api::BlockBackend;
78use sc_network_common::{role::Roles, ExHashT};
79use sc_network_types::PeerId;
80use sc_utils::mpsc::{tracing_unbounded, TracingUnboundedReceiver};
81use sp_runtime::traits::Block as BlockT;
82
83use std::{
84	cmp,
85	collections::{hash_map::Entry, HashMap, HashSet},
86	fs,
87	future::Future,
88	iter,
89	pin::Pin,
90	sync::{
91		atomic::{AtomicUsize, Ordering},
92		Arc,
93	},
94	time::{Duration, Instant},
95};
96
97mod discovery;
98mod peerstore;
99mod service;
100mod shim;
101
102/// Litep2p bandwidth sink.
103struct Litep2pBandwidthSink {
104	sink: litep2p::BandwidthSink,
105}
106
107impl BandwidthSink for Litep2pBandwidthSink {
108	fn total_inbound(&self) -> u64 {
109		self.sink.inbound() as u64
110	}
111
112	fn total_outbound(&self) -> u64 {
113		self.sink.outbound() as u64
114	}
115}
116
117/// Litep2p task executor.
118struct Litep2pExecutor {
119	/// Executor.
120	executor: Box<dyn Fn(Pin<Box<dyn Future<Output = ()> + Send>>) + Send + Sync>,
121}
122
123impl Executor for Litep2pExecutor {
124	fn run(&self, future: Pin<Box<dyn Future<Output = ()> + Send>>) {
125		(self.executor)(future)
126	}
127
128	fn run_with_name(&self, _: &'static str, future: Pin<Box<dyn Future<Output = ()> + Send>>) {
129		(self.executor)(future)
130	}
131}
132
133/// Logging target for the file.
134const LOG_TARGET: &str = "sub-libp2p";
135
136/// Peer context.
137struct ConnectionContext {
138	/// Peer endpoints.
139	endpoints: HashMap<ConnectionId, Endpoint>,
140
141	/// Number of active connections.
142	num_connections: usize,
143}
144
145/// Kademlia query we are tracking.
146#[derive(Debug)]
147enum KadQuery {
148	/// `FIND_NODE` query for target and when it was initiated.
149	FindNode(PeerId, Instant),
150	/// `GET_VALUE` query for key and when it was initiated.
151	GetValue(RecordKey, Instant),
152	/// `PUT_VALUE` query for key and when it was initiated.
153	PutValue(RecordKey, Instant),
154	/// `GET_PROVIDERS` query for key and when it was initiated.
155	GetProviders(RecordKey, Instant),
156}
157
158/// Networking backend for `litep2p`.
159pub struct Litep2pNetworkBackend {
160	/// Main `litep2p` object.
161	litep2p: Litep2p,
162
163	/// `NetworkService` implementation for `Litep2pNetworkBackend`.
164	network_service: Arc<dyn NetworkService>,
165
166	/// RX channel for receiving commands from `Litep2pNetworkService`.
167	cmd_rx: TracingUnboundedReceiver<NetworkServiceCommand>,
168
169	/// `Peerset` handles to notification protocols.
170	peerset_handles: HashMap<ProtocolName, ProtocolControlHandle>,
171
172	/// Pending Kademlia queries.
173	pending_queries: HashMap<QueryId, KadQuery>,
174
175	/// Discovery.
176	discovery: Discovery,
177
178	/// Number of connected peers.
179	num_connected: Arc<AtomicUsize>,
180
181	/// Connected peers.
182	peers: HashMap<litep2p::PeerId, ConnectionContext>,
183
184	/// Peerstore.
185	peerstore_handle: Arc<dyn PeerStoreProvider>,
186
187	/// Block announce protocol name.
188	block_announce_protocol: ProtocolName,
189
190	/// Sender for DHT events.
191	event_streams: out_events::OutChannels,
192
193	/// Prometheus metrics.
194	metrics: Option<Metrics>,
195}
196
197impl Litep2pNetworkBackend {
198	/// From an iterator of multiaddress(es), parse and group all addresses of peers
199	/// so that litep2p can consume the information easily.
200	fn parse_addresses(
201		addresses: impl Iterator<Item = Multiaddr>,
202	) -> HashMap<PeerId, Vec<Multiaddr>> {
203		addresses
204			.into_iter()
205			.filter_map(|address| match address.iter().next() {
206				Some(
207					Protocol::Dns(_) |
208					Protocol::Dns4(_) |
209					Protocol::Dns6(_) |
210					Protocol::Ip6(_) |
211					Protocol::Ip4(_),
212				) => match address.iter().find(|protocol| std::matches!(protocol, Protocol::P2p(_)))
213				{
214					Some(Protocol::P2p(multihash)) => PeerId::from_multihash(multihash.into())
215						.map_or(None, |peer| Some((peer, Some(address)))),
216					_ => None,
217				},
218				Some(Protocol::P2p(multihash)) =>
219					PeerId::from_multihash(multihash.into()).map_or(None, |peer| Some((peer, None))),
220				_ => None,
221			})
222			.fold(HashMap::new(), |mut acc, (peer, maybe_address)| {
223				let entry = acc.entry(peer).or_default();
224				maybe_address.map(|address| entry.push(address));
225
226				acc
227			})
228	}
229
230	/// Add new known addresses to `litep2p` and return the parsed peer IDs.
231	fn add_addresses(&mut self, peers: impl Iterator<Item = Multiaddr>) -> HashSet<PeerId> {
232		Self::parse_addresses(peers.into_iter())
233			.into_iter()
234			.filter_map(|(peer, addresses)| {
235				// `peers` contained multiaddress in the form `/p2p/<peer ID>`
236				if addresses.is_empty() {
237					return Some(peer)
238				}
239
240				if self.litep2p.add_known_address(peer.into(), addresses.clone().into_iter()) == 0 {
241					log::warn!(
242						target: LOG_TARGET,
243						"couldn't add any addresses for {peer:?} and it won't be added as reserved peer",
244					);
245					return None
246				}
247
248				self.peerstore_handle.add_known_peer(peer);
249				Some(peer)
250			})
251			.collect()
252	}
253}
254
255impl Litep2pNetworkBackend {
256	/// Get `litep2p` keypair from `NodeKeyConfig`.
257	fn get_keypair(node_key: &NodeKeyConfig) -> Result<(Keypair, litep2p::PeerId), Error> {
258		let secret: litep2p::crypto::ed25519::SecretKey =
259			node_key.clone().into_keypair()?.secret().into();
260
261		let local_identity = Keypair::from(secret);
262		let local_public = local_identity.public();
263		let local_peer_id = local_public.to_peer_id();
264
265		Ok((local_identity, local_peer_id))
266	}
267
268	/// Configure transport protocols for `Litep2pNetworkBackend`.
269	fn configure_transport<B: BlockT + 'static, H: ExHashT>(
270		config: &FullNetworkConfiguration<B, H, Self>,
271	) -> ConfigBuilder {
272		let _ = match config.network_config.transport {
273			TransportConfig::MemoryOnly => panic!("memory transport not supported"),
274			TransportConfig::Normal { .. } => false,
275		};
276		let config_builder = ConfigBuilder::new();
277
278		let (tcp, websocket): (Vec<Option<_>>, Vec<Option<_>>) = config
279			.network_config
280			.listen_addresses
281			.iter()
282			.filter_map(|address| {
283				use sc_network_types::multiaddr::Protocol;
284
285				let mut iter = address.iter();
286
287				match iter.next() {
288					Some(Protocol::Ip4(_) | Protocol::Ip6(_)) => {},
289					protocol => {
290						log::error!(
291							target: LOG_TARGET,
292							"unknown protocol {protocol:?}, ignoring {address:?}",
293						);
294
295						return None
296					},
297				}
298
299				match iter.next() {
300					Some(Protocol::Tcp(_)) => match iter.next() {
301						Some(Protocol::Ws(_) | Protocol::Wss(_)) =>
302							Some((None, Some(address.clone()))),
303						Some(Protocol::P2p(_)) | None => Some((Some(address.clone()), None)),
304						protocol => {
305							log::error!(
306								target: LOG_TARGET,
307								"unknown protocol {protocol:?}, ignoring {address:?}",
308							);
309							None
310						},
311					},
312					protocol => {
313						log::error!(
314							target: LOG_TARGET,
315							"unknown protocol {protocol:?}, ignoring {address:?}",
316						);
317						None
318					},
319				}
320			})
321			.unzip();
322
323		config_builder
324			.with_websocket(WebSocketTransportConfig {
325				listen_addresses: websocket.into_iter().flatten().map(Into::into).collect(),
326				yamux_config: litep2p::yamux::Config::default(),
327				nodelay: true,
328				..Default::default()
329			})
330			.with_tcp(TcpTransportConfig {
331				listen_addresses: tcp.into_iter().flatten().map(Into::into).collect(),
332				yamux_config: litep2p::yamux::Config::default(),
333				nodelay: true,
334				..Default::default()
335			})
336	}
337}
338
339#[async_trait::async_trait]
340impl<B: BlockT + 'static, H: ExHashT> NetworkBackend<B, H> for Litep2pNetworkBackend {
341	type NotificationProtocolConfig = NotificationProtocolConfig;
342	type RequestResponseProtocolConfig = RequestResponseConfig;
343	type NetworkService<Block, Hash> = Arc<Litep2pNetworkService>;
344	type PeerStore = Peerstore;
345	type BitswapConfig = BitswapConfig;
346
347	fn new(mut params: Params<B, H, Self>) -> Result<Self, Error>
348	where
349		Self: Sized,
350	{
351		let (keypair, local_peer_id) =
352			Self::get_keypair(&params.network_config.network_config.node_key)?;
353		let (cmd_tx, cmd_rx) = tracing_unbounded("mpsc_network_worker", 100_000);
354
355		params.network_config.network_config.boot_nodes = params
356			.network_config
357			.network_config
358			.boot_nodes
359			.into_iter()
360			.filter(|boot_node| boot_node.peer_id != local_peer_id.into())
361			.collect();
362		params.network_config.network_config.default_peers_set.reserved_nodes = params
363			.network_config
364			.network_config
365			.default_peers_set
366			.reserved_nodes
367			.into_iter()
368			.filter(|reserved_node| {
369				if reserved_node.peer_id == local_peer_id.into() {
370					log::warn!(
371						target: LOG_TARGET,
372						"Local peer ID used in reserved node, ignoring: {reserved_node}",
373					);
374					false
375				} else {
376					true
377				}
378			})
379			.collect();
380
381		if let Some(path) = &params.network_config.network_config.net_config_path {
382			fs::create_dir_all(path)?;
383		}
384
385		log::info!(target: LOG_TARGET, "Local node identity is: {local_peer_id}");
386		log::info!(target: LOG_TARGET, "Running litep2p network backend");
387
388		params.network_config.sanity_check_addresses()?;
389		params.network_config.sanity_check_bootnodes()?;
390
391		let mut config_builder =
392			Self::configure_transport(&params.network_config).with_keypair(keypair.clone());
393		let known_addresses = params.network_config.known_addresses();
394		let peer_store_handle = params.network_config.peer_store_handle();
395		let executor = Arc::new(Litep2pExecutor { executor: params.executor });
396
397		let FullNetworkConfiguration {
398			notification_protocols,
399			request_response_protocols,
400			network_config,
401			..
402		} = params.network_config;
403
404		// initialize notification protocols
405		//
406		// pass the protocol configuration to `Litep2pConfigBuilder` and save the TX channel
407		// to the protocol's `Peerset` together with the protocol name to allow other subsystems
408		// of Polkadot SDK to control connectivity of the notification protocol
409		let block_announce_protocol = params.block_announce_config.protocol_name().clone();
410		let mut notif_protocols = HashMap::from_iter([(
411			params.block_announce_config.protocol_name().clone(),
412			params.block_announce_config.handle,
413		)]);
414
415		// handshake for all but the syncing protocol is set to node role
416		config_builder = notification_protocols
417			.into_iter()
418			.fold(config_builder, |config_builder, mut config| {
419				config.config.set_handshake(Roles::from(&params.role).encode());
420				notif_protocols.insert(config.protocol_name, config.handle);
421
422				config_builder.with_notification_protocol(config.config)
423			})
424			.with_notification_protocol(params.block_announce_config.config);
425
426		// initialize request-response protocols
427		let metrics = match &params.metrics_registry {
428			Some(registry) => Some(register_without_sources(registry)?),
429			None => None,
430		};
431
432		// create channels that are used to send request before initializing protocols so the
433		// senders can be passed onto all request-response protocols
434		//
435		// all protocols must have each others' senders so they can send the fallback request in
436		// case the main protocol is not supported by the remote peer and user specified a fallback
437		let (mut request_response_receivers, request_response_senders): (
438			HashMap<_, _>,
439			HashMap<_, _>,
440		) = request_response_protocols
441			.iter()
442			.map(|config| {
443				let (tx, rx) = tracing_unbounded("outbound-requests", 10_000);
444				((config.protocol_name.clone(), rx), (config.protocol_name.clone(), tx))
445			})
446			.unzip();
447
448		config_builder = request_response_protocols.into_iter().fold(
449			config_builder,
450			|config_builder, config| {
451				let (protocol_config, handle) = RequestResponseConfigBuilder::new(
452					Litep2pProtocolName::from(config.protocol_name.clone()),
453				)
454				.with_max_size(cmp::max(config.max_request_size, config.max_response_size) as usize)
455				.with_fallback_names(config.fallback_names.into_iter().map(From::from).collect())
456				.with_timeout(config.request_timeout)
457				.build();
458
459				let protocol = RequestResponseProtocol::new(
460					config.protocol_name.clone(),
461					handle,
462					Arc::clone(&peer_store_handle),
463					config.inbound_queue,
464					request_response_receivers
465						.remove(&config.protocol_name)
466						.expect("receiver exists as it was just added and there are no duplicate protocols; qed"),
467					request_response_senders.clone(),
468					metrics.clone(),
469				);
470
471				executor.run(Box::pin(async move {
472					protocol.run().await;
473				}));
474
475				config_builder.with_request_response_protocol(protocol_config)
476			},
477		);
478
479		// collect known addresses
480		let known_addresses: HashMap<litep2p::PeerId, Vec<Multiaddr>> =
481			known_addresses.into_iter().fold(HashMap::new(), |mut acc, (peer, address)| {
482				use sc_network_types::multiaddr::Protocol;
483
484				let address = match address.iter().last() {
485					Some(Protocol::Ws(_) | Protocol::Wss(_) | Protocol::Tcp(_)) =>
486						address.with(Protocol::P2p(peer.into())),
487					Some(Protocol::P2p(_)) => address,
488					_ => return acc,
489				};
490
491				acc.entry(peer.into()).or_default().push(address.into());
492				peer_store_handle.add_known_peer(peer);
493
494				acc
495			});
496
497		// enable ipfs ping, identify and kademlia, and potentially mdns if user enabled it
498		let listen_addresses = Arc::new(Default::default());
499		let (discovery, ping_config, identify_config, kademlia_config, maybe_mdns_config) =
500			Discovery::new(
501				local_peer_id,
502				&network_config,
503				params.genesis_hash,
504				params.fork_id.as_deref(),
505				&params.protocol_id,
506				known_addresses.clone(),
507				Arc::clone(&listen_addresses),
508				Arc::clone(&peer_store_handle),
509			);
510
511		config_builder = config_builder
512			.with_known_addresses(known_addresses.clone().into_iter())
513			.with_libp2p_ping(ping_config)
514			.with_libp2p_identify(identify_config)
515			.with_libp2p_kademlia(kademlia_config)
516			.with_connection_limits(ConnectionLimitsConfig::default().max_incoming_connections(
517				Some(crate::MAX_CONNECTIONS_ESTABLISHED_INCOMING as usize),
518			))
519			.with_keep_alive_timeout(network_config.idle_connection_timeout)
520			// Use system DNS resolver to enable intranet domain resolution and administrator
521			// control over DNS lookup.
522			.with_system_resolver()
523			.with_executor(executor);
524
525		if let Some(config) = maybe_mdns_config {
526			config_builder = config_builder.with_mdns(config);
527		}
528
529		if let Some(config) = params.bitswap_config {
530			config_builder = config_builder.with_libp2p_bitswap(config);
531		}
532
533		let litep2p =
534			Litep2p::new(config_builder.build()).map_err(|error| Error::Litep2p(error))?;
535
536		litep2p.listen_addresses().for_each(|address| {
537			log::debug!(target: LOG_TARGET, "listening on: {address}");
538
539			listen_addresses.write().insert(address.clone());
540		});
541
542		let public_addresses = litep2p.public_addresses();
543		for address in network_config.public_addresses.iter() {
544			if let Err(err) = public_addresses.add_address(address.clone().into()) {
545				log::warn!(
546					target: LOG_TARGET,
547					"failed to add public address {address:?}: {err:?}",
548				);
549			}
550		}
551
552		let network_service = Arc::new(Litep2pNetworkService::new(
553			local_peer_id,
554			keypair.clone(),
555			cmd_tx,
556			Arc::clone(&peer_store_handle),
557			notif_protocols.clone(),
558			block_announce_protocol.clone(),
559			request_response_senders,
560			Arc::clone(&listen_addresses),
561			public_addresses,
562		));
563
564		// register rest of the metrics now that `Litep2p` has been created
565		let num_connected = Arc::new(Default::default());
566		let bandwidth: Arc<dyn BandwidthSink> =
567			Arc::new(Litep2pBandwidthSink { sink: litep2p.bandwidth_sink() });
568
569		if let Some(registry) = &params.metrics_registry {
570			MetricSources::register(registry, bandwidth, Arc::clone(&num_connected))?;
571		}
572
573		Ok(Self {
574			network_service,
575			cmd_rx,
576			metrics,
577			peerset_handles: notif_protocols,
578			num_connected,
579			discovery,
580			pending_queries: HashMap::new(),
581			peerstore_handle: peer_store_handle,
582			block_announce_protocol,
583			event_streams: out_events::OutChannels::new(None)?,
584			peers: HashMap::new(),
585			litep2p,
586		})
587	}
588
589	fn network_service(&self) -> Arc<dyn NetworkService> {
590		Arc::clone(&self.network_service)
591	}
592
593	fn peer_store(
594		bootnodes: Vec<sc_network_types::PeerId>,
595		metrics_registry: Option<Registry>,
596	) -> Self::PeerStore {
597		Peerstore::new(bootnodes, metrics_registry)
598	}
599
600	fn register_notification_metrics(registry: Option<&Registry>) -> NotificationMetrics {
601		NotificationMetrics::new(registry)
602	}
603
604	/// Create Bitswap server.
605	fn bitswap_server(
606		client: Arc<dyn BlockBackend<B> + Send + Sync>,
607	) -> (Pin<Box<dyn Future<Output = ()> + Send>>, Self::BitswapConfig) {
608		BitswapServer::new(client)
609	}
610
611	/// Create notification protocol configuration for `protocol`.
612	fn notification_config(
613		protocol_name: ProtocolName,
614		fallback_names: Vec<ProtocolName>,
615		max_notification_size: u64,
616		handshake: Option<NotificationHandshake>,
617		set_config: SetConfig,
618		metrics: NotificationMetrics,
619		peerstore_handle: Arc<dyn PeerStoreProvider>,
620	) -> (Self::NotificationProtocolConfig, Box<dyn NotificationService>) {
621		Self::NotificationProtocolConfig::new(
622			protocol_name,
623			fallback_names,
624			max_notification_size as usize,
625			handshake,
626			set_config,
627			metrics,
628			peerstore_handle,
629		)
630	}
631
632	/// Create request-response protocol configuration.
633	fn request_response_config(
634		protocol_name: ProtocolName,
635		fallback_names: Vec<ProtocolName>,
636		max_request_size: u64,
637		max_response_size: u64,
638		request_timeout: Duration,
639		inbound_queue: Option<async_channel::Sender<IncomingRequest>>,
640	) -> Self::RequestResponseProtocolConfig {
641		Self::RequestResponseProtocolConfig::new(
642			protocol_name,
643			fallback_names,
644			max_request_size,
645			max_response_size,
646			request_timeout,
647			inbound_queue,
648		)
649	}
650
651	/// Start [`Litep2pNetworkBackend`] event loop.
652	async fn run(mut self) {
653		log::debug!(target: LOG_TARGET, "starting litep2p network backend");
654
655		loop {
656			let num_connected_peers = self
657				.peerset_handles
658				.get(&self.block_announce_protocol)
659				.map_or(0usize, |handle| handle.connected_peers.load(Ordering::Relaxed));
660			self.num_connected.store(num_connected_peers, Ordering::Relaxed);
661
662			tokio::select! {
663				command = self.cmd_rx.next() => match command {
664					None => return,
665					Some(command) => match command {
666						NetworkServiceCommand::FindClosestPeers { target } => {
667							let query_id = self.discovery.find_node(target.into()).await;
668							self.pending_queries.insert(query_id, KadQuery::FindNode(target, Instant::now()));
669						}
670						NetworkServiceCommand::GetValue{ key } => {
671							let query_id = self.discovery.get_value(key.clone()).await;
672							self.pending_queries.insert(query_id, KadQuery::GetValue(key, Instant::now()));
673						}
674						NetworkServiceCommand::PutValue { key, value } => {
675							let query_id = self.discovery.put_value(key.clone(), value).await;
676							self.pending_queries.insert(query_id, KadQuery::PutValue(key, Instant::now()));
677						}
678						NetworkServiceCommand::PutValueTo { record, peers, update_local_storage} => {
679							let kademlia_key = record.key.clone();
680							let query_id = self.discovery.put_value_to_peers(record.into(), peers, update_local_storage).await;
681							self.pending_queries.insert(query_id, KadQuery::PutValue(kademlia_key, Instant::now()));
682						}
683						NetworkServiceCommand::StoreRecord { key, value, publisher, expires } => {
684							self.discovery.store_record(key, value, publisher.map(Into::into), expires).await;
685						}
686						NetworkServiceCommand::StartProviding { key } => {
687							self.discovery.start_providing(key).await;
688						}
689						NetworkServiceCommand::StopProviding { key } => {
690							self.discovery.stop_providing(key).await;
691						}
692						NetworkServiceCommand::GetProviders { key } => {
693							let query_id = self.discovery.get_providers(key.clone()).await;
694							self.pending_queries.insert(query_id, KadQuery::GetProviders(key, Instant::now()));
695						}
696						NetworkServiceCommand::EventStream { tx } => {
697							self.event_streams.push(tx);
698						}
699						NetworkServiceCommand::Status { tx } => {
700							let _ = tx.send(NetworkStatus {
701								num_connected_peers: self
702									.peerset_handles
703									.get(&self.block_announce_protocol)
704									.map_or(0usize, |handle| handle.connected_peers.load(Ordering::Relaxed)),
705								total_bytes_inbound: self.litep2p.bandwidth_sink().inbound() as u64,
706								total_bytes_outbound: self.litep2p.bandwidth_sink().outbound() as u64,
707							});
708						}
709						NetworkServiceCommand::AddPeersToReservedSet {
710							protocol,
711							peers,
712						} => {
713							let peers = self.add_addresses(peers.into_iter().map(Into::into));
714
715							match self.peerset_handles.get(&protocol) {
716								Some(handle) => {
717									let _ = handle.tx.unbounded_send(PeersetCommand::AddReservedPeers { peers });
718								}
719								None => log::warn!(target: LOG_TARGET, "protocol {protocol} doens't exist"),
720							};
721						}
722						NetworkServiceCommand::AddKnownAddress { peer, address } => {
723							let mut address: Multiaddr = address.into();
724
725							if !address.iter().any(|protocol| std::matches!(protocol, Protocol::P2p(_))) {
726								address.push(Protocol::P2p(litep2p::PeerId::from(peer).into()));
727							}
728
729							if self.litep2p.add_known_address(peer.into(), iter::once(address.clone())) > 0 {
730								// libp2p backend generates `DiscoveryOut::Discovered(peer_id)`
731								// event when a new address is added for a peer, which leads to the
732								// peer being added to peerstore. Do the same directly here.
733								self.peerstore_handle.add_known_peer(peer);
734							} else {
735								log::debug!(
736									target: LOG_TARGET,
737									"couldn't add known address ({address}) for {peer:?}, unsupported transport"
738								);
739							}
740						},
741						NetworkServiceCommand::SetReservedPeers { protocol, peers } => {
742							let peers = self.add_addresses(peers.into_iter().map(Into::into));
743
744							match self.peerset_handles.get(&protocol) {
745								Some(handle) => {
746									let _ = handle.tx.unbounded_send(PeersetCommand::SetReservedPeers { peers });
747								}
748								None => log::warn!(target: LOG_TARGET, "protocol {protocol} doens't exist"),
749							}
750
751						},
752						NetworkServiceCommand::DisconnectPeer {
753							protocol,
754							peer,
755						} => {
756							let Some(handle) = self.peerset_handles.get(&protocol) else {
757								log::warn!(target: LOG_TARGET, "protocol {protocol} doens't exist");
758								continue
759							};
760
761							let _ = handle.tx.unbounded_send(PeersetCommand::DisconnectPeer { peer });
762						}
763						NetworkServiceCommand::SetReservedOnly {
764							protocol,
765							reserved_only,
766						} => {
767							let Some(handle) = self.peerset_handles.get(&protocol) else {
768								log::warn!(target: LOG_TARGET, "protocol {protocol} doens't exist");
769								continue
770							};
771
772							let _ = handle.tx.unbounded_send(PeersetCommand::SetReservedOnly { reserved_only });
773						}
774						NetworkServiceCommand::RemoveReservedPeers {
775							protocol,
776							peers,
777						} => {
778							let Some(handle) = self.peerset_handles.get(&protocol) else {
779								log::warn!(target: LOG_TARGET, "protocol {protocol} doens't exist");
780								continue
781							};
782
783							let _ = handle.tx.unbounded_send(PeersetCommand::RemoveReservedPeers { peers });
784						}
785					}
786				},
787				event = self.discovery.next() => match event {
788					None => return,
789					Some(DiscoveryEvent::Discovered { addresses }) => {
790						// if at least one address was added for the peer, report the peer to `Peerstore`
791						for (peer, addresses) in Litep2pNetworkBackend::parse_addresses(addresses.into_iter()) {
792							if self.litep2p.add_known_address(peer.into(), addresses.clone().into_iter()) > 0 {
793								self.peerstore_handle.add_known_peer(peer);
794							}
795						}
796					}
797					Some(DiscoveryEvent::RoutingTableUpdate { peers }) => {
798						for peer in peers {
799							self.peerstore_handle.add_known_peer(peer.into());
800						}
801					}
802					Some(DiscoveryEvent::FindNodeSuccess { query_id, target, peers }) => {
803						match self.pending_queries.remove(&query_id) {
804							Some(KadQuery::FindNode(_, started)) => {
805								log::trace!(
806									target: LOG_TARGET,
807									"`FIND_NODE` for {target:?} ({query_id:?}) succeeded",
808								);
809
810								self.event_streams.send(
811									Event::Dht(
812										DhtEvent::ClosestPeersFound(
813											target.into(),
814											peers
815												.into_iter()
816												.map(|(peer, addrs)| (
817													peer.into(),
818													addrs.into_iter().map(Into::into).collect(),
819												))
820												.collect(),
821										)
822									)
823								);
824
825								if let Some(ref metrics) = self.metrics {
826									metrics
827										.kademlia_query_duration
828										.with_label_values(&["node-find"])
829										.observe(started.elapsed().as_secs_f64());
830								}
831							},
832							query => {
833								log::error!(
834									target: LOG_TARGET,
835									"Missing/invalid pending query for `FIND_NODE`: {query:?}"
836								);
837								debug_assert!(false);
838							}
839						}
840					},
841					Some(DiscoveryEvent::GetRecordPartialResult { query_id, record }) => {
842						if !self.pending_queries.contains_key(&query_id) {
843							log::error!(
844								target: LOG_TARGET,
845								"Missing/invalid pending query for `GET_VALUE` partial result: {query_id:?}"
846							);
847
848							continue
849						}
850
851						let peer_id: sc_network_types::PeerId = record.peer.into();
852						let record = PeerRecord {
853							record: P2PRecord {
854								key: record.record.key.to_vec().into(),
855								value: record.record.value,
856								publisher: record.record.publisher.map(|peer_id| {
857									let peer_id: sc_network_types::PeerId = peer_id.into();
858									peer_id.into()
859								}),
860								expires: record.record.expires,
861							},
862							peer: Some(peer_id.into()),
863						};
864
865						self.event_streams.send(
866							Event::Dht(
867								DhtEvent::ValueFound(
868									record.into()
869								)
870							)
871						);
872					}
873					Some(DiscoveryEvent::GetRecordSuccess { query_id }) => {
874						match self.pending_queries.remove(&query_id) {
875							Some(KadQuery::GetValue(key, started)) => {
876								log::trace!(
877									target: LOG_TARGET,
878									"`GET_VALUE` for {key:?} ({query_id:?}) succeeded",
879								);
880
881								if let Some(ref metrics) = self.metrics {
882									metrics
883										.kademlia_query_duration
884										.with_label_values(&["value-get"])
885										.observe(started.elapsed().as_secs_f64());
886								}
887							},
888							query => {
889								log::error!(
890									target: LOG_TARGET,
891									"Missing/invalid pending query for `GET_VALUE`: {query:?}"
892								);
893								debug_assert!(false);
894							},
895						}
896					}
897					Some(DiscoveryEvent::PutRecordSuccess { query_id }) => {
898						match self.pending_queries.remove(&query_id) {
899							Some(KadQuery::PutValue(key, started)) => {
900								log::trace!(
901									target: LOG_TARGET,
902									"`PUT_VALUE` for {key:?} ({query_id:?}) succeeded",
903								);
904
905								self.event_streams.send(Event::Dht(
906									DhtEvent::ValuePut(key)
907								));
908
909								if let Some(ref metrics) = self.metrics {
910									metrics
911										.kademlia_query_duration
912										.with_label_values(&["value-put"])
913										.observe(started.elapsed().as_secs_f64());
914								}
915							},
916							query => {
917								log::error!(
918									target: LOG_TARGET,
919									"Missing/invalid pending query for `PUT_VALUE`: {query:?}"
920								);
921								debug_assert!(false);
922							}
923						}
924					}
925					Some(DiscoveryEvent::GetProvidersSuccess { query_id, providers }) => {
926						match self.pending_queries.remove(&query_id) {
927							Some(KadQuery::GetProviders(key, started)) => {
928								log::trace!(
929									target: LOG_TARGET,
930									"`GET_PROVIDERS` for {key:?} ({query_id:?}) succeeded",
931								);
932
933								// We likely requested providers to connect to them,
934								// so let's add their addresses to litep2p's transport manager.
935								// Consider also looking the addresses of providers up with `FIND_NODE`
936								// query, as it can yield more up to date addresses.
937								providers.iter().for_each(|p| {
938									self.litep2p.add_known_address(p.peer, p.addresses.clone().into_iter());
939								});
940
941								self.event_streams.send(Event::Dht(
942									DhtEvent::ProvidersFound(
943										key.clone().into(),
944										providers.into_iter().map(|p| p.peer.into()).collect()
945									)
946								));
947
948								// litep2p returns all providers in a single event, so we let
949								// subscribers know no more providers will be yielded.
950								self.event_streams.send(Event::Dht(
951									DhtEvent::NoMoreProviders(key.into())
952								));
953
954								if let Some(ref metrics) = self.metrics {
955									metrics
956										.kademlia_query_duration
957										.with_label_values(&["providers-get"])
958										.observe(started.elapsed().as_secs_f64());
959								}
960							},
961							query => {
962								log::error!(
963									target: LOG_TARGET,
964									"Missing/invalid pending query for `GET_PROVIDERS`: {query:?}"
965								);
966								debug_assert!(false);
967							}
968						}
969					}
970					Some(DiscoveryEvent::QueryFailed { query_id }) => {
971						match self.pending_queries.remove(&query_id) {
972							Some(KadQuery::FindNode(peer_id, started)) => {
973								log::debug!(
974									target: LOG_TARGET,
975									"`FIND_NODE` ({query_id:?}) failed for target {peer_id:?}",
976								);
977
978								self.event_streams.send(Event::Dht(
979									DhtEvent::ClosestPeersNotFound(peer_id.into())
980								));
981
982								if let Some(ref metrics) = self.metrics {
983									metrics
984										.kademlia_query_duration
985										.with_label_values(&["node-find-failed"])
986										.observe(started.elapsed().as_secs_f64());
987								}
988							},
989							Some(KadQuery::GetValue(key, started)) => {
990								log::debug!(
991									target: LOG_TARGET,
992									"`GET_VALUE` ({query_id:?}) failed for key {key:?}",
993								);
994
995								self.event_streams.send(Event::Dht(
996									DhtEvent::ValueNotFound(key)
997								));
998
999								if let Some(ref metrics) = self.metrics {
1000									metrics
1001										.kademlia_query_duration
1002										.with_label_values(&["value-get-failed"])
1003										.observe(started.elapsed().as_secs_f64());
1004								}
1005							},
1006							Some(KadQuery::PutValue(key, started)) => {
1007								log::debug!(
1008									target: LOG_TARGET,
1009									"`PUT_VALUE` ({query_id:?}) failed for key {key:?}",
1010								);
1011
1012								self.event_streams.send(Event::Dht(
1013									DhtEvent::ValuePutFailed(key)
1014								));
1015
1016								if let Some(ref metrics) = self.metrics {
1017									metrics
1018										.kademlia_query_duration
1019										.with_label_values(&["value-put-failed"])
1020										.observe(started.elapsed().as_secs_f64());
1021								}
1022							},
1023							Some(KadQuery::GetProviders(key, started)) => {
1024								log::debug!(
1025									target: LOG_TARGET,
1026									"`GET_PROVIDERS` ({query_id:?}) failed for key {key:?}"
1027								);
1028
1029								self.event_streams.send(Event::Dht(
1030									DhtEvent::ProvidersNotFound(key)
1031								));
1032
1033								if let Some(ref metrics) = self.metrics {
1034									metrics
1035										.kademlia_query_duration
1036										.with_label_values(&["providers-get-failed"])
1037										.observe(started.elapsed().as_secs_f64());
1038								}
1039							},
1040							None => {
1041								log::warn!(
1042									target: LOG_TARGET,
1043									"non-existent query failed ({query_id:?})",
1044								);
1045							}
1046						}
1047					}
1048					Some(DiscoveryEvent::Identified { peer, listen_addresses, supported_protocols, .. }) => {
1049						self.discovery.add_self_reported_address(peer, supported_protocols, listen_addresses).await;
1050					}
1051					Some(DiscoveryEvent::ExternalAddressDiscovered { address }) => {
1052						match self.litep2p.public_addresses().add_address(address.clone().into()) {
1053							Ok(inserted) => if inserted {
1054								log::info!(target: LOG_TARGET, "๐Ÿ” Discovered new external address for our node: {address}");
1055							},
1056							Err(err) => {
1057								log::warn!(
1058									target: LOG_TARGET,
1059									"๐Ÿ” Failed to add discovered external address {address:?}: {err:?}",
1060								);
1061							},
1062						}
1063					}
1064					Some(DiscoveryEvent::ExternalAddressExpired{ address }) => {
1065						let local_peer_id = self.litep2p.local_peer_id();
1066
1067						// Litep2p requires the peer ID to be present in the address.
1068						let address = if !std::matches!(address.iter().last(), Some(Protocol::P2p(_))) {
1069							address.with(Protocol::P2p(*local_peer_id.as_ref()))
1070						} else {
1071							address
1072						};
1073
1074						if self.litep2p.public_addresses().remove_address(&address) {
1075							log::info!(target: LOG_TARGET, "๐Ÿ” Expired external address for our node: {address}");
1076						} else {
1077							log::warn!(
1078								target: LOG_TARGET,
1079								"๐Ÿ” Failed to remove expired external address {address:?}"
1080							);
1081						}
1082					}
1083					Some(DiscoveryEvent::Ping { peer, rtt }) => {
1084						log::trace!(
1085							target: LOG_TARGET,
1086							"ping time with {peer:?}: {rtt:?}",
1087						);
1088					}
1089					Some(DiscoveryEvent::IncomingRecord { record: Record { key, value, publisher, expires }} ) => {
1090						self.event_streams.send(Event::Dht(
1091							DhtEvent::PutRecordRequest(
1092								key.into(),
1093								value,
1094								publisher.map(Into::into),
1095								expires,
1096							)
1097						));
1098					},
1099
1100					Some(DiscoveryEvent::RandomKademliaStarted) => {
1101						if let Some(metrics) = self.metrics.as_ref() {
1102							metrics.kademlia_random_queries_total.inc();
1103						}
1104					}
1105				},
1106				event = self.litep2p.next_event() => match event {
1107					Some(Litep2pEvent::ConnectionEstablished { peer, endpoint }) => {
1108						let Some(metrics) = &self.metrics else {
1109							continue;
1110						};
1111
1112						let direction = match endpoint {
1113							Endpoint::Dialer { .. } => "out",
1114							Endpoint::Listener { .. } => {
1115								// Increment incoming connections counter.
1116								//
1117								// Note: For litep2p these are represented by established negotiated connections,
1118								// while for libp2p (legacy) these represent not-yet-negotiated connections.
1119								metrics.incoming_connections_total.inc();
1120
1121								"in"
1122							},
1123						};
1124						metrics.connections_opened_total.with_label_values(&[direction]).inc();
1125
1126						match self.peers.entry(peer) {
1127							Entry::Vacant(entry) => {
1128								entry.insert(ConnectionContext {
1129									endpoints: HashMap::from_iter([(endpoint.connection_id(), endpoint)]),
1130									num_connections: 1usize,
1131								});
1132								metrics.distinct_peers_connections_opened_total.inc();
1133							}
1134							Entry::Occupied(entry) => {
1135								let entry = entry.into_mut();
1136								entry.num_connections += 1;
1137								entry.endpoints.insert(endpoint.connection_id(), endpoint);
1138							}
1139						}
1140					}
1141					Some(Litep2pEvent::ConnectionClosed { peer, connection_id }) => {
1142						let Some(metrics) = &self.metrics else {
1143							continue;
1144						};
1145
1146						let Some(context) = self.peers.get_mut(&peer) else {
1147							log::debug!(target: LOG_TARGET, "unknown peer disconnected: {peer:?} ({connection_id:?})");
1148							continue
1149						};
1150
1151						let direction = match context.endpoints.remove(&connection_id) {
1152							None => {
1153								log::debug!(target: LOG_TARGET, "connection {connection_id:?} doesn't exist for {peer:?} ");
1154								continue
1155							}
1156							Some(endpoint) => {
1157								context.num_connections -= 1;
1158
1159								match endpoint {
1160									Endpoint::Dialer { .. } => "out",
1161									Endpoint::Listener { .. } => "in",
1162								}
1163							}
1164						};
1165
1166						metrics.connections_closed_total.with_label_values(&[direction, "actively-closed"]).inc();
1167
1168						if context.num_connections == 0 {
1169							self.peers.remove(&peer);
1170							metrics.distinct_peers_connections_closed_total.inc();
1171						}
1172					}
1173					Some(Litep2pEvent::DialFailure { address, error }) => {
1174						log::debug!(
1175							target: LOG_TARGET,
1176							"failed to dial peer at {address:?}: {error:?}",
1177						);
1178
1179						if let Some(metrics) = &self.metrics {
1180							let reason = match error {
1181								DialError::Timeout => "timeout",
1182								DialError::AddressError(_) => "invalid-address",
1183								DialError::DnsError(_) => "cannot-resolve-dns",
1184								DialError::NegotiationError(error) => match error {
1185									NegotiationError::Timeout => "timeout",
1186									NegotiationError::PeerIdMissing => "missing-peer-id",
1187									NegotiationError::StateMismatch => "state-mismatch",
1188									NegotiationError::PeerIdMismatch(_,_) => "peer-id-missmatch",
1189									NegotiationError::MultistreamSelectError(_) => "multistream-select-error",
1190									NegotiationError::SnowError(_) => "noise-error",
1191									NegotiationError::ParseError(_) => "parse-error",
1192									NegotiationError::IoError(_) => "io-error",
1193									NegotiationError::WebSocket(_) => "webscoket-error",
1194									NegotiationError::BadSignature => "bad-signature",
1195								}
1196							};
1197
1198							metrics.pending_connections_errors_total.with_label_values(&[&reason]).inc();
1199						}
1200					}
1201					Some(Litep2pEvent::ListDialFailures { errors }) => {
1202						log::debug!(
1203							target: LOG_TARGET,
1204							"failed to dial peer on multiple addresses {errors:?}",
1205						);
1206
1207						if let Some(metrics) = &self.metrics {
1208							metrics.pending_connections_errors_total.with_label_values(&["transport-errors"]).inc();
1209						}
1210					}
1211					None => {
1212						log::error!(
1213								target: LOG_TARGET,
1214								"Litep2p backend terminated"
1215						);
1216						return
1217					}
1218				},
1219			}
1220		}
1221	}
1222}