referrerpolicy=no-referrer-when-downgrade

sc_network/litep2p/
mod.rs

1// This file is part of Substrate.
2
3// Copyright (C) Parity Technologies (UK) Ltd.
4// SPDX-License-Identifier: GPL-3.0-or-later WITH Classpath-exception-2.0
5
6// This program is free software: you can redistribute it and/or modify
7// it under the terms of the GNU General Public License as published by
8// the Free Software Foundation, either version 3 of the License, or
9// (at your option) any later version.
10
11// This program is distributed in the hope that it will be useful,
12// but WITHOUT ANY WARRANTY; without even the implied warranty of
13// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
14// GNU General Public License for more details.
15
16// You should have received a copy of the GNU General Public License
17// along with this program. If not, see <https://www.gnu.org/licenses/>.
18
19//! `NetworkBackend` implementation for `litep2p`.
20
21use crate::{
22	config::{
23		FullNetworkConfiguration, IncomingRequest, NodeKeyConfig, NotificationHandshake, Params,
24		SetConfig, TransportConfig,
25	},
26	error::Error,
27	event::{DhtEvent, Event},
28	litep2p::{
29		bitswap::BitswapService,
30		discovery::{Discovery, DiscoveryEvent},
31		ipfs_dht::IpfsDht,
32		peerstore::Peerstore,
33		service::{Litep2pNetworkService, NetworkServiceCommand},
34		shim::{
35			notification::{
36				config::{NotificationProtocolConfig, ProtocolControlHandle},
37				peerset::PeersetCommand,
38			},
39			request_response::{RequestResponseConfig, RequestResponseProtocol},
40		},
41	},
42	peer_store::PeerStoreProvider,
43	service::{
44		metrics::{register_without_sources, MetricSources, Metrics, NotificationMetrics},
45		out_events,
46		traits::{BandwidthSink, NetworkBackend, NetworkService},
47	},
48	NetworkStatus, NotificationService, ProtocolName,
49};
50
51use codec::Encode;
52use futures::StreamExt;
53use litep2p::{
54	config::ConfigBuilder,
55	crypto::ed25519::Keypair,
56	error::{DialError, NegotiationError},
57	executor::Executor,
58	protocol::{
59		libp2p::kademlia::{QueryId, Record},
60		request_response::ConfigBuilder as RequestResponseConfigBuilder,
61	},
62	transport::{
63		tcp::config::Config as TcpTransportConfig,
64		websocket::config::Config as WebSocketTransportConfig, ConnectionLimitsConfig, Endpoint,
65	},
66	types::{
67		multiaddr::{Multiaddr, Protocol},
68		ConnectionId,
69	},
70	Litep2p, Litep2pEvent, ProtocolName as Litep2pProtocolName,
71};
72use prometheus_endpoint::Registry;
73use sc_network_types::kad::{Key as RecordKey, PeerRecord, Record as P2PRecord};
74
75use sc_client_api::BlockBackend;
76use sc_network_common::{role::Roles, ExHashT};
77use sc_network_types::PeerId;
78use sc_utils::mpsc::{tracing_unbounded, TracingUnboundedReceiver};
79use sp_runtime::traits::Block as BlockT;
80
81use std::{
82	cmp,
83	collections::{hash_map::Entry, HashMap, HashSet},
84	fs,
85	future::Future,
86	iter,
87	pin::Pin,
88	sync::{
89		atomic::{AtomicUsize, Ordering},
90		Arc,
91	},
92	time::{Duration, Instant},
93};
94
95mod bitswap;
96mod discovery;
97mod ipfs_dht;
98mod peerstore;
99mod service;
100mod shim;
101
102/// Litep2p bandwidth sink.
103struct Litep2pBandwidthSink {
104	sink: litep2p::BandwidthSink,
105}
106
107impl BandwidthSink for Litep2pBandwidthSink {
108	fn total_inbound(&self) -> u64 {
109		self.sink.inbound() as u64
110	}
111
112	fn total_outbound(&self) -> u64 {
113		self.sink.outbound() as u64
114	}
115}
116
117/// Litep2p task executor.
118struct Litep2pExecutor {
119	/// Executor.
120	executor: Box<dyn Fn(Pin<Box<dyn Future<Output = ()> + Send>>) + Send + Sync>,
121}
122
123impl Executor for Litep2pExecutor {
124	fn run(&self, future: Pin<Box<dyn Future<Output = ()> + Send>>) {
125		(self.executor)(future)
126	}
127
128	fn run_with_name(&self, _: &'static str, future: Pin<Box<dyn Future<Output = ()> + Send>>) {
129		(self.executor)(future)
130	}
131}
132
133/// Logging target for the file.
134const LOG_TARGET: &str = "sub-libp2p";
135
136/// Peer context.
137struct ConnectionContext {
138	/// Peer endpoints.
139	endpoints: HashMap<ConnectionId, Endpoint>,
140
141	/// Number of active connections.
142	num_connections: usize,
143}
144
145/// Kademlia query we are tracking.
146#[derive(Debug)]
147enum KadQuery {
148	/// `FIND_NODE` query for target and when it was initiated.
149	FindNode(PeerId, Instant),
150	/// `GET_VALUE` query for key and when it was initiated.
151	GetValue(RecordKey, Instant),
152	/// `PUT_VALUE` query for key and when it was initiated.
153	PutValue(RecordKey, Instant),
154	/// `GET_PROVIDERS` query for key and when it was initiated.
155	GetProviders(RecordKey, Instant),
156	/// `ADD_PROVIDER` query for key and when it was initiated.
157	AddProvider(RecordKey, Instant),
158}
159
160/// Networking backend for `litep2p`.
161pub struct Litep2pNetworkBackend {
162	/// Main `litep2p` object.
163	litep2p: Litep2p,
164
165	/// `NetworkService` implementation for `Litep2pNetworkBackend`.
166	network_service: Arc<dyn NetworkService>,
167
168	/// RX channel for receiving commands from `Litep2pNetworkService`.
169	cmd_rx: TracingUnboundedReceiver<NetworkServiceCommand>,
170
171	/// `Peerset` handles to notification protocols.
172	peerset_handles: HashMap<ProtocolName, ProtocolControlHandle>,
173
174	/// Pending Kademlia queries.
175	pending_queries: HashMap<QueryId, KadQuery>,
176
177	/// Discovery.
178	discovery: Discovery,
179
180	/// Number of connected peers.
181	num_connected: Arc<AtomicUsize>,
182
183	/// Connected peers.
184	peers: HashMap<litep2p::PeerId, ConnectionContext>,
185
186	/// Peerstore.
187	peerstore_handle: Arc<dyn PeerStoreProvider>,
188
189	/// Block announce protocol name.
190	block_announce_protocol: ProtocolName,
191
192	/// Sender for DHT events.
193	event_streams: out_events::OutChannels,
194
195	/// Prometheus metrics.
196	metrics: Option<Metrics>,
197}
198
199impl Litep2pNetworkBackend {
200	/// From an iterator of multiaddress(es), parse and group all addresses of peers
201	/// so that litep2p can consume the information easily.
202	fn parse_addresses(
203		addresses: impl Iterator<Item = Multiaddr>,
204	) -> HashMap<PeerId, Vec<Multiaddr>> {
205		addresses
206			.into_iter()
207			.filter_map(|address| match address.iter().next() {
208				Some(
209					Protocol::Dns(_) |
210					Protocol::Dns4(_) |
211					Protocol::Dns6(_) |
212					Protocol::Ip6(_) |
213					Protocol::Ip4(_),
214				) => match address.iter().find(|protocol| std::matches!(protocol, Protocol::P2p(_)))
215				{
216					Some(Protocol::P2p(peer_id)) => Some((peer_id.into(), Some(address))),
217					_ => None,
218				},
219				Some(Protocol::P2p(peer_id)) => Some((peer_id.into(), None)),
220				_ => None,
221			})
222			.fold(HashMap::new(), |mut acc, (peer, maybe_address)| {
223				let entry = acc.entry(peer).or_default();
224				maybe_address.map(|address| entry.push(address));
225
226				acc
227			})
228	}
229
230	/// Add new known addresses to `litep2p` and return the parsed peer IDs.
231	fn add_addresses(&mut self, peers: impl Iterator<Item = Multiaddr>) -> HashSet<PeerId> {
232		Self::parse_addresses(peers.into_iter())
233			.into_iter()
234			.filter_map(|(peer, addresses)| {
235				// `peers` contained multiaddress in the form `/p2p/<peer ID>`
236				if addresses.is_empty() {
237					return Some(peer);
238				}
239
240				if self.litep2p.add_known_address(peer.into(), addresses.clone().into_iter()) == 0 {
241					log::warn!(
242						target: LOG_TARGET,
243						"couldn't add any addresses for {peer:?} and it won't be added as reserved peer",
244					);
245					return None;
246				}
247
248				self.peerstore_handle.add_known_peer(peer);
249				Some(peer)
250			})
251			.collect()
252	}
253}
254
255impl Litep2pNetworkBackend {
256	/// Get `litep2p` keypair from `NodeKeyConfig`.
257	fn get_keypair(node_key: &NodeKeyConfig) -> Result<(Keypair, litep2p::PeerId), Error> {
258		let secret: litep2p::crypto::ed25519::SecretKey =
259			node_key.clone().into_keypair()?.secret().into();
260
261		let local_identity = Keypair::from(secret);
262		let local_public = local_identity.public();
263		let local_peer_id = local_public.to_peer_id();
264
265		Ok((local_identity, local_peer_id))
266	}
267
268	/// Configure transport protocols for `Litep2pNetworkBackend`.
269	fn configure_transport<B: BlockT + 'static, H: ExHashT>(
270		config: &FullNetworkConfiguration<B, H, Self>,
271	) -> ConfigBuilder {
272		let _ = match config.network_config.transport {
273			TransportConfig::MemoryOnly => panic!("memory transport not supported"),
274			TransportConfig::Normal { .. } => false,
275		};
276		let config_builder = ConfigBuilder::new();
277
278		let (tcp, websocket): (Vec<Option<_>>, Vec<Option<_>>) = config
279			.network_config
280			.listen_addresses
281			.iter()
282			.filter_map(|address| {
283				use sc_network_types::multiaddr::Protocol;
284
285				let mut iter = address.iter();
286
287				match iter.next() {
288					Some(Protocol::Ip4(_) | Protocol::Ip6(_)) => {},
289					protocol => {
290						log::error!(
291							target: LOG_TARGET,
292							"unknown protocol {protocol:?}, ignoring {address:?}",
293						);
294
295						return None;
296					},
297				}
298
299				match iter.next() {
300					Some(Protocol::Tcp(_)) => match iter.next() {
301						Some(Protocol::Ws(_) | Protocol::Wss(_)) => {
302							Some((None, Some(address.clone())))
303						},
304						Some(Protocol::P2p(_)) | None => Some((Some(address.clone()), None)),
305						protocol => {
306							log::error!(
307								target: LOG_TARGET,
308								"unknown protocol {protocol:?}, ignoring {address:?}",
309							);
310							None
311						},
312					},
313					protocol => {
314						log::error!(
315							target: LOG_TARGET,
316							"unknown protocol {protocol:?}, ignoring {address:?}",
317						);
318						None
319					},
320				}
321			})
322			.unzip();
323
324		config_builder
325			.with_websocket(WebSocketTransportConfig {
326				listen_addresses: websocket.into_iter().flatten().map(Into::into).collect(),
327				yamux_config: litep2p::yamux::Config::default(),
328				nodelay: true,
329				..Default::default()
330			})
331			.with_tcp(TcpTransportConfig {
332				listen_addresses: tcp.into_iter().flatten().map(Into::into).collect(),
333				yamux_config: litep2p::yamux::Config::default(),
334				nodelay: true,
335				..Default::default()
336			})
337	}
338}
339
340#[async_trait::async_trait]
341impl<B: BlockT + 'static, H: ExHashT> NetworkBackend<B, H> for Litep2pNetworkBackend {
342	type NotificationProtocolConfig = NotificationProtocolConfig;
343	type RequestResponseProtocolConfig = RequestResponseConfig;
344	type NetworkService<Block, Hash> = Arc<Litep2pNetworkService>;
345	type PeerStore = Peerstore;
346	type BitswapConfig = bitswap::BitswapConfig;
347
348	fn new(mut params: Params<B, H, Self>) -> Result<Self, Error>
349	where
350		Self: Sized,
351	{
352		let (keypair, local_peer_id) =
353			Self::get_keypair(&params.network_config.network_config.node_key)?;
354		let (cmd_tx, cmd_rx) = tracing_unbounded("mpsc_network_worker", 100_000);
355
356		params.network_config.network_config.boot_nodes = params
357			.network_config
358			.network_config
359			.boot_nodes
360			.into_iter()
361			.filter(|boot_node| boot_node.peer_id != local_peer_id.into())
362			.collect();
363		params.network_config.network_config.default_peers_set.reserved_nodes = params
364			.network_config
365			.network_config
366			.default_peers_set
367			.reserved_nodes
368			.into_iter()
369			.filter(|reserved_node| {
370				if reserved_node.peer_id == local_peer_id.into() {
371					log::warn!(
372						target: LOG_TARGET,
373						"Local peer ID used in reserved node, ignoring: {reserved_node}",
374					);
375					false
376				} else {
377					true
378				}
379			})
380			.collect();
381
382		if let Some(path) = &params.network_config.network_config.net_config_path {
383			fs::create_dir_all(path)?;
384		}
385
386		log::info!(target: LOG_TARGET, "Local node identity is: {local_peer_id}");
387		log::info!(target: LOG_TARGET, "Running litep2p network backend");
388
389		params.network_config.sanity_check_addresses()?;
390		params.network_config.sanity_check_bootnodes()?;
391
392		let mut config_builder =
393			Self::configure_transport(&params.network_config).with_keypair(keypair.clone());
394		let known_addresses = params.network_config.known_addresses();
395		let peer_store_handle = params.network_config.peer_store_handle();
396		let executor = Arc::new(Litep2pExecutor { executor: params.executor });
397
398		let FullNetworkConfiguration {
399			notification_protocols,
400			request_response_protocols,
401			network_config,
402			..
403		} = params.network_config;
404
405		// initialize notification protocols
406		//
407		// pass the protocol configuration to `Litep2pConfigBuilder` and save the TX channel
408		// to the protocol's `Peerset` together with the protocol name to allow other subsystems
409		// of Polkadot SDK to control connectivity of the notification protocol
410		let block_announce_protocol = params.block_announce_config.protocol_name().clone();
411		let mut notif_protocols = HashMap::from_iter([(
412			params.block_announce_config.protocol_name().clone(),
413			params.block_announce_config.handle,
414		)]);
415
416		// handshake for all but the syncing protocol is set to node role
417		config_builder = notification_protocols
418			.into_iter()
419			.fold(config_builder, |config_builder, mut config| {
420				config.config.set_handshake(Roles::from(&params.role).encode());
421				notif_protocols.insert(config.protocol_name, config.handle);
422
423				config_builder.with_notification_protocol(config.config)
424			})
425			.with_notification_protocol(params.block_announce_config.config);
426
427		// initialize request-response protocols
428		let metrics = match &params.metrics_registry {
429			Some(registry) => Some(register_without_sources(registry)?),
430			None => None,
431		};
432
433		// create channels that are used to send request before initializing protocols so the
434		// senders can be passed onto all request-response protocols
435		//
436		// all protocols must have each others' senders so they can send the fallback request in
437		// case the main protocol is not supported by the remote peer and user specified a fallback
438		let (mut request_response_receivers, request_response_senders): (
439			HashMap<_, _>,
440			HashMap<_, _>,
441		) = request_response_protocols
442			.iter()
443			.map(|config| {
444				let (tx, rx) = tracing_unbounded("outbound-requests", 10_000);
445				((config.protocol_name.clone(), rx), (config.protocol_name.clone(), tx))
446			})
447			.unzip();
448
449		config_builder = request_response_protocols.into_iter().fold(
450			config_builder,
451			|config_builder, config| {
452				let (protocol_config, handle) = RequestResponseConfigBuilder::new(
453					Litep2pProtocolName::from(config.protocol_name.clone()),
454				)
455				.with_max_size(cmp::max(config.max_request_size, config.max_response_size) as usize)
456				.with_fallback_names(config.fallback_names.into_iter().map(From::from).collect())
457				.with_timeout(config.request_timeout)
458				.build();
459
460				let protocol = RequestResponseProtocol::new(
461					config.protocol_name.clone(),
462					handle,
463					Arc::clone(&peer_store_handle),
464					config.inbound_queue,
465					request_response_receivers
466						.remove(&config.protocol_name)
467						.expect("receiver exists as it was just added and there are no duplicate protocols; qed"),
468					request_response_senders.clone(),
469					metrics.clone(),
470				);
471
472				executor.run(Box::pin(async move {
473					protocol.run().await;
474				}));
475
476				config_builder.with_request_response_protocol(protocol_config)
477			},
478		);
479
480		// collect known addresses
481		let known_addresses: HashMap<litep2p::PeerId, Vec<Multiaddr>> =
482			known_addresses.into_iter().fold(HashMap::new(), |mut acc, (peer, address)| {
483				use sc_network_types::multiaddr::Protocol;
484
485				let address = match address.iter().last() {
486					Some(Protocol::Ws(_) | Protocol::Wss(_) | Protocol::Tcp(_)) => {
487						address.with(Protocol::P2p(peer.into()))
488					},
489					Some(Protocol::P2p(_)) => address,
490					_ => return acc,
491				};
492
493				acc.entry(peer.into()).or_default().push(address.into());
494				peer_store_handle.add_known_peer(peer);
495
496				acc
497			});
498
499		// enable ipfs ping, identify and kademlia, and potentially mdns if user enabled it
500		let listen_addresses = Arc::new(Default::default());
501		let (discovery, ping_config, identify_config, kademlia_config, maybe_mdns_config) =
502			Discovery::new(
503				local_peer_id,
504				&network_config,
505				params.genesis_hash,
506				params.fork_id.as_deref(),
507				&params.protocol_id,
508				known_addresses.clone(),
509				Arc::clone(&listen_addresses),
510				Arc::clone(&peer_store_handle),
511			);
512
513		let bitswap_cmd_tx = params.ipfs_config.as_ref().map(|c| c.bitswap_config.cmd_tx.clone());
514
515		// enable Bitswap & IPFS DHT
516		if let Some(config) = params.ipfs_config {
517			config_builder =
518				config_builder.with_libp2p_bitswap(config.bitswap_config.litep2p_config);
519
520			if !config.bootnodes.is_empty() {
521				let (ipfs_dht, kad_config) = IpfsDht::new(config.bootnodes, config.block_provider);
522				config_builder = config_builder.with_libp2p_kademlia(kad_config);
523				executor.run(Box::pin(ipfs_dht.run()));
524			} else {
525				log::warn!(
526					target: LOG_TARGET,
527					"Not starting IPFS DHT publisher because no IPFS bootnodes are configured. \
528					 Only direct Bitswap requests will be handled.",
529				);
530			}
531		}
532
533		config_builder = config_builder
534			.with_known_addresses(known_addresses.clone().into_iter())
535			.with_libp2p_ping(ping_config)
536			.with_libp2p_identify(identify_config)
537			.with_libp2p_kademlia(kademlia_config)
538			.with_connection_limits(ConnectionLimitsConfig::default().max_incoming_connections(
539				Some(crate::MAX_CONNECTIONS_ESTABLISHED_INCOMING as usize),
540			))
541			.with_keep_alive_timeout(network_config.idle_connection_timeout)
542			// Use system DNS resolver to enable intranet domain resolution and administrator
543			// control over DNS lookup.
544			.with_system_resolver()
545			.with_executor(executor);
546
547		if let Some(config) = maybe_mdns_config {
548			config_builder = config_builder.with_mdns(config);
549		}
550
551		let litep2p =
552			Litep2p::new(config_builder.build()).map_err(|error| Error::Litep2p(error))?;
553
554		litep2p.listen_addresses().for_each(|address| {
555			log::debug!(target: LOG_TARGET, "listening on: {address}");
556
557			listen_addresses.write().insert(address.clone());
558		});
559
560		let public_addresses = litep2p.public_addresses();
561		for address in network_config.public_addresses.iter() {
562			if let Err(err) = public_addresses.add_address(address.clone().into()) {
563				log::warn!(
564					target: LOG_TARGET,
565					"failed to add public address {address:?}: {err:?}",
566				);
567			}
568		}
569
570		let network_service = Arc::new(Litep2pNetworkService::new(
571			local_peer_id,
572			keypair.clone(),
573			cmd_tx,
574			Arc::clone(&peer_store_handle),
575			notif_protocols.clone(),
576			block_announce_protocol.clone(),
577			request_response_senders,
578			Arc::clone(&listen_addresses),
579			public_addresses,
580			bitswap_cmd_tx,
581		));
582
583		// register rest of the metrics now that `Litep2p` has been created
584		let num_connected = Arc::new(Default::default());
585		let bandwidth: Arc<dyn BandwidthSink> =
586			Arc::new(Litep2pBandwidthSink { sink: litep2p.bandwidth_sink() });
587
588		if let Some(registry) = &params.metrics_registry {
589			MetricSources::register(registry, bandwidth, Arc::clone(&num_connected))?;
590		}
591
592		Ok(Self {
593			network_service,
594			cmd_rx,
595			metrics,
596			peerset_handles: notif_protocols,
597			num_connected,
598			discovery,
599			pending_queries: HashMap::new(),
600			peerstore_handle: peer_store_handle,
601			block_announce_protocol,
602			event_streams: out_events::OutChannels::new(None)?,
603			peers: HashMap::new(),
604			litep2p,
605		})
606	}
607
608	fn network_service(&self) -> Arc<dyn NetworkService> {
609		Arc::clone(&self.network_service)
610	}
611
612	fn peer_store(
613		bootnodes: Vec<sc_network_types::PeerId>,
614		metrics_registry: Option<Registry>,
615	) -> Self::PeerStore {
616		Peerstore::new(bootnodes, metrics_registry)
617	}
618
619	fn register_notification_metrics(registry: Option<&Registry>) -> NotificationMetrics {
620		NotificationMetrics::new(registry)
621	}
622
623	/// Create Bitswap server.
624	fn bitswap_server(
625		client: Arc<dyn BlockBackend<B> + Send + Sync>,
626	) -> (Pin<Box<dyn Future<Output = ()> + Send>>, Self::BitswapConfig) {
627		BitswapService::new(client)
628	}
629
630	/// Create notification protocol configuration for `protocol`.
631	fn notification_config(
632		protocol_name: ProtocolName,
633		fallback_names: Vec<ProtocolName>,
634		max_notification_size: u64,
635		handshake: Option<NotificationHandshake>,
636		set_config: SetConfig,
637		metrics: NotificationMetrics,
638		peerstore_handle: Arc<dyn PeerStoreProvider>,
639	) -> (Self::NotificationProtocolConfig, Box<dyn NotificationService>) {
640		Self::NotificationProtocolConfig::new(
641			protocol_name,
642			fallback_names,
643			max_notification_size as usize,
644			handshake,
645			set_config,
646			metrics,
647			peerstore_handle,
648		)
649	}
650
651	/// Create request-response protocol configuration.
652	fn request_response_config(
653		protocol_name: ProtocolName,
654		fallback_names: Vec<ProtocolName>,
655		max_request_size: u64,
656		max_response_size: u64,
657		request_timeout: Duration,
658		inbound_queue: Option<async_channel::Sender<IncomingRequest>>,
659	) -> Self::RequestResponseProtocolConfig {
660		Self::RequestResponseProtocolConfig::new(
661			protocol_name,
662			fallback_names,
663			max_request_size,
664			max_response_size,
665			request_timeout,
666			inbound_queue,
667		)
668	}
669
670	/// Start [`Litep2pNetworkBackend`] event loop.
671	async fn run(mut self) {
672		log::debug!(target: LOG_TARGET, "starting litep2p network backend");
673
674		loop {
675			let num_connected_peers = self
676				.peerset_handles
677				.get(&self.block_announce_protocol)
678				.map_or(0usize, |handle| handle.connected_peers.load(Ordering::Relaxed));
679			self.num_connected.store(num_connected_peers, Ordering::Relaxed);
680
681			tokio::select! {
682				command = self.cmd_rx.next() => match command {
683					None => return,
684					Some(command) => match command {
685						NetworkServiceCommand::FindClosestPeers { target } => {
686							let query_id = self.discovery.find_node(target.into()).await;
687							self.pending_queries.insert(query_id, KadQuery::FindNode(target, Instant::now()));
688						}
689						NetworkServiceCommand::GetValue{ key } => {
690							let query_id = self.discovery.get_value(key.clone()).await;
691							self.pending_queries.insert(query_id, KadQuery::GetValue(key, Instant::now()));
692						}
693						NetworkServiceCommand::PutValue { key, value } => {
694							let query_id = self.discovery.put_value(key.clone(), value).await;
695							self.pending_queries.insert(query_id, KadQuery::PutValue(key, Instant::now()));
696						}
697						NetworkServiceCommand::PutValueTo { record, peers, update_local_storage} => {
698							let kademlia_key = record.key.clone();
699							let query_id = self.discovery.put_value_to_peers(record.into(), peers, update_local_storage).await;
700							self.pending_queries.insert(query_id, KadQuery::PutValue(kademlia_key, Instant::now()));
701						}
702						NetworkServiceCommand::StoreRecord { key, value, publisher, expires } => {
703							self.discovery.store_record(key, value, publisher.map(Into::into), expires).await;
704						}
705						NetworkServiceCommand::StartProviding { key } => {
706							let query_id = self.discovery.start_providing(key.clone()).await;
707							self.pending_queries.insert(query_id, KadQuery::AddProvider(key, Instant::now()));
708						}
709						NetworkServiceCommand::StopProviding { key } => {
710							self.discovery.stop_providing(key).await;
711						}
712						NetworkServiceCommand::GetProviders { key } => {
713							let query_id = self.discovery.get_providers(key.clone()).await;
714							self.pending_queries.insert(query_id, KadQuery::GetProviders(key, Instant::now()));
715						}
716						NetworkServiceCommand::EventStream { tx } => {
717							self.event_streams.push(tx);
718						}
719						NetworkServiceCommand::Status { tx } => {
720							let _ = tx.send(NetworkStatus {
721								num_connected_peers: self
722									.peerset_handles
723									.get(&self.block_announce_protocol)
724									.map_or(0usize, |handle| handle.connected_peers.load(Ordering::Relaxed)),
725								total_bytes_inbound: self.litep2p.bandwidth_sink().inbound() as u64,
726								total_bytes_outbound: self.litep2p.bandwidth_sink().outbound() as u64,
727							});
728						}
729						NetworkServiceCommand::AddPeersToReservedSet {
730							protocol,
731							peers,
732						} => {
733							let peers = self.add_addresses(peers.into_iter().map(Into::into));
734
735							match self.peerset_handles.get(&protocol) {
736								Some(handle) => {
737									let _ = handle.tx.unbounded_send(PeersetCommand::AddReservedPeers { peers });
738								}
739								None => log::warn!(target: LOG_TARGET, "protocol {protocol} doens't exist"),
740							};
741						}
742						NetworkServiceCommand::AddKnownAddress { peer, address } => {
743							let mut address: Multiaddr = address.into();
744
745							if !address.iter().any(|protocol| std::matches!(protocol, Protocol::P2p(_))) {
746								address.push(Protocol::P2p(litep2p::PeerId::from(peer).into()));
747							}
748
749							if self.litep2p.add_known_address(peer.into(), iter::once(address.clone())) > 0 {
750								// libp2p backend generates `DiscoveryOut::Discovered(peer_id)`
751								// event when a new address is added for a peer, which leads to the
752								// peer being added to peerstore. Do the same directly here.
753								self.peerstore_handle.add_known_peer(peer);
754							} else {
755								log::debug!(
756									target: LOG_TARGET,
757									"couldn't add known address ({address}) for {peer:?}, unsupported transport"
758								);
759							}
760						},
761						NetworkServiceCommand::SetReservedPeers { protocol, peers } => {
762							let peers = self.add_addresses(peers.into_iter().map(Into::into));
763
764							match self.peerset_handles.get(&protocol) {
765								Some(handle) => {
766									let _ = handle.tx.unbounded_send(PeersetCommand::SetReservedPeers { peers });
767								}
768								None => log::warn!(target: LOG_TARGET, "protocol {protocol} doens't exist"),
769							}
770
771						},
772						NetworkServiceCommand::DisconnectPeer {
773							protocol,
774							peer,
775						} => {
776							let Some(handle) = self.peerset_handles.get(&protocol) else {
777								log::warn!(target: LOG_TARGET, "protocol {protocol} doens't exist");
778								continue
779							};
780
781							let _ = handle.tx.unbounded_send(PeersetCommand::DisconnectPeer { peer });
782						}
783						NetworkServiceCommand::SetReservedOnly {
784							protocol,
785							reserved_only,
786						} => {
787							let Some(handle) = self.peerset_handles.get(&protocol) else {
788								log::warn!(target: LOG_TARGET, "protocol {protocol} doens't exist");
789								continue
790							};
791
792							let _ = handle.tx.unbounded_send(PeersetCommand::SetReservedOnly { reserved_only });
793						}
794						NetworkServiceCommand::RemoveReservedPeers {
795							protocol,
796							peers,
797						} => {
798							let Some(handle) = self.peerset_handles.get(&protocol) else {
799								log::warn!(target: LOG_TARGET, "protocol {protocol} doens't exist");
800								continue
801							};
802
803							let _ = handle.tx.unbounded_send(PeersetCommand::RemoveReservedPeers { peers });
804						}
805					}
806				},
807				event = self.discovery.next() => match event {
808					None => return,
809					Some(DiscoveryEvent::Discovered { addresses }) => {
810						// if at least one address was added for the peer, report the peer to `Peerstore`
811						for (peer, addresses) in Litep2pNetworkBackend::parse_addresses(addresses.into_iter()) {
812							if self.litep2p.add_known_address(peer.into(), addresses.clone().into_iter()) > 0 {
813								self.peerstore_handle.add_known_peer(peer);
814							}
815						}
816					}
817					Some(DiscoveryEvent::RoutingTableUpdate { peers }) => {
818						for peer in peers {
819							self.peerstore_handle.add_known_peer(peer.into());
820						}
821					}
822					Some(DiscoveryEvent::FindNodeSuccess { query_id, target, peers }) => {
823						match self.pending_queries.remove(&query_id) {
824							Some(KadQuery::FindNode(_, started)) => {
825								log::trace!(
826									target: LOG_TARGET,
827									"`FIND_NODE` for {target:?} ({query_id:?}) succeeded",
828								);
829
830								self.event_streams.send(
831									Event::Dht(
832										DhtEvent::ClosestPeersFound(
833											target.into(),
834											peers
835												.into_iter()
836												.map(|(peer, addrs)| (
837													peer.into(),
838													addrs.into_iter().map(Into::into).collect(),
839												))
840												.collect(),
841										)
842									)
843								);
844
845								if let Some(ref metrics) = self.metrics {
846									metrics
847										.kademlia_query_duration
848										.with_label_values(&["node-find"])
849										.observe(started.elapsed().as_secs_f64());
850								}
851							},
852							query => {
853								log::error!(
854									target: LOG_TARGET,
855									"Missing/invalid pending query for `FIND_NODE`: {query:?}"
856								);
857								debug_assert!(false);
858							}
859						}
860					},
861					Some(DiscoveryEvent::GetRecordPartialResult { query_id, record }) => {
862						if !self.pending_queries.contains_key(&query_id) {
863							log::error!(
864								target: LOG_TARGET,
865								"Missing/invalid pending query for `GET_VALUE` partial result: {query_id:?}"
866							);
867
868							continue
869						}
870
871						let peer_id: sc_network_types::PeerId = record.peer.into();
872						let record = PeerRecord {
873							record: P2PRecord {
874								key: record.record.key.to_vec().into(),
875								value: record.record.value,
876								publisher: record.record.publisher.map(|peer_id| {
877									let peer_id: sc_network_types::PeerId = peer_id.into();
878									peer_id.into()
879								}),
880								expires: record.record.expires,
881							},
882							peer: Some(peer_id.into()),
883						};
884
885						self.event_streams.send(
886							Event::Dht(
887								DhtEvent::ValueFound(
888									record.into()
889								)
890							)
891						);
892					}
893					Some(DiscoveryEvent::GetRecordSuccess { query_id }) => {
894						match self.pending_queries.remove(&query_id) {
895							Some(KadQuery::GetValue(key, started)) => {
896								log::trace!(
897									target: LOG_TARGET,
898									"`GET_VALUE` for {key:?} ({query_id:?}) succeeded",
899								);
900
901								if let Some(ref metrics) = self.metrics {
902									metrics
903										.kademlia_query_duration
904										.with_label_values(&["value-get"])
905										.observe(started.elapsed().as_secs_f64());
906								}
907							},
908							query => {
909								log::error!(
910									target: LOG_TARGET,
911									"Missing/invalid pending query for `GET_VALUE`: {query:?}"
912								);
913								debug_assert!(false);
914							},
915						}
916					}
917					Some(DiscoveryEvent::PutRecordSuccess { query_id }) => {
918						match self.pending_queries.remove(&query_id) {
919							Some(KadQuery::PutValue(key, started)) => {
920								log::trace!(
921									target: LOG_TARGET,
922									"`PUT_VALUE` for {key:?} ({query_id:?}) succeeded",
923								);
924
925								self.event_streams.send(Event::Dht(
926									DhtEvent::ValuePut(key)
927								));
928
929								if let Some(ref metrics) = self.metrics {
930									metrics
931										.kademlia_query_duration
932										.with_label_values(&["value-put"])
933										.observe(started.elapsed().as_secs_f64());
934								}
935							},
936							query => {
937								log::error!(
938									target: LOG_TARGET,
939									"Missing/invalid pending query for `PUT_VALUE`: {query:?}"
940								);
941								debug_assert!(false);
942							}
943						}
944					}
945					Some(DiscoveryEvent::GetProvidersSuccess { query_id, providers }) => {
946						match self.pending_queries.remove(&query_id) {
947							Some(KadQuery::GetProviders(key, started)) => {
948								log::trace!(
949									target: LOG_TARGET,
950									"`GET_PROVIDERS` for {key:?} ({query_id:?}) succeeded",
951								);
952
953								// We likely requested providers to connect to them,
954								// so let's add their addresses to litep2p's transport manager.
955								// Consider also looking the addresses of providers up with `FIND_NODE`
956								// query, as it can yield more up to date addresses.
957								providers.iter().for_each(|p| {
958									self.litep2p.add_known_address(p.peer, p.addresses.clone().into_iter());
959								});
960
961								self.event_streams.send(Event::Dht(
962									DhtEvent::ProvidersFound(
963										key.clone().into(),
964										providers.into_iter().map(|p| p.peer.into()).collect()
965									)
966								));
967
968								// litep2p returns all providers in a single event, so we let
969								// subscribers know no more providers will be yielded.
970								self.event_streams.send(Event::Dht(
971									DhtEvent::NoMoreProviders(key.into())
972								));
973
974								if let Some(ref metrics) = self.metrics {
975									metrics
976										.kademlia_query_duration
977										.with_label_values(&["providers-get"])
978										.observe(started.elapsed().as_secs_f64());
979								}
980							},
981							query => {
982								log::error!(
983									target: LOG_TARGET,
984									"Missing/invalid pending query for `GET_PROVIDERS`: {query:?}"
985								);
986								debug_assert!(false);
987							}
988						}
989					}
990					Some(DiscoveryEvent::AddProviderSuccess { query_id, provided_key }) => {
991						match self.pending_queries.remove(&query_id) {
992							Some(KadQuery::AddProvider(key, started)) => {
993								debug_assert_eq!(key, provided_key.into());
994
995								log::trace!(
996									target: LOG_TARGET,
997									"`ADD_PROVIDER` for {key:?} ({query_id:?}) succeeded",
998								);
999
1000								self.event_streams.send(Event::Dht(
1001									DhtEvent::StartedProviding(key.into())
1002								));
1003
1004								if let Some(ref metrics) = self.metrics {
1005									metrics
1006										.kademlia_query_duration
1007										.with_label_values(&["provider-add"])
1008										.observe(started.elapsed().as_secs_f64());
1009								}
1010							}
1011							Some(_) => {
1012								log::error!(
1013									target: LOG_TARGET,
1014									"Invalid pending query for `ADD_PROVIDER`: {query_id:?}"
1015								);
1016								debug_assert!(false);
1017							}
1018							None => {
1019								log::trace!(
1020									target: LOG_TARGET,
1021									"`ADD_PROVIDER` for key {provided_key:?} ({query_id:?}) succeeded (republishing)",
1022								);
1023							}
1024						}
1025					}
1026					Some(DiscoveryEvent::QueryFailed { query_id }) => {
1027						match self.pending_queries.remove(&query_id) {
1028							Some(KadQuery::FindNode(peer_id, started)) => {
1029								log::debug!(
1030									target: LOG_TARGET,
1031									"`FIND_NODE` ({query_id:?}) failed for target {peer_id:?}",
1032								);
1033
1034								self.event_streams.send(Event::Dht(
1035									DhtEvent::ClosestPeersNotFound(peer_id.into())
1036								));
1037
1038								if let Some(ref metrics) = self.metrics {
1039									metrics
1040										.kademlia_query_duration
1041										.with_label_values(&["node-find-failed"])
1042										.observe(started.elapsed().as_secs_f64());
1043								}
1044							},
1045							Some(KadQuery::GetValue(key, started)) => {
1046								log::debug!(
1047									target: LOG_TARGET,
1048									"`GET_VALUE` ({query_id:?}) failed for key {key:?}",
1049								);
1050
1051								self.event_streams.send(Event::Dht(
1052									DhtEvent::ValueNotFound(key)
1053								));
1054
1055								if let Some(ref metrics) = self.metrics {
1056									metrics
1057										.kademlia_query_duration
1058										.with_label_values(&["value-get-failed"])
1059										.observe(started.elapsed().as_secs_f64());
1060								}
1061							},
1062							Some(KadQuery::PutValue(key, started)) => {
1063								log::debug!(
1064									target: LOG_TARGET,
1065									"`PUT_VALUE` ({query_id:?}) failed for key {key:?}",
1066								);
1067
1068								self.event_streams.send(Event::Dht(
1069									DhtEvent::ValuePutFailed(key)
1070								));
1071
1072								if let Some(ref metrics) = self.metrics {
1073									metrics
1074										.kademlia_query_duration
1075										.with_label_values(&["value-put-failed"])
1076										.observe(started.elapsed().as_secs_f64());
1077								}
1078							},
1079							Some(KadQuery::GetProviders(key, started)) => {
1080								log::debug!(
1081									target: LOG_TARGET,
1082									"`GET_PROVIDERS` ({query_id:?}) failed for key {key:?}"
1083								);
1084
1085								self.event_streams.send(Event::Dht(
1086									DhtEvent::ProvidersNotFound(key)
1087								));
1088
1089								if let Some(ref metrics) = self.metrics {
1090									metrics
1091										.kademlia_query_duration
1092										.with_label_values(&["providers-get-failed"])
1093										.observe(started.elapsed().as_secs_f64());
1094								}
1095							},
1096							Some(KadQuery::AddProvider(key, started)) => {
1097								log::debug!(
1098									target: LOG_TARGET,
1099									"`ADD_PROVIDER` ({query_id:?}) failed with key {key:?}",
1100								);
1101
1102								self.event_streams.send(Event::Dht(
1103									DhtEvent::StartProvidingFailed(key)
1104								));
1105
1106								if let Some(ref metrics) = self.metrics {
1107									metrics
1108										.kademlia_query_duration
1109										.with_label_values(&["provider-add-failed"])
1110										.observe(started.elapsed().as_secs_f64());
1111								}
1112							},
1113							None => {
1114								log::debug!(
1115									target: LOG_TARGET,
1116									"non-existent query (likely republishing a provider) failed ({query_id:?})",
1117								);
1118							}
1119						}
1120					}
1121					Some(DiscoveryEvent::Identified { peer, listen_addresses, supported_protocols, .. }) => {
1122						self.discovery.add_self_reported_address(peer, supported_protocols, listen_addresses).await;
1123					}
1124					Some(DiscoveryEvent::ExternalAddressDiscovered { address }) => {
1125						match self.litep2p.public_addresses().add_address(address.clone().into()) {
1126							Ok(inserted) => if inserted {
1127								log::info!(target: LOG_TARGET, "๐Ÿ” Discovered new external address for our node: {address}");
1128							},
1129							Err(err) => {
1130								log::warn!(
1131									target: LOG_TARGET,
1132									"๐Ÿ” Failed to add discovered external address {address:?}: {err:?}",
1133								);
1134							},
1135						}
1136					}
1137					Some(DiscoveryEvent::ExternalAddressExpired{ address }) => {
1138						let local_peer_id = self.litep2p.local_peer_id();
1139
1140						// Litep2p requires the peer ID to be present in the address.
1141						let address = if !std::matches!(address.iter().last(), Some(Protocol::P2p(_))) {
1142							address.with(Protocol::P2p((*local_peer_id).into()))
1143						} else {
1144							address
1145						};
1146
1147						if self.litep2p.public_addresses().remove_address(&address) {
1148							log::info!(target: LOG_TARGET, "๐Ÿ” Expired external address for our node: {address}");
1149						} else {
1150							log::warn!(
1151								target: LOG_TARGET,
1152								"๐Ÿ” Failed to remove expired external address {address:?}"
1153							);
1154						}
1155					}
1156					Some(DiscoveryEvent::Ping { peer, rtt }) => {
1157						log::trace!(
1158							target: LOG_TARGET,
1159							"ping time with {peer:?}: {rtt:?}",
1160						);
1161					}
1162					Some(DiscoveryEvent::IncomingRecord { record: Record { key, value, publisher, expires }} ) => {
1163						self.event_streams.send(Event::Dht(
1164							DhtEvent::PutRecordRequest(
1165								key.into(),
1166								value,
1167								publisher.map(Into::into),
1168								expires,
1169							)
1170						));
1171					},
1172
1173					Some(DiscoveryEvent::RandomKademliaStarted) => {
1174						if let Some(metrics) = self.metrics.as_ref() {
1175							metrics.kademlia_random_queries_total.inc();
1176						}
1177					}
1178				},
1179				event = self.litep2p.next_event() => match event {
1180					Some(Litep2pEvent::ConnectionEstablished { peer, endpoint }) => {
1181						let Some(metrics) = &self.metrics else {
1182							continue;
1183						};
1184
1185						let direction = match endpoint {
1186							Endpoint::Dialer { .. } => "out",
1187							Endpoint::Listener { .. } => {
1188								// Increment incoming connections counter.
1189								//
1190								// Note: For litep2p these are represented by established negotiated connections,
1191								// while for libp2p (legacy) these represent not-yet-negotiated connections.
1192								metrics.incoming_connections_total.inc();
1193
1194								"in"
1195							},
1196						};
1197						metrics.connections_opened_total.with_label_values(&[direction]).inc();
1198
1199						match self.peers.entry(peer) {
1200							Entry::Vacant(entry) => {
1201								entry.insert(ConnectionContext {
1202									endpoints: HashMap::from_iter([(endpoint.connection_id(), endpoint)]),
1203									num_connections: 1usize,
1204								});
1205								metrics.distinct_peers_connections_opened_total.inc();
1206							}
1207							Entry::Occupied(entry) => {
1208								let entry = entry.into_mut();
1209								entry.num_connections += 1;
1210								entry.endpoints.insert(endpoint.connection_id(), endpoint);
1211							}
1212						}
1213					}
1214					Some(Litep2pEvent::ConnectionClosed { peer, connection_id }) => {
1215						let Some(metrics) = &self.metrics else {
1216							continue;
1217						};
1218
1219						let Some(context) = self.peers.get_mut(&peer) else {
1220							log::debug!(target: LOG_TARGET, "unknown peer disconnected: {peer:?} ({connection_id:?})");
1221							continue
1222						};
1223
1224						let direction = match context.endpoints.remove(&connection_id) {
1225							None => {
1226								log::debug!(target: LOG_TARGET, "connection {connection_id:?} doesn't exist for {peer:?} ");
1227								continue
1228							}
1229							Some(endpoint) => {
1230								context.num_connections -= 1;
1231
1232								match endpoint {
1233									Endpoint::Dialer { .. } => "out",
1234									Endpoint::Listener { .. } => "in",
1235								}
1236							}
1237						};
1238
1239						metrics.connections_closed_total.with_label_values(&[direction, "actively-closed"]).inc();
1240
1241						if context.num_connections == 0 {
1242							self.peers.remove(&peer);
1243							metrics.distinct_peers_connections_closed_total.inc();
1244						}
1245					}
1246					Some(Litep2pEvent::DialFailure { address, error }) => {
1247						log::debug!(
1248							target: LOG_TARGET,
1249							"failed to dial peer at {address:?}: {error:?}",
1250						);
1251
1252						if let Some(metrics) = &self.metrics {
1253							let reason = match error {
1254								DialError::Timeout => "timeout",
1255								DialError::AddressError(_) => "invalid-address",
1256								DialError::DnsError(_) => "cannot-resolve-dns",
1257								DialError::NegotiationError(error) => match error {
1258									NegotiationError::Timeout => "timeout",
1259									NegotiationError::PeerIdMissing => "missing-peer-id",
1260									NegotiationError::StateMismatch => "state-mismatch",
1261									NegotiationError::PeerIdMismatch(_,_) => "peer-id-missmatch",
1262									NegotiationError::MultistreamSelectError(_) => "multistream-select-error",
1263									NegotiationError::SnowError(_) => "noise-error",
1264									NegotiationError::ParseError(_) => "parse-error",
1265									NegotiationError::IoError(_) => "io-error",
1266									NegotiationError::WebSocket(_) => "webscoket-error",
1267									NegotiationError::BadSignature => "bad-signature",
1268								}
1269							};
1270
1271							metrics.pending_connections_errors_total.with_label_values(&[&reason]).inc();
1272						}
1273					}
1274					Some(Litep2pEvent::ListDialFailures { errors }) => {
1275						log::debug!(
1276							target: LOG_TARGET,
1277							"failed to dial peer on multiple addresses {errors:?}",
1278						);
1279
1280						if let Some(metrics) = &self.metrics {
1281							metrics.pending_connections_errors_total.with_label_values(&["transport-errors"]).inc();
1282						}
1283					}
1284					None => {
1285						log::error!(
1286								target: LOG_TARGET,
1287								"Litep2p backend terminated"
1288						);
1289						return
1290					}
1291				},
1292			}
1293		}
1294	}
1295}