sc_network/litep2p/
mod.rs

1// This file is part of Substrate.
2
3// Copyright (C) Parity Technologies (UK) Ltd.
4// SPDX-License-Identifier: GPL-3.0-or-later WITH Classpath-exception-2.0
5
6// This program is free software: you can redistribute it and/or modify
7// it under the terms of the GNU General Public License as published by
8// the Free Software Foundation, either version 3 of the License, or
9// (at your option) any later version.
10
11// This program is distributed in the hope that it will be useful,
12// but WITHOUT ANY WARRANTY; without even the implied warranty of
13// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
14// GNU General Public License for more details.
15
16// You should have received a copy of the GNU General Public License
17// along with this program. If not, see <https://www.gnu.org/licenses/>.
18
19//! `NetworkBackend` implementation for `litep2p`.
20
21use crate::{
22	config::{
23		FullNetworkConfiguration, IncomingRequest, NodeKeyConfig, NotificationHandshake, Params,
24		SetConfig, TransportConfig,
25	},
26	error::Error,
27	event::{DhtEvent, Event},
28	litep2p::{
29		discovery::{Discovery, DiscoveryEvent},
30		peerstore::Peerstore,
31		service::{Litep2pNetworkService, NetworkServiceCommand},
32		shim::{
33			bitswap::BitswapServer,
34			notification::{
35				config::{NotificationProtocolConfig, ProtocolControlHandle},
36				peerset::PeersetCommand,
37			},
38			request_response::{RequestResponseConfig, RequestResponseProtocol},
39		},
40	},
41	peer_store::PeerStoreProvider,
42	protocol,
43	service::{
44		metrics::{register_without_sources, MetricSources, Metrics, NotificationMetrics},
45		out_events,
46		traits::{BandwidthSink, NetworkBackend, NetworkService},
47	},
48	NetworkStatus, NotificationService, ProtocolName,
49};
50
51use codec::Encode;
52use futures::StreamExt;
53use libp2p::kad::{PeerRecord, Record as P2PRecord, RecordKey};
54use litep2p::{
55	config::ConfigBuilder,
56	crypto::ed25519::Keypair,
57	error::{DialError, NegotiationError},
58	executor::Executor,
59	protocol::{
60		libp2p::{
61			bitswap::Config as BitswapConfig,
62			kademlia::{QueryId, Record, RecordsType},
63		},
64		request_response::ConfigBuilder as RequestResponseConfigBuilder,
65	},
66	transport::{
67		tcp::config::Config as TcpTransportConfig,
68		websocket::config::Config as WebSocketTransportConfig, ConnectionLimitsConfig, Endpoint,
69	},
70	types::{
71		multiaddr::{Multiaddr, Protocol},
72		ConnectionId,
73	},
74	Litep2p, Litep2pEvent, ProtocolName as Litep2pProtocolName,
75};
76use prometheus_endpoint::Registry;
77
78use sc_client_api::BlockBackend;
79use sc_network_common::{role::Roles, ExHashT};
80use sc_network_types::PeerId;
81use sc_utils::mpsc::{tracing_unbounded, TracingUnboundedReceiver};
82use sp_runtime::traits::Block as BlockT;
83
84use std::{
85	cmp,
86	collections::{hash_map::Entry, HashMap, HashSet},
87	fs,
88	future::Future,
89	iter,
90	pin::Pin,
91	sync::{
92		atomic::{AtomicUsize, Ordering},
93		Arc,
94	},
95	time::{Duration, Instant},
96};
97
98mod discovery;
99mod peerstore;
100mod service;
101mod shim;
102
103/// Litep2p bandwidth sink.
104struct Litep2pBandwidthSink {
105	sink: litep2p::BandwidthSink,
106}
107
108impl BandwidthSink for Litep2pBandwidthSink {
109	fn total_inbound(&self) -> u64 {
110		self.sink.inbound() as u64
111	}
112
113	fn total_outbound(&self) -> u64 {
114		self.sink.outbound() as u64
115	}
116}
117
118/// Litep2p task executor.
119struct Litep2pExecutor {
120	/// Executor.
121	executor: Box<dyn Fn(Pin<Box<dyn Future<Output = ()> + Send>>) + Send + Sync>,
122}
123
124impl Executor for Litep2pExecutor {
125	fn run(&self, future: Pin<Box<dyn Future<Output = ()> + Send>>) {
126		(self.executor)(future)
127	}
128
129	fn run_with_name(&self, _: &'static str, future: Pin<Box<dyn Future<Output = ()> + Send>>) {
130		(self.executor)(future)
131	}
132}
133
134/// Logging target for the file.
135const LOG_TARGET: &str = "sub-libp2p";
136
137/// Peer context.
138struct ConnectionContext {
139	/// Peer endpoints.
140	endpoints: HashMap<ConnectionId, Endpoint>,
141
142	/// Number of active connections.
143	num_connections: usize,
144}
145
146/// Networking backend for `litep2p`.
147pub struct Litep2pNetworkBackend {
148	/// Main `litep2p` object.
149	litep2p: Litep2p,
150
151	/// `NetworkService` implementation for `Litep2pNetworkBackend`.
152	network_service: Arc<dyn NetworkService>,
153
154	/// RX channel for receiving commands from `Litep2pNetworkService`.
155	cmd_rx: TracingUnboundedReceiver<NetworkServiceCommand>,
156
157	/// `Peerset` handles to notification protocols.
158	peerset_handles: HashMap<ProtocolName, ProtocolControlHandle>,
159
160	/// Pending `GET_VALUE` queries.
161	pending_get_values: HashMap<QueryId, (RecordKey, Instant)>,
162
163	/// Pending `PUT_VALUE` queries.
164	pending_put_values: HashMap<QueryId, (RecordKey, Instant)>,
165
166	/// Discovery.
167	discovery: Discovery,
168
169	/// Number of connected peers.
170	num_connected: Arc<AtomicUsize>,
171
172	/// Connected peers.
173	peers: HashMap<litep2p::PeerId, ConnectionContext>,
174
175	/// Peerstore.
176	peerstore_handle: Arc<dyn PeerStoreProvider>,
177
178	/// Block announce protocol name.
179	block_announce_protocol: ProtocolName,
180
181	/// Sender for DHT events.
182	event_streams: out_events::OutChannels,
183
184	/// Prometheus metrics.
185	metrics: Option<Metrics>,
186}
187
188impl Litep2pNetworkBackend {
189	/// From an iterator of multiaddress(es), parse and group all addresses of peers
190	/// so that litep2p can consume the information easily.
191	fn parse_addresses(
192		addresses: impl Iterator<Item = Multiaddr>,
193	) -> HashMap<PeerId, Vec<Multiaddr>> {
194		addresses
195			.into_iter()
196			.filter_map(|address| match address.iter().next() {
197				Some(
198					Protocol::Dns(_) |
199					Protocol::Dns4(_) |
200					Protocol::Dns6(_) |
201					Protocol::Ip6(_) |
202					Protocol::Ip4(_),
203				) => match address.iter().find(|protocol| std::matches!(protocol, Protocol::P2p(_)))
204				{
205					Some(Protocol::P2p(multihash)) => PeerId::from_multihash(multihash.into())
206						.map_or(None, |peer| Some((peer, Some(address)))),
207					_ => None,
208				},
209				Some(Protocol::P2p(multihash)) =>
210					PeerId::from_multihash(multihash.into()).map_or(None, |peer| Some((peer, None))),
211				_ => None,
212			})
213			.fold(HashMap::new(), |mut acc, (peer, maybe_address)| {
214				let entry = acc.entry(peer).or_default();
215				maybe_address.map(|address| entry.push(address));
216
217				acc
218			})
219	}
220
221	/// Add new known addresses to `litep2p` and return the parsed peer IDs.
222	fn add_addresses(&mut self, peers: impl Iterator<Item = Multiaddr>) -> HashSet<PeerId> {
223		Self::parse_addresses(peers.into_iter())
224			.into_iter()
225			.filter_map(|(peer, addresses)| {
226				// `peers` contained multiaddress in the form `/p2p/<peer ID>`
227				if addresses.is_empty() {
228					return Some(peer)
229				}
230
231				if self.litep2p.add_known_address(peer.into(), addresses.clone().into_iter()) == 0 {
232					log::warn!(
233						target: LOG_TARGET,
234						"couldn't add any addresses for {peer:?} and it won't be added as reserved peer",
235					);
236					return None
237				}
238
239				self.peerstore_handle.add_known_peer(peer);
240				Some(peer)
241			})
242			.collect()
243	}
244}
245
246impl Litep2pNetworkBackend {
247	/// Get `litep2p` keypair from `NodeKeyConfig`.
248	fn get_keypair(node_key: &NodeKeyConfig) -> Result<(Keypair, litep2p::PeerId), Error> {
249		let secret: litep2p::crypto::ed25519::SecretKey =
250			node_key.clone().into_keypair()?.secret().into();
251
252		let local_identity = Keypair::from(secret);
253		let local_public = local_identity.public();
254		let local_peer_id = local_public.to_peer_id();
255
256		Ok((local_identity, local_peer_id))
257	}
258
259	/// Configure transport protocols for `Litep2pNetworkBackend`.
260	fn configure_transport<B: BlockT + 'static, H: ExHashT>(
261		config: &FullNetworkConfiguration<B, H, Self>,
262	) -> ConfigBuilder {
263		let _ = match config.network_config.transport {
264			TransportConfig::MemoryOnly => panic!("memory transport not supported"),
265			TransportConfig::Normal { .. } => false,
266		};
267		let config_builder = ConfigBuilder::new();
268
269		// The yamux buffer size limit is configured to be equal to the maximum frame size
270		// of all protocols. 10 bytes are added to each limit for the length prefix that
271		// is not included in the upper layer protocols limit but is still present in the
272		// yamux buffer. These 10 bytes correspond to the maximum size required to encode
273		// a variable-length-encoding 64bits number. In other words, we make the
274		// assumption that no notification larger than 2^64 will ever be sent.
275		let yamux_maximum_buffer_size = {
276			let requests_max = config
277				.request_response_protocols
278				.iter()
279				.map(|cfg| usize::try_from(cfg.max_request_size).unwrap_or(usize::MAX));
280			let responses_max = config
281				.request_response_protocols
282				.iter()
283				.map(|cfg| usize::try_from(cfg.max_response_size).unwrap_or(usize::MAX));
284			let notifs_max = config
285				.notification_protocols
286				.iter()
287				.map(|cfg| usize::try_from(cfg.max_notification_size()).unwrap_or(usize::MAX));
288
289			// A "default" max is added to cover all the other protocols: ping, identify,
290			// kademlia, block announces, and transactions.
291			let default_max = cmp::max(
292				1024 * 1024,
293				usize::try_from(protocol::BLOCK_ANNOUNCES_TRANSACTIONS_SUBSTREAM_SIZE)
294					.unwrap_or(usize::MAX),
295			);
296
297			iter::once(default_max)
298				.chain(requests_max)
299				.chain(responses_max)
300				.chain(notifs_max)
301				.max()
302				.expect("iterator known to always yield at least one element; qed")
303				.saturating_add(10)
304		};
305
306		let yamux_config = {
307			let mut yamux_config = litep2p::yamux::Config::default();
308			// Enable proper flow-control: window updates are only sent when
309			// buffered data has been consumed.
310			yamux_config.set_window_update_mode(litep2p::yamux::WindowUpdateMode::OnRead);
311			yamux_config.set_max_buffer_size(yamux_maximum_buffer_size);
312
313			if let Some(yamux_window_size) = config.network_config.yamux_window_size {
314				yamux_config.set_receive_window(yamux_window_size);
315			}
316
317			yamux_config
318		};
319
320		let (tcp, websocket): (Vec<Option<_>>, Vec<Option<_>>) = config
321			.network_config
322			.listen_addresses
323			.iter()
324			.filter_map(|address| {
325				use sc_network_types::multiaddr::Protocol;
326
327				let mut iter = address.iter();
328
329				match iter.next() {
330					Some(Protocol::Ip4(_) | Protocol::Ip6(_)) => {},
331					protocol => {
332						log::error!(
333							target: LOG_TARGET,
334							"unknown protocol {protocol:?}, ignoring {address:?}",
335						);
336
337						return None
338					},
339				}
340
341				match iter.next() {
342					Some(Protocol::Tcp(_)) => match iter.next() {
343						Some(Protocol::Ws(_) | Protocol::Wss(_)) =>
344							Some((None, Some(address.clone()))),
345						Some(Protocol::P2p(_)) | None => Some((Some(address.clone()), None)),
346						protocol => {
347							log::error!(
348								target: LOG_TARGET,
349								"unknown protocol {protocol:?}, ignoring {address:?}",
350							);
351							None
352						},
353					},
354					protocol => {
355						log::error!(
356							target: LOG_TARGET,
357							"unknown protocol {protocol:?}, ignoring {address:?}",
358						);
359						None
360					},
361				}
362			})
363			.unzip();
364
365		config_builder
366			.with_websocket(WebSocketTransportConfig {
367				listen_addresses: websocket.into_iter().flatten().map(Into::into).collect(),
368				yamux_config: yamux_config.clone(),
369				nodelay: true,
370				..Default::default()
371			})
372			.with_tcp(TcpTransportConfig {
373				listen_addresses: tcp.into_iter().flatten().map(Into::into).collect(),
374				yamux_config,
375				nodelay: true,
376				..Default::default()
377			})
378	}
379}
380
381#[async_trait::async_trait]
382impl<B: BlockT + 'static, H: ExHashT> NetworkBackend<B, H> for Litep2pNetworkBackend {
383	type NotificationProtocolConfig = NotificationProtocolConfig;
384	type RequestResponseProtocolConfig = RequestResponseConfig;
385	type NetworkService<Block, Hash> = Arc<Litep2pNetworkService>;
386	type PeerStore = Peerstore;
387	type BitswapConfig = BitswapConfig;
388
389	fn new(mut params: Params<B, H, Self>) -> Result<Self, Error>
390	where
391		Self: Sized,
392	{
393		let (keypair, local_peer_id) =
394			Self::get_keypair(&params.network_config.network_config.node_key)?;
395		let (cmd_tx, cmd_rx) = tracing_unbounded("mpsc_network_worker", 100_000);
396
397		params.network_config.network_config.boot_nodes = params
398			.network_config
399			.network_config
400			.boot_nodes
401			.into_iter()
402			.filter(|boot_node| boot_node.peer_id != local_peer_id.into())
403			.collect();
404		params.network_config.network_config.default_peers_set.reserved_nodes = params
405			.network_config
406			.network_config
407			.default_peers_set
408			.reserved_nodes
409			.into_iter()
410			.filter(|reserved_node| {
411				if reserved_node.peer_id == local_peer_id.into() {
412					log::warn!(
413						target: LOG_TARGET,
414						"Local peer ID used in reserved node, ignoring: {reserved_node}",
415					);
416					false
417				} else {
418					true
419				}
420			})
421			.collect();
422
423		if let Some(path) = &params.network_config.network_config.net_config_path {
424			fs::create_dir_all(path)?;
425		}
426
427		log::info!(target: LOG_TARGET, "Local node identity is: {local_peer_id}");
428		log::info!(target: LOG_TARGET, "Running litep2p network backend");
429
430		params.network_config.sanity_check_addresses()?;
431		params.network_config.sanity_check_bootnodes()?;
432
433		let mut config_builder =
434			Self::configure_transport(&params.network_config).with_keypair(keypair.clone());
435		let known_addresses = params.network_config.known_addresses();
436		let peer_store_handle = params.network_config.peer_store_handle();
437		let executor = Arc::new(Litep2pExecutor { executor: params.executor });
438
439		let FullNetworkConfiguration {
440			notification_protocols,
441			request_response_protocols,
442			network_config,
443			..
444		} = params.network_config;
445
446		// initialize notification protocols
447		//
448		// pass the protocol configuration to `Litep2pConfigBuilder` and save the TX channel
449		// to the protocol's `Peerset` together with the protocol name to allow other subsystems
450		// of Polkadot SDK to control connectivity of the notification protocol
451		let block_announce_protocol = params.block_announce_config.protocol_name().clone();
452		let mut notif_protocols = HashMap::from_iter([(
453			params.block_announce_config.protocol_name().clone(),
454			params.block_announce_config.handle,
455		)]);
456
457		// handshake for all but the syncing protocol is set to node role
458		config_builder = notification_protocols
459			.into_iter()
460			.fold(config_builder, |config_builder, mut config| {
461				config.config.set_handshake(Roles::from(&params.role).encode());
462				notif_protocols.insert(config.protocol_name, config.handle);
463
464				config_builder.with_notification_protocol(config.config)
465			})
466			.with_notification_protocol(params.block_announce_config.config);
467
468		// initialize request-response protocols
469		let metrics = match &params.metrics_registry {
470			Some(registry) => Some(register_without_sources(registry)?),
471			None => None,
472		};
473
474		// create channels that are used to send request before initializing protocols so the
475		// senders can be passed onto all request-response protocols
476		//
477		// all protocols must have each others' senders so they can send the fallback request in
478		// case the main protocol is not supported by the remote peer and user specified a fallback
479		let (mut request_response_receivers, request_response_senders): (
480			HashMap<_, _>,
481			HashMap<_, _>,
482		) = request_response_protocols
483			.iter()
484			.map(|config| {
485				let (tx, rx) = tracing_unbounded("outbound-requests", 10_000);
486				((config.protocol_name.clone(), rx), (config.protocol_name.clone(), tx))
487			})
488			.unzip();
489
490		config_builder = request_response_protocols.into_iter().fold(
491			config_builder,
492			|config_builder, config| {
493				let (protocol_config, handle) = RequestResponseConfigBuilder::new(
494					Litep2pProtocolName::from(config.protocol_name.clone()),
495				)
496				.with_max_size(cmp::max(config.max_request_size, config.max_response_size) as usize)
497				.with_fallback_names(config.fallback_names.into_iter().map(From::from).collect())
498				.with_timeout(config.request_timeout)
499				.build();
500
501				let protocol = RequestResponseProtocol::new(
502					config.protocol_name.clone(),
503					handle,
504					Arc::clone(&peer_store_handle),
505					config.inbound_queue,
506					request_response_receivers
507						.remove(&config.protocol_name)
508						.expect("receiver exists as it was just added and there are no duplicate protocols; qed"),
509					request_response_senders.clone(),
510					metrics.clone(),
511				);
512
513				executor.run(Box::pin(async move {
514					protocol.run().await;
515				}));
516
517				config_builder.with_request_response_protocol(protocol_config)
518			},
519		);
520
521		// collect known addresses
522		let known_addresses: HashMap<litep2p::PeerId, Vec<Multiaddr>> =
523			known_addresses.into_iter().fold(HashMap::new(), |mut acc, (peer, address)| {
524				use sc_network_types::multiaddr::Protocol;
525
526				let address = match address.iter().last() {
527					Some(Protocol::Ws(_) | Protocol::Wss(_) | Protocol::Tcp(_)) =>
528						address.with(Protocol::P2p(peer.into())),
529					Some(Protocol::P2p(_)) => address,
530					_ => return acc,
531				};
532
533				acc.entry(peer.into()).or_default().push(address.into());
534				peer_store_handle.add_known_peer(peer);
535
536				acc
537			});
538
539		// enable ipfs ping, identify and kademlia, and potentially mdns if user enabled it
540		let listen_addresses = Arc::new(Default::default());
541		let (discovery, ping_config, identify_config, kademlia_config, maybe_mdns_config) =
542			Discovery::new(
543				&network_config,
544				params.genesis_hash,
545				params.fork_id.as_deref(),
546				&params.protocol_id,
547				known_addresses.clone(),
548				Arc::clone(&listen_addresses),
549				Arc::clone(&peer_store_handle),
550			);
551
552		config_builder = config_builder
553			.with_known_addresses(known_addresses.clone().into_iter())
554			.with_libp2p_ping(ping_config)
555			.with_libp2p_identify(identify_config)
556			.with_libp2p_kademlia(kademlia_config)
557			.with_connection_limits(ConnectionLimitsConfig::default().max_incoming_connections(
558				Some(crate::MAX_CONNECTIONS_ESTABLISHED_INCOMING as usize),
559			))
560			.with_executor(executor);
561
562		if let Some(config) = maybe_mdns_config {
563			config_builder = config_builder.with_mdns(config);
564		}
565
566		if let Some(config) = params.bitswap_config {
567			config_builder = config_builder.with_libp2p_bitswap(config);
568		}
569
570		let litep2p =
571			Litep2p::new(config_builder.build()).map_err(|error| Error::Litep2p(error))?;
572
573		litep2p.listen_addresses().for_each(|address| {
574			log::debug!(target: LOG_TARGET, "listening on: {address}");
575
576			listen_addresses.write().insert(address.clone());
577		});
578
579		let public_addresses = litep2p.public_addresses();
580		for address in network_config.public_addresses.iter() {
581			if let Err(err) = public_addresses.add_address(address.clone().into()) {
582				log::warn!(
583					target: LOG_TARGET,
584					"failed to add public address {address:?}: {err:?}",
585				);
586			}
587		}
588
589		let network_service = Arc::new(Litep2pNetworkService::new(
590			local_peer_id,
591			keypair.clone(),
592			cmd_tx,
593			Arc::clone(&peer_store_handle),
594			notif_protocols.clone(),
595			block_announce_protocol.clone(),
596			request_response_senders,
597			Arc::clone(&listen_addresses),
598			public_addresses,
599		));
600
601		// register rest of the metrics now that `Litep2p` has been created
602		let num_connected = Arc::new(Default::default());
603		let bandwidth: Arc<dyn BandwidthSink> =
604			Arc::new(Litep2pBandwidthSink { sink: litep2p.bandwidth_sink() });
605
606		if let Some(registry) = &params.metrics_registry {
607			MetricSources::register(registry, bandwidth, Arc::clone(&num_connected))?;
608		}
609
610		Ok(Self {
611			network_service,
612			cmd_rx,
613			metrics,
614			peerset_handles: notif_protocols,
615			num_connected,
616			discovery,
617			pending_put_values: HashMap::new(),
618			pending_get_values: HashMap::new(),
619			peerstore_handle: peer_store_handle,
620			block_announce_protocol,
621			event_streams: out_events::OutChannels::new(None)?,
622			peers: HashMap::new(),
623			litep2p,
624		})
625	}
626
627	fn network_service(&self) -> Arc<dyn NetworkService> {
628		Arc::clone(&self.network_service)
629	}
630
631	fn peer_store(
632		bootnodes: Vec<sc_network_types::PeerId>,
633		metrics_registry: Option<Registry>,
634	) -> Self::PeerStore {
635		Peerstore::new(bootnodes, metrics_registry)
636	}
637
638	fn register_notification_metrics(registry: Option<&Registry>) -> NotificationMetrics {
639		NotificationMetrics::new(registry)
640	}
641
642	/// Create Bitswap server.
643	fn bitswap_server(
644		client: Arc<dyn BlockBackend<B> + Send + Sync>,
645	) -> (Pin<Box<dyn Future<Output = ()> + Send>>, Self::BitswapConfig) {
646		BitswapServer::new(client)
647	}
648
649	/// Create notification protocol configuration for `protocol`.
650	fn notification_config(
651		protocol_name: ProtocolName,
652		fallback_names: Vec<ProtocolName>,
653		max_notification_size: u64,
654		handshake: Option<NotificationHandshake>,
655		set_config: SetConfig,
656		metrics: NotificationMetrics,
657		peerstore_handle: Arc<dyn PeerStoreProvider>,
658	) -> (Self::NotificationProtocolConfig, Box<dyn NotificationService>) {
659		Self::NotificationProtocolConfig::new(
660			protocol_name,
661			fallback_names,
662			max_notification_size as usize,
663			handshake,
664			set_config,
665			metrics,
666			peerstore_handle,
667		)
668	}
669
670	/// Create request-response protocol configuration.
671	fn request_response_config(
672		protocol_name: ProtocolName,
673		fallback_names: Vec<ProtocolName>,
674		max_request_size: u64,
675		max_response_size: u64,
676		request_timeout: Duration,
677		inbound_queue: Option<async_channel::Sender<IncomingRequest>>,
678	) -> Self::RequestResponseProtocolConfig {
679		Self::RequestResponseProtocolConfig::new(
680			protocol_name,
681			fallback_names,
682			max_request_size,
683			max_response_size,
684			request_timeout,
685			inbound_queue,
686		)
687	}
688
689	/// Start [`Litep2pNetworkBackend`] event loop.
690	async fn run(mut self) {
691		log::debug!(target: LOG_TARGET, "starting litep2p network backend");
692
693		loop {
694			let num_connected_peers = self
695				.peerset_handles
696				.get(&self.block_announce_protocol)
697				.map_or(0usize, |handle| handle.connected_peers.load(Ordering::Relaxed));
698			self.num_connected.store(num_connected_peers, Ordering::Relaxed);
699
700			tokio::select! {
701				command = self.cmd_rx.next() => match command {
702					None => return,
703					Some(command) => match command {
704						NetworkServiceCommand::GetValue{ key } => {
705							let query_id = self.discovery.get_value(key.clone()).await;
706							self.pending_get_values.insert(query_id, (key, Instant::now()));
707						}
708						NetworkServiceCommand::PutValue { key, value } => {
709							let query_id = self.discovery.put_value(key.clone(), value).await;
710							self.pending_put_values.insert(query_id, (key, Instant::now()));
711						}
712						NetworkServiceCommand::PutValueTo { record, peers, update_local_storage} => {
713							let kademlia_key = record.key.to_vec().into();
714							let query_id = self.discovery.put_value_to_peers(record, peers, update_local_storage).await;
715							self.pending_put_values.insert(query_id, (kademlia_key, Instant::now()));
716						}
717
718						NetworkServiceCommand::StoreRecord { key, value, publisher, expires } => {
719							self.discovery.store_record(key, value, publisher.map(Into::into), expires).await;
720						}
721						NetworkServiceCommand::EventStream { tx } => {
722							self.event_streams.push(tx);
723						}
724						NetworkServiceCommand::Status { tx } => {
725							let _ = tx.send(NetworkStatus {
726								num_connected_peers: self
727									.peerset_handles
728									.get(&self.block_announce_protocol)
729									.map_or(0usize, |handle| handle.connected_peers.load(Ordering::Relaxed)),
730								total_bytes_inbound: self.litep2p.bandwidth_sink().inbound() as u64,
731								total_bytes_outbound: self.litep2p.bandwidth_sink().outbound() as u64,
732							});
733						}
734						NetworkServiceCommand::AddPeersToReservedSet {
735							protocol,
736							peers,
737						} => {
738							let peers = self.add_addresses(peers.into_iter().map(Into::into));
739
740							match self.peerset_handles.get(&protocol) {
741								Some(handle) => {
742									let _ = handle.tx.unbounded_send(PeersetCommand::AddReservedPeers { peers });
743								}
744								None => log::warn!(target: LOG_TARGET, "protocol {protocol} doens't exist"),
745							};
746						}
747						NetworkServiceCommand::AddKnownAddress { peer, address } => {
748							let mut address: Multiaddr = address.into();
749
750							if !address.iter().any(|protocol| std::matches!(protocol, Protocol::P2p(_))) {
751								address.push(Protocol::P2p(litep2p::PeerId::from(peer).into()));
752							}
753
754							if self.litep2p.add_known_address(peer.into(), iter::once(address.clone())) == 0usize {
755								log::warn!(
756									target: LOG_TARGET,
757									"couldn't add known address ({address}) for {peer:?}, unsupported transport"
758								);
759							}
760						},
761						NetworkServiceCommand::SetReservedPeers { protocol, peers } => {
762							let peers = self.add_addresses(peers.into_iter().map(Into::into));
763
764							match self.peerset_handles.get(&protocol) {
765								Some(handle) => {
766									let _ = handle.tx.unbounded_send(PeersetCommand::SetReservedPeers { peers });
767								}
768								None => log::warn!(target: LOG_TARGET, "protocol {protocol} doens't exist"),
769							}
770
771						},
772						NetworkServiceCommand::DisconnectPeer {
773							protocol,
774							peer,
775						} => {
776							let Some(handle) = self.peerset_handles.get(&protocol) else {
777								log::warn!(target: LOG_TARGET, "protocol {protocol} doens't exist");
778								continue
779							};
780
781							let _ = handle.tx.unbounded_send(PeersetCommand::DisconnectPeer { peer });
782						}
783						NetworkServiceCommand::SetReservedOnly {
784							protocol,
785							reserved_only,
786						} => {
787							let Some(handle) = self.peerset_handles.get(&protocol) else {
788								log::warn!(target: LOG_TARGET, "protocol {protocol} doens't exist");
789								continue
790							};
791
792							let _ = handle.tx.unbounded_send(PeersetCommand::SetReservedOnly { reserved_only });
793						}
794						NetworkServiceCommand::RemoveReservedPeers {
795							protocol,
796							peers,
797						} => {
798							let Some(handle) = self.peerset_handles.get(&protocol) else {
799								log::warn!(target: LOG_TARGET, "protocol {protocol} doens't exist");
800								continue
801							};
802
803							let _ = handle.tx.unbounded_send(PeersetCommand::RemoveReservedPeers { peers });
804						}
805					}
806				},
807				event = self.discovery.next() => match event {
808					None => return,
809					Some(DiscoveryEvent::Discovered { addresses }) => {
810						// if at least one address was added for the peer, report the peer to `Peerstore`
811						for (peer, addresses) in Litep2pNetworkBackend::parse_addresses(addresses.into_iter()) {
812							if self.litep2p.add_known_address(peer.into(), addresses.clone().into_iter()) > 0 {
813								self.peerstore_handle.add_known_peer(peer);
814							}
815						}
816					}
817					Some(DiscoveryEvent::RoutingTableUpdate { peers }) => {
818						for peer in peers {
819							self.peerstore_handle.add_known_peer(peer.into());
820						}
821					}
822					Some(DiscoveryEvent::GetRecordSuccess { query_id, records }) => {
823						match self.pending_get_values.remove(&query_id) {
824							None => log::warn!(
825								target: LOG_TARGET,
826								"`GET_VALUE` succeeded for a non-existent query",
827							),
828							Some((key, started)) => {
829								log::trace!(
830									target: LOG_TARGET,
831									"`GET_VALUE` for {:?} ({query_id:?}) succeeded",
832									key,
833								);
834								for record in litep2p_to_libp2p_peer_record(records) {
835									self.event_streams.send(
836										Event::Dht(
837											DhtEvent::ValueFound(
838												record
839											)
840										)
841									);
842								}
843
844								if let Some(ref metrics) = self.metrics {
845									metrics
846										.kademlia_query_duration
847										.with_label_values(&["value-get"])
848										.observe(started.elapsed().as_secs_f64());
849								}
850							}
851						}
852					}
853					Some(DiscoveryEvent::PutRecordSuccess { query_id }) => {
854						match self.pending_put_values.remove(&query_id) {
855							None => log::warn!(
856								target: LOG_TARGET,
857								"`PUT_VALUE` succeeded for a non-existent query",
858							),
859							Some((key, started)) => {
860								log::trace!(
861									target: LOG_TARGET,
862									"`PUT_VALUE` for {key:?} ({query_id:?}) succeeded",
863								);
864
865								self.event_streams.send(Event::Dht(
866									DhtEvent::ValuePut(libp2p::kad::RecordKey::new(&key))
867								));
868
869								if let Some(ref metrics) = self.metrics {
870									metrics
871										.kademlia_query_duration
872										.with_label_values(&["value-put"])
873										.observe(started.elapsed().as_secs_f64());
874								}
875							}
876						}
877					}
878					Some(DiscoveryEvent::QueryFailed { query_id }) => {
879						match self.pending_get_values.remove(&query_id) {
880							None => match self.pending_put_values.remove(&query_id) {
881								None => log::warn!(
882									target: LOG_TARGET,
883									"non-existent query failed ({query_id:?})",
884								),
885								Some((key, started)) => {
886									log::debug!(
887										target: LOG_TARGET,
888										"`PUT_VALUE` ({query_id:?}) failed for key {key:?}",
889									);
890
891									self.event_streams.send(Event::Dht(
892										DhtEvent::ValuePutFailed(libp2p::kad::RecordKey::new(&key))
893									));
894
895									if let Some(ref metrics) = self.metrics {
896										metrics
897											.kademlia_query_duration
898											.with_label_values(&["value-put-failed"])
899											.observe(started.elapsed().as_secs_f64());
900									}
901								}
902							}
903							Some((key, started)) => {
904								log::debug!(
905									target: LOG_TARGET,
906									"`GET_VALUE` ({query_id:?}) failed for key {key:?}",
907								);
908
909								self.event_streams.send(Event::Dht(
910									DhtEvent::ValueNotFound(libp2p::kad::RecordKey::new(&key))
911								));
912
913								if let Some(ref metrics) = self.metrics {
914									metrics
915										.kademlia_query_duration
916										.with_label_values(&["value-get-failed"])
917										.observe(started.elapsed().as_secs_f64());
918								}
919							}
920						}
921					}
922					Some(DiscoveryEvent::Identified { peer, listen_addresses, supported_protocols, .. }) => {
923						self.discovery.add_self_reported_address(peer, supported_protocols, listen_addresses).await;
924					}
925					Some(DiscoveryEvent::ExternalAddressDiscovered { address }) => {
926						match self.litep2p.public_addresses().add_address(address.clone().into()) {
927							Ok(inserted) => if inserted {
928								log::info!(target: LOG_TARGET, "🔍 Discovered new external address for our node: {address}");
929							},
930							Err(err) => {
931								log::warn!(
932									target: LOG_TARGET,
933									"🔍 Failed to add discovered external address {address:?}: {err:?}",
934								);
935							},
936						}
937					}
938					Some(DiscoveryEvent::Ping { peer, rtt }) => {
939						log::trace!(
940							target: LOG_TARGET,
941							"ping time with {peer:?}: {rtt:?}",
942						);
943					}
944					Some(DiscoveryEvent::IncomingRecord { record: Record { key, value, publisher, expires }} ) => {
945						self.event_streams.send(Event::Dht(
946							DhtEvent::PutRecordRequest(
947								libp2p::kad::RecordKey::new(&key),
948								value,
949								publisher.map(Into::into),
950								expires,
951							)
952						));
953					},
954
955					Some(DiscoveryEvent::RandomKademliaStarted) => {
956						if let Some(metrics) = self.metrics.as_ref() {
957							metrics.kademlia_random_queries_total.inc();
958						}
959					}
960				},
961				event = self.litep2p.next_event() => match event {
962					Some(Litep2pEvent::ConnectionEstablished { peer, endpoint }) => {
963						let Some(metrics) = &self.metrics else {
964							continue;
965						};
966
967						let direction = match endpoint {
968							Endpoint::Dialer { .. } => "out",
969							Endpoint::Listener { .. } => "in",
970						};
971						metrics.connections_opened_total.with_label_values(&[direction]).inc();
972
973						match self.peers.entry(peer) {
974							Entry::Vacant(entry) => {
975								entry.insert(ConnectionContext {
976									endpoints: HashMap::from_iter([(endpoint.connection_id(), endpoint)]),
977									num_connections: 1usize,
978								});
979								metrics.distinct_peers_connections_opened_total.inc();
980							}
981							Entry::Occupied(entry) => {
982								let entry = entry.into_mut();
983								entry.num_connections += 1;
984								entry.endpoints.insert(endpoint.connection_id(), endpoint);
985							}
986						}
987					}
988					Some(Litep2pEvent::ConnectionClosed { peer, connection_id }) => {
989						let Some(metrics) = &self.metrics else {
990							continue;
991						};
992
993						let Some(context) = self.peers.get_mut(&peer) else {
994							log::debug!(target: LOG_TARGET, "unknown peer disconnected: {peer:?} ({connection_id:?})");
995							continue
996						};
997
998						let direction = match context.endpoints.remove(&connection_id) {
999							None => {
1000								log::debug!(target: LOG_TARGET, "connection {connection_id:?} doesn't exist for {peer:?} ");
1001								continue
1002							}
1003							Some(endpoint) => {
1004								context.num_connections -= 1;
1005
1006								match endpoint {
1007									Endpoint::Dialer { .. } => "out",
1008									Endpoint::Listener { .. } => "in",
1009								}
1010							}
1011						};
1012
1013						metrics.connections_closed_total.with_label_values(&[direction, "actively-closed"]).inc();
1014
1015						if context.num_connections == 0 {
1016							self.peers.remove(&peer);
1017							metrics.distinct_peers_connections_closed_total.inc();
1018						}
1019					}
1020					Some(Litep2pEvent::DialFailure { address, error }) => {
1021						log::debug!(
1022							target: LOG_TARGET,
1023							"failed to dial peer at {address:?}: {error:?}",
1024						);
1025
1026						if let Some(metrics) = &self.metrics {
1027							let reason = match error {
1028								DialError::Timeout => "timeout",
1029								DialError::AddressError(_) => "invalid-address",
1030								DialError::DnsError(_) => "cannot-resolve-dns",
1031								DialError::NegotiationError(error) => match error {
1032									NegotiationError::Timeout => "timeout",
1033									NegotiationError::PeerIdMissing => "missing-peer-id",
1034									NegotiationError::StateMismatch => "state-mismatch",
1035									NegotiationError::PeerIdMismatch(_,_) => "peer-id-missmatch",
1036									NegotiationError::MultistreamSelectError(_) => "multistream-select-error",
1037									NegotiationError::SnowError(_) => "noise-error",
1038									NegotiationError::ParseError(_) => "parse-error",
1039									NegotiationError::IoError(_) => "io-error",
1040									NegotiationError::WebSocket(_) => "webscoket-error",
1041								}
1042							};
1043
1044							metrics.pending_connections_errors_total.with_label_values(&[&reason]).inc();
1045						}
1046					}
1047					Some(Litep2pEvent::ListDialFailures { errors }) => {
1048						log::debug!(
1049							target: LOG_TARGET,
1050							"failed to dial peer on multiple addresses {errors:?}",
1051						);
1052
1053						if let Some(metrics) = &self.metrics {
1054							metrics.pending_connections_errors_total.with_label_values(&["transport-errors"]).inc();
1055						}
1056					}
1057					_ => {}
1058				},
1059			}
1060		}
1061	}
1062}
1063
1064// Glue code to convert from a litep2p records type to a libp2p2 PeerRecord.
1065fn litep2p_to_libp2p_peer_record(records: RecordsType) -> Vec<PeerRecord> {
1066	match records {
1067		litep2p::protocol::libp2p::kademlia::RecordsType::LocalStore(record) => {
1068			vec![PeerRecord {
1069				record: P2PRecord {
1070					key: record.key.to_vec().into(),
1071					value: record.value,
1072					publisher: record.publisher.map(|peer_id| {
1073						let peer_id: sc_network_types::PeerId = peer_id.into();
1074						peer_id.into()
1075					}),
1076					expires: record.expires,
1077				},
1078				peer: None,
1079			}]
1080		},
1081		litep2p::protocol::libp2p::kademlia::RecordsType::Network(records) => records
1082			.into_iter()
1083			.map(|record| {
1084				let peer_id: sc_network_types::PeerId = record.peer.into();
1085
1086				PeerRecord {
1087					record: P2PRecord {
1088						key: record.record.key.to_vec().into(),
1089						value: record.record.value,
1090						publisher: record.record.publisher.map(|peer_id| {
1091							let peer_id: sc_network_types::PeerId = peer_id.into();
1092							peer_id.into()
1093						}),
1094						expires: record.record.expires,
1095					},
1096					peer: Some(peer_id.into()),
1097				}
1098			})
1099			.collect::<Vec<_>>(),
1100	}
1101}