referrerpolicy=no-referrer-when-downgrade

sc_network/
service.rs

1// This file is part of Substrate.
2
3// Copyright (C) Parity Technologies (UK) Ltd.
4// SPDX-License-Identifier: GPL-3.0-or-later WITH Classpath-exception-2.0
5
6// This program is free software: you can redistribute it and/or modify
7// it under the terms of the GNU General Public License as published by
8// the Free Software Foundation, either version 3 of the License, or
9// (at your option) any later version.
10
11// This program is distributed in the hope that it will be useful,
12// but WITHOUT ANY WARRANTY; without even the implied warranty of
13// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
14// GNU General Public License for more details.
15
16// You should have received a copy of the GNU General Public License
17// along with this program. If not, see <https://www.gnu.org/licenses/>.
18
19//! Main entry point of the sc-network crate.
20//!
21//! There are two main structs in this module: [`NetworkWorker`] and [`NetworkService`].
22//! The [`NetworkWorker`] *is* the network. Network is driven by [`NetworkWorker::run`] future that
23//! terminates only when all instances of the control handles [`NetworkService`] were dropped.
24//! The [`NetworkService`] is merely a shared version of the [`NetworkWorker`]. You can obtain an
25//! `Arc<NetworkService>` by calling [`NetworkWorker::service`].
26//!
27//! The methods of the [`NetworkService`] are implemented by sending a message over a channel,
28//! which is then processed by [`NetworkWorker::next_action`].
29
30use crate::{
31	behaviour::{self, Behaviour, BehaviourOut},
32	bitswap::BitswapRequestHandler,
33	config::{
34		parse_addr, FullNetworkConfiguration, IncomingRequest, MultiaddrWithPeerId,
35		NonDefaultSetConfig, NotificationHandshake, Params, SetConfig, TransportConfig,
36	},
37	discovery::DiscoveryConfig,
38	error::Error,
39	event::{DhtEvent, Event},
40	network_state::{
41		NetworkState, NotConnectedPeer as NetworkStateNotConnectedPeer, Peer as NetworkStatePeer,
42	},
43	peer_store::{PeerStore, PeerStoreProvider},
44	protocol::{self, Protocol, Ready},
45	protocol_controller::{self, ProtoSetConfig, ProtocolController, SetId},
46	request_responses::{IfDisconnected, ProtocolConfig as RequestResponseConfig, RequestFailure},
47	service::{
48		signature::{Signature, SigningError},
49		traits::{
50			BandwidthSink, NetworkBackend, NetworkDHTProvider, NetworkEventStream, NetworkPeers,
51			NetworkRequest, NetworkService as NetworkServiceT, NetworkSigner, NetworkStateInfo,
52			NetworkStatus, NetworkStatusProvider, NotificationSender as NotificationSenderT,
53			NotificationSenderError, NotificationSenderReady as NotificationSenderReadyT,
54		},
55	},
56	transport,
57	types::ProtocolName,
58	NotificationService, ReputationChange,
59};
60
61use codec::DecodeAll;
62use futures::{channel::oneshot, prelude::*};
63use libp2p::{
64	connection_limits::{ConnectionLimits, Exceeded},
65	core::{upgrade, ConnectedPoint, Endpoint},
66	identify::Info as IdentifyInfo,
67	identity::ed25519,
68	multiaddr::{self, Multiaddr},
69	swarm::{
70		Config as SwarmConfig, ConnectionError, ConnectionId, DialError, Executor, ListenError,
71		NetworkBehaviour, Swarm, SwarmEvent,
72	},
73	PeerId,
74};
75use log::{debug, error, info, trace, warn};
76use metrics::{Histogram, MetricSources, Metrics};
77use parking_lot::Mutex;
78use prometheus_endpoint::Registry;
79use sc_network_types::kad::{Key as KademliaKey, Record};
80
81use sc_client_api::BlockBackend;
82use sc_network_common::{
83	role::{ObservedRole, Roles},
84	ExHashT,
85};
86use sc_utils::mpsc::{tracing_unbounded, TracingUnboundedReceiver, TracingUnboundedSender};
87use sp_runtime::traits::Block as BlockT;
88
89pub use behaviour::{InboundFailure, OutboundFailure, ResponseFailure};
90pub use libp2p::identity::{DecodingError, Keypair, PublicKey};
91pub use metrics::NotificationMetrics;
92pub use protocol::NotificationsSink;
93use std::{
94	collections::{HashMap, HashSet},
95	fs, iter,
96	marker::PhantomData,
97	num::NonZeroUsize,
98	pin::Pin,
99	str,
100	sync::{
101		atomic::{AtomicUsize, Ordering},
102		Arc,
103	},
104	time::{Duration, Instant},
105};
106
107pub(crate) mod metrics;
108pub(crate) mod out_events;
109
110pub mod signature;
111pub mod traits;
112
113/// Logging target for the file.
114const LOG_TARGET: &str = "sub-libp2p";
115
116struct Libp2pBandwidthSink {
117	#[allow(deprecated)]
118	sink: Arc<transport::BandwidthSinks>,
119}
120
121impl BandwidthSink for Libp2pBandwidthSink {
122	fn total_inbound(&self) -> u64 {
123		self.sink.total_inbound()
124	}
125
126	fn total_outbound(&self) -> u64 {
127		self.sink.total_outbound()
128	}
129}
130
131/// Substrate network service. Handles network IO and manages connectivity.
132pub struct NetworkService<B: BlockT + 'static, H: ExHashT> {
133	/// Number of peers we're connected to.
134	num_connected: Arc<AtomicUsize>,
135	/// The local external addresses.
136	external_addresses: Arc<Mutex<HashSet<Multiaddr>>>,
137	/// Listen addresses. Do **NOT** include a trailing `/p2p/` with our `PeerId`.
138	listen_addresses: Arc<Mutex<HashSet<Multiaddr>>>,
139	/// Local copy of the `PeerId` of the local node.
140	local_peer_id: PeerId,
141	/// The `KeyPair` that defines the `PeerId` of the local node.
142	local_identity: Keypair,
143	/// Bandwidth logging system. Can be queried to know the average bandwidth consumed.
144	bandwidth: Arc<dyn BandwidthSink>,
145	/// Channel that sends messages to the actual worker.
146	to_worker: TracingUnboundedSender<ServiceToWorkerMsg>,
147	/// Protocol name -> `SetId` mapping for notification protocols. The map never changes after
148	/// initialization.
149	notification_protocol_ids: HashMap<ProtocolName, SetId>,
150	/// Handles to manage peer connections on notification protocols. The vector never changes
151	/// after initialization.
152	protocol_handles: Vec<protocol_controller::ProtocolHandle>,
153	/// Shortcut to sync protocol handle (`protocol_handles[0]`).
154	sync_protocol_handle: protocol_controller::ProtocolHandle,
155	/// Handle to `PeerStore`.
156	peer_store_handle: Arc<dyn PeerStoreProvider>,
157	/// Marker to pin the `H` generic. Serves no purpose except to not break backwards
158	/// compatibility.
159	_marker: PhantomData<H>,
160	/// Marker for block type
161	_block: PhantomData<B>,
162}
163
164#[async_trait::async_trait]
165impl<B, H> NetworkBackend<B, H> for NetworkWorker<B, H>
166where
167	B: BlockT + 'static,
168	H: ExHashT,
169{
170	type NotificationProtocolConfig = NonDefaultSetConfig;
171	type RequestResponseProtocolConfig = RequestResponseConfig;
172	type NetworkService<Block, Hash> = Arc<NetworkService<B, H>>;
173	type PeerStore = PeerStore;
174	type BitswapConfig = RequestResponseConfig;
175
176	fn new(params: Params<B, H, Self>) -> Result<Self, Error>
177	where
178		Self: Sized,
179	{
180		NetworkWorker::new(params)
181	}
182
183	/// Get handle to `NetworkService` of the `NetworkBackend`.
184	fn network_service(&self) -> Arc<dyn NetworkServiceT> {
185		self.service.clone()
186	}
187
188	/// Create `PeerStore`.
189	fn peer_store(
190		bootnodes: Vec<sc_network_types::PeerId>,
191		metrics_registry: Option<Registry>,
192	) -> Self::PeerStore {
193		PeerStore::new(bootnodes.into_iter().map(From::from).collect(), metrics_registry)
194	}
195
196	fn register_notification_metrics(registry: Option<&Registry>) -> NotificationMetrics {
197		NotificationMetrics::new(registry)
198	}
199
200	fn bitswap_server(
201		client: Arc<dyn BlockBackend<B> + Send + Sync>,
202	) -> (Pin<Box<dyn Future<Output = ()> + Send>>, Self::BitswapConfig) {
203		let (handler, protocol_config) = BitswapRequestHandler::new(client.clone());
204
205		(Box::pin(async move { handler.run().await }), protocol_config)
206	}
207
208	/// Create notification protocol configuration.
209	fn notification_config(
210		protocol_name: ProtocolName,
211		fallback_names: Vec<ProtocolName>,
212		max_notification_size: u64,
213		handshake: Option<NotificationHandshake>,
214		set_config: SetConfig,
215		_metrics: NotificationMetrics,
216		_peerstore_handle: Arc<dyn PeerStoreProvider>,
217	) -> (Self::NotificationProtocolConfig, Box<dyn NotificationService>) {
218		NonDefaultSetConfig::new(
219			protocol_name,
220			fallback_names,
221			max_notification_size,
222			handshake,
223			set_config,
224		)
225	}
226
227	/// Create request-response protocol configuration.
228	fn request_response_config(
229		protocol_name: ProtocolName,
230		fallback_names: Vec<ProtocolName>,
231		max_request_size: u64,
232		max_response_size: u64,
233		request_timeout: Duration,
234		inbound_queue: Option<async_channel::Sender<IncomingRequest>>,
235	) -> Self::RequestResponseProtocolConfig {
236		Self::RequestResponseProtocolConfig {
237			name: protocol_name,
238			fallback_names,
239			max_request_size,
240			max_response_size,
241			request_timeout,
242			inbound_queue,
243		}
244	}
245
246	/// Start [`NetworkBackend`] event loop.
247	async fn run(mut self) {
248		self.run().await
249	}
250}
251
252impl<B, H> NetworkWorker<B, H>
253where
254	B: BlockT + 'static,
255	H: ExHashT,
256{
257	/// Creates the network service.
258	///
259	/// Returns a `NetworkWorker` that implements `Future` and must be regularly polled in order
260	/// for the network processing to advance. From it, you can extract a `NetworkService` using
261	/// `worker.service()`. The `NetworkService` can be shared through the codebase.
262	pub fn new(params: Params<B, H, Self>) -> Result<Self, Error> {
263		let peer_store_handle = params.network_config.peer_store_handle();
264		let FullNetworkConfiguration {
265			notification_protocols,
266			request_response_protocols,
267			mut network_config,
268			..
269		} = params.network_config;
270
271		// Private and public keys configuration.
272		let local_identity = network_config.node_key.clone().into_keypair()?;
273		let local_public = local_identity.public();
274		let local_peer_id = local_public.to_peer_id();
275
276		// Convert to libp2p types.
277		let local_identity: ed25519::Keypair = local_identity.into();
278		let local_public: ed25519::PublicKey = local_public.into();
279		let local_peer_id: PeerId = local_peer_id.into();
280
281		network_config.boot_nodes = network_config
282			.boot_nodes
283			.into_iter()
284			.filter(|boot_node| boot_node.peer_id != local_peer_id.into())
285			.collect();
286		network_config.default_peers_set.reserved_nodes = network_config
287			.default_peers_set
288			.reserved_nodes
289			.into_iter()
290			.filter(|reserved_node| {
291				if reserved_node.peer_id == local_peer_id.into() {
292					warn!(
293						target: LOG_TARGET,
294						"Local peer ID used in reserved node, ignoring: {}",
295						reserved_node,
296					);
297					false
298				} else {
299					true
300				}
301			})
302			.collect();
303
304		// Ensure the listen addresses are consistent with the transport.
305		ensure_addresses_consistent_with_transport(
306			network_config.listen_addresses.iter(),
307			&network_config.transport,
308		)?;
309		ensure_addresses_consistent_with_transport(
310			network_config.boot_nodes.iter().map(|x| &x.multiaddr),
311			&network_config.transport,
312		)?;
313		ensure_addresses_consistent_with_transport(
314			network_config.default_peers_set.reserved_nodes.iter().map(|x| &x.multiaddr),
315			&network_config.transport,
316		)?;
317		for notification_protocol in &notification_protocols {
318			ensure_addresses_consistent_with_transport(
319				notification_protocol.set_config().reserved_nodes.iter().map(|x| &x.multiaddr),
320				&network_config.transport,
321			)?;
322		}
323		ensure_addresses_consistent_with_transport(
324			network_config.public_addresses.iter(),
325			&network_config.transport,
326		)?;
327
328		let (to_worker, from_service) = tracing_unbounded("mpsc_network_worker", 100_000);
329
330		if let Some(path) = &network_config.net_config_path {
331			fs::create_dir_all(path)?;
332		}
333
334		info!(
335			target: LOG_TARGET,
336			"๐Ÿท  Local node identity is: {}",
337			local_peer_id.to_base58(),
338		);
339		info!(target: LOG_TARGET, "Running libp2p network backend");
340
341		let (transport, bandwidth) = {
342			let config_mem = match network_config.transport {
343				TransportConfig::MemoryOnly => true,
344				TransportConfig::Normal { .. } => false,
345			};
346
347			transport::build_transport(local_identity.clone().into(), config_mem)
348		};
349
350		let (to_notifications, from_protocol_controllers) =
351			tracing_unbounded("mpsc_protocol_controllers_to_notifications", 10_000);
352
353		// We must prepend a hardcoded default peer set to notification protocols.
354		let all_peer_sets_iter = iter::once(&network_config.default_peers_set)
355			.chain(notification_protocols.iter().map(|protocol| protocol.set_config()));
356
357		let (protocol_handles, protocol_controllers): (Vec<_>, Vec<_>) = all_peer_sets_iter
358			.enumerate()
359			.map(|(set_id, set_config)| {
360				let proto_set_config = ProtoSetConfig {
361					in_peers: set_config.in_peers,
362					out_peers: set_config.out_peers,
363					reserved_nodes: set_config
364						.reserved_nodes
365						.iter()
366						.map(|node| node.peer_id.into())
367						.collect(),
368					reserved_only: set_config.non_reserved_mode.is_reserved_only(),
369				};
370
371				ProtocolController::new(
372					SetId::from(set_id),
373					proto_set_config,
374					to_notifications.clone(),
375					Arc::clone(&peer_store_handle),
376				)
377			})
378			.unzip();
379
380		// Shortcut to default (sync) peer set protocol handle.
381		let sync_protocol_handle = protocol_handles[0].clone();
382
383		// Spawn `ProtocolController` runners.
384		protocol_controllers
385			.into_iter()
386			.for_each(|controller| (params.executor)(controller.run().boxed()));
387
388		// Protocol name to protocol id mapping. The first protocol is always block announce (sync)
389		// protocol, aka default (hardcoded) peer set.
390		let notification_protocol_ids: HashMap<ProtocolName, SetId> =
391			iter::once(&params.block_announce_config)
392				.chain(notification_protocols.iter())
393				.enumerate()
394				.map(|(index, protocol)| (protocol.protocol_name().clone(), SetId::from(index)))
395				.collect();
396
397		let known_addresses = {
398			// Collect all reserved nodes and bootnodes addresses.
399			let mut addresses: Vec<_> = network_config
400				.default_peers_set
401				.reserved_nodes
402				.iter()
403				.map(|reserved| (reserved.peer_id, reserved.multiaddr.clone()))
404				.chain(notification_protocols.iter().flat_map(|protocol| {
405					protocol
406						.set_config()
407						.reserved_nodes
408						.iter()
409						.map(|reserved| (reserved.peer_id, reserved.multiaddr.clone()))
410				}))
411				.chain(
412					network_config
413						.boot_nodes
414						.iter()
415						.map(|bootnode| (bootnode.peer_id, bootnode.multiaddr.clone())),
416				)
417				.collect();
418
419			// Remove possible duplicates.
420			addresses.sort();
421			addresses.dedup();
422
423			addresses
424		};
425
426		// Check for duplicate bootnodes.
427		network_config.boot_nodes.iter().try_for_each(|bootnode| {
428			if let Some(other) = network_config
429				.boot_nodes
430				.iter()
431				.filter(|o| o.multiaddr == bootnode.multiaddr)
432				.find(|o| o.peer_id != bootnode.peer_id)
433			{
434				Err(Error::DuplicateBootnode {
435					address: bootnode.multiaddr.clone().into(),
436					first_id: bootnode.peer_id.into(),
437					second_id: other.peer_id.into(),
438				})
439			} else {
440				Ok(())
441			}
442		})?;
443
444		// List of bootnode multiaddresses.
445		let mut boot_node_ids = HashMap::<PeerId, Vec<Multiaddr>>::new();
446
447		for bootnode in network_config.boot_nodes.iter() {
448			boot_node_ids
449				.entry(bootnode.peer_id.into())
450				.or_default()
451				.push(bootnode.multiaddr.clone().into());
452		}
453
454		let boot_node_ids = Arc::new(boot_node_ids);
455
456		let num_connected = Arc::new(AtomicUsize::new(0));
457		let external_addresses = Arc::new(Mutex::new(HashSet::new()));
458
459		let (protocol, notif_protocol_handles) = Protocol::new(
460			From::from(&params.role),
461			params.notification_metrics,
462			notification_protocols,
463			params.block_announce_config,
464			Arc::clone(&peer_store_handle),
465			protocol_handles.clone(),
466			from_protocol_controllers,
467		)?;
468
469		// Build the swarm.
470		let (mut swarm, bandwidth): (Swarm<Behaviour<B>>, _) = {
471			let user_agent =
472				format!("{} ({})", network_config.client_version, network_config.node_name);
473
474			let discovery_config = {
475				let mut config = DiscoveryConfig::new(local_peer_id);
476				config.with_permanent_addresses(
477					known_addresses
478						.iter()
479						.map(|(peer, address)| (peer.into(), address.clone().into()))
480						.collect::<Vec<_>>(),
481				);
482				config.discovery_limit(u64::from(network_config.default_peers_set.out_peers) + 15);
483				config.with_kademlia(
484					params.genesis_hash,
485					params.fork_id.as_deref(),
486					&params.protocol_id,
487				);
488				config.with_dht_random_walk(network_config.enable_dht_random_walk);
489				config.allow_non_globals_in_dht(network_config.allow_non_globals_in_dht);
490				config.use_kademlia_disjoint_query_paths(
491					network_config.kademlia_disjoint_query_paths,
492				);
493				config.with_kademlia_replication_factor(network_config.kademlia_replication_factor);
494
495				match network_config.transport {
496					TransportConfig::MemoryOnly => {
497						config.with_mdns(false);
498						config.allow_private_ip(false);
499					},
500					TransportConfig::Normal {
501						enable_mdns,
502						allow_private_ip: allow_private_ipv4,
503						..
504					} => {
505						config.with_mdns(enable_mdns);
506						config.allow_private_ip(allow_private_ipv4);
507					},
508				}
509
510				config
511			};
512
513			let behaviour = {
514				let result = Behaviour::new(
515					protocol,
516					user_agent,
517					local_public.into(),
518					discovery_config,
519					request_response_protocols,
520					Arc::clone(&peer_store_handle),
521					external_addresses.clone(),
522					network_config.public_addresses.iter().cloned().map(Into::into).collect(),
523					ConnectionLimits::default()
524						.with_max_established_per_peer(Some(crate::MAX_CONNECTIONS_PER_PEER as u32))
525						.with_max_established_incoming(Some(
526							crate::MAX_CONNECTIONS_ESTABLISHED_INCOMING,
527						)),
528				);
529
530				match result {
531					Ok(b) => b,
532					Err(crate::request_responses::RegisterError::DuplicateProtocol(proto)) =>
533						return Err(Error::DuplicateRequestResponseProtocol { protocol: proto }),
534				}
535			};
536
537			let swarm = {
538				struct SpawnImpl<F>(F);
539				impl<F: Fn(Pin<Box<dyn Future<Output = ()> + Send>>)> Executor for SpawnImpl<F> {
540					fn exec(&self, f: Pin<Box<dyn Future<Output = ()> + Send>>) {
541						(self.0)(f)
542					}
543				}
544
545				let config = SwarmConfig::with_executor(SpawnImpl(params.executor))
546					.with_substream_upgrade_protocol_override(upgrade::Version::V1)
547					.with_notify_handler_buffer_size(NonZeroUsize::new(32).expect("32 != 0; qed"))
548					// NOTE: 24 is somewhat arbitrary and should be tuned in the future if
549					// necessary. See <https://github.com/paritytech/substrate/pull/6080>
550					.with_per_connection_event_buffer_size(24)
551					.with_max_negotiating_inbound_streams(2048)
552					.with_idle_connection_timeout(network_config.idle_connection_timeout);
553
554				Swarm::new(transport, behaviour, local_peer_id, config)
555			};
556
557			(swarm, Arc::new(Libp2pBandwidthSink { sink: bandwidth }))
558		};
559
560		// Initialize the metrics.
561		let metrics = match &params.metrics_registry {
562			Some(registry) => Some(metrics::register(
563				registry,
564				MetricSources {
565					bandwidth: bandwidth.clone(),
566					connected_peers: num_connected.clone(),
567				},
568			)?),
569			None => None,
570		};
571
572		// Listen on multiaddresses.
573		for addr in &network_config.listen_addresses {
574			if let Err(err) = Swarm::<Behaviour<B>>::listen_on(&mut swarm, addr.clone().into()) {
575				warn!(target: LOG_TARGET, "Can't listen on {} because: {:?}", addr, err)
576			}
577		}
578
579		// Add external addresses.
580		for addr in &network_config.public_addresses {
581			Swarm::<Behaviour<B>>::add_external_address(&mut swarm, addr.clone().into());
582		}
583
584		let listen_addresses_set = Arc::new(Mutex::new(HashSet::new()));
585
586		let service = Arc::new(NetworkService {
587			bandwidth,
588			external_addresses,
589			listen_addresses: listen_addresses_set.clone(),
590			num_connected: num_connected.clone(),
591			local_peer_id,
592			local_identity: local_identity.into(),
593			to_worker,
594			notification_protocol_ids,
595			protocol_handles,
596			sync_protocol_handle,
597			peer_store_handle: Arc::clone(&peer_store_handle),
598			_marker: PhantomData,
599			_block: Default::default(),
600		});
601
602		Ok(NetworkWorker {
603			listen_addresses: listen_addresses_set,
604			num_connected,
605			network_service: swarm,
606			service,
607			from_service,
608			event_streams: out_events::OutChannels::new(params.metrics_registry.as_ref())?,
609			metrics,
610			boot_node_ids,
611			reported_invalid_boot_nodes: Default::default(),
612			peer_store_handle: Arc::clone(&peer_store_handle),
613			notif_protocol_handles,
614			_marker: Default::default(),
615			_block: Default::default(),
616		})
617	}
618
619	/// High-level network status information.
620	pub fn status(&self) -> NetworkStatus {
621		NetworkStatus {
622			num_connected_peers: self.num_connected_peers(),
623			total_bytes_inbound: self.total_bytes_inbound(),
624			total_bytes_outbound: self.total_bytes_outbound(),
625		}
626	}
627
628	/// Returns the total number of bytes received so far.
629	pub fn total_bytes_inbound(&self) -> u64 {
630		self.service.bandwidth.total_inbound()
631	}
632
633	/// Returns the total number of bytes sent so far.
634	pub fn total_bytes_outbound(&self) -> u64 {
635		self.service.bandwidth.total_outbound()
636	}
637
638	/// Returns the number of peers we're connected to.
639	pub fn num_connected_peers(&self) -> usize {
640		self.network_service.behaviour().user_protocol().num_sync_peers()
641	}
642
643	/// Adds an address for a node.
644	pub fn add_known_address(&mut self, peer_id: PeerId, addr: Multiaddr) {
645		self.network_service.behaviour_mut().add_known_address(peer_id, addr);
646	}
647
648	/// Return a `NetworkService` that can be shared through the code base and can be used to
649	/// manipulate the worker.
650	pub fn service(&self) -> &Arc<NetworkService<B, H>> {
651		&self.service
652	}
653
654	/// Returns the local `PeerId`.
655	pub fn local_peer_id(&self) -> &PeerId {
656		Swarm::<Behaviour<B>>::local_peer_id(&self.network_service)
657	}
658
659	/// Returns the list of addresses we are listening on.
660	///
661	/// Does **NOT** include a trailing `/p2p/` with our `PeerId`.
662	pub fn listen_addresses(&self) -> impl Iterator<Item = &Multiaddr> {
663		Swarm::<Behaviour<B>>::listeners(&self.network_service)
664	}
665
666	/// Get network state.
667	///
668	/// **Note**: Use this only for debugging. This API is unstable. There are warnings literally
669	/// everywhere about this. Please don't use this function to retrieve actual information.
670	pub fn network_state(&mut self) -> NetworkState {
671		let swarm = &mut self.network_service;
672		let open = swarm.behaviour_mut().user_protocol().open_peers().cloned().collect::<Vec<_>>();
673		let connected_peers = {
674			let swarm = &mut *swarm;
675			open.iter()
676				.filter_map(move |peer_id| {
677					let known_addresses = if let Ok(addrs) =
678						NetworkBehaviour::handle_pending_outbound_connection(
679							swarm.behaviour_mut(),
680							ConnectionId::new_unchecked(0), // dummy value
681							Some(*peer_id),
682							&vec![],
683							Endpoint::Listener,
684						) {
685						addrs.into_iter().collect()
686					} else {
687						error!(target: LOG_TARGET, "Was not able to get known addresses for {:?}", peer_id);
688						return None
689					};
690
691					let endpoint = if let Some(e) =
692						swarm.behaviour_mut().node(peer_id).and_then(|i| i.endpoint())
693					{
694						e.clone().into()
695					} else {
696						error!(target: LOG_TARGET, "Found state inconsistency between custom protocol \
697						and debug information about {:?}", peer_id);
698						return None
699					};
700
701					Some((
702						peer_id.to_base58(),
703						NetworkStatePeer {
704							endpoint,
705							version_string: swarm
706								.behaviour_mut()
707								.node(peer_id)
708								.and_then(|i| i.client_version().map(|s| s.to_owned())),
709							latest_ping_time: swarm
710								.behaviour_mut()
711								.node(peer_id)
712								.and_then(|i| i.latest_ping()),
713							known_addresses,
714						},
715					))
716				})
717				.collect()
718		};
719
720		let not_connected_peers = {
721			let swarm = &mut *swarm;
722			swarm
723				.behaviour_mut()
724				.known_peers()
725				.into_iter()
726				.filter(|p| open.iter().all(|n| n != p))
727				.map(move |peer_id| {
728					let known_addresses = if let Ok(addrs) =
729						NetworkBehaviour::handle_pending_outbound_connection(
730							swarm.behaviour_mut(),
731							ConnectionId::new_unchecked(0), // dummy value
732							Some(peer_id),
733							&vec![],
734							Endpoint::Listener,
735						) {
736						addrs.into_iter().collect()
737					} else {
738						error!(target: LOG_TARGET, "Was not able to get known addresses for {:?}", peer_id);
739						Default::default()
740					};
741
742					(
743						peer_id.to_base58(),
744						NetworkStateNotConnectedPeer {
745							version_string: swarm
746								.behaviour_mut()
747								.node(&peer_id)
748								.and_then(|i| i.client_version().map(|s| s.to_owned())),
749							latest_ping_time: swarm
750								.behaviour_mut()
751								.node(&peer_id)
752								.and_then(|i| i.latest_ping()),
753							known_addresses,
754						},
755					)
756				})
757				.collect()
758		};
759
760		let peer_id = Swarm::<Behaviour<B>>::local_peer_id(swarm).to_base58();
761		let listened_addresses = swarm.listeners().cloned().collect();
762		let external_addresses = swarm.external_addresses().cloned().collect();
763
764		NetworkState {
765			peer_id,
766			listened_addresses,
767			external_addresses,
768			connected_peers,
769			not_connected_peers,
770			// TODO: Check what info we can include here.
771			//       Issue reference: https://github.com/paritytech/substrate/issues/14160.
772			peerset: serde_json::json!(
773				"Unimplemented. See https://github.com/paritytech/substrate/issues/14160."
774			),
775		}
776	}
777
778	/// Removes a `PeerId` from the list of reserved peers.
779	pub fn remove_reserved_peer(&self, peer: PeerId) {
780		self.service.remove_reserved_peer(peer.into());
781	}
782
783	/// Adds a `PeerId` and its `Multiaddr` as reserved.
784	pub fn add_reserved_peer(&self, peer: MultiaddrWithPeerId) -> Result<(), String> {
785		self.service.add_reserved_peer(peer)
786	}
787}
788
789impl<B: BlockT + 'static, H: ExHashT> NetworkService<B, H> {
790	/// Get network state.
791	///
792	/// **Note**: Use this only for debugging. This API is unstable. There are warnings literally
793	/// everywhere about this. Please don't use this function to retrieve actual information.
794	///
795	/// Returns an error if the `NetworkWorker` is no longer running.
796	pub async fn network_state(&self) -> Result<NetworkState, ()> {
797		let (tx, rx) = oneshot::channel();
798
799		let _ = self
800			.to_worker
801			.unbounded_send(ServiceToWorkerMsg::NetworkState { pending_response: tx });
802
803		match rx.await {
804			Ok(v) => v.map_err(|_| ()),
805			// The channel can only be closed if the network worker no longer exists.
806			Err(_) => Err(()),
807		}
808	}
809
810	/// Utility function to extract `PeerId` from each `Multiaddr` for peer set updates.
811	///
812	/// Returns an `Err` if one of the given addresses is invalid or contains an
813	/// invalid peer ID (which includes the local peer ID).
814	fn split_multiaddr_and_peer_id(
815		&self,
816		peers: HashSet<Multiaddr>,
817	) -> Result<Vec<(PeerId, Multiaddr)>, String> {
818		peers
819			.into_iter()
820			.map(|mut addr| {
821				let peer = match addr.pop() {
822					Some(multiaddr::Protocol::P2p(peer_id)) => peer_id,
823					_ => return Err("Missing PeerId from address".to_string()),
824				};
825
826				// Make sure the local peer ID is never added to the PSM
827				// or added as a "known address", even if given.
828				if peer == self.local_peer_id {
829					Err("Local peer ID in peer set.".to_string())
830				} else {
831					Ok((peer, addr))
832				}
833			})
834			.collect::<Result<Vec<(PeerId, Multiaddr)>, String>>()
835	}
836}
837
838impl<B, H> NetworkStateInfo for NetworkService<B, H>
839where
840	B: sp_runtime::traits::Block,
841	H: ExHashT,
842{
843	/// Returns the local external addresses.
844	fn external_addresses(&self) -> Vec<sc_network_types::multiaddr::Multiaddr> {
845		self.external_addresses.lock().iter().cloned().map(Into::into).collect()
846	}
847
848	/// Returns the listener addresses (without trailing `/p2p/` with our `PeerId`).
849	fn listen_addresses(&self) -> Vec<sc_network_types::multiaddr::Multiaddr> {
850		self.listen_addresses.lock().iter().cloned().map(Into::into).collect()
851	}
852
853	/// Returns the local Peer ID.
854	fn local_peer_id(&self) -> sc_network_types::PeerId {
855		self.local_peer_id.into()
856	}
857}
858
859impl<B, H> NetworkSigner for NetworkService<B, H>
860where
861	B: sp_runtime::traits::Block,
862	H: ExHashT,
863{
864	fn sign_with_local_identity(&self, msg: Vec<u8>) -> Result<Signature, SigningError> {
865		let public_key = self.local_identity.public();
866		let bytes = self.local_identity.sign(msg.as_ref())?;
867
868		Ok(Signature {
869			public_key: crate::service::signature::PublicKey::Libp2p(public_key),
870			bytes,
871		})
872	}
873
874	fn verify(
875		&self,
876		peer_id: sc_network_types::PeerId,
877		public_key: &Vec<u8>,
878		signature: &Vec<u8>,
879		message: &Vec<u8>,
880	) -> Result<bool, String> {
881		let public_key =
882			PublicKey::try_decode_protobuf(&public_key).map_err(|error| error.to_string())?;
883		let peer_id: PeerId = peer_id.into();
884		let remote: libp2p::PeerId = public_key.to_peer_id();
885
886		Ok(peer_id == remote && public_key.verify(message, signature))
887	}
888}
889
890impl<B, H> NetworkDHTProvider for NetworkService<B, H>
891where
892	B: BlockT + 'static,
893	H: ExHashT,
894{
895	/// Start finding closest peerst to the target peer ID in the DHT.
896	///
897	/// This will generate either a `ClosestPeersFound` or a `ClosestPeersNotFound` event and pass
898	/// it as an item on the [`NetworkWorker`] stream.
899	fn find_closest_peers(&self, target: sc_network_types::PeerId) {
900		let _ = self
901			.to_worker
902			.unbounded_send(ServiceToWorkerMsg::FindClosestPeers(target.into()));
903	}
904
905	/// Start getting a value from the DHT.
906	///
907	/// This will generate either a `ValueFound` or a `ValueNotFound` event and pass it as an
908	/// item on the [`NetworkWorker`] stream.
909	fn get_value(&self, key: &KademliaKey) {
910		let _ = self.to_worker.unbounded_send(ServiceToWorkerMsg::GetValue(key.clone()));
911	}
912
913	/// Start putting a value in the DHT.
914	///
915	/// This will generate either a `ValuePut` or a `ValuePutFailed` event and pass it as an
916	/// item on the [`NetworkWorker`] stream.
917	fn put_value(&self, key: KademliaKey, value: Vec<u8>) {
918		let _ = self.to_worker.unbounded_send(ServiceToWorkerMsg::PutValue(key, value));
919	}
920
921	fn put_record_to(
922		&self,
923		record: Record,
924		peers: HashSet<sc_network_types::PeerId>,
925		update_local_storage: bool,
926	) {
927		let _ = self.to_worker.unbounded_send(ServiceToWorkerMsg::PutRecordTo {
928			record,
929			peers,
930			update_local_storage,
931		});
932	}
933
934	fn store_record(
935		&self,
936		key: KademliaKey,
937		value: Vec<u8>,
938		publisher: Option<sc_network_types::PeerId>,
939		expires: Option<Instant>,
940	) {
941		let _ = self.to_worker.unbounded_send(ServiceToWorkerMsg::StoreRecord(
942			key,
943			value,
944			publisher.map(Into::into),
945			expires,
946		));
947	}
948
949	fn start_providing(&self, key: KademliaKey) {
950		let _ = self.to_worker.unbounded_send(ServiceToWorkerMsg::StartProviding(key));
951	}
952
953	fn stop_providing(&self, key: KademliaKey) {
954		let _ = self.to_worker.unbounded_send(ServiceToWorkerMsg::StopProviding(key));
955	}
956
957	fn get_providers(&self, key: KademliaKey) {
958		let _ = self.to_worker.unbounded_send(ServiceToWorkerMsg::GetProviders(key));
959	}
960}
961
962#[async_trait::async_trait]
963impl<B, H> NetworkStatusProvider for NetworkService<B, H>
964where
965	B: BlockT + 'static,
966	H: ExHashT,
967{
968	async fn status(&self) -> Result<NetworkStatus, ()> {
969		let (tx, rx) = oneshot::channel();
970
971		let _ = self
972			.to_worker
973			.unbounded_send(ServiceToWorkerMsg::NetworkStatus { pending_response: tx });
974
975		match rx.await {
976			Ok(v) => v.map_err(|_| ()),
977			// The channel can only be closed if the network worker no longer exists.
978			Err(_) => Err(()),
979		}
980	}
981
982	async fn network_state(&self) -> Result<NetworkState, ()> {
983		let (tx, rx) = oneshot::channel();
984
985		let _ = self
986			.to_worker
987			.unbounded_send(ServiceToWorkerMsg::NetworkState { pending_response: tx });
988
989		match rx.await {
990			Ok(v) => v.map_err(|_| ()),
991			// The channel can only be closed if the network worker no longer exists.
992			Err(_) => Err(()),
993		}
994	}
995}
996
997#[async_trait::async_trait]
998impl<B, H> NetworkPeers for NetworkService<B, H>
999where
1000	B: BlockT + 'static,
1001	H: ExHashT,
1002{
1003	fn set_authorized_peers(&self, peers: HashSet<sc_network_types::PeerId>) {
1004		self.sync_protocol_handle
1005			.set_reserved_peers(peers.iter().map(|peer| (*peer).into()).collect());
1006	}
1007
1008	fn set_authorized_only(&self, reserved_only: bool) {
1009		self.sync_protocol_handle.set_reserved_only(reserved_only);
1010	}
1011
1012	fn add_known_address(
1013		&self,
1014		peer_id: sc_network_types::PeerId,
1015		addr: sc_network_types::multiaddr::Multiaddr,
1016	) {
1017		let _ = self
1018			.to_worker
1019			.unbounded_send(ServiceToWorkerMsg::AddKnownAddress(peer_id.into(), addr.into()));
1020	}
1021
1022	fn report_peer(&self, peer_id: sc_network_types::PeerId, cost_benefit: ReputationChange) {
1023		self.peer_store_handle.report_peer(peer_id, cost_benefit);
1024	}
1025
1026	fn peer_reputation(&self, peer_id: &sc_network_types::PeerId) -> i32 {
1027		self.peer_store_handle.peer_reputation(peer_id)
1028	}
1029
1030	fn disconnect_peer(&self, peer_id: sc_network_types::PeerId, protocol: ProtocolName) {
1031		let _ = self
1032			.to_worker
1033			.unbounded_send(ServiceToWorkerMsg::DisconnectPeer(peer_id.into(), protocol));
1034	}
1035
1036	fn accept_unreserved_peers(&self) {
1037		self.sync_protocol_handle.set_reserved_only(false);
1038	}
1039
1040	fn deny_unreserved_peers(&self) {
1041		self.sync_protocol_handle.set_reserved_only(true);
1042	}
1043
1044	fn add_reserved_peer(&self, peer: MultiaddrWithPeerId) -> Result<(), String> {
1045		// Make sure the local peer ID is never added as a reserved peer.
1046		if peer.peer_id == self.local_peer_id.into() {
1047			return Err("Local peer ID cannot be added as a reserved peer.".to_string())
1048		}
1049
1050		let _ = self.to_worker.unbounded_send(ServiceToWorkerMsg::AddKnownAddress(
1051			peer.peer_id.into(),
1052			peer.multiaddr.into(),
1053		));
1054		self.sync_protocol_handle.add_reserved_peer(peer.peer_id.into());
1055
1056		Ok(())
1057	}
1058
1059	fn remove_reserved_peer(&self, peer_id: sc_network_types::PeerId) {
1060		self.sync_protocol_handle.remove_reserved_peer(peer_id.into());
1061	}
1062
1063	fn set_reserved_peers(
1064		&self,
1065		protocol: ProtocolName,
1066		peers: HashSet<sc_network_types::multiaddr::Multiaddr>,
1067	) -> Result<(), String> {
1068		let Some(set_id) = self.notification_protocol_ids.get(&protocol) else {
1069			return Err(format!("Cannot set reserved peers for unknown protocol: {}", protocol))
1070		};
1071
1072		let peers: HashSet<Multiaddr> = peers.into_iter().map(Into::into).collect();
1073		let peers_addrs = self.split_multiaddr_and_peer_id(peers)?;
1074
1075		let mut peers: HashSet<PeerId> = HashSet::with_capacity(peers_addrs.len());
1076
1077		for (peer_id, addr) in peers_addrs.into_iter() {
1078			// Make sure the local peer ID is never added to the PSM.
1079			if peer_id == self.local_peer_id {
1080				return Err("Local peer ID cannot be added as a reserved peer.".to_string())
1081			}
1082
1083			peers.insert(peer_id.into());
1084
1085			if !addr.is_empty() {
1086				let _ = self
1087					.to_worker
1088					.unbounded_send(ServiceToWorkerMsg::AddKnownAddress(peer_id, addr));
1089			}
1090		}
1091
1092		self.protocol_handles[usize::from(*set_id)].set_reserved_peers(peers);
1093
1094		Ok(())
1095	}
1096
1097	fn add_peers_to_reserved_set(
1098		&self,
1099		protocol: ProtocolName,
1100		peers: HashSet<sc_network_types::multiaddr::Multiaddr>,
1101	) -> Result<(), String> {
1102		let Some(set_id) = self.notification_protocol_ids.get(&protocol) else {
1103			return Err(format!(
1104				"Cannot add peers to reserved set of unknown protocol: {}",
1105				protocol
1106			))
1107		};
1108
1109		let peers: HashSet<Multiaddr> = peers.into_iter().map(Into::into).collect();
1110		let peers = self.split_multiaddr_and_peer_id(peers)?;
1111
1112		for (peer_id, addr) in peers.into_iter() {
1113			// Make sure the local peer ID is never added to the PSM.
1114			if peer_id == self.local_peer_id {
1115				return Err("Local peer ID cannot be added as a reserved peer.".to_string())
1116			}
1117
1118			if !addr.is_empty() {
1119				let _ = self
1120					.to_worker
1121					.unbounded_send(ServiceToWorkerMsg::AddKnownAddress(peer_id, addr));
1122			}
1123
1124			self.protocol_handles[usize::from(*set_id)].add_reserved_peer(peer_id);
1125		}
1126
1127		Ok(())
1128	}
1129
1130	fn remove_peers_from_reserved_set(
1131		&self,
1132		protocol: ProtocolName,
1133		peers: Vec<sc_network_types::PeerId>,
1134	) -> Result<(), String> {
1135		let Some(set_id) = self.notification_protocol_ids.get(&protocol) else {
1136			return Err(format!(
1137				"Cannot remove peers from reserved set of unknown protocol: {}",
1138				protocol
1139			))
1140		};
1141
1142		for peer_id in peers.into_iter() {
1143			self.protocol_handles[usize::from(*set_id)].remove_reserved_peer(peer_id.into());
1144		}
1145
1146		Ok(())
1147	}
1148
1149	fn sync_num_connected(&self) -> usize {
1150		self.num_connected.load(Ordering::Relaxed)
1151	}
1152
1153	fn peer_role(
1154		&self,
1155		peer_id: sc_network_types::PeerId,
1156		handshake: Vec<u8>,
1157	) -> Option<ObservedRole> {
1158		match Roles::decode_all(&mut &handshake[..]) {
1159			Ok(role) => Some(role.into()),
1160			Err(_) => {
1161				log::debug!(target: LOG_TARGET, "handshake doesn't contain peer role: {handshake:?}");
1162				self.peer_store_handle.peer_role(&(peer_id.into()))
1163			},
1164		}
1165	}
1166
1167	/// Get the list of reserved peers.
1168	///
1169	/// Returns an error if the `NetworkWorker` is no longer running.
1170	async fn reserved_peers(&self) -> Result<Vec<sc_network_types::PeerId>, ()> {
1171		let (tx, rx) = oneshot::channel();
1172
1173		self.sync_protocol_handle.reserved_peers(tx);
1174
1175		// The channel can only be closed if `ProtocolController` no longer exists.
1176		rx.await
1177			.map(|peers| peers.into_iter().map(From::from).collect())
1178			.map_err(|_| ())
1179	}
1180}
1181
1182impl<B, H> NetworkEventStream for NetworkService<B, H>
1183where
1184	B: BlockT + 'static,
1185	H: ExHashT,
1186{
1187	fn event_stream(&self, name: &'static str) -> Pin<Box<dyn Stream<Item = Event> + Send>> {
1188		let (tx, rx) = out_events::channel(name, 100_000);
1189		let _ = self.to_worker.unbounded_send(ServiceToWorkerMsg::EventStream(tx));
1190		Box::pin(rx)
1191	}
1192}
1193
1194#[async_trait::async_trait]
1195impl<B, H> NetworkRequest for NetworkService<B, H>
1196where
1197	B: BlockT + 'static,
1198	H: ExHashT,
1199{
1200	async fn request(
1201		&self,
1202		target: sc_network_types::PeerId,
1203		protocol: ProtocolName,
1204		request: Vec<u8>,
1205		fallback_request: Option<(Vec<u8>, ProtocolName)>,
1206		connect: IfDisconnected,
1207	) -> Result<(Vec<u8>, ProtocolName), RequestFailure> {
1208		let (tx, rx) = oneshot::channel();
1209
1210		self.start_request(target.into(), protocol, request, fallback_request, tx, connect);
1211
1212		match rx.await {
1213			Ok(v) => v,
1214			// The channel can only be closed if the network worker no longer exists. If the
1215			// network worker no longer exists, then all connections to `target` are necessarily
1216			// closed, and we legitimately report this situation as a "ConnectionClosed".
1217			Err(_) => Err(RequestFailure::Network(OutboundFailure::ConnectionClosed)),
1218		}
1219	}
1220
1221	fn start_request(
1222		&self,
1223		target: sc_network_types::PeerId,
1224		protocol: ProtocolName,
1225		request: Vec<u8>,
1226		fallback_request: Option<(Vec<u8>, ProtocolName)>,
1227		tx: oneshot::Sender<Result<(Vec<u8>, ProtocolName), RequestFailure>>,
1228		connect: IfDisconnected,
1229	) {
1230		let _ = self.to_worker.unbounded_send(ServiceToWorkerMsg::Request {
1231			target: target.into(),
1232			protocol: protocol.into(),
1233			request,
1234			fallback_request,
1235			pending_response: tx,
1236			connect,
1237		});
1238	}
1239}
1240
1241/// A `NotificationSender` allows for sending notifications to a peer with a chosen protocol.
1242#[must_use]
1243pub struct NotificationSender {
1244	sink: NotificationsSink,
1245
1246	/// Name of the protocol on the wire.
1247	protocol_name: ProtocolName,
1248
1249	/// Field extracted from the [`Metrics`] struct and necessary to report the
1250	/// notifications-related metrics.
1251	notification_size_metric: Option<Histogram>,
1252}
1253
1254#[async_trait::async_trait]
1255impl NotificationSenderT for NotificationSender {
1256	async fn ready(
1257		&self,
1258	) -> Result<Box<dyn NotificationSenderReadyT + '_>, NotificationSenderError> {
1259		Ok(Box::new(NotificationSenderReady {
1260			ready: match self.sink.reserve_notification().await {
1261				Ok(r) => Some(r),
1262				Err(()) => return Err(NotificationSenderError::Closed),
1263			},
1264			peer_id: self.sink.peer_id(),
1265			protocol_name: &self.protocol_name,
1266			notification_size_metric: self.notification_size_metric.clone(),
1267		}))
1268	}
1269}
1270
1271/// Reserved slot in the notifications buffer, ready to accept data.
1272#[must_use]
1273pub struct NotificationSenderReady<'a> {
1274	ready: Option<Ready<'a>>,
1275
1276	/// Target of the notification.
1277	peer_id: &'a PeerId,
1278
1279	/// Name of the protocol on the wire.
1280	protocol_name: &'a ProtocolName,
1281
1282	/// Field extracted from the [`Metrics`] struct and necessary to report the
1283	/// notifications-related metrics.
1284	notification_size_metric: Option<Histogram>,
1285}
1286
1287impl<'a> NotificationSenderReadyT for NotificationSenderReady<'a> {
1288	fn send(&mut self, notification: Vec<u8>) -> Result<(), NotificationSenderError> {
1289		if let Some(notification_size_metric) = &self.notification_size_metric {
1290			notification_size_metric.observe(notification.len() as f64);
1291		}
1292
1293		trace!(
1294			target: LOG_TARGET,
1295			"External API => Notification({:?}, {}, {} bytes)",
1296			self.peer_id, self.protocol_name, notification.len(),
1297		);
1298		trace!(target: LOG_TARGET, "Handler({:?}) <= Async notification", self.peer_id);
1299
1300		self.ready
1301			.take()
1302			.ok_or(NotificationSenderError::Closed)?
1303			.send(notification)
1304			.map_err(|()| NotificationSenderError::Closed)
1305	}
1306}
1307
1308/// Messages sent from the `NetworkService` to the `NetworkWorker`.
1309///
1310/// Each entry corresponds to a method of `NetworkService`.
1311enum ServiceToWorkerMsg {
1312	FindClosestPeers(PeerId),
1313	GetValue(KademliaKey),
1314	PutValue(KademliaKey, Vec<u8>),
1315	PutRecordTo {
1316		record: Record,
1317		peers: HashSet<sc_network_types::PeerId>,
1318		update_local_storage: bool,
1319	},
1320	StoreRecord(KademliaKey, Vec<u8>, Option<PeerId>, Option<Instant>),
1321	StartProviding(KademliaKey),
1322	StopProviding(KademliaKey),
1323	GetProviders(KademliaKey),
1324	AddKnownAddress(PeerId, Multiaddr),
1325	EventStream(out_events::Sender),
1326	Request {
1327		target: PeerId,
1328		protocol: ProtocolName,
1329		request: Vec<u8>,
1330		fallback_request: Option<(Vec<u8>, ProtocolName)>,
1331		pending_response: oneshot::Sender<Result<(Vec<u8>, ProtocolName), RequestFailure>>,
1332		connect: IfDisconnected,
1333	},
1334	NetworkStatus {
1335		pending_response: oneshot::Sender<Result<NetworkStatus, RequestFailure>>,
1336	},
1337	NetworkState {
1338		pending_response: oneshot::Sender<Result<NetworkState, RequestFailure>>,
1339	},
1340	DisconnectPeer(PeerId, ProtocolName),
1341}
1342
1343/// Main network worker. Must be polled in order for the network to advance.
1344///
1345/// You are encouraged to poll this in a separate background thread or task.
1346#[must_use = "The NetworkWorker must be polled in order for the network to advance"]
1347pub struct NetworkWorker<B, H>
1348where
1349	B: BlockT + 'static,
1350	H: ExHashT,
1351{
1352	/// Updated by the `NetworkWorker` and loaded by the `NetworkService`.
1353	listen_addresses: Arc<Mutex<HashSet<Multiaddr>>>,
1354	/// Updated by the `NetworkWorker` and loaded by the `NetworkService`.
1355	num_connected: Arc<AtomicUsize>,
1356	/// The network service that can be extracted and shared through the codebase.
1357	service: Arc<NetworkService<B, H>>,
1358	/// The *actual* network.
1359	network_service: Swarm<Behaviour<B>>,
1360	/// Messages from the [`NetworkService`] that must be processed.
1361	from_service: TracingUnboundedReceiver<ServiceToWorkerMsg>,
1362	/// Senders for events that happen on the network.
1363	event_streams: out_events::OutChannels,
1364	/// Prometheus network metrics.
1365	metrics: Option<Metrics>,
1366	/// The `PeerId`'s of all boot nodes mapped to the registered addresses.
1367	boot_node_ids: Arc<HashMap<PeerId, Vec<Multiaddr>>>,
1368	/// Boot nodes that we already have reported as invalid.
1369	reported_invalid_boot_nodes: HashSet<PeerId>,
1370	/// Peer reputation store handle.
1371	peer_store_handle: Arc<dyn PeerStoreProvider>,
1372	/// Notification protocol handles.
1373	notif_protocol_handles: Vec<protocol::ProtocolHandle>,
1374	/// Marker to pin the `H` generic. Serves no purpose except to not break backwards
1375	/// compatibility.
1376	_marker: PhantomData<H>,
1377	/// Marker for block type
1378	_block: PhantomData<B>,
1379}
1380
1381impl<B, H> NetworkWorker<B, H>
1382where
1383	B: BlockT + 'static,
1384	H: ExHashT,
1385{
1386	/// Run the network.
1387	pub async fn run(mut self) {
1388		while self.next_action().await {}
1389	}
1390
1391	/// Perform one action on the network.
1392	///
1393	/// Returns `false` when the worker should be shutdown.
1394	/// Use in tests only.
1395	pub async fn next_action(&mut self) -> bool {
1396		futures::select! {
1397			// Next message from the service.
1398			msg = self.from_service.next() => {
1399				if let Some(msg) = msg {
1400					self.handle_worker_message(msg);
1401				} else {
1402					return false
1403				}
1404			},
1405			// Next event from `Swarm` (the stream guaranteed to never terminate).
1406			event = self.network_service.select_next_some() => {
1407				self.handle_swarm_event(event);
1408			},
1409		};
1410
1411		// Update the `num_connected` count shared with the `NetworkService`.
1412		let num_connected_peers = self.network_service.behaviour().user_protocol().num_sync_peers();
1413		self.num_connected.store(num_connected_peers, Ordering::Relaxed);
1414
1415		if let Some(metrics) = self.metrics.as_ref() {
1416			if let Some(buckets) = self.network_service.behaviour_mut().num_entries_per_kbucket() {
1417				for (lower_ilog2_bucket_bound, num_entries) in buckets {
1418					metrics
1419						.kbuckets_num_nodes
1420						.with_label_values(&[&lower_ilog2_bucket_bound.to_string()])
1421						.set(num_entries as u64);
1422				}
1423			}
1424			if let Some(num_entries) = self.network_service.behaviour_mut().num_kademlia_records() {
1425				metrics.kademlia_records_count.set(num_entries as u64);
1426			}
1427			if let Some(num_entries) =
1428				self.network_service.behaviour_mut().kademlia_records_total_size()
1429			{
1430				metrics.kademlia_records_sizes_total.set(num_entries as u64);
1431			}
1432
1433			metrics.pending_connections.set(
1434				Swarm::network_info(&self.network_service).connection_counters().num_pending()
1435					as u64,
1436			);
1437		}
1438
1439		true
1440	}
1441
1442	/// Process the next message coming from the `NetworkService`.
1443	fn handle_worker_message(&mut self, msg: ServiceToWorkerMsg) {
1444		match msg {
1445			ServiceToWorkerMsg::FindClosestPeers(target) =>
1446				self.network_service.behaviour_mut().find_closest_peers(target),
1447			ServiceToWorkerMsg::GetValue(key) =>
1448				self.network_service.behaviour_mut().get_value(key.into()),
1449			ServiceToWorkerMsg::PutValue(key, value) =>
1450				self.network_service.behaviour_mut().put_value(key.into(), value),
1451			ServiceToWorkerMsg::PutRecordTo { record, peers, update_local_storage } => self
1452				.network_service
1453				.behaviour_mut()
1454				.put_record_to(record.into(), peers, update_local_storage),
1455			ServiceToWorkerMsg::StoreRecord(key, value, publisher, expires) => self
1456				.network_service
1457				.behaviour_mut()
1458				.store_record(key.into(), value, publisher, expires),
1459			ServiceToWorkerMsg::StartProviding(key) =>
1460				self.network_service.behaviour_mut().start_providing(key.into()),
1461			ServiceToWorkerMsg::StopProviding(key) =>
1462				self.network_service.behaviour_mut().stop_providing(&key.into()),
1463			ServiceToWorkerMsg::GetProviders(key) =>
1464				self.network_service.behaviour_mut().get_providers(key.into()),
1465			ServiceToWorkerMsg::AddKnownAddress(peer_id, addr) =>
1466				self.network_service.behaviour_mut().add_known_address(peer_id, addr),
1467			ServiceToWorkerMsg::EventStream(sender) => self.event_streams.push(sender),
1468			ServiceToWorkerMsg::Request {
1469				target,
1470				protocol,
1471				request,
1472				fallback_request,
1473				pending_response,
1474				connect,
1475			} => {
1476				self.network_service.behaviour_mut().send_request(
1477					&target,
1478					protocol,
1479					request,
1480					fallback_request,
1481					pending_response,
1482					connect,
1483				);
1484			},
1485			ServiceToWorkerMsg::NetworkStatus { pending_response } => {
1486				let _ = pending_response.send(Ok(self.status()));
1487			},
1488			ServiceToWorkerMsg::NetworkState { pending_response } => {
1489				let _ = pending_response.send(Ok(self.network_state()));
1490			},
1491			ServiceToWorkerMsg::DisconnectPeer(who, protocol_name) => self
1492				.network_service
1493				.behaviour_mut()
1494				.user_protocol_mut()
1495				.disconnect_peer(&who, protocol_name),
1496		}
1497	}
1498
1499	/// Process the next event coming from `Swarm`.
1500	fn handle_swarm_event(&mut self, event: SwarmEvent<BehaviourOut>) {
1501		match event {
1502			SwarmEvent::Behaviour(BehaviourOut::InboundRequest { protocol, result, .. }) => {
1503				if let Some(metrics) = self.metrics.as_ref() {
1504					match result {
1505						Ok(serve_time) => {
1506							metrics
1507								.requests_in_success_total
1508								.with_label_values(&[&protocol])
1509								.observe(serve_time.as_secs_f64());
1510						},
1511						Err(err) => {
1512							let reason = match err {
1513								ResponseFailure::Network(InboundFailure::Timeout) =>
1514									Some("timeout"),
1515								ResponseFailure::Network(InboundFailure::UnsupportedProtocols) =>
1516								// `UnsupportedProtocols` is reported for every single
1517								// inbound request whenever a request with an unsupported
1518								// protocol is received. This is not reported in order to
1519								// avoid confusions.
1520									None,
1521								ResponseFailure::Network(InboundFailure::ResponseOmission) =>
1522									Some("busy-omitted"),
1523								ResponseFailure::Network(InboundFailure::ConnectionClosed) =>
1524									Some("connection-closed"),
1525								ResponseFailure::Network(InboundFailure::Io(_)) => Some("io"),
1526							};
1527
1528							if let Some(reason) = reason {
1529								metrics
1530									.requests_in_failure_total
1531									.with_label_values(&[&protocol, reason])
1532									.inc();
1533							}
1534						},
1535					}
1536				}
1537			},
1538			SwarmEvent::Behaviour(BehaviourOut::RequestFinished {
1539				protocol,
1540				duration,
1541				result,
1542				..
1543			}) =>
1544				if let Some(metrics) = self.metrics.as_ref() {
1545					match result {
1546						Ok(_) => {
1547							metrics
1548								.requests_out_success_total
1549								.with_label_values(&[&protocol])
1550								.observe(duration.as_secs_f64());
1551						},
1552						Err(err) => {
1553							let reason = match err {
1554								RequestFailure::NotConnected => "not-connected",
1555								RequestFailure::UnknownProtocol => "unknown-protocol",
1556								RequestFailure::Refused => "refused",
1557								RequestFailure::Obsolete => "obsolete",
1558								RequestFailure::Network(OutboundFailure::DialFailure) =>
1559									"dial-failure",
1560								RequestFailure::Network(OutboundFailure::Timeout) => "timeout",
1561								RequestFailure::Network(OutboundFailure::ConnectionClosed) =>
1562									"connection-closed",
1563								RequestFailure::Network(OutboundFailure::UnsupportedProtocols) =>
1564									"unsupported",
1565								RequestFailure::Network(OutboundFailure::Io(_)) => "io",
1566							};
1567
1568							metrics
1569								.requests_out_failure_total
1570								.with_label_values(&[&protocol, reason])
1571								.inc();
1572						},
1573					}
1574				},
1575			SwarmEvent::Behaviour(BehaviourOut::ReputationChanges { peer, changes }) => {
1576				for change in changes {
1577					self.peer_store_handle.report_peer(peer.into(), change);
1578				}
1579			},
1580			SwarmEvent::Behaviour(BehaviourOut::PeerIdentify {
1581				peer_id,
1582				info:
1583					IdentifyInfo {
1584						protocol_version, agent_version, mut listen_addrs, protocols, ..
1585					},
1586			}) => {
1587				if listen_addrs.len() > 30 {
1588					debug!(
1589						target: LOG_TARGET,
1590						"Node {:?} has reported more than 30 addresses; it is identified by {:?} and {:?}",
1591						peer_id, protocol_version, agent_version
1592					);
1593					listen_addrs.truncate(30);
1594				}
1595				for addr in listen_addrs {
1596					self.network_service.behaviour_mut().add_self_reported_address_to_dht(
1597						&peer_id,
1598						&protocols,
1599						addr.clone(),
1600					);
1601				}
1602				self.peer_store_handle.add_known_peer(peer_id.into());
1603			},
1604			SwarmEvent::Behaviour(BehaviourOut::Discovered(peer_id)) => {
1605				self.peer_store_handle.add_known_peer(peer_id.into());
1606			},
1607			SwarmEvent::Behaviour(BehaviourOut::RandomKademliaStarted) => {
1608				if let Some(metrics) = self.metrics.as_ref() {
1609					metrics.kademlia_random_queries_total.inc();
1610				}
1611			},
1612			SwarmEvent::Behaviour(BehaviourOut::NotificationStreamOpened {
1613				remote,
1614				set_id,
1615				direction,
1616				negotiated_fallback,
1617				notifications_sink,
1618				received_handshake,
1619			}) => {
1620				let _ = self.notif_protocol_handles[usize::from(set_id)].report_substream_opened(
1621					remote,
1622					direction,
1623					received_handshake,
1624					negotiated_fallback,
1625					notifications_sink,
1626				);
1627			},
1628			SwarmEvent::Behaviour(BehaviourOut::NotificationStreamReplaced {
1629				remote,
1630				set_id,
1631				notifications_sink,
1632			}) => {
1633				let _ = self.notif_protocol_handles[usize::from(set_id)]
1634					.report_notification_sink_replaced(remote, notifications_sink);
1635
1636				// TODO: Notifications might have been lost as a result of the previous
1637				// connection being dropped, and as a result it would be preferable to notify
1638				// the users of this fact by simulating the substream being closed then
1639				// reopened.
1640				// The code below doesn't compile because `role` is unknown. Propagating the
1641				// handshake of the secondary connections is quite an invasive change and
1642				// would conflict with https://github.com/paritytech/substrate/issues/6403.
1643				// Considering that dropping notifications is generally regarded as
1644				// acceptable, this bug is at the moment intentionally left there and is
1645				// intended to be fixed at the same time as
1646				// https://github.com/paritytech/substrate/issues/6403.
1647				// self.event_streams.send(Event::NotificationStreamClosed {
1648				// remote,
1649				// protocol,
1650				// });
1651				// self.event_streams.send(Event::NotificationStreamOpened {
1652				// remote,
1653				// protocol,
1654				// role,
1655				// });
1656			},
1657			SwarmEvent::Behaviour(BehaviourOut::NotificationStreamClosed { remote, set_id }) => {
1658				let _ = self.notif_protocol_handles[usize::from(set_id)]
1659					.report_substream_closed(remote);
1660			},
1661			SwarmEvent::Behaviour(BehaviourOut::NotificationsReceived {
1662				remote,
1663				set_id,
1664				notification,
1665			}) => {
1666				let _ = self.notif_protocol_handles[usize::from(set_id)]
1667					.report_notification_received(remote, notification);
1668			},
1669			SwarmEvent::Behaviour(BehaviourOut::Dht(event, duration)) => {
1670				match (self.metrics.as_ref(), duration) {
1671					(Some(metrics), Some(duration)) => {
1672						let query_type = match event {
1673							DhtEvent::ClosestPeersFound(_, _) => "peers-found",
1674							DhtEvent::ClosestPeersNotFound(_) => "peers-not-found",
1675							DhtEvent::ValueFound(_) => "value-found",
1676							DhtEvent::ValueNotFound(_) => "value-not-found",
1677							DhtEvent::ValuePut(_) => "value-put",
1678							DhtEvent::ValuePutFailed(_) => "value-put-failed",
1679							DhtEvent::PutRecordRequest(_, _, _, _) => "put-record-request",
1680							DhtEvent::StartedProviding(_) => "started-providing",
1681							DhtEvent::StartProvidingFailed(_) => "start-providing-failed",
1682							DhtEvent::ProvidersFound(_, _) => "providers-found",
1683							DhtEvent::NoMoreProviders(_) => "no-more-providers",
1684							DhtEvent::ProvidersNotFound(_) => "providers-not-found",
1685						};
1686						metrics
1687							.kademlia_query_duration
1688							.with_label_values(&[query_type])
1689							.observe(duration.as_secs_f64());
1690					},
1691					_ => {},
1692				}
1693
1694				self.event_streams.send(Event::Dht(event));
1695			},
1696			SwarmEvent::Behaviour(BehaviourOut::None) => {
1697				// Ignored event from lower layers.
1698			},
1699			SwarmEvent::ConnectionEstablished {
1700				peer_id,
1701				endpoint,
1702				num_established,
1703				concurrent_dial_errors,
1704				..
1705			} => {
1706				if let Some(errors) = concurrent_dial_errors {
1707					debug!(target: LOG_TARGET, "Libp2p => Connected({:?}) with errors: {:?}", peer_id, errors);
1708				} else {
1709					debug!(target: LOG_TARGET, "Libp2p => Connected({:?})", peer_id);
1710				}
1711
1712				if let Some(metrics) = self.metrics.as_ref() {
1713					let direction = match endpoint {
1714						ConnectedPoint::Dialer { .. } => "out",
1715						ConnectedPoint::Listener { .. } => "in",
1716					};
1717					metrics.connections_opened_total.with_label_values(&[direction]).inc();
1718
1719					if num_established.get() == 1 {
1720						metrics.distinct_peers_connections_opened_total.inc();
1721					}
1722				}
1723			},
1724			SwarmEvent::ConnectionClosed {
1725				connection_id,
1726				peer_id,
1727				cause,
1728				endpoint,
1729				num_established,
1730			} => {
1731				debug!(target: LOG_TARGET, "Libp2p => Disconnected({peer_id:?} via {connection_id:?}, {cause:?})");
1732				if let Some(metrics) = self.metrics.as_ref() {
1733					let direction = match endpoint {
1734						ConnectedPoint::Dialer { .. } => "out",
1735						ConnectedPoint::Listener { .. } => "in",
1736					};
1737					let reason = match cause {
1738						Some(ConnectionError::IO(_)) => "transport-error",
1739						Some(ConnectionError::KeepAliveTimeout) => "keep-alive-timeout",
1740						None => "actively-closed",
1741					};
1742					metrics.connections_closed_total.with_label_values(&[direction, reason]).inc();
1743
1744					// `num_established` represents the number of *remaining* connections.
1745					if num_established == 0 {
1746						metrics.distinct_peers_connections_closed_total.inc();
1747					}
1748				}
1749			},
1750			SwarmEvent::NewListenAddr { address, .. } => {
1751				trace!(target: LOG_TARGET, "Libp2p => NewListenAddr({})", address);
1752				if let Some(metrics) = self.metrics.as_ref() {
1753					metrics.listeners_local_addresses.inc();
1754				}
1755				self.listen_addresses.lock().insert(address.clone());
1756			},
1757			SwarmEvent::ExpiredListenAddr { address, .. } => {
1758				info!(target: LOG_TARGET, "๐Ÿ“ช No longer listening on {}", address);
1759				if let Some(metrics) = self.metrics.as_ref() {
1760					metrics.listeners_local_addresses.dec();
1761				}
1762				self.listen_addresses.lock().remove(&address);
1763			},
1764			SwarmEvent::OutgoingConnectionError { connection_id, peer_id, error } => {
1765				if let Some(peer_id) = peer_id {
1766					trace!(
1767						target: LOG_TARGET,
1768						"Libp2p => Failed to reach {peer_id:?} via {connection_id:?}: {error}",
1769					);
1770
1771					let not_reported = !self.reported_invalid_boot_nodes.contains(&peer_id);
1772
1773					if let Some(addresses) =
1774						not_reported.then(|| self.boot_node_ids.get(&peer_id)).flatten()
1775					{
1776						if let DialError::WrongPeerId { obtained, endpoint } = &error {
1777							if let ConnectedPoint::Dialer {
1778								address,
1779								role_override: _,
1780								port_use: _,
1781							} = endpoint
1782							{
1783								let address_without_peer_id = parse_addr(address.clone().into())
1784									.map_or_else(|_| address.clone(), |r| r.1.into());
1785
1786								// Only report for address of boot node that was added at startup of
1787								// the node and not for any address that the node learned of the
1788								// boot node.
1789								if addresses.iter().any(|a| address_without_peer_id == *a) {
1790									warn!(
1791										"๐Ÿ’” The bootnode you want to connect to at `{address}` provided a \
1792										 different peer ID `{obtained}` than the one you expect `{peer_id}`.",
1793									);
1794
1795									self.reported_invalid_boot_nodes.insert(peer_id);
1796								}
1797							}
1798						}
1799					}
1800				}
1801
1802				if let Some(metrics) = self.metrics.as_ref() {
1803					let reason = match error {
1804						DialError::Denied { cause } =>
1805							if cause.downcast::<Exceeded>().is_ok() {
1806								Some("limit-reached")
1807							} else {
1808								None
1809							},
1810						DialError::LocalPeerId { .. } => Some("local-peer-id"),
1811						DialError::WrongPeerId { .. } => Some("invalid-peer-id"),
1812						DialError::Transport(_) => Some("transport-error"),
1813						DialError::NoAddresses |
1814						DialError::DialPeerConditionFalse(_) |
1815						DialError::Aborted => None, // ignore them
1816					};
1817					if let Some(reason) = reason {
1818						metrics.pending_connections_errors_total.with_label_values(&[reason]).inc();
1819					}
1820				}
1821			},
1822			SwarmEvent::Dialing { connection_id, peer_id } => {
1823				trace!(target: LOG_TARGET, "Libp2p => Dialing({peer_id:?}) via {connection_id:?}")
1824			},
1825			SwarmEvent::IncomingConnection { connection_id, local_addr, send_back_addr } => {
1826				trace!(target: LOG_TARGET, "Libp2p => IncomingConnection({local_addr},{send_back_addr} via {connection_id:?}))");
1827				if let Some(metrics) = self.metrics.as_ref() {
1828					metrics.incoming_connections_total.inc();
1829				}
1830			},
1831			SwarmEvent::IncomingConnectionError {
1832				connection_id,
1833				local_addr,
1834				send_back_addr,
1835				error,
1836			} => {
1837				debug!(
1838					target: LOG_TARGET,
1839					"Libp2p => IncomingConnectionError({local_addr},{send_back_addr} via {connection_id:?}): {error}"
1840				);
1841				if let Some(metrics) = self.metrics.as_ref() {
1842					let reason = match error {
1843						ListenError::Denied { cause } =>
1844							if cause.downcast::<Exceeded>().is_ok() {
1845								Some("limit-reached")
1846							} else {
1847								None
1848							},
1849						ListenError::WrongPeerId { .. } | ListenError::LocalPeerId { .. } =>
1850							Some("invalid-peer-id"),
1851						ListenError::Transport(_) => Some("transport-error"),
1852						ListenError::Aborted => None, // ignore it
1853					};
1854
1855					if let Some(reason) = reason {
1856						metrics
1857							.incoming_connections_errors_total
1858							.with_label_values(&[reason])
1859							.inc();
1860					}
1861				}
1862			},
1863			SwarmEvent::ListenerClosed { reason, addresses, .. } => {
1864				if let Some(metrics) = self.metrics.as_ref() {
1865					metrics.listeners_local_addresses.sub(addresses.len() as u64);
1866				}
1867				let mut listen_addresses = self.listen_addresses.lock();
1868				for addr in &addresses {
1869					listen_addresses.remove(addr);
1870				}
1871				drop(listen_addresses);
1872
1873				let addrs =
1874					addresses.into_iter().map(|a| a.to_string()).collect::<Vec<_>>().join(", ");
1875				match reason {
1876					Ok(()) => error!(
1877						target: LOG_TARGET,
1878						"๐Ÿ“ช Libp2p listener ({}) closed gracefully",
1879						addrs
1880					),
1881					Err(e) => error!(
1882						target: LOG_TARGET,
1883						"๐Ÿ“ช Libp2p listener ({}) closed: {}",
1884						addrs, e
1885					),
1886				}
1887			},
1888			SwarmEvent::ListenerError { error, .. } => {
1889				debug!(target: LOG_TARGET, "Libp2p => ListenerError: {}", error);
1890				if let Some(metrics) = self.metrics.as_ref() {
1891					metrics.listeners_errors_total.inc();
1892				}
1893			},
1894			SwarmEvent::NewExternalAddrCandidate { address } => {
1895				trace!(target: LOG_TARGET, "Libp2p => NewExternalAddrCandidate: {address:?}");
1896			},
1897			SwarmEvent::ExternalAddrConfirmed { address } => {
1898				trace!(target: LOG_TARGET, "Libp2p => ExternalAddrConfirmed: {address:?}");
1899			},
1900			SwarmEvent::ExternalAddrExpired { address } => {
1901				trace!(target: LOG_TARGET, "Libp2p => ExternalAddrExpired: {address:?}");
1902			},
1903			SwarmEvent::NewExternalAddrOfPeer { peer_id, address } => {
1904				trace!(target: LOG_TARGET, "Libp2p => NewExternalAddrOfPeer({peer_id:?}): {address:?}")
1905			},
1906			event => {
1907				warn!(target: LOG_TARGET, "New unknown SwarmEvent libp2p event: {event:?}");
1908			},
1909		}
1910	}
1911}
1912
1913impl<B, H> Unpin for NetworkWorker<B, H>
1914where
1915	B: BlockT + 'static,
1916	H: ExHashT,
1917{
1918}
1919
1920pub(crate) fn ensure_addresses_consistent_with_transport<'a>(
1921	addresses: impl Iterator<Item = &'a sc_network_types::multiaddr::Multiaddr>,
1922	transport: &TransportConfig,
1923) -> Result<(), Error> {
1924	use sc_network_types::multiaddr::Protocol;
1925
1926	if matches!(transport, TransportConfig::MemoryOnly) {
1927		let addresses: Vec<_> = addresses
1928			.filter(|x| x.iter().any(|y| !matches!(y, Protocol::Memory(_))))
1929			.cloned()
1930			.collect();
1931
1932		if !addresses.is_empty() {
1933			return Err(Error::AddressesForAnotherTransport {
1934				transport: transport.clone(),
1935				addresses,
1936			})
1937		}
1938	} else {
1939		let addresses: Vec<_> = addresses
1940			.filter(|x| x.iter().any(|y| matches!(y, Protocol::Memory(_))))
1941			.cloned()
1942			.collect();
1943
1944		if !addresses.is_empty() {
1945			return Err(Error::AddressesForAnotherTransport {
1946				transport: transport.clone(),
1947				addresses,
1948			})
1949		}
1950	}
1951
1952	Ok(())
1953}