sc_network/
service.rs

1// This file is part of Substrate.
2
3// Copyright (C) Parity Technologies (UK) Ltd.
4// SPDX-License-Identifier: GPL-3.0-or-later WITH Classpath-exception-2.0
5
6// This program is free software: you can redistribute it and/or modify
7// it under the terms of the GNU General Public License as published by
8// the Free Software Foundation, either version 3 of the License, or
9// (at your option) any later version.
10
11// This program is distributed in the hope that it will be useful,
12// but WITHOUT ANY WARRANTY; without even the implied warranty of
13// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
14// GNU General Public License for more details.
15
16// You should have received a copy of the GNU General Public License
17// along with this program. If not, see <https://www.gnu.org/licenses/>.
18
19//! Main entry point of the sc-network crate.
20//!
21//! There are two main structs in this module: [`NetworkWorker`] and [`NetworkService`].
22//! The [`NetworkWorker`] *is* the network. Network is driven by [`NetworkWorker::run`] future that
23//! terminates only when all instances of the control handles [`NetworkService`] were dropped.
24//! The [`NetworkService`] is merely a shared version of the [`NetworkWorker`]. You can obtain an
25//! `Arc<NetworkService>` by calling [`NetworkWorker::service`].
26//!
27//! The methods of the [`NetworkService`] are implemented by sending a message over a channel,
28//! which is then processed by [`NetworkWorker::next_action`].
29
30use crate::{
31	behaviour::{self, Behaviour, BehaviourOut},
32	bitswap::BitswapRequestHandler,
33	config::{
34		parse_addr, FullNetworkConfiguration, IncomingRequest, MultiaddrWithPeerId,
35		NonDefaultSetConfig, NotificationHandshake, Params, SetConfig, TransportConfig,
36	},
37	discovery::DiscoveryConfig,
38	error::Error,
39	event::{DhtEvent, Event},
40	network_state::{
41		NetworkState, NotConnectedPeer as NetworkStateNotConnectedPeer, Peer as NetworkStatePeer,
42	},
43	peer_store::{PeerStore, PeerStoreProvider},
44	protocol::{self, Protocol, Ready},
45	protocol_controller::{self, ProtoSetConfig, ProtocolController, SetId},
46	request_responses::{IfDisconnected, ProtocolConfig as RequestResponseConfig, RequestFailure},
47	service::{
48		signature::{Signature, SigningError},
49		traits::{
50			BandwidthSink, NetworkBackend, NetworkDHTProvider, NetworkEventStream, NetworkPeers,
51			NetworkRequest, NetworkService as NetworkServiceT, NetworkSigner, NetworkStateInfo,
52			NetworkStatus, NetworkStatusProvider, NotificationSender as NotificationSenderT,
53			NotificationSenderError, NotificationSenderReady as NotificationSenderReadyT,
54		},
55	},
56	transport,
57	types::ProtocolName,
58	NotificationService, ReputationChange,
59};
60
61use codec::DecodeAll;
62use futures::{channel::oneshot, prelude::*};
63use libp2p::{
64	connection_limits::{ConnectionLimits, Exceeded},
65	core::{upgrade, ConnectedPoint, Endpoint},
66	identify::Info as IdentifyInfo,
67	identity::ed25519,
68	multiaddr::{self, Multiaddr},
69	swarm::{
70		Config as SwarmConfig, ConnectionError, ConnectionId, DialError, Executor, ListenError,
71		NetworkBehaviour, Swarm, SwarmEvent,
72	},
73	PeerId,
74};
75use log::{debug, error, info, trace, warn};
76use metrics::{Histogram, MetricSources, Metrics};
77use parking_lot::Mutex;
78use prometheus_endpoint::Registry;
79use sc_network_types::kad::{Key as KademliaKey, Record};
80
81use sc_client_api::BlockBackend;
82use sc_network_common::{
83	role::{ObservedRole, Roles},
84	ExHashT,
85};
86use sc_utils::mpsc::{tracing_unbounded, TracingUnboundedReceiver, TracingUnboundedSender};
87use sp_runtime::traits::Block as BlockT;
88
89pub use behaviour::{InboundFailure, OutboundFailure, ResponseFailure};
90pub use libp2p::identity::{DecodingError, Keypair, PublicKey};
91pub use metrics::NotificationMetrics;
92pub use protocol::NotificationsSink;
93use std::{
94	collections::{HashMap, HashSet},
95	fs, iter,
96	marker::PhantomData,
97	num::NonZeroUsize,
98	pin::Pin,
99	str,
100	sync::{
101		atomic::{AtomicUsize, Ordering},
102		Arc,
103	},
104	time::{Duration, Instant},
105};
106
107pub(crate) mod metrics;
108pub(crate) mod out_events;
109
110pub mod signature;
111pub mod traits;
112
113/// Logging target for the file.
114const LOG_TARGET: &str = "sub-libp2p";
115
116struct Libp2pBandwidthSink {
117	#[allow(deprecated)]
118	sink: Arc<transport::BandwidthSinks>,
119}
120
121impl BandwidthSink for Libp2pBandwidthSink {
122	fn total_inbound(&self) -> u64 {
123		self.sink.total_inbound()
124	}
125
126	fn total_outbound(&self) -> u64 {
127		self.sink.total_outbound()
128	}
129}
130
131/// Substrate network service. Handles network IO and manages connectivity.
132pub struct NetworkService<B: BlockT + 'static, H: ExHashT> {
133	/// Number of peers we're connected to.
134	num_connected: Arc<AtomicUsize>,
135	/// The local external addresses.
136	external_addresses: Arc<Mutex<HashSet<Multiaddr>>>,
137	/// Listen addresses. Do **NOT** include a trailing `/p2p/` with our `PeerId`.
138	listen_addresses: Arc<Mutex<HashSet<Multiaddr>>>,
139	/// Local copy of the `PeerId` of the local node.
140	local_peer_id: PeerId,
141	/// The `KeyPair` that defines the `PeerId` of the local node.
142	local_identity: Keypair,
143	/// Bandwidth logging system. Can be queried to know the average bandwidth consumed.
144	bandwidth: Arc<dyn BandwidthSink>,
145	/// Channel that sends messages to the actual worker.
146	to_worker: TracingUnboundedSender<ServiceToWorkerMsg>,
147	/// Protocol name -> `SetId` mapping for notification protocols. The map never changes after
148	/// initialization.
149	notification_protocol_ids: HashMap<ProtocolName, SetId>,
150	/// Handles to manage peer connections on notification protocols. The vector never changes
151	/// after initialization.
152	protocol_handles: Vec<protocol_controller::ProtocolHandle>,
153	/// Shortcut to sync protocol handle (`protocol_handles[0]`).
154	sync_protocol_handle: protocol_controller::ProtocolHandle,
155	/// Handle to `PeerStore`.
156	peer_store_handle: Arc<dyn PeerStoreProvider>,
157	/// Marker to pin the `H` generic. Serves no purpose except to not break backwards
158	/// compatibility.
159	_marker: PhantomData<H>,
160	/// Marker for block type
161	_block: PhantomData<B>,
162}
163
164#[async_trait::async_trait]
165impl<B, H> NetworkBackend<B, H> for NetworkWorker<B, H>
166where
167	B: BlockT + 'static,
168	H: ExHashT,
169{
170	type NotificationProtocolConfig = NonDefaultSetConfig;
171	type RequestResponseProtocolConfig = RequestResponseConfig;
172	type NetworkService<Block, Hash> = Arc<NetworkService<B, H>>;
173	type PeerStore = PeerStore;
174	type BitswapConfig = RequestResponseConfig;
175
176	fn new(params: Params<B, H, Self>) -> Result<Self, Error>
177	where
178		Self: Sized,
179	{
180		NetworkWorker::new(params)
181	}
182
183	/// Get handle to `NetworkService` of the `NetworkBackend`.
184	fn network_service(&self) -> Arc<dyn NetworkServiceT> {
185		self.service.clone()
186	}
187
188	/// Create `PeerStore`.
189	fn peer_store(
190		bootnodes: Vec<sc_network_types::PeerId>,
191		metrics_registry: Option<Registry>,
192	) -> Self::PeerStore {
193		PeerStore::new(bootnodes.into_iter().map(From::from).collect(), metrics_registry)
194	}
195
196	fn register_notification_metrics(registry: Option<&Registry>) -> NotificationMetrics {
197		NotificationMetrics::new(registry)
198	}
199
200	fn bitswap_server(
201		client: Arc<dyn BlockBackend<B> + Send + Sync>,
202		_metrics_registry: Option<Registry>,
203	) -> (Pin<Box<dyn Future<Output = ()> + Send>>, Self::BitswapConfig) {
204		let (handler, protocol_config) = BitswapRequestHandler::new(client.clone());
205
206		(Box::pin(async move { handler.run().await }), protocol_config)
207	}
208
209	/// Create notification protocol configuration.
210	fn notification_config(
211		protocol_name: ProtocolName,
212		fallback_names: Vec<ProtocolName>,
213		max_notification_size: u64,
214		handshake: Option<NotificationHandshake>,
215		set_config: SetConfig,
216		_metrics: NotificationMetrics,
217		_peerstore_handle: Arc<dyn PeerStoreProvider>,
218	) -> (Self::NotificationProtocolConfig, Box<dyn NotificationService>) {
219		NonDefaultSetConfig::new(
220			protocol_name,
221			fallback_names,
222			max_notification_size,
223			handshake,
224			set_config,
225		)
226	}
227
228	/// Create request-response protocol configuration.
229	fn request_response_config(
230		protocol_name: ProtocolName,
231		fallback_names: Vec<ProtocolName>,
232		max_request_size: u64,
233		max_response_size: u64,
234		request_timeout: Duration,
235		inbound_queue: Option<async_channel::Sender<IncomingRequest>>,
236	) -> Self::RequestResponseProtocolConfig {
237		Self::RequestResponseProtocolConfig {
238			name: protocol_name,
239			fallback_names,
240			max_request_size,
241			max_response_size,
242			request_timeout,
243			inbound_queue,
244		}
245	}
246
247	/// Start [`NetworkBackend`] event loop.
248	async fn run(mut self) {
249		self.run().await
250	}
251}
252
253impl<B, H> NetworkWorker<B, H>
254where
255	B: BlockT + 'static,
256	H: ExHashT,
257{
258	/// Creates the network service.
259	///
260	/// Returns a `NetworkWorker` that implements `Future` and must be regularly polled in order
261	/// for the network processing to advance. From it, you can extract a `NetworkService` using
262	/// `worker.service()`. The `NetworkService` can be shared through the codebase.
263	pub fn new(params: Params<B, H, Self>) -> Result<Self, Error> {
264		let peer_store_handle = params.network_config.peer_store_handle();
265		let FullNetworkConfiguration {
266			notification_protocols,
267			request_response_protocols,
268			mut network_config,
269			..
270		} = params.network_config;
271
272		// Private and public keys configuration.
273		let local_identity = network_config.node_key.clone().into_keypair()?;
274		let local_public = local_identity.public();
275		let local_peer_id = local_public.to_peer_id();
276
277		// Convert to libp2p types.
278		let local_identity: ed25519::Keypair = local_identity.into();
279		let local_public: ed25519::PublicKey = local_public.into();
280		let local_peer_id: PeerId = local_peer_id.into();
281
282		network_config.boot_nodes = network_config
283			.boot_nodes
284			.into_iter()
285			.filter(|boot_node| boot_node.peer_id != local_peer_id.into())
286			.collect();
287		network_config.default_peers_set.reserved_nodes = network_config
288			.default_peers_set
289			.reserved_nodes
290			.into_iter()
291			.filter(|reserved_node| {
292				if reserved_node.peer_id == local_peer_id.into() {
293					warn!(
294						target: LOG_TARGET,
295						"Local peer ID used in reserved node, ignoring: {}",
296						reserved_node,
297					);
298					false
299				} else {
300					true
301				}
302			})
303			.collect();
304
305		// Ensure the listen addresses are consistent with the transport.
306		ensure_addresses_consistent_with_transport(
307			network_config.listen_addresses.iter(),
308			&network_config.transport,
309		)?;
310		ensure_addresses_consistent_with_transport(
311			network_config.boot_nodes.iter().map(|x| &x.multiaddr),
312			&network_config.transport,
313		)?;
314		ensure_addresses_consistent_with_transport(
315			network_config.default_peers_set.reserved_nodes.iter().map(|x| &x.multiaddr),
316			&network_config.transport,
317		)?;
318		for notification_protocol in &notification_protocols {
319			ensure_addresses_consistent_with_transport(
320				notification_protocol.set_config().reserved_nodes.iter().map(|x| &x.multiaddr),
321				&network_config.transport,
322			)?;
323		}
324		ensure_addresses_consistent_with_transport(
325			network_config.public_addresses.iter(),
326			&network_config.transport,
327		)?;
328
329		let (to_worker, from_service) = tracing_unbounded("mpsc_network_worker", 100_000);
330
331		if let Some(path) = &network_config.net_config_path {
332			fs::create_dir_all(path)?;
333		}
334
335		info!(
336			target: LOG_TARGET,
337			"🏷  Local node identity is: {}",
338			local_peer_id.to_base58(),
339		);
340		info!(target: LOG_TARGET, "Running libp2p network backend");
341
342		let (transport, bandwidth) = {
343			let config_mem = match network_config.transport {
344				TransportConfig::MemoryOnly => true,
345				TransportConfig::Normal { .. } => false,
346			};
347
348			transport::build_transport(local_identity.clone().into(), config_mem)
349		};
350
351		let (to_notifications, from_protocol_controllers) =
352			tracing_unbounded("mpsc_protocol_controllers_to_notifications", 10_000);
353
354		// We must prepend a hardcoded default peer set to notification protocols.
355		let all_peer_sets_iter = iter::once(&network_config.default_peers_set)
356			.chain(notification_protocols.iter().map(|protocol| protocol.set_config()));
357
358		let (protocol_handles, protocol_controllers): (Vec<_>, Vec<_>) = all_peer_sets_iter
359			.enumerate()
360			.map(|(set_id, set_config)| {
361				let proto_set_config = ProtoSetConfig {
362					in_peers: set_config.in_peers,
363					out_peers: set_config.out_peers,
364					reserved_nodes: set_config
365						.reserved_nodes
366						.iter()
367						.map(|node| node.peer_id.into())
368						.collect(),
369					reserved_only: set_config.non_reserved_mode.is_reserved_only(),
370				};
371
372				ProtocolController::new(
373					SetId::from(set_id),
374					proto_set_config,
375					to_notifications.clone(),
376					Arc::clone(&peer_store_handle),
377				)
378			})
379			.unzip();
380
381		// Shortcut to default (sync) peer set protocol handle.
382		let sync_protocol_handle = protocol_handles[0].clone();
383
384		// Spawn `ProtocolController` runners.
385		protocol_controllers
386			.into_iter()
387			.for_each(|controller| (params.executor)(controller.run().boxed()));
388
389		// Protocol name to protocol id mapping. The first protocol is always block announce (sync)
390		// protocol, aka default (hardcoded) peer set.
391		let notification_protocol_ids: HashMap<ProtocolName, SetId> =
392			iter::once(&params.block_announce_config)
393				.chain(notification_protocols.iter())
394				.enumerate()
395				.map(|(index, protocol)| (protocol.protocol_name().clone(), SetId::from(index)))
396				.collect();
397
398		let known_addresses = {
399			// Collect all reserved nodes and bootnodes addresses.
400			let mut addresses: Vec<_> = network_config
401				.default_peers_set
402				.reserved_nodes
403				.iter()
404				.map(|reserved| (reserved.peer_id, reserved.multiaddr.clone()))
405				.chain(notification_protocols.iter().flat_map(|protocol| {
406					protocol
407						.set_config()
408						.reserved_nodes
409						.iter()
410						.map(|reserved| (reserved.peer_id, reserved.multiaddr.clone()))
411				}))
412				.chain(
413					network_config
414						.boot_nodes
415						.iter()
416						.map(|bootnode| (bootnode.peer_id, bootnode.multiaddr.clone())),
417				)
418				.collect();
419
420			// Remove possible duplicates.
421			addresses.sort();
422			addresses.dedup();
423
424			addresses
425		};
426
427		// Check for duplicate bootnodes.
428		network_config.boot_nodes.iter().try_for_each(|bootnode| {
429			if let Some(other) = network_config
430				.boot_nodes
431				.iter()
432				.filter(|o| o.multiaddr == bootnode.multiaddr)
433				.find(|o| o.peer_id != bootnode.peer_id)
434			{
435				Err(Error::DuplicateBootnode {
436					address: bootnode.multiaddr.clone().into(),
437					first_id: bootnode.peer_id.into(),
438					second_id: other.peer_id.into(),
439				})
440			} else {
441				Ok(())
442			}
443		})?;
444
445		// List of bootnode multiaddresses.
446		let mut boot_node_ids = HashMap::<PeerId, Vec<Multiaddr>>::new();
447
448		for bootnode in network_config.boot_nodes.iter() {
449			boot_node_ids
450				.entry(bootnode.peer_id.into())
451				.or_default()
452				.push(bootnode.multiaddr.clone().into());
453		}
454
455		let boot_node_ids = Arc::new(boot_node_ids);
456
457		let num_connected = Arc::new(AtomicUsize::new(0));
458		let external_addresses = Arc::new(Mutex::new(HashSet::new()));
459
460		let (protocol, notif_protocol_handles) = Protocol::new(
461			From::from(&params.role),
462			params.notification_metrics,
463			notification_protocols,
464			params.block_announce_config,
465			Arc::clone(&peer_store_handle),
466			protocol_handles.clone(),
467			from_protocol_controllers,
468		)?;
469
470		// Build the swarm.
471		let (mut swarm, bandwidth): (Swarm<Behaviour<B>>, _) = {
472			let user_agent =
473				format!("{} ({})", network_config.client_version, network_config.node_name);
474
475			let discovery_config = {
476				let mut config = DiscoveryConfig::new(local_peer_id);
477				config.with_permanent_addresses(
478					known_addresses
479						.iter()
480						.map(|(peer, address)| (peer.into(), address.clone().into()))
481						.collect::<Vec<_>>(),
482				);
483				config.discovery_limit(u64::from(network_config.default_peers_set.out_peers) + 15);
484				config.with_kademlia(
485					params.genesis_hash,
486					params.fork_id.as_deref(),
487					&params.protocol_id,
488				);
489				config.with_dht_random_walk(network_config.enable_dht_random_walk);
490				config.allow_non_globals_in_dht(network_config.allow_non_globals_in_dht);
491				config.use_kademlia_disjoint_query_paths(
492					network_config.kademlia_disjoint_query_paths,
493				);
494				config.with_kademlia_replication_factor(network_config.kademlia_replication_factor);
495
496				match network_config.transport {
497					TransportConfig::MemoryOnly => {
498						config.with_mdns(false);
499						config.allow_private_ip(false);
500					},
501					TransportConfig::Normal {
502						enable_mdns,
503						allow_private_ip: allow_private_ipv4,
504						..
505					} => {
506						config.with_mdns(enable_mdns);
507						config.allow_private_ip(allow_private_ipv4);
508					},
509				}
510
511				config
512			};
513
514			let behaviour = {
515				let result = Behaviour::new(
516					protocol,
517					user_agent,
518					local_public.into(),
519					discovery_config,
520					request_response_protocols,
521					Arc::clone(&peer_store_handle),
522					external_addresses.clone(),
523					network_config.public_addresses.iter().cloned().map(Into::into).collect(),
524					ConnectionLimits::default()
525						.with_max_established_per_peer(Some(crate::MAX_CONNECTIONS_PER_PEER as u32))
526						.with_max_established_incoming(Some(
527							crate::MAX_CONNECTIONS_ESTABLISHED_INCOMING,
528						)),
529				);
530
531				match result {
532					Ok(b) => b,
533					Err(crate::request_responses::RegisterError::DuplicateProtocol(proto)) => {
534						return Err(Error::DuplicateRequestResponseProtocol { protocol: proto })
535					},
536				}
537			};
538
539			let swarm = {
540				struct SpawnImpl<F>(F);
541				impl<F: Fn(Pin<Box<dyn Future<Output = ()> + Send>>)> Executor for SpawnImpl<F> {
542					fn exec(&self, f: Pin<Box<dyn Future<Output = ()> + Send>>) {
543						(self.0)(f)
544					}
545				}
546
547				let config = SwarmConfig::with_executor(SpawnImpl(params.executor))
548					.with_substream_upgrade_protocol_override(upgrade::Version::V1)
549					.with_notify_handler_buffer_size(NonZeroUsize::new(32).expect("32 != 0; qed"))
550					// NOTE: 24 is somewhat arbitrary and should be tuned in the future if
551					// necessary. See <https://github.com/paritytech/substrate/pull/6080>
552					.with_per_connection_event_buffer_size(24)
553					.with_max_negotiating_inbound_streams(2048)
554					.with_idle_connection_timeout(network_config.idle_connection_timeout);
555
556				Swarm::new(transport, behaviour, local_peer_id, config)
557			};
558
559			(swarm, Arc::new(Libp2pBandwidthSink { sink: bandwidth }))
560		};
561
562		// Initialize the metrics.
563		let metrics = match &params.metrics_registry {
564			Some(registry) => Some(metrics::register(
565				registry,
566				MetricSources {
567					bandwidth: bandwidth.clone(),
568					connected_peers: num_connected.clone(),
569				},
570			)?),
571			None => None,
572		};
573
574		// Listen on multiaddresses.
575		for addr in &network_config.listen_addresses {
576			if let Err(err) = Swarm::<Behaviour<B>>::listen_on(&mut swarm, addr.clone().into()) {
577				warn!(target: LOG_TARGET, "Can't listen on {} because: {:?}", addr, err)
578			}
579		}
580
581		// Add external addresses.
582		for addr in &network_config.public_addresses {
583			Swarm::<Behaviour<B>>::add_external_address(&mut swarm, addr.clone().into());
584		}
585
586		let listen_addresses_set = Arc::new(Mutex::new(HashSet::new()));
587
588		let service = Arc::new(NetworkService {
589			bandwidth,
590			external_addresses,
591			listen_addresses: listen_addresses_set.clone(),
592			num_connected: num_connected.clone(),
593			local_peer_id,
594			local_identity: local_identity.into(),
595			to_worker,
596			notification_protocol_ids,
597			protocol_handles,
598			sync_protocol_handle,
599			peer_store_handle: Arc::clone(&peer_store_handle),
600			_marker: PhantomData,
601			_block: Default::default(),
602		});
603
604		Ok(NetworkWorker {
605			listen_addresses: listen_addresses_set,
606			num_connected,
607			network_service: swarm,
608			service,
609			from_service,
610			event_streams: out_events::OutChannels::new(params.metrics_registry.as_ref())?,
611			metrics,
612			boot_node_ids,
613			reported_invalid_boot_nodes: Default::default(),
614			peer_store_handle: Arc::clone(&peer_store_handle),
615			notif_protocol_handles,
616			_marker: Default::default(),
617			_block: Default::default(),
618		})
619	}
620
621	/// High-level network status information.
622	pub fn status(&self) -> NetworkStatus {
623		NetworkStatus {
624			num_connected_peers: self.num_connected_peers(),
625			total_bytes_inbound: self.total_bytes_inbound(),
626			total_bytes_outbound: self.total_bytes_outbound(),
627		}
628	}
629
630	/// Returns the total number of bytes received so far.
631	pub fn total_bytes_inbound(&self) -> u64 {
632		self.service.bandwidth.total_inbound()
633	}
634
635	/// Returns the total number of bytes sent so far.
636	pub fn total_bytes_outbound(&self) -> u64 {
637		self.service.bandwidth.total_outbound()
638	}
639
640	/// Returns the number of peers we're connected to.
641	pub fn num_connected_peers(&self) -> usize {
642		self.network_service.behaviour().user_protocol().num_sync_peers()
643	}
644
645	/// Adds an address for a node.
646	pub fn add_known_address(&mut self, peer_id: PeerId, addr: Multiaddr) {
647		self.network_service.behaviour_mut().add_known_address(peer_id, addr);
648	}
649
650	/// Return a `NetworkService` that can be shared through the code base and can be used to
651	/// manipulate the worker.
652	pub fn service(&self) -> &Arc<NetworkService<B, H>> {
653		&self.service
654	}
655
656	/// Returns the local `PeerId`.
657	pub fn local_peer_id(&self) -> &PeerId {
658		Swarm::<Behaviour<B>>::local_peer_id(&self.network_service)
659	}
660
661	/// Returns the list of addresses we are listening on.
662	///
663	/// Does **NOT** include a trailing `/p2p/` with our `PeerId`.
664	pub fn listen_addresses(&self) -> impl Iterator<Item = &Multiaddr> {
665		Swarm::<Behaviour<B>>::listeners(&self.network_service)
666	}
667
668	/// Get network state.
669	///
670	/// **Note**: Use this only for debugging. This API is unstable. There are warnings literally
671	/// everywhere about this. Please don't use this function to retrieve actual information.
672	pub fn network_state(&mut self) -> NetworkState {
673		let swarm = &mut self.network_service;
674		let open = swarm.behaviour_mut().user_protocol().open_peers().cloned().collect::<Vec<_>>();
675		let connected_peers = {
676			let swarm = &mut *swarm;
677			open.iter()
678				.filter_map(move |peer_id| {
679					let known_addresses = if let Ok(addrs) =
680						NetworkBehaviour::handle_pending_outbound_connection(
681							swarm.behaviour_mut(),
682							ConnectionId::new_unchecked(0), // dummy value
683							Some(*peer_id),
684							&vec![],
685							Endpoint::Listener,
686						) {
687						addrs.into_iter().collect()
688					} else {
689						error!(target: LOG_TARGET, "Was not able to get known addresses for {:?}", peer_id);
690						return None;
691					};
692
693					let endpoint = if let Some(e) =
694						swarm.behaviour_mut().node(peer_id).and_then(|i| i.endpoint())
695					{
696						e.clone().into()
697					} else {
698						error!(target: LOG_TARGET, "Found state inconsistency between custom protocol \
699						and debug information about {:?}", peer_id);
700						return None;
701					};
702
703					Some((
704						peer_id.to_base58(),
705						NetworkStatePeer {
706							endpoint,
707							version_string: swarm
708								.behaviour_mut()
709								.node(peer_id)
710								.and_then(|i| i.client_version().map(|s| s.to_owned())),
711							latest_ping_time: swarm
712								.behaviour_mut()
713								.node(peer_id)
714								.and_then(|i| i.latest_ping()),
715							known_addresses,
716						},
717					))
718				})
719				.collect()
720		};
721
722		let not_connected_peers = {
723			let swarm = &mut *swarm;
724			swarm
725				.behaviour_mut()
726				.known_peers()
727				.into_iter()
728				.filter(|p| open.iter().all(|n| n != p))
729				.map(move |peer_id| {
730					let known_addresses = if let Ok(addrs) =
731						NetworkBehaviour::handle_pending_outbound_connection(
732							swarm.behaviour_mut(),
733							ConnectionId::new_unchecked(0), // dummy value
734							Some(peer_id),
735							&vec![],
736							Endpoint::Listener,
737						) {
738						addrs.into_iter().collect()
739					} else {
740						error!(target: LOG_TARGET, "Was not able to get known addresses for {:?}", peer_id);
741						Default::default()
742					};
743
744					(
745						peer_id.to_base58(),
746						NetworkStateNotConnectedPeer {
747							version_string: swarm
748								.behaviour_mut()
749								.node(&peer_id)
750								.and_then(|i| i.client_version().map(|s| s.to_owned())),
751							latest_ping_time: swarm
752								.behaviour_mut()
753								.node(&peer_id)
754								.and_then(|i| i.latest_ping()),
755							known_addresses,
756						},
757					)
758				})
759				.collect()
760		};
761
762		let peer_id = Swarm::<Behaviour<B>>::local_peer_id(swarm).to_base58();
763		let listened_addresses = swarm.listeners().cloned().collect();
764		let external_addresses = swarm.external_addresses().cloned().collect();
765
766		NetworkState {
767			peer_id,
768			listened_addresses,
769			external_addresses,
770			connected_peers,
771			not_connected_peers,
772			// TODO: Check what info we can include here.
773			//       Issue reference: https://github.com/paritytech/substrate/issues/14160.
774			peerset: serde_json::json!(
775				"Unimplemented. See https://github.com/paritytech/substrate/issues/14160."
776			),
777		}
778	}
779
780	/// Removes a `PeerId` from the list of reserved peers.
781	pub fn remove_reserved_peer(&self, peer: PeerId) {
782		self.service.remove_reserved_peer(peer.into());
783	}
784
785	/// Adds a `PeerId` and its `Multiaddr` as reserved.
786	pub fn add_reserved_peer(&self, peer: MultiaddrWithPeerId) -> Result<(), String> {
787		self.service.add_reserved_peer(peer)
788	}
789}
790
791impl<B: BlockT + 'static, H: ExHashT> NetworkService<B, H> {
792	/// Get network state.
793	///
794	/// **Note**: Use this only for debugging. This API is unstable. There are warnings literally
795	/// everywhere about this. Please don't use this function to retrieve actual information.
796	///
797	/// Returns an error if the `NetworkWorker` is no longer running.
798	pub async fn network_state(&self) -> Result<NetworkState, ()> {
799		let (tx, rx) = oneshot::channel();
800
801		let _ = self
802			.to_worker
803			.unbounded_send(ServiceToWorkerMsg::NetworkState { pending_response: tx });
804
805		match rx.await {
806			Ok(v) => v.map_err(|_| ()),
807			// The channel can only be closed if the network worker no longer exists.
808			Err(_) => Err(()),
809		}
810	}
811
812	/// Utility function to extract `PeerId` from each `Multiaddr` for peer set updates.
813	///
814	/// Returns an `Err` if one of the given addresses is invalid or contains an
815	/// invalid peer ID (which includes the local peer ID).
816	fn split_multiaddr_and_peer_id(
817		&self,
818		peers: HashSet<Multiaddr>,
819	) -> Result<Vec<(PeerId, Multiaddr)>, String> {
820		peers
821			.into_iter()
822			.map(|mut addr| {
823				let peer = match addr.pop() {
824					Some(multiaddr::Protocol::P2p(peer_id)) => peer_id,
825					_ => return Err("Missing PeerId from address".to_string()),
826				};
827
828				// Make sure the local peer ID is never added to the PSM
829				// or added as a "known address", even if given.
830				if peer == self.local_peer_id {
831					Err("Local peer ID in peer set.".to_string())
832				} else {
833					Ok((peer, addr))
834				}
835			})
836			.collect::<Result<Vec<(PeerId, Multiaddr)>, String>>()
837	}
838}
839
840impl<B, H> NetworkStateInfo for NetworkService<B, H>
841where
842	B: sp_runtime::traits::Block,
843	H: ExHashT,
844{
845	/// Returns the local external addresses.
846	fn external_addresses(&self) -> Vec<sc_network_types::multiaddr::Multiaddr> {
847		self.external_addresses.lock().iter().cloned().map(Into::into).collect()
848	}
849
850	/// Returns the listener addresses (without trailing `/p2p/` with our `PeerId`).
851	fn listen_addresses(&self) -> Vec<sc_network_types::multiaddr::Multiaddr> {
852		self.listen_addresses.lock().iter().cloned().map(Into::into).collect()
853	}
854
855	/// Returns the local Peer ID.
856	fn local_peer_id(&self) -> sc_network_types::PeerId {
857		self.local_peer_id.into()
858	}
859}
860
861impl<B, H> NetworkSigner for NetworkService<B, H>
862where
863	B: sp_runtime::traits::Block,
864	H: ExHashT,
865{
866	fn sign_with_local_identity(&self, msg: Vec<u8>) -> Result<Signature, SigningError> {
867		let public_key = self.local_identity.public();
868		let bytes = self.local_identity.sign(msg.as_ref())?;
869
870		Ok(Signature {
871			public_key: crate::service::signature::PublicKey::Libp2p(public_key),
872			bytes,
873		})
874	}
875
876	fn verify(
877		&self,
878		peer_id: sc_network_types::PeerId,
879		public_key: &Vec<u8>,
880		signature: &Vec<u8>,
881		message: &Vec<u8>,
882	) -> Result<bool, String> {
883		let public_key =
884			PublicKey::try_decode_protobuf(&public_key).map_err(|error| error.to_string())?;
885		let peer_id: PeerId = peer_id.into();
886		let remote: libp2p::PeerId = public_key.to_peer_id();
887
888		Ok(peer_id == remote && public_key.verify(message, signature))
889	}
890}
891
892impl<B, H> NetworkDHTProvider for NetworkService<B, H>
893where
894	B: BlockT + 'static,
895	H: ExHashT,
896{
897	/// Start finding closest peerst to the target peer ID in the DHT.
898	///
899	/// This will generate either a `ClosestPeersFound` or a `ClosestPeersNotFound` event and pass
900	/// it as an item on the [`NetworkWorker`] stream.
901	fn find_closest_peers(&self, target: sc_network_types::PeerId) {
902		let _ = self
903			.to_worker
904			.unbounded_send(ServiceToWorkerMsg::FindClosestPeers(target.into()));
905	}
906
907	/// Start getting a value from the DHT.
908	///
909	/// This will generate either a `ValueFound` or a `ValueNotFound` event and pass it as an
910	/// item on the [`NetworkWorker`] stream.
911	fn get_value(&self, key: &KademliaKey) {
912		let _ = self.to_worker.unbounded_send(ServiceToWorkerMsg::GetValue(key.clone()));
913	}
914
915	/// Start putting a value in the DHT.
916	///
917	/// This will generate either a `ValuePut` or a `ValuePutFailed` event and pass it as an
918	/// item on the [`NetworkWorker`] stream.
919	fn put_value(&self, key: KademliaKey, value: Vec<u8>) {
920		let _ = self.to_worker.unbounded_send(ServiceToWorkerMsg::PutValue(key, value));
921	}
922
923	fn put_record_to(
924		&self,
925		record: Record,
926		peers: HashSet<sc_network_types::PeerId>,
927		update_local_storage: bool,
928	) {
929		let _ = self.to_worker.unbounded_send(ServiceToWorkerMsg::PutRecordTo {
930			record,
931			peers,
932			update_local_storage,
933		});
934	}
935
936	fn store_record(
937		&self,
938		key: KademliaKey,
939		value: Vec<u8>,
940		publisher: Option<sc_network_types::PeerId>,
941		expires: Option<Instant>,
942	) {
943		let _ = self.to_worker.unbounded_send(ServiceToWorkerMsg::StoreRecord(
944			key,
945			value,
946			publisher.map(Into::into),
947			expires,
948		));
949	}
950
951	fn start_providing(&self, key: KademliaKey) {
952		let _ = self.to_worker.unbounded_send(ServiceToWorkerMsg::StartProviding(key));
953	}
954
955	fn stop_providing(&self, key: KademliaKey) {
956		let _ = self.to_worker.unbounded_send(ServiceToWorkerMsg::StopProviding(key));
957	}
958
959	fn get_providers(&self, key: KademliaKey) {
960		let _ = self.to_worker.unbounded_send(ServiceToWorkerMsg::GetProviders(key));
961	}
962}
963
964#[async_trait::async_trait]
965impl<B, H> NetworkStatusProvider for NetworkService<B, H>
966where
967	B: BlockT + 'static,
968	H: ExHashT,
969{
970	async fn status(&self) -> Result<NetworkStatus, ()> {
971		let (tx, rx) = oneshot::channel();
972
973		let _ = self
974			.to_worker
975			.unbounded_send(ServiceToWorkerMsg::NetworkStatus { pending_response: tx });
976
977		match rx.await {
978			Ok(v) => v.map_err(|_| ()),
979			// The channel can only be closed if the network worker no longer exists.
980			Err(_) => Err(()),
981		}
982	}
983
984	async fn network_state(&self) -> Result<NetworkState, ()> {
985		let (tx, rx) = oneshot::channel();
986
987		let _ = self
988			.to_worker
989			.unbounded_send(ServiceToWorkerMsg::NetworkState { pending_response: tx });
990
991		match rx.await {
992			Ok(v) => v.map_err(|_| ()),
993			// The channel can only be closed if the network worker no longer exists.
994			Err(_) => Err(()),
995		}
996	}
997}
998
999#[async_trait::async_trait]
1000impl<B, H> NetworkPeers for NetworkService<B, H>
1001where
1002	B: BlockT + 'static,
1003	H: ExHashT,
1004{
1005	fn set_authorized_peers(&self, peers: HashSet<sc_network_types::PeerId>) {
1006		self.sync_protocol_handle
1007			.set_reserved_peers(peers.iter().map(|peer| (*peer).into()).collect());
1008	}
1009
1010	fn set_authorized_only(&self, reserved_only: bool) {
1011		self.sync_protocol_handle.set_reserved_only(reserved_only);
1012	}
1013
1014	fn add_known_address(
1015		&self,
1016		peer_id: sc_network_types::PeerId,
1017		addr: sc_network_types::multiaddr::Multiaddr,
1018	) {
1019		let _ = self
1020			.to_worker
1021			.unbounded_send(ServiceToWorkerMsg::AddKnownAddress(peer_id.into(), addr.into()));
1022	}
1023
1024	fn report_peer(&self, peer_id: sc_network_types::PeerId, cost_benefit: ReputationChange) {
1025		self.peer_store_handle.report_peer(peer_id, cost_benefit);
1026	}
1027
1028	fn peer_reputation(&self, peer_id: &sc_network_types::PeerId) -> i32 {
1029		self.peer_store_handle.peer_reputation(peer_id)
1030	}
1031
1032	fn disconnect_peer(&self, peer_id: sc_network_types::PeerId, protocol: ProtocolName) {
1033		let _ = self
1034			.to_worker
1035			.unbounded_send(ServiceToWorkerMsg::DisconnectPeer(peer_id.into(), protocol));
1036	}
1037
1038	fn accept_unreserved_peers(&self) {
1039		self.sync_protocol_handle.set_reserved_only(false);
1040	}
1041
1042	fn deny_unreserved_peers(&self) {
1043		self.sync_protocol_handle.set_reserved_only(true);
1044	}
1045
1046	fn add_reserved_peer(&self, peer: MultiaddrWithPeerId) -> Result<(), String> {
1047		// Make sure the local peer ID is never added as a reserved peer.
1048		if peer.peer_id == self.local_peer_id.into() {
1049			return Err("Local peer ID cannot be added as a reserved peer.".to_string());
1050		}
1051
1052		let _ = self.to_worker.unbounded_send(ServiceToWorkerMsg::AddKnownAddress(
1053			peer.peer_id.into(),
1054			peer.multiaddr.into(),
1055		));
1056		self.sync_protocol_handle.add_reserved_peer(peer.peer_id.into());
1057
1058		Ok(())
1059	}
1060
1061	fn remove_reserved_peer(&self, peer_id: sc_network_types::PeerId) {
1062		self.sync_protocol_handle.remove_reserved_peer(peer_id.into());
1063	}
1064
1065	fn set_reserved_peers(
1066		&self,
1067		protocol: ProtocolName,
1068		peers: HashSet<sc_network_types::multiaddr::Multiaddr>,
1069	) -> Result<(), String> {
1070		let Some(set_id) = self.notification_protocol_ids.get(&protocol) else {
1071			return Err(format!("Cannot set reserved peers for unknown protocol: {}", protocol));
1072		};
1073
1074		let peers: HashSet<Multiaddr> = peers.into_iter().map(Into::into).collect();
1075		let peers_addrs = self.split_multiaddr_and_peer_id(peers)?;
1076
1077		let mut peers: HashSet<PeerId> = HashSet::with_capacity(peers_addrs.len());
1078
1079		for (peer_id, addr) in peers_addrs.into_iter() {
1080			// Make sure the local peer ID is never added to the PSM.
1081			if peer_id == self.local_peer_id {
1082				return Err("Local peer ID cannot be added as a reserved peer.".to_string());
1083			}
1084
1085			peers.insert(peer_id.into());
1086
1087			if !addr.is_empty() {
1088				let _ = self
1089					.to_worker
1090					.unbounded_send(ServiceToWorkerMsg::AddKnownAddress(peer_id, addr));
1091			}
1092		}
1093
1094		self.protocol_handles[usize::from(*set_id)].set_reserved_peers(peers);
1095
1096		Ok(())
1097	}
1098
1099	fn add_peers_to_reserved_set(
1100		&self,
1101		protocol: ProtocolName,
1102		peers: HashSet<sc_network_types::multiaddr::Multiaddr>,
1103	) -> Result<(), String> {
1104		let Some(set_id) = self.notification_protocol_ids.get(&protocol) else {
1105			return Err(format!(
1106				"Cannot add peers to reserved set of unknown protocol: {}",
1107				protocol
1108			));
1109		};
1110
1111		let peers: HashSet<Multiaddr> = peers.into_iter().map(Into::into).collect();
1112		let peers = self.split_multiaddr_and_peer_id(peers)?;
1113
1114		for (peer_id, addr) in peers.into_iter() {
1115			// Make sure the local peer ID is never added to the PSM.
1116			if peer_id == self.local_peer_id {
1117				return Err("Local peer ID cannot be added as a reserved peer.".to_string());
1118			}
1119
1120			if !addr.is_empty() {
1121				let _ = self
1122					.to_worker
1123					.unbounded_send(ServiceToWorkerMsg::AddKnownAddress(peer_id, addr));
1124			}
1125
1126			self.protocol_handles[usize::from(*set_id)].add_reserved_peer(peer_id);
1127		}
1128
1129		Ok(())
1130	}
1131
1132	fn remove_peers_from_reserved_set(
1133		&self,
1134		protocol: ProtocolName,
1135		peers: Vec<sc_network_types::PeerId>,
1136	) -> Result<(), String> {
1137		let Some(set_id) = self.notification_protocol_ids.get(&protocol) else {
1138			return Err(format!(
1139				"Cannot remove peers from reserved set of unknown protocol: {}",
1140				protocol
1141			));
1142		};
1143
1144		for peer_id in peers.into_iter() {
1145			self.protocol_handles[usize::from(*set_id)].remove_reserved_peer(peer_id.into());
1146		}
1147
1148		Ok(())
1149	}
1150
1151	fn sync_num_connected(&self) -> usize {
1152		self.num_connected.load(Ordering::Relaxed)
1153	}
1154
1155	fn peer_role(
1156		&self,
1157		peer_id: sc_network_types::PeerId,
1158		handshake: Vec<u8>,
1159	) -> Option<ObservedRole> {
1160		match Roles::decode_all(&mut &handshake[..]) {
1161			Ok(role) => Some(role.into()),
1162			Err(_) => {
1163				log::debug!(target: LOG_TARGET, "handshake doesn't contain peer role: {handshake:?}");
1164				self.peer_store_handle.peer_role(&(peer_id.into()))
1165			},
1166		}
1167	}
1168
1169	/// Get the list of reserved peers.
1170	///
1171	/// Returns an error if the `NetworkWorker` is no longer running.
1172	async fn reserved_peers(&self) -> Result<Vec<sc_network_types::PeerId>, ()> {
1173		let (tx, rx) = oneshot::channel();
1174
1175		self.sync_protocol_handle.reserved_peers(tx);
1176
1177		// The channel can only be closed if `ProtocolController` no longer exists.
1178		rx.await
1179			.map(|peers| peers.into_iter().map(From::from).collect())
1180			.map_err(|_| ())
1181	}
1182}
1183
1184impl<B, H> NetworkEventStream for NetworkService<B, H>
1185where
1186	B: BlockT + 'static,
1187	H: ExHashT,
1188{
1189	fn event_stream(&self, name: &'static str) -> Pin<Box<dyn Stream<Item = Event> + Send>> {
1190		let (tx, rx) = out_events::channel(name, 100_000);
1191		let _ = self.to_worker.unbounded_send(ServiceToWorkerMsg::EventStream(tx));
1192		Box::pin(rx)
1193	}
1194}
1195
1196#[async_trait::async_trait]
1197impl<B, H> NetworkRequest for NetworkService<B, H>
1198where
1199	B: BlockT + 'static,
1200	H: ExHashT,
1201{
1202	async fn request(
1203		&self,
1204		target: sc_network_types::PeerId,
1205		protocol: ProtocolName,
1206		request: Vec<u8>,
1207		fallback_request: Option<(Vec<u8>, ProtocolName)>,
1208		connect: IfDisconnected,
1209	) -> Result<(Vec<u8>, ProtocolName), RequestFailure> {
1210		let (tx, rx) = oneshot::channel();
1211
1212		self.start_request(target.into(), protocol, request, fallback_request, tx, connect);
1213
1214		match rx.await {
1215			Ok(v) => v,
1216			// The channel can only be closed if the network worker no longer exists. If the
1217			// network worker no longer exists, then all connections to `target` are necessarily
1218			// closed, and we legitimately report this situation as a "ConnectionClosed".
1219			Err(_) => Err(RequestFailure::Network(OutboundFailure::ConnectionClosed)),
1220		}
1221	}
1222
1223	fn start_request(
1224		&self,
1225		target: sc_network_types::PeerId,
1226		protocol: ProtocolName,
1227		request: Vec<u8>,
1228		fallback_request: Option<(Vec<u8>, ProtocolName)>,
1229		tx: oneshot::Sender<Result<(Vec<u8>, ProtocolName), RequestFailure>>,
1230		connect: IfDisconnected,
1231	) {
1232		let _ = self.to_worker.unbounded_send(ServiceToWorkerMsg::Request {
1233			target: target.into(),
1234			protocol: protocol.into(),
1235			request,
1236			fallback_request,
1237			pending_response: tx,
1238			connect,
1239		});
1240	}
1241}
1242
1243/// A `NotificationSender` allows for sending notifications to a peer with a chosen protocol.
1244#[must_use]
1245pub struct NotificationSender {
1246	sink: NotificationsSink,
1247
1248	/// Name of the protocol on the wire.
1249	protocol_name: ProtocolName,
1250
1251	/// Field extracted from the [`Metrics`] struct and necessary to report the
1252	/// notifications-related metrics.
1253	notification_size_metric: Option<Histogram>,
1254}
1255
1256#[async_trait::async_trait]
1257impl NotificationSenderT for NotificationSender {
1258	async fn ready(
1259		&self,
1260	) -> Result<Box<dyn NotificationSenderReadyT + '_>, NotificationSenderError> {
1261		Ok(Box::new(NotificationSenderReady {
1262			ready: match self.sink.reserve_notification().await {
1263				Ok(r) => Some(r),
1264				Err(()) => return Err(NotificationSenderError::Closed),
1265			},
1266			peer_id: self.sink.peer_id(),
1267			protocol_name: &self.protocol_name,
1268			notification_size_metric: self.notification_size_metric.clone(),
1269		}))
1270	}
1271}
1272
1273/// Reserved slot in the notifications buffer, ready to accept data.
1274#[must_use]
1275pub struct NotificationSenderReady<'a> {
1276	ready: Option<Ready<'a>>,
1277
1278	/// Target of the notification.
1279	peer_id: &'a PeerId,
1280
1281	/// Name of the protocol on the wire.
1282	protocol_name: &'a ProtocolName,
1283
1284	/// Field extracted from the [`Metrics`] struct and necessary to report the
1285	/// notifications-related metrics.
1286	notification_size_metric: Option<Histogram>,
1287}
1288
1289impl<'a> NotificationSenderReadyT for NotificationSenderReady<'a> {
1290	fn send(&mut self, notification: Vec<u8>) -> Result<(), NotificationSenderError> {
1291		if let Some(notification_size_metric) = &self.notification_size_metric {
1292			notification_size_metric.observe(notification.len() as f64);
1293		}
1294
1295		trace!(
1296			target: LOG_TARGET,
1297			"External API => Notification({:?}, {}, {} bytes)",
1298			self.peer_id, self.protocol_name, notification.len(),
1299		);
1300		trace!(target: LOG_TARGET, "Handler({:?}) <= Async notification", self.peer_id);
1301
1302		self.ready
1303			.take()
1304			.ok_or(NotificationSenderError::Closed)?
1305			.send(notification)
1306			.map_err(|()| NotificationSenderError::Closed)
1307	}
1308}
1309
1310/// Messages sent from the `NetworkService` to the `NetworkWorker`.
1311///
1312/// Each entry corresponds to a method of `NetworkService`.
1313enum ServiceToWorkerMsg {
1314	FindClosestPeers(PeerId),
1315	GetValue(KademliaKey),
1316	PutValue(KademliaKey, Vec<u8>),
1317	PutRecordTo {
1318		record: Record,
1319		peers: HashSet<sc_network_types::PeerId>,
1320		update_local_storage: bool,
1321	},
1322	StoreRecord(KademliaKey, Vec<u8>, Option<PeerId>, Option<Instant>),
1323	StartProviding(KademliaKey),
1324	StopProviding(KademliaKey),
1325	GetProviders(KademliaKey),
1326	AddKnownAddress(PeerId, Multiaddr),
1327	EventStream(out_events::Sender),
1328	Request {
1329		target: PeerId,
1330		protocol: ProtocolName,
1331		request: Vec<u8>,
1332		fallback_request: Option<(Vec<u8>, ProtocolName)>,
1333		pending_response: oneshot::Sender<Result<(Vec<u8>, ProtocolName), RequestFailure>>,
1334		connect: IfDisconnected,
1335	},
1336	NetworkStatus {
1337		pending_response: oneshot::Sender<Result<NetworkStatus, RequestFailure>>,
1338	},
1339	NetworkState {
1340		pending_response: oneshot::Sender<Result<NetworkState, RequestFailure>>,
1341	},
1342	DisconnectPeer(PeerId, ProtocolName),
1343}
1344
1345/// Main network worker. Must be polled in order for the network to advance.
1346///
1347/// You are encouraged to poll this in a separate background thread or task.
1348#[must_use = "The NetworkWorker must be polled in order for the network to advance"]
1349pub struct NetworkWorker<B, H>
1350where
1351	B: BlockT + 'static,
1352	H: ExHashT,
1353{
1354	/// Updated by the `NetworkWorker` and loaded by the `NetworkService`.
1355	listen_addresses: Arc<Mutex<HashSet<Multiaddr>>>,
1356	/// Updated by the `NetworkWorker` and loaded by the `NetworkService`.
1357	num_connected: Arc<AtomicUsize>,
1358	/// The network service that can be extracted and shared through the codebase.
1359	service: Arc<NetworkService<B, H>>,
1360	/// The *actual* network.
1361	network_service: Swarm<Behaviour<B>>,
1362	/// Messages from the [`NetworkService`] that must be processed.
1363	from_service: TracingUnboundedReceiver<ServiceToWorkerMsg>,
1364	/// Senders for events that happen on the network.
1365	event_streams: out_events::OutChannels,
1366	/// Prometheus network metrics.
1367	metrics: Option<Metrics>,
1368	/// The `PeerId`'s of all boot nodes mapped to the registered addresses.
1369	boot_node_ids: Arc<HashMap<PeerId, Vec<Multiaddr>>>,
1370	/// Boot nodes that we already have reported as invalid.
1371	reported_invalid_boot_nodes: HashSet<PeerId>,
1372	/// Peer reputation store handle.
1373	peer_store_handle: Arc<dyn PeerStoreProvider>,
1374	/// Notification protocol handles.
1375	notif_protocol_handles: Vec<protocol::ProtocolHandle>,
1376	/// Marker to pin the `H` generic. Serves no purpose except to not break backwards
1377	/// compatibility.
1378	_marker: PhantomData<H>,
1379	/// Marker for block type
1380	_block: PhantomData<B>,
1381}
1382
1383impl<B, H> NetworkWorker<B, H>
1384where
1385	B: BlockT + 'static,
1386	H: ExHashT,
1387{
1388	/// Run the network.
1389	pub async fn run(mut self) {
1390		while self.next_action().await {}
1391	}
1392
1393	/// Perform one action on the network.
1394	///
1395	/// Returns `false` when the worker should be shutdown.
1396	/// Use in tests only.
1397	pub async fn next_action(&mut self) -> bool {
1398		futures::select! {
1399			// Next message from the service.
1400			msg = self.from_service.next() => {
1401				if let Some(msg) = msg {
1402					self.handle_worker_message(msg);
1403				} else {
1404					return false
1405				}
1406			},
1407			// Next event from `Swarm` (the stream guaranteed to never terminate).
1408			event = self.network_service.select_next_some() => {
1409				self.handle_swarm_event(event);
1410			},
1411		};
1412
1413		// Update the `num_connected` count shared with the `NetworkService`.
1414		let num_connected_peers = self.network_service.behaviour().user_protocol().num_sync_peers();
1415		self.num_connected.store(num_connected_peers, Ordering::Relaxed);
1416
1417		if let Some(metrics) = self.metrics.as_ref() {
1418			if let Some(buckets) = self.network_service.behaviour_mut().num_entries_per_kbucket() {
1419				for (lower_ilog2_bucket_bound, num_entries) in buckets {
1420					metrics
1421						.kbuckets_num_nodes
1422						.with_label_values(&[&lower_ilog2_bucket_bound.to_string()])
1423						.set(num_entries as u64);
1424				}
1425			}
1426			if let Some(num_entries) = self.network_service.behaviour_mut().num_kademlia_records() {
1427				metrics.kademlia_records_count.set(num_entries as u64);
1428			}
1429			if let Some(num_entries) =
1430				self.network_service.behaviour_mut().kademlia_records_total_size()
1431			{
1432				metrics.kademlia_records_sizes_total.set(num_entries as u64);
1433			}
1434
1435			metrics.pending_connections.set(
1436				Swarm::network_info(&self.network_service).connection_counters().num_pending()
1437					as u64,
1438			);
1439		}
1440
1441		true
1442	}
1443
1444	/// Process the next message coming from the `NetworkService`.
1445	fn handle_worker_message(&mut self, msg: ServiceToWorkerMsg) {
1446		match msg {
1447			ServiceToWorkerMsg::FindClosestPeers(target) => {
1448				self.network_service.behaviour_mut().find_closest_peers(target)
1449			},
1450			ServiceToWorkerMsg::GetValue(key) => {
1451				self.network_service.behaviour_mut().get_value(key.into())
1452			},
1453			ServiceToWorkerMsg::PutValue(key, value) => {
1454				self.network_service.behaviour_mut().put_value(key.into(), value)
1455			},
1456			ServiceToWorkerMsg::PutRecordTo { record, peers, update_local_storage } => self
1457				.network_service
1458				.behaviour_mut()
1459				.put_record_to(record.into(), peers, update_local_storage),
1460			ServiceToWorkerMsg::StoreRecord(key, value, publisher, expires) => self
1461				.network_service
1462				.behaviour_mut()
1463				.store_record(key.into(), value, publisher, expires),
1464			ServiceToWorkerMsg::StartProviding(key) => {
1465				self.network_service.behaviour_mut().start_providing(key.into())
1466			},
1467			ServiceToWorkerMsg::StopProviding(key) => {
1468				self.network_service.behaviour_mut().stop_providing(&key.into())
1469			},
1470			ServiceToWorkerMsg::GetProviders(key) => {
1471				self.network_service.behaviour_mut().get_providers(key.into())
1472			},
1473			ServiceToWorkerMsg::AddKnownAddress(peer_id, addr) => {
1474				self.network_service.behaviour_mut().add_known_address(peer_id, addr)
1475			},
1476			ServiceToWorkerMsg::EventStream(sender) => self.event_streams.push(sender),
1477			ServiceToWorkerMsg::Request {
1478				target,
1479				protocol,
1480				request,
1481				fallback_request,
1482				pending_response,
1483				connect,
1484			} => {
1485				self.network_service.behaviour_mut().send_request(
1486					&target,
1487					protocol,
1488					request,
1489					fallback_request,
1490					pending_response,
1491					connect,
1492				);
1493			},
1494			ServiceToWorkerMsg::NetworkStatus { pending_response } => {
1495				let _ = pending_response.send(Ok(self.status()));
1496			},
1497			ServiceToWorkerMsg::NetworkState { pending_response } => {
1498				let _ = pending_response.send(Ok(self.network_state()));
1499			},
1500			ServiceToWorkerMsg::DisconnectPeer(who, protocol_name) => self
1501				.network_service
1502				.behaviour_mut()
1503				.user_protocol_mut()
1504				.disconnect_peer(&who, protocol_name),
1505		}
1506	}
1507
1508	/// Process the next event coming from `Swarm`.
1509	fn handle_swarm_event(&mut self, event: SwarmEvent<BehaviourOut>) {
1510		match event {
1511			SwarmEvent::Behaviour(BehaviourOut::InboundRequest { protocol, result, .. }) => {
1512				if let Some(metrics) = self.metrics.as_ref() {
1513					match result {
1514						Ok(serve_time) => {
1515							metrics
1516								.requests_in_success_total
1517								.with_label_values(&[&protocol])
1518								.observe(serve_time.as_secs_f64());
1519						},
1520						Err(err) => {
1521							let reason = match err {
1522								ResponseFailure::Network(InboundFailure::Timeout) => {
1523									Some("timeout")
1524								},
1525								ResponseFailure::Network(InboundFailure::UnsupportedProtocols) =>
1526								// `UnsupportedProtocols` is reported for every single
1527								// inbound request whenever a request with an unsupported
1528								// protocol is received. This is not reported in order to
1529								// avoid confusions.
1530								{
1531									None
1532								},
1533								ResponseFailure::Network(InboundFailure::ResponseOmission) => {
1534									Some("busy-omitted")
1535								},
1536								ResponseFailure::Network(InboundFailure::ConnectionClosed) => {
1537									Some("connection-closed")
1538								},
1539								ResponseFailure::Network(InboundFailure::Io(_)) => Some("io"),
1540							};
1541
1542							if let Some(reason) = reason {
1543								metrics
1544									.requests_in_failure_total
1545									.with_label_values(&[&protocol, reason])
1546									.inc();
1547							}
1548						},
1549					}
1550				}
1551			},
1552			SwarmEvent::Behaviour(BehaviourOut::RequestFinished {
1553				protocol,
1554				duration,
1555				result,
1556				..
1557			}) => {
1558				if let Some(metrics) = self.metrics.as_ref() {
1559					match result {
1560						Ok(_) => {
1561							metrics
1562								.requests_out_success_total
1563								.with_label_values(&[&protocol])
1564								.observe(duration.as_secs_f64());
1565						},
1566						Err(err) => {
1567							let reason = match err {
1568								RequestFailure::NotConnected => "not-connected",
1569								RequestFailure::UnknownProtocol => "unknown-protocol",
1570								RequestFailure::InvalidRequest => "invalid-request",
1571								RequestFailure::Refused => "refused",
1572								RequestFailure::Obsolete => "obsolete",
1573								RequestFailure::Network(OutboundFailure::DialFailure) => {
1574									"dial-failure"
1575								},
1576								RequestFailure::Network(OutboundFailure::Timeout) => "timeout",
1577								RequestFailure::Network(OutboundFailure::ConnectionClosed) => {
1578									"connection-closed"
1579								},
1580								RequestFailure::Network(OutboundFailure::UnsupportedProtocols) => {
1581									"unsupported"
1582								},
1583								RequestFailure::Network(OutboundFailure::Io(_)) => "io",
1584							};
1585
1586							metrics
1587								.requests_out_failure_total
1588								.with_label_values(&[&protocol, reason])
1589								.inc();
1590						},
1591					}
1592				}
1593			},
1594			SwarmEvent::Behaviour(BehaviourOut::ReputationChanges { peer, changes }) => {
1595				for change in changes {
1596					self.peer_store_handle.report_peer(peer.into(), change);
1597				}
1598			},
1599			SwarmEvent::Behaviour(BehaviourOut::PeerIdentify {
1600				peer_id,
1601				info:
1602					IdentifyInfo {
1603						protocol_version, agent_version, mut listen_addrs, protocols, ..
1604					},
1605			}) => {
1606				if listen_addrs.len() > 30 {
1607					debug!(
1608						target: LOG_TARGET,
1609						"Node {:?} has reported more than 30 addresses; it is identified by {:?} and {:?}",
1610						peer_id, protocol_version, agent_version
1611					);
1612					listen_addrs.truncate(30);
1613				}
1614				for addr in listen_addrs {
1615					self.network_service.behaviour_mut().add_self_reported_address_to_dht(
1616						&peer_id,
1617						&protocols,
1618						addr.clone(),
1619					);
1620				}
1621				self.peer_store_handle.add_known_peer(peer_id.into());
1622			},
1623			SwarmEvent::Behaviour(BehaviourOut::Discovered(peer_id)) => {
1624				self.peer_store_handle.add_known_peer(peer_id.into());
1625			},
1626			SwarmEvent::Behaviour(BehaviourOut::RandomKademliaStarted) => {
1627				if let Some(metrics) = self.metrics.as_ref() {
1628					metrics.kademlia_random_queries_total.inc();
1629				}
1630			},
1631			SwarmEvent::Behaviour(BehaviourOut::NotificationStreamOpened {
1632				remote,
1633				set_id,
1634				direction,
1635				negotiated_fallback,
1636				notifications_sink,
1637				received_handshake,
1638			}) => {
1639				let _ = self.notif_protocol_handles[usize::from(set_id)].report_substream_opened(
1640					remote,
1641					direction,
1642					received_handshake,
1643					negotiated_fallback,
1644					notifications_sink,
1645				);
1646			},
1647			SwarmEvent::Behaviour(BehaviourOut::NotificationStreamReplaced {
1648				remote,
1649				set_id,
1650				notifications_sink,
1651			}) => {
1652				let _ = self.notif_protocol_handles[usize::from(set_id)]
1653					.report_notification_sink_replaced(remote, notifications_sink);
1654
1655				// TODO: Notifications might have been lost as a result of the previous
1656				// connection being dropped, and as a result it would be preferable to notify
1657				// the users of this fact by simulating the substream being closed then
1658				// reopened.
1659				// The code below doesn't compile because `role` is unknown. Propagating the
1660				// handshake of the secondary connections is quite an invasive change and
1661				// would conflict with https://github.com/paritytech/substrate/issues/6403.
1662				// Considering that dropping notifications is generally regarded as
1663				// acceptable, this bug is at the moment intentionally left there and is
1664				// intended to be fixed at the same time as
1665				// https://github.com/paritytech/substrate/issues/6403.
1666				// self.event_streams.send(Event::NotificationStreamClosed {
1667				// remote,
1668				// protocol,
1669				// });
1670				// self.event_streams.send(Event::NotificationStreamOpened {
1671				// remote,
1672				// protocol,
1673				// role,
1674				// });
1675			},
1676			SwarmEvent::Behaviour(BehaviourOut::NotificationStreamClosed { remote, set_id }) => {
1677				let _ = self.notif_protocol_handles[usize::from(set_id)]
1678					.report_substream_closed(remote);
1679			},
1680			SwarmEvent::Behaviour(BehaviourOut::NotificationsReceived {
1681				remote,
1682				set_id,
1683				notification,
1684			}) => {
1685				let _ = self.notif_protocol_handles[usize::from(set_id)]
1686					.report_notification_received(remote, notification);
1687			},
1688			SwarmEvent::Behaviour(BehaviourOut::Dht(event, duration)) => {
1689				match (self.metrics.as_ref(), duration) {
1690					(Some(metrics), Some(duration)) => {
1691						let query_type = match event {
1692							DhtEvent::ClosestPeersFound(_, _) => "peers-found",
1693							DhtEvent::ClosestPeersNotFound(_) => "peers-not-found",
1694							DhtEvent::ValueFound(_) => "value-found",
1695							DhtEvent::ValueNotFound(_) => "value-not-found",
1696							DhtEvent::ValuePut(_) => "value-put",
1697							DhtEvent::ValuePutFailed(_) => "value-put-failed",
1698							DhtEvent::PutRecordRequest(_, _, _, _) => "put-record-request",
1699							DhtEvent::StartedProviding(_) => "started-providing",
1700							DhtEvent::StartProvidingFailed(_) => "start-providing-failed",
1701							DhtEvent::ProvidersFound(_, _) => "providers-found",
1702							DhtEvent::NoMoreProviders(_) => "no-more-providers",
1703							DhtEvent::ProvidersNotFound(_) => "providers-not-found",
1704						};
1705						metrics
1706							.kademlia_query_duration
1707							.with_label_values(&[query_type])
1708							.observe(duration.as_secs_f64());
1709					},
1710					_ => {},
1711				}
1712
1713				self.event_streams.send(Event::Dht(event));
1714			},
1715			SwarmEvent::Behaviour(BehaviourOut::None) => {
1716				// Ignored event from lower layers.
1717			},
1718			SwarmEvent::ConnectionEstablished {
1719				peer_id,
1720				endpoint,
1721				num_established,
1722				concurrent_dial_errors,
1723				..
1724			} => {
1725				if let Some(errors) = concurrent_dial_errors {
1726					debug!(target: LOG_TARGET, "Libp2p => Connected({:?}) with errors: {:?}", peer_id, errors);
1727				} else {
1728					debug!(target: LOG_TARGET, "Libp2p => Connected({:?})", peer_id);
1729				}
1730
1731				if let Some(metrics) = self.metrics.as_ref() {
1732					let direction = match endpoint {
1733						ConnectedPoint::Dialer { .. } => "out",
1734						ConnectedPoint::Listener { .. } => "in",
1735					};
1736					metrics.connections_opened_total.with_label_values(&[direction]).inc();
1737
1738					if num_established.get() == 1 {
1739						metrics.distinct_peers_connections_opened_total.inc();
1740					}
1741				}
1742			},
1743			SwarmEvent::ConnectionClosed {
1744				connection_id,
1745				peer_id,
1746				cause,
1747				endpoint,
1748				num_established,
1749			} => {
1750				debug!(target: LOG_TARGET, "Libp2p => Disconnected({peer_id:?} via {connection_id:?}, {cause:?})");
1751				if let Some(metrics) = self.metrics.as_ref() {
1752					let direction = match endpoint {
1753						ConnectedPoint::Dialer { .. } => "out",
1754						ConnectedPoint::Listener { .. } => "in",
1755					};
1756					let reason = match cause {
1757						Some(ConnectionError::IO(_)) => "transport-error",
1758						Some(ConnectionError::KeepAliveTimeout) => "keep-alive-timeout",
1759						None => "actively-closed",
1760					};
1761					metrics.connections_closed_total.with_label_values(&[direction, reason]).inc();
1762
1763					// `num_established` represents the number of *remaining* connections.
1764					if num_established == 0 {
1765						metrics.distinct_peers_connections_closed_total.inc();
1766					}
1767				}
1768			},
1769			SwarmEvent::NewListenAddr { address, .. } => {
1770				trace!(target: LOG_TARGET, "Libp2p => NewListenAddr({})", address);
1771				if let Some(metrics) = self.metrics.as_ref() {
1772					metrics.listeners_local_addresses.inc();
1773				}
1774				self.listen_addresses.lock().insert(address.clone());
1775			},
1776			SwarmEvent::ExpiredListenAddr { address, .. } => {
1777				info!(target: LOG_TARGET, "📪 No longer listening on {}", address);
1778				if let Some(metrics) = self.metrics.as_ref() {
1779					metrics.listeners_local_addresses.dec();
1780				}
1781				self.listen_addresses.lock().remove(&address);
1782			},
1783			SwarmEvent::OutgoingConnectionError { connection_id, peer_id, error } => {
1784				if let Some(peer_id) = peer_id {
1785					trace!(
1786						target: LOG_TARGET,
1787						"Libp2p => Failed to reach {peer_id:?} via {connection_id:?}: {error}",
1788					);
1789
1790					let not_reported = !self.reported_invalid_boot_nodes.contains(&peer_id);
1791
1792					if let Some(addresses) =
1793						not_reported.then(|| self.boot_node_ids.get(&peer_id)).flatten()
1794					{
1795						if let DialError::WrongPeerId { obtained, endpoint } = &error {
1796							if let ConnectedPoint::Dialer {
1797								address,
1798								role_override: _,
1799								port_use: _,
1800							} = endpoint
1801							{
1802								let address_without_peer_id = parse_addr(address.clone().into())
1803									.map_or_else(|_| address.clone(), |r| r.1.into());
1804
1805								// Only report for address of boot node that was added at startup of
1806								// the node and not for any address that the node learned of the
1807								// boot node.
1808								if addresses.iter().any(|a| address_without_peer_id == *a) {
1809									warn!(
1810										"💔 The bootnode you want to connect to at `{address}` provided a \
1811										 different peer ID `{obtained}` than the one you expect `{peer_id}`.",
1812									);
1813
1814									self.reported_invalid_boot_nodes.insert(peer_id);
1815								}
1816							}
1817						}
1818					}
1819				}
1820
1821				if let Some(metrics) = self.metrics.as_ref() {
1822					let reason = match error {
1823						DialError::Denied { cause } => {
1824							if cause.downcast::<Exceeded>().is_ok() {
1825								Some("limit-reached")
1826							} else {
1827								None
1828							}
1829						},
1830						DialError::LocalPeerId { .. } => Some("local-peer-id"),
1831						DialError::WrongPeerId { .. } => Some("invalid-peer-id"),
1832						DialError::Transport(_) => Some("transport-error"),
1833						DialError::NoAddresses |
1834						DialError::DialPeerConditionFalse(_) |
1835						DialError::Aborted => None, // ignore them
1836					};
1837					if let Some(reason) = reason {
1838						metrics.pending_connections_errors_total.with_label_values(&[reason]).inc();
1839					}
1840				}
1841			},
1842			SwarmEvent::Dialing { connection_id, peer_id } => {
1843				trace!(target: LOG_TARGET, "Libp2p => Dialing({peer_id:?}) via {connection_id:?}")
1844			},
1845			SwarmEvent::IncomingConnection { connection_id, local_addr, send_back_addr } => {
1846				trace!(target: LOG_TARGET, "Libp2p => IncomingConnection({local_addr},{send_back_addr} via {connection_id:?}))");
1847				if let Some(metrics) = self.metrics.as_ref() {
1848					metrics.incoming_connections_total.inc();
1849				}
1850			},
1851			SwarmEvent::IncomingConnectionError {
1852				connection_id,
1853				local_addr,
1854				send_back_addr,
1855				error,
1856			} => {
1857				debug!(
1858					target: LOG_TARGET,
1859					"Libp2p => IncomingConnectionError({local_addr},{send_back_addr} via {connection_id:?}): {error}"
1860				);
1861				if let Some(metrics) = self.metrics.as_ref() {
1862					let reason = match error {
1863						ListenError::Denied { cause } => {
1864							if cause.downcast::<Exceeded>().is_ok() {
1865								Some("limit-reached")
1866							} else {
1867								None
1868							}
1869						},
1870						ListenError::WrongPeerId { .. } | ListenError::LocalPeerId { .. } => {
1871							Some("invalid-peer-id")
1872						},
1873						ListenError::Transport(_) => Some("transport-error"),
1874						ListenError::Aborted => None, // ignore it
1875					};
1876
1877					if let Some(reason) = reason {
1878						metrics
1879							.incoming_connections_errors_total
1880							.with_label_values(&[reason])
1881							.inc();
1882					}
1883				}
1884			},
1885			SwarmEvent::ListenerClosed { reason, addresses, .. } => {
1886				if let Some(metrics) = self.metrics.as_ref() {
1887					metrics.listeners_local_addresses.sub(addresses.len() as u64);
1888				}
1889				let mut listen_addresses = self.listen_addresses.lock();
1890				for addr in &addresses {
1891					listen_addresses.remove(addr);
1892				}
1893				drop(listen_addresses);
1894
1895				let addrs =
1896					addresses.into_iter().map(|a| a.to_string()).collect::<Vec<_>>().join(", ");
1897				match reason {
1898					Ok(()) => error!(
1899						target: LOG_TARGET,
1900						"📪 Libp2p listener ({}) closed gracefully",
1901						addrs
1902					),
1903					Err(e) => error!(
1904						target: LOG_TARGET,
1905						"📪 Libp2p listener ({}) closed: {}",
1906						addrs, e
1907					),
1908				}
1909			},
1910			SwarmEvent::ListenerError { error, .. } => {
1911				debug!(target: LOG_TARGET, "Libp2p => ListenerError: {}", error);
1912				if let Some(metrics) = self.metrics.as_ref() {
1913					metrics.listeners_errors_total.inc();
1914				}
1915			},
1916			SwarmEvent::NewExternalAddrCandidate { address } => {
1917				trace!(target: LOG_TARGET, "Libp2p => NewExternalAddrCandidate: {address:?}");
1918			},
1919			SwarmEvent::ExternalAddrConfirmed { address } => {
1920				trace!(target: LOG_TARGET, "Libp2p => ExternalAddrConfirmed: {address:?}");
1921			},
1922			SwarmEvent::ExternalAddrExpired { address } => {
1923				trace!(target: LOG_TARGET, "Libp2p => ExternalAddrExpired: {address:?}");
1924			},
1925			SwarmEvent::NewExternalAddrOfPeer { peer_id, address } => {
1926				trace!(target: LOG_TARGET, "Libp2p => NewExternalAddrOfPeer({peer_id:?}): {address:?}")
1927			},
1928			event => {
1929				warn!(target: LOG_TARGET, "New unknown SwarmEvent libp2p event: {event:?}");
1930			},
1931		}
1932	}
1933}
1934
1935impl<B, H> Unpin for NetworkWorker<B, H>
1936where
1937	B: BlockT + 'static,
1938	H: ExHashT,
1939{
1940}
1941
1942pub(crate) fn ensure_addresses_consistent_with_transport<'a>(
1943	addresses: impl Iterator<Item = &'a sc_network_types::multiaddr::Multiaddr>,
1944	transport: &TransportConfig,
1945) -> Result<(), Error> {
1946	use sc_network_types::multiaddr::Protocol;
1947
1948	if matches!(transport, TransportConfig::MemoryOnly) {
1949		let addresses: Vec<_> = addresses
1950			.filter(|x| x.iter().any(|y| !matches!(y, Protocol::Memory(_))))
1951			.cloned()
1952			.collect();
1953
1954		if !addresses.is_empty() {
1955			return Err(Error::AddressesForAnotherTransport {
1956				transport: transport.clone(),
1957				addresses,
1958			});
1959		}
1960	} else {
1961		let addresses: Vec<_> = addresses
1962			.filter(|x| x.iter().any(|y| matches!(y, Protocol::Memory(_))))
1963			.cloned()
1964			.collect();
1965
1966		if !addresses.is_empty() {
1967			return Err(Error::AddressesForAnotherTransport {
1968				transport: transport.clone(),
1969				addresses,
1970			});
1971		}
1972	}
1973
1974	Ok(())
1975}