referrerpolicy=no-referrer-when-downgrade

sc_network/
discovery.rs

1// This file is part of Substrate.
2
3// Copyright (C) Parity Technologies (UK) Ltd.
4// SPDX-License-Identifier: GPL-3.0-or-later WITH Classpath-exception-2.0
5
6// This program is free software: you can redistribute it and/or modify
7// it under the terms of the GNU General Public License as published by
8// the Free Software Foundation, either version 3 of the License, or
9// (at your option) any later version.
10
11// This program is distributed in the hope that it will be useful,
12// but WITHOUT ANY WARRANTY; without even the implied warranty of
13// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
14// GNU General Public License for more details.
15
16// You should have received a copy of the GNU General Public License
17// along with this program. If not, see <https://www.gnu.org/licenses/>.
18
19//! Discovery mechanisms of Substrate.
20//!
21//! The `DiscoveryBehaviour` struct implements the `NetworkBehaviour` trait of libp2p and is
22//! responsible for discovering other nodes that are part of the network.
23//!
24//! Substrate uses the following mechanisms in order to discover nodes that are part of the network:
25//!
26//! - Bootstrap nodes. These are hard-coded node identities and addresses passed in the constructor
27//! of the `DiscoveryBehaviour`. You can also call `add_known_address` later to add an entry.
28//!
29//! - mDNS. Discovers nodes on the local network by broadcasting UDP packets.
30//!
31//! - Kademlia random walk. Once connected, we perform random Kademlia `FIND_NODE` requests on the
32//! configured Kademlia DHTs in order for nodes to propagate to us their view of the network. This
33//! is performed automatically by the `DiscoveryBehaviour`.
34//!
35//! Additionally, the `DiscoveryBehaviour` is also capable of storing and loading value in the
36//! configured DHTs.
37//!
38//! ## Usage
39//!
40//! The `DiscoveryBehaviour` generates events of type `DiscoveryOut`, most notably
41//! `DiscoveryOut::Discovered` that is generated whenever we discover a node.
42//! Only the identity of the node is returned. The node's addresses are stored within the
43//! `DiscoveryBehaviour` and can be queried through the `NetworkBehaviour` trait.
44//!
45//! **Important**: In order for the discovery mechanism to work properly, there needs to be an
46//! active mechanism that asks nodes for the addresses they are listening on. Whenever we learn
47//! of a node's address, you must call `add_self_reported_address`.
48
49use crate::{
50	config::{
51		ProtocolId, KADEMLIA_MAX_PROVIDER_KEYS, KADEMLIA_PROVIDER_RECORD_TTL,
52		KADEMLIA_PROVIDER_REPUBLISH_INTERVAL,
53	},
54	utils::LruHashSet,
55};
56
57use array_bytes::bytes2hex;
58use futures::prelude::*;
59use futures_timer::Delay;
60use ip_network::IpNetwork;
61use libp2p::{
62	core::{transport::PortUse, Endpoint, Multiaddr},
63	kad::{
64		self,
65		store::{MemoryStore, MemoryStoreConfig, RecordStore},
66		Behaviour as Kademlia, BucketInserts, Config as KademliaConfig, Event as KademliaEvent,
67		Event, GetClosestPeersError, GetClosestPeersOk, GetProvidersError, GetProvidersOk,
68		GetRecordOk, PeerRecord, QueryId, QueryResult, Quorum, Record, RecordKey,
69	},
70	mdns::{self, tokio::Behaviour as TokioMdns},
71	multiaddr::Protocol,
72	swarm::{
73		behaviour::{
74			toggle::{Toggle, ToggleConnectionHandler},
75			DialFailure, ExternalAddrConfirmed, FromSwarm,
76		},
77		ConnectionDenied, ConnectionId, DialError, NetworkBehaviour, StreamProtocol, THandler,
78		THandlerInEvent, THandlerOutEvent, ToSwarm,
79	},
80	PeerId,
81};
82use linked_hash_set::LinkedHashSet;
83use log::{debug, error, info, trace, warn};
84use sp_core::hexdisplay::HexDisplay;
85use std::{
86	cmp,
87	collections::{hash_map::Entry, HashMap, HashSet, VecDeque},
88	num::NonZeroUsize,
89	task::{Context, Poll},
90	time::{Duration, Instant},
91};
92
93/// Logging target for the file.
94const LOG_TARGET: &str = "sub-libp2p::discovery";
95
96/// Maximum number of known external addresses that we will cache.
97/// This only affects whether we will log whenever we (re-)discover
98/// a given address.
99const MAX_KNOWN_EXTERNAL_ADDRESSES: usize = 32;
100
101/// Default value for Kademlia replication factor which  determines to how many closest peers a
102/// record is replicated to.
103pub const DEFAULT_KADEMLIA_REPLICATION_FACTOR: usize = 20;
104
105/// The minimum number of peers we expect an answer before we terminate the request.
106const GET_RECORD_REDUNDANCY_FACTOR: u32 = 4;
107
108/// Query timeout for Kademlia requests. We need to increase this for record/provider publishing
109/// to not timeout most of the time.
110const KAD_QUERY_TIMEOUT: Duration = Duration::from_secs(300);
111
112/// `DiscoveryBehaviour` configuration.
113///
114///
115/// Note: In order to discover nodes or load and store values via Kademlia one has to add
116///       Kademlia protocol via [`DiscoveryConfig::with_kademlia`].
117pub struct DiscoveryConfig {
118	local_peer_id: PeerId,
119	permanent_addresses: Vec<(PeerId, Multiaddr)>,
120	dht_random_walk: bool,
121	allow_private_ip: bool,
122	allow_non_globals_in_dht: bool,
123	discovery_only_if_under_num: u64,
124	enable_mdns: bool,
125	kademlia_disjoint_query_paths: bool,
126	kademlia_protocol: Option<StreamProtocol>,
127	kademlia_legacy_protocol: Option<StreamProtocol>,
128	kademlia_replication_factor: NonZeroUsize,
129}
130
131impl DiscoveryConfig {
132	/// Create a default configuration with the given public key.
133	pub fn new(local_peer_id: PeerId) -> Self {
134		Self {
135			local_peer_id,
136			permanent_addresses: Vec::new(),
137			dht_random_walk: true,
138			allow_private_ip: true,
139			allow_non_globals_in_dht: false,
140			discovery_only_if_under_num: std::u64::MAX,
141			enable_mdns: false,
142			kademlia_disjoint_query_paths: false,
143			kademlia_protocol: None,
144			kademlia_legacy_protocol: None,
145			kademlia_replication_factor: NonZeroUsize::new(DEFAULT_KADEMLIA_REPLICATION_FACTOR)
146				.expect("value is a constant; constant is non-zero; qed."),
147		}
148	}
149
150	/// Set the number of active connections at which we pause discovery.
151	pub fn discovery_limit(&mut self, limit: u64) -> &mut Self {
152		self.discovery_only_if_under_num = limit;
153		self
154	}
155
156	/// Set custom nodes which never expire, e.g. bootstrap or reserved nodes.
157	pub fn with_permanent_addresses<I>(&mut self, permanent_addresses: I) -> &mut Self
158	where
159		I: IntoIterator<Item = (PeerId, Multiaddr)>,
160	{
161		self.permanent_addresses.extend(permanent_addresses);
162		self
163	}
164
165	/// Whether the discovery behaviour should periodically perform a random
166	/// walk on the DHT to discover peers.
167	pub fn with_dht_random_walk(&mut self, value: bool) -> &mut Self {
168		self.dht_random_walk = value;
169		self
170	}
171
172	/// Should private IPv4/IPv6 addresses be reported?
173	pub fn allow_private_ip(&mut self, value: bool) -> &mut Self {
174		self.allow_private_ip = value;
175		self
176	}
177
178	/// Should non-global addresses be inserted to the DHT?
179	pub fn allow_non_globals_in_dht(&mut self, value: bool) -> &mut Self {
180		self.allow_non_globals_in_dht = value;
181		self
182	}
183
184	/// Should MDNS discovery be supported?
185	pub fn with_mdns(&mut self, value: bool) -> &mut Self {
186		self.enable_mdns = value;
187		self
188	}
189
190	/// Add discovery via Kademlia for the given protocol.
191	///
192	/// Currently accepts `protocol_id`. This should be removed once all the nodes
193	/// are upgraded to genesis hash- and fork ID-based Kademlia protocol name.
194	pub fn with_kademlia<Hash: AsRef<[u8]>>(
195		&mut self,
196		genesis_hash: Hash,
197		fork_id: Option<&str>,
198		protocol_id: &ProtocolId,
199	) -> &mut Self {
200		self.kademlia_protocol = Some(kademlia_protocol_name(genesis_hash, fork_id));
201		self.kademlia_legacy_protocol = Some(legacy_kademlia_protocol_name(protocol_id));
202		self
203	}
204
205	/// Require iterative Kademlia DHT queries to use disjoint paths for increased resiliency in the
206	/// presence of potentially adversarial nodes.
207	pub fn use_kademlia_disjoint_query_paths(&mut self, value: bool) -> &mut Self {
208		self.kademlia_disjoint_query_paths = value;
209		self
210	}
211
212	/// Sets Kademlia replication factor.
213	pub fn with_kademlia_replication_factor(&mut self, value: NonZeroUsize) -> &mut Self {
214		self.kademlia_replication_factor = value;
215		self
216	}
217
218	/// Create a `DiscoveryBehaviour` from this config.
219	pub fn finish(self) -> DiscoveryBehaviour {
220		let Self {
221			local_peer_id,
222			permanent_addresses,
223			dht_random_walk,
224			allow_private_ip,
225			allow_non_globals_in_dht,
226			discovery_only_if_under_num,
227			enable_mdns,
228			kademlia_disjoint_query_paths,
229			kademlia_protocol,
230			kademlia_legacy_protocol: _,
231			kademlia_replication_factor,
232		} = self;
233
234		let kademlia = if let Some(ref kademlia_protocol) = kademlia_protocol {
235			let mut config = KademliaConfig::new(kademlia_protocol.clone());
236
237			config.set_replication_factor(kademlia_replication_factor);
238
239			config.set_record_filtering(libp2p::kad::StoreInserts::FilterBoth);
240
241			config.set_query_timeout(KAD_QUERY_TIMEOUT);
242
243			// By default Kademlia attempts to insert all peers into its routing table once a
244			// dialing attempt succeeds. In order to control which peer is added, disable the
245			// auto-insertion and instead add peers manually.
246			config.set_kbucket_inserts(BucketInserts::Manual);
247			config.disjoint_query_paths(kademlia_disjoint_query_paths);
248
249			config.set_provider_record_ttl(Some(KADEMLIA_PROVIDER_RECORD_TTL));
250			config.set_provider_publication_interval(Some(KADEMLIA_PROVIDER_REPUBLISH_INTERVAL));
251
252			let store = MemoryStore::with_config(
253				local_peer_id,
254				MemoryStoreConfig {
255					max_provided_keys: KADEMLIA_MAX_PROVIDER_KEYS,
256					..Default::default()
257				},
258			);
259
260			let mut kad = Kademlia::with_config(local_peer_id, store, config);
261			kad.set_mode(Some(kad::Mode::Server));
262
263			for (peer_id, addr) in &permanent_addresses {
264				kad.add_address(peer_id, addr.clone());
265			}
266
267			Some(kad)
268		} else {
269			None
270		};
271
272		DiscoveryBehaviour {
273			permanent_addresses,
274			ephemeral_addresses: HashMap::new(),
275			kademlia: Toggle::from(kademlia),
276			next_kad_random_query: if dht_random_walk {
277				Some(Delay::new(Duration::new(0, 0)))
278			} else {
279				None
280			},
281			duration_to_next_kad: Duration::from_secs(1),
282			pending_events: VecDeque::new(),
283			local_peer_id,
284			num_connections: 0,
285			allow_private_ip,
286			discovery_only_if_under_num,
287			mdns: if enable_mdns {
288				match TokioMdns::new(mdns::Config::default(), local_peer_id) {
289					Ok(mdns) => Toggle::from(Some(mdns)),
290					Err(err) => {
291						warn!(target: LOG_TARGET, "Failed to initialize mDNS: {:?}", err);
292						Toggle::from(None)
293					},
294				}
295			} else {
296				Toggle::from(None)
297			},
298			allow_non_globals_in_dht,
299			known_external_addresses: LruHashSet::new(
300				NonZeroUsize::new(MAX_KNOWN_EXTERNAL_ADDRESSES)
301					.expect("value is a constant; constant is non-zero; qed."),
302			),
303			records_to_publish: Default::default(),
304			kademlia_protocol,
305			provider_keys_requested: HashMap::new(),
306		}
307	}
308}
309
310/// Implementation of `NetworkBehaviour` that discovers the nodes on the network.
311pub struct DiscoveryBehaviour {
312	/// User-defined list of nodes and their addresses. Typically includes bootstrap nodes and
313	/// reserved nodes.
314	permanent_addresses: Vec<(PeerId, Multiaddr)>,
315	/// Same as `permanent_addresses`, except that addresses that fail to reach a peer are
316	/// removed.
317	ephemeral_addresses: HashMap<PeerId, Vec<Multiaddr>>,
318	/// Kademlia requests and answers. Even though it's wrapped in `Toggle`, currently
319	/// it's always enabled in `NetworkWorker::new()`.
320	kademlia: Toggle<Kademlia<MemoryStore>>,
321	/// Discovers nodes on the local network.
322	mdns: Toggle<TokioMdns>,
323	/// Stream that fires when we need to perform the next random Kademlia query. `None` if
324	/// random walking is disabled.
325	next_kad_random_query: Option<Delay>,
326	/// After `next_kad_random_query` triggers, the next one triggers after this duration.
327	duration_to_next_kad: Duration,
328	/// Events to return in priority when polled.
329	pending_events: VecDeque<DiscoveryOut>,
330	/// Identity of our local node.
331	local_peer_id: PeerId,
332	/// Number of nodes we're currently connected to.
333	num_connections: u64,
334	/// If false, `addresses_of_peer` won't return any private IPv4/IPv6 address, except for the
335	/// ones stored in `permanent_addresses` or `ephemeral_addresses`.
336	allow_private_ip: bool,
337	/// Number of active connections over which we interrupt the discovery process.
338	discovery_only_if_under_num: u64,
339	/// Should non-global addresses be added to the DHT?
340	allow_non_globals_in_dht: bool,
341	/// A cache of discovered external addresses. Only used for logging purposes.
342	known_external_addresses: LruHashSet<Multiaddr>,
343	/// Records to publish per QueryId.
344	///
345	/// After finishing a Kademlia query, libp2p will return us a list of the closest peers that
346	/// did not return the record(in `FinishedWithNoAdditionalRecord`). We will then put the record
347	/// to these peers.
348	records_to_publish: HashMap<QueryId, Record>,
349	/// The chain based kademlia protocol name (including genesis hash and fork id).
350	///
351	/// Remove when all nodes are upgraded to genesis hash and fork ID-based Kademlia:
352	/// <https://github.com/paritytech/polkadot-sdk/issues/504>.
353	kademlia_protocol: Option<StreamProtocol>,
354	/// Provider keys requested with `GET_PROVIDERS` queries.
355	provider_keys_requested: HashMap<QueryId, RecordKey>,
356}
357
358impl DiscoveryBehaviour {
359	/// Returns the list of nodes that we know exist in the network.
360	pub fn known_peers(&mut self) -> HashSet<PeerId> {
361		let mut peers = HashSet::new();
362		if let Some(k) = self.kademlia.as_mut() {
363			for b in k.kbuckets() {
364				for e in b.iter() {
365					if !peers.contains(e.node.key.preimage()) {
366						peers.insert(*e.node.key.preimage());
367					}
368				}
369			}
370		}
371		peers
372	}
373
374	/// Adds a hard-coded address for the given peer, that never expires.
375	///
376	/// This adds an entry to the parameter that was passed to `new`.
377	///
378	/// If we didn't know this address before, also generates a `Discovered` event.
379	pub fn add_known_address(&mut self, peer_id: PeerId, addr: Multiaddr) {
380		let addrs_list = self.ephemeral_addresses.entry(peer_id).or_default();
381		if addrs_list.contains(&addr) {
382			return
383		}
384
385		if let Some(k) = self.kademlia.as_mut() {
386			k.add_address(&peer_id, addr.clone());
387		}
388
389		self.pending_events.push_back(DiscoveryOut::Discovered(peer_id));
390		addrs_list.push(addr);
391	}
392
393	/// Add a self-reported address of a remote peer to the k-buckets of the DHT
394	/// if it has compatible `supported_protocols`.
395	///
396	/// **Note**: It is important that you call this method. The discovery mechanism will not
397	/// automatically add connecting peers to the Kademlia k-buckets.
398	pub fn add_self_reported_address(
399		&mut self,
400		peer_id: &PeerId,
401		supported_protocols: &[StreamProtocol],
402		addr: Multiaddr,
403	) {
404		if let Some(kademlia) = self.kademlia.as_mut() {
405			if !self.allow_non_globals_in_dht && !Self::can_add_to_dht(&addr) {
406				trace!(
407					target: LOG_TARGET,
408					"Ignoring self-reported non-global address {} from {}.", addr, peer_id
409				);
410				return
411			}
412
413			// The supported protocols must include the chain-based Kademlia protocol.
414			//
415			// Extract the chain-based Kademlia protocol from `kademlia.protocol_name()`
416			// when all nodes are upgraded to genesis hash and fork ID-based Kademlia:
417			// https://github.com/paritytech/polkadot-sdk/issues/504.
418			if !supported_protocols.iter().any(|p| {
419				p == self
420					.kademlia_protocol
421					.as_ref()
422					.expect("kademlia protocol was checked above to be enabled; qed")
423			}) {
424				trace!(
425					target: LOG_TARGET,
426					"Ignoring self-reported address {} from {} as remote node is not part of the \
427					 Kademlia DHT supported by the local node.", addr, peer_id,
428				);
429				return
430			}
431
432			trace!(
433				target: LOG_TARGET,
434				"Adding self-reported address {} from {} to Kademlia DHT.",
435				addr, peer_id
436			);
437			kademlia.add_address(peer_id, addr.clone());
438		}
439	}
440
441	/// Start finding the closest peers to the given `PeerId`.
442	///
443	/// A corresponding `ClosestPeersFound` or `ClosestPeersNotFound` event will later be generated.
444	pub fn find_closest_peers(&mut self, target: PeerId) {
445		if let Some(k) = self.kademlia.as_mut() {
446			k.get_closest_peers(target);
447		}
448	}
449
450	/// Start fetching a record from the DHT.
451	///
452	/// A corresponding `ValueFound` or `ValueNotFound` event will later be generated.
453	pub fn get_value(&mut self, key: RecordKey) {
454		if let Some(k) = self.kademlia.as_mut() {
455			k.get_record(key.clone());
456		}
457	}
458
459	/// Start putting a record into the DHT. Other nodes can later fetch that value with
460	/// `get_value`.
461	///
462	/// A corresponding `ValuePut` or `ValuePutFailed` event will later be generated.
463	pub fn put_value(&mut self, key: RecordKey, value: Vec<u8>) {
464		if let Some(k) = self.kademlia.as_mut() {
465			if let Err(e) = k.put_record(Record::new(key.clone(), value.clone()), Quorum::All) {
466				warn!(target: LOG_TARGET, "Libp2p => Failed to put record: {:?}", e);
467				self.pending_events
468					.push_back(DiscoveryOut::ValuePutFailed(key.clone(), Duration::from_secs(0)));
469			}
470		}
471	}
472
473	/// Puts a record into the DHT on the provided `peers`
474	///
475	/// If `update_local_storage` is true, the local storage is update as well.
476	pub fn put_record_to(
477		&mut self,
478		record: Record,
479		peers: HashSet<sc_network_types::PeerId>,
480		update_local_storage: bool,
481	) {
482		if let Some(kad) = self.kademlia.as_mut() {
483			if update_local_storage {
484				if let Err(_e) = kad.store_mut().put(record.clone()) {
485					warn!(target: LOG_TARGET, "Failed to update local starage");
486				}
487			}
488
489			if !peers.is_empty() {
490				kad.put_record_to(
491					record,
492					peers.into_iter().map(|peer_id| peer_id.into()),
493					Quorum::All,
494				);
495			}
496		}
497	}
498
499	/// Register as a content provider on the DHT for `key`.
500	pub fn start_providing(&mut self, key: RecordKey) {
501		if let Some(kad) = self.kademlia.as_mut() {
502			if let Err(e) = kad.start_providing(key.clone()) {
503				warn!(target: LOG_TARGET, "Libp2p => Failed to start providing {key:?}: {e}.");
504				self.pending_events
505					.push_back(DiscoveryOut::StartProvidingFailed(key, Duration::from_secs(0)));
506			}
507		}
508	}
509
510	/// Deregister as a content provider on the DHT for `key`.
511	pub fn stop_providing(&mut self, key: &RecordKey) {
512		if let Some(kad) = self.kademlia.as_mut() {
513			kad.stop_providing(key);
514		}
515	}
516
517	/// Get content providers for `key` from the DHT.
518	pub fn get_providers(&mut self, key: RecordKey) {
519		if let Some(kad) = self.kademlia.as_mut() {
520			let query_id = kad.get_providers(key.clone());
521			self.provider_keys_requested.insert(query_id, key);
522		}
523	}
524
525	/// Store a record in the Kademlia record store.
526	pub fn store_record(
527		&mut self,
528		record_key: RecordKey,
529		record_value: Vec<u8>,
530		publisher: Option<PeerId>,
531		expires: Option<Instant>,
532	) {
533		if let Some(k) = self.kademlia.as_mut() {
534			if let Err(err) = k.store_mut().put(Record {
535				key: record_key,
536				value: record_value,
537				publisher: publisher.map(|publisher| publisher.into()),
538				expires,
539			}) {
540				debug!(
541					target: LOG_TARGET,
542					"Failed to store record with key: {:?}",
543					err
544				);
545			}
546		}
547	}
548
549	/// Returns the number of nodes in each Kademlia kbucket for each Kademlia instance.
550	///
551	/// Identifies Kademlia instances by their [`ProtocolId`] and kbuckets by the base 2 logarithm
552	/// of their lower bound.
553	pub fn num_entries_per_kbucket(&mut self) -> Option<Vec<(u32, usize)>> {
554		self.kademlia.as_mut().map(|kad| {
555			kad.kbuckets()
556				.map(|bucket| (bucket.range().0.ilog2().unwrap_or(0), bucket.iter().count()))
557				.collect()
558		})
559	}
560
561	/// Returns the number of records in the Kademlia record stores.
562	pub fn num_kademlia_records(&mut self) -> Option<usize> {
563		// Note that this code is ok only because we use a `MemoryStore`.
564		self.kademlia.as_mut().map(|kad| kad.store_mut().records().count())
565	}
566
567	/// Returns the total size in bytes of all the records in the Kademlia record stores.
568	pub fn kademlia_records_total_size(&mut self) -> Option<usize> {
569		// Note that this code is ok only because we use a `MemoryStore`. If the records were
570		// for example stored on disk, this would load every single one of them every single time.
571		self.kademlia
572			.as_mut()
573			.map(|kad| kad.store_mut().records().fold(0, |tot, rec| tot + rec.value.len()))
574	}
575
576	/// Can the given `Multiaddr` be put into the DHT?
577	///
578	/// This test is successful only for global IP addresses and DNS names.
579	// NB: Currently all DNS names are allowed and no check for TLD suffixes is done
580	// because the set of valid domains is highly dynamic and would require frequent
581	// updates, for example by utilising publicsuffix.org or IANA.
582	pub fn can_add_to_dht(addr: &Multiaddr) -> bool {
583		let ip = match addr.iter().next() {
584			Some(Protocol::Ip4(ip)) => IpNetwork::from(ip),
585			Some(Protocol::Ip6(ip)) => IpNetwork::from(ip),
586			Some(Protocol::Dns(_)) | Some(Protocol::Dns4(_)) | Some(Protocol::Dns6(_)) =>
587				return true,
588			_ => return false,
589		};
590		ip.is_global()
591	}
592}
593
594/// Event generated by the `DiscoveryBehaviour`.
595#[derive(Debug)]
596pub enum DiscoveryOut {
597	/// We discovered a peer and currenlty have it's addresses stored either in the routing
598	/// table or in the ephemeral addresses list, so a connection can be established.
599	Discovered(PeerId),
600
601	/// A peer connected to this node for whom no listen address is known.
602	///
603	/// In order for the peer to be added to the Kademlia routing table, a known
604	/// listen address must be added via
605	/// [`DiscoveryBehaviour::add_self_reported_address`], e.g. obtained through
606	/// the `identify` protocol.
607	UnroutablePeer(PeerId),
608
609	/// `FIND_NODE` query yielded closest peers with their addresses. This event also delivers
610	/// a partial result in case the query timed out, because it can contain the target peer's
611	/// address.
612	ClosestPeersFound(PeerId, Vec<(PeerId, Vec<Multiaddr>)>, Duration),
613
614	/// The closest peers to the target `PeerId` have not been found.
615	ClosestPeersNotFound(PeerId, Duration),
616
617	/// The DHT yielded results for the record request.
618	///
619	/// Returning the result grouped in (key, value) pairs as well as the request duration.
620	ValueFound(PeerRecord, Duration),
621
622	/// The DHT received a put record request.
623	PutRecordRequest(
624		RecordKey,
625		Vec<u8>,
626		Option<sc_network_types::PeerId>,
627		Option<std::time::Instant>,
628	),
629
630	/// The record requested was not found in the DHT.
631	///
632	/// Returning the corresponding key as well as the request duration.
633	ValueNotFound(RecordKey, Duration),
634
635	/// The record with a given key was successfully inserted into the DHT.
636	///
637	/// Returning the corresponding key as well as the request duration.
638	ValuePut(RecordKey, Duration),
639
640	/// Inserting a value into the DHT failed.
641	///
642	/// Returning the corresponding key as well as the request duration.
643	ValuePutFailed(RecordKey, Duration),
644
645	/// The content provider for a given key was successfully published.
646	StartedProviding(RecordKey, Duration),
647
648	/// Starting providing a key failed.
649	StartProvidingFailed(RecordKey, Duration),
650
651	/// The DHT yielded results for the providers request.
652	ProvidersFound(RecordKey, HashSet<PeerId>, Duration),
653
654	/// The DHT yielded no more providers for the key (`GET_PROVIDERS` query finished).
655	NoMoreProviders(RecordKey, Duration),
656
657	/// Providers for the requested key were not found in the DHT.
658	ProvidersNotFound(RecordKey, Duration),
659
660	/// Started a random Kademlia query.
661	///
662	/// Only happens if [`DiscoveryConfig::with_dht_random_walk`] has been configured to `true`.
663	RandomKademliaStarted,
664}
665
666impl NetworkBehaviour for DiscoveryBehaviour {
667	type ConnectionHandler =
668		ToggleConnectionHandler<<Kademlia<MemoryStore> as NetworkBehaviour>::ConnectionHandler>;
669	type ToSwarm = DiscoveryOut;
670
671	fn handle_established_inbound_connection(
672		&mut self,
673		connection_id: ConnectionId,
674		peer: PeerId,
675		local_addr: &Multiaddr,
676		remote_addr: &Multiaddr,
677	) -> Result<THandler<Self>, ConnectionDenied> {
678		self.kademlia.handle_established_inbound_connection(
679			connection_id,
680			peer,
681			local_addr,
682			remote_addr,
683		)
684	}
685
686	fn handle_established_outbound_connection(
687		&mut self,
688		connection_id: ConnectionId,
689		peer: PeerId,
690		addr: &Multiaddr,
691		role_override: Endpoint,
692		port_use: PortUse,
693	) -> Result<THandler<Self>, ConnectionDenied> {
694		self.kademlia.handle_established_outbound_connection(
695			connection_id,
696			peer,
697			addr,
698			role_override,
699			port_use,
700		)
701	}
702
703	fn handle_pending_inbound_connection(
704		&mut self,
705		connection_id: ConnectionId,
706		local_addr: &Multiaddr,
707		remote_addr: &Multiaddr,
708	) -> Result<(), ConnectionDenied> {
709		self.kademlia
710			.handle_pending_inbound_connection(connection_id, local_addr, remote_addr)
711	}
712
713	fn handle_pending_outbound_connection(
714		&mut self,
715		connection_id: ConnectionId,
716		maybe_peer: Option<PeerId>,
717		addresses: &[Multiaddr],
718		effective_role: Endpoint,
719	) -> Result<Vec<Multiaddr>, ConnectionDenied> {
720		let Some(peer_id) = maybe_peer else { return Ok(Vec::new()) };
721
722		// Collect addresses into [`LinkedHashSet`] to eliminate duplicate entries preserving the
723		// order of addresses. Give priority to `permanent_addresses` (used with reserved nodes) and
724		// `ephemeral_addresses` (used for addresses discovered from other sources, like authority
725		// discovery DHT records).
726		let mut list: LinkedHashSet<_> = self
727			.permanent_addresses
728			.iter()
729			.filter_map(|(p, a)| (*p == peer_id).then(|| a.clone()))
730			.collect();
731
732		if let Some(ephemeral_addresses) = self.ephemeral_addresses.get(&peer_id) {
733			ephemeral_addresses.iter().for_each(|address| {
734				list.insert_if_absent(address.clone());
735			});
736		}
737
738		{
739			let mut list_to_filter = self.kademlia.handle_pending_outbound_connection(
740				connection_id,
741				maybe_peer,
742				addresses,
743				effective_role,
744			)?;
745
746			list_to_filter.extend(self.mdns.handle_pending_outbound_connection(
747				connection_id,
748				maybe_peer,
749				addresses,
750				effective_role,
751			)?);
752
753			if !self.allow_private_ip {
754				list_to_filter.retain(|addr| match addr.iter().next() {
755					Some(Protocol::Ip4(addr)) if !IpNetwork::from(addr).is_global() => false,
756					Some(Protocol::Ip6(addr)) if !IpNetwork::from(addr).is_global() => false,
757					_ => true,
758				});
759			}
760
761			list_to_filter.into_iter().for_each(|address| {
762				list.insert_if_absent(address);
763			});
764		}
765
766		trace!(target: LOG_TARGET, "Addresses of {:?}: {:?}", peer_id, list);
767
768		Ok(list.into_iter().collect())
769	}
770
771	fn on_swarm_event(&mut self, event: FromSwarm) {
772		match event {
773			FromSwarm::ConnectionEstablished(e) => {
774				self.num_connections += 1;
775				self.kademlia.on_swarm_event(FromSwarm::ConnectionEstablished(e));
776			},
777			FromSwarm::ConnectionClosed(e) => {
778				self.num_connections -= 1;
779				self.kademlia.on_swarm_event(FromSwarm::ConnectionClosed(e));
780			},
781			FromSwarm::DialFailure(e @ DialFailure { peer_id, error, .. }) => {
782				if let Some(peer_id) = peer_id {
783					if let DialError::Transport(errors) = error {
784						if let Entry::Occupied(mut entry) = self.ephemeral_addresses.entry(peer_id)
785						{
786							for (addr, _error) in errors {
787								entry.get_mut().retain(|a| a != addr);
788							}
789							if entry.get().is_empty() {
790								entry.remove();
791							}
792						}
793					}
794				}
795
796				self.kademlia.on_swarm_event(FromSwarm::DialFailure(e));
797			},
798			FromSwarm::ListenerClosed(e) => {
799				self.kademlia.on_swarm_event(FromSwarm::ListenerClosed(e));
800			},
801			FromSwarm::ListenFailure(e) => {
802				self.kademlia.on_swarm_event(FromSwarm::ListenFailure(e));
803			},
804			FromSwarm::ListenerError(e) => {
805				self.kademlia.on_swarm_event(FromSwarm::ListenerError(e));
806			},
807			FromSwarm::ExternalAddrExpired(e) => {
808				// We intentionally don't remove the element from `known_external_addresses` in
809				// order to not print the log line again.
810
811				self.kademlia.on_swarm_event(FromSwarm::ExternalAddrExpired(e));
812			},
813			FromSwarm::NewListener(e) => {
814				self.kademlia.on_swarm_event(FromSwarm::NewListener(e));
815			},
816			FromSwarm::ExpiredListenAddr(e) => {
817				self.kademlia.on_swarm_event(FromSwarm::ExpiredListenAddr(e));
818			},
819			FromSwarm::NewExternalAddrCandidate(e) => {
820				self.kademlia.on_swarm_event(FromSwarm::NewExternalAddrCandidate(e));
821			},
822			FromSwarm::AddressChange(e) => {
823				self.kademlia.on_swarm_event(FromSwarm::AddressChange(e));
824			},
825			FromSwarm::NewListenAddr(e) => {
826				self.kademlia.on_swarm_event(FromSwarm::NewListenAddr(e));
827				self.mdns.on_swarm_event(FromSwarm::NewListenAddr(e));
828			},
829			FromSwarm::ExternalAddrConfirmed(e @ ExternalAddrConfirmed { addr }) => {
830				let mut address = addr.clone();
831
832				if let Some(Protocol::P2p(peer_id)) = addr.iter().last() {
833					if peer_id != self.local_peer_id {
834						warn!(
835							target: LOG_TARGET,
836							"๐Ÿ” Discovered external address for a peer that is not us: {addr}",
837						);
838						// Ensure this address is not propagated to kademlia.
839						return
840					}
841				} else {
842					address.push(Protocol::P2p(self.local_peer_id));
843				}
844
845				if Self::can_add_to_dht(&address) {
846					// NOTE: we might re-discover the same address multiple times
847					// in which case we just want to refrain from logging.
848					if self.known_external_addresses.insert(address.clone()) {
849						info!(
850						  target: LOG_TARGET,
851						  "๐Ÿ” Discovered new external address for our node: {address}",
852						);
853					}
854				}
855
856				self.kademlia.on_swarm_event(FromSwarm::ExternalAddrConfirmed(e));
857			},
858			FromSwarm::NewExternalAddrOfPeer(e) => {
859				self.kademlia.on_swarm_event(FromSwarm::NewExternalAddrOfPeer(e));
860				self.mdns.on_swarm_event(FromSwarm::NewExternalAddrOfPeer(e));
861			},
862			event => {
863				debug!(target: LOG_TARGET, "New unknown `FromSwarm` libp2p event: {event:?}");
864				self.kademlia.on_swarm_event(event);
865				self.mdns.on_swarm_event(event);
866			},
867		}
868	}
869
870	fn on_connection_handler_event(
871		&mut self,
872		peer_id: PeerId,
873		connection_id: ConnectionId,
874		event: THandlerOutEvent<Self>,
875	) {
876		self.kademlia.on_connection_handler_event(peer_id, connection_id, event);
877	}
878
879	fn poll(&mut self, cx: &mut Context) -> Poll<ToSwarm<Self::ToSwarm, THandlerInEvent<Self>>> {
880		// Immediately process the content of `discovered`.
881		if let Some(ev) = self.pending_events.pop_front() {
882			return Poll::Ready(ToSwarm::GenerateEvent(ev))
883		}
884
885		// Poll the stream that fires when we need to start a random Kademlia query.
886		if let Some(kademlia) = self.kademlia.as_mut() {
887			if let Some(next_kad_random_query) = self.next_kad_random_query.as_mut() {
888				while next_kad_random_query.poll_unpin(cx).is_ready() {
889					let actually_started =
890						if self.num_connections < self.discovery_only_if_under_num {
891							let random_peer_id = PeerId::random();
892							debug!(
893								target: LOG_TARGET,
894								"Libp2p <= Starting random Kademlia request for {:?}",
895								random_peer_id,
896							);
897							kademlia.get_closest_peers(random_peer_id);
898							true
899						} else {
900							debug!(
901								target: LOG_TARGET,
902								"Kademlia paused due to high number of connections ({})",
903								self.num_connections
904							);
905							false
906						};
907
908					// Schedule the next random query with exponentially increasing delay,
909					// capped at 60 seconds.
910					*next_kad_random_query = Delay::new(self.duration_to_next_kad);
911					self.duration_to_next_kad =
912						cmp::min(self.duration_to_next_kad * 2, Duration::from_secs(60));
913
914					if actually_started {
915						let ev = DiscoveryOut::RandomKademliaStarted;
916						return Poll::Ready(ToSwarm::GenerateEvent(ev))
917					}
918				}
919			}
920		}
921
922		while let Poll::Ready(ev) = self.kademlia.poll(cx) {
923			match ev {
924				ToSwarm::GenerateEvent(ev) => match ev {
925					KademliaEvent::RoutingUpdated { peer, .. } => {
926						let ev = DiscoveryOut::Discovered(peer);
927						return Poll::Ready(ToSwarm::GenerateEvent(ev))
928					},
929					KademliaEvent::UnroutablePeer { peer, .. } => {
930						let ev = DiscoveryOut::UnroutablePeer(peer);
931						return Poll::Ready(ToSwarm::GenerateEvent(ev))
932					},
933					KademliaEvent::RoutablePeer { .. } => {
934						// Generate nothing, because the address was not added to the routing table,
935						// so we will not be able to connect to the peer.
936					},
937					KademliaEvent::PendingRoutablePeer { .. } => {
938						// We are not interested in this event at the moment.
939					},
940					KademliaEvent::InboundRequest { request } => match request {
941						libp2p::kad::InboundRequest::PutRecord { record: Some(record), .. } =>
942							return Poll::Ready(ToSwarm::GenerateEvent(
943								DiscoveryOut::PutRecordRequest(
944									record.key,
945									record.value,
946									record.publisher.map(Into::into),
947									record.expires,
948								),
949							)),
950						_ => {},
951					},
952					KademliaEvent::OutboundQueryProgressed {
953						result: QueryResult::GetClosestPeers(res),
954						stats,
955						..
956					} => {
957						let (key, peers, timeout) = match res {
958							Ok(GetClosestPeersOk { key, peers }) => (key, peers, false),
959							Err(GetClosestPeersError::Timeout { key, peers }) => (key, peers, true),
960						};
961
962						let target = match PeerId::from_bytes(&key.clone()) {
963							Ok(peer_id) => peer_id,
964							Err(_) => {
965								warn!(
966									target: LOG_TARGET,
967									"Libp2p => FIND_NODE query finished for target that is not \
968									 a peer ID: {:?}",
969									HexDisplay::from(&key),
970								);
971								continue
972							},
973						};
974
975						if timeout {
976							debug!(
977								target: LOG_TARGET,
978								"Libp2p => Query for target {target:?} timed out and yielded {} peers",
979								peers.len(),
980							);
981						} else {
982							debug!(
983								target: LOG_TARGET,
984								"Libp2p => Query for target {target:?} yielded {} peers",
985								peers.len(),
986							);
987						}
988
989						let ev = if peers.is_empty() {
990							DiscoveryOut::ClosestPeersNotFound(
991								target,
992								stats.duration().unwrap_or_default(),
993							)
994						} else {
995							DiscoveryOut::ClosestPeersFound(
996								target,
997								peers.into_iter().map(|p| (p.peer_id, p.addrs)).collect(),
998								stats.duration().unwrap_or_default(),
999							)
1000						};
1001
1002						return Poll::Ready(ToSwarm::GenerateEvent(ev))
1003					},
1004					KademliaEvent::OutboundQueryProgressed {
1005						result: QueryResult::GetRecord(res),
1006						stats,
1007						id,
1008						..
1009					} => {
1010						let ev = match res {
1011							Ok(GetRecordOk::FoundRecord(r)) => {
1012								debug!(
1013									target: LOG_TARGET,
1014									"Libp2p => Found record ({:?}) with value: {:?} id {:?} stats {:?}",
1015									r.record.key,
1016									r.record.value,
1017									id,
1018									stats,
1019								);
1020
1021								// Let's directly finish the query if we are above 4.
1022								// This number is small enough to make sure we don't
1023								// unnecessarily flood the network with queries, but high
1024								// enough to make sure we also touch peers which might have
1025								// old record, so that we can update them once we notice
1026								// they have old records.
1027								if stats.num_successes() > GET_RECORD_REDUNDANCY_FACTOR {
1028									if let Some(kad) = self.kademlia.as_mut() {
1029										if let Some(mut query) = kad.query_mut(&id) {
1030											query.finish();
1031										}
1032									}
1033								}
1034
1035								// Will be removed below when we receive
1036								// `FinishedWithNoAdditionalRecord`.
1037								self.records_to_publish.insert(id, r.record.clone());
1038
1039								DiscoveryOut::ValueFound(r, stats.duration().unwrap_or_default())
1040							},
1041							Ok(GetRecordOk::FinishedWithNoAdditionalRecord {
1042								cache_candidates,
1043							}) => {
1044								debug!(
1045									target: LOG_TARGET,
1046									"Libp2p => Finished with no-additional-record {:?} stats {:?} took {:?} ms",
1047									id,
1048									stats,
1049									stats.duration().map(|val| val.as_millis())
1050								);
1051								// We always need to remove the record to not leak any data!
1052								if let Some(record) = self.records_to_publish.remove(&id) {
1053									if cache_candidates.is_empty() {
1054										continue
1055									}
1056
1057									// Put the record to the `cache_candidates` that are nearest to
1058									// the record key from our point of view of the network.
1059									if let Some(kad) = self.kademlia.as_mut() {
1060										kad.put_record_to(
1061											record,
1062											cache_candidates.into_iter().map(|v| v.1),
1063											Quorum::One,
1064										);
1065									}
1066								}
1067
1068								continue
1069							},
1070							Err(e @ libp2p::kad::GetRecordError::NotFound { .. }) => {
1071								trace!(
1072									target: LOG_TARGET,
1073									"Libp2p => Failed to get record: {:?}",
1074									e,
1075								);
1076								DiscoveryOut::ValueNotFound(
1077									e.into_key(),
1078									stats.duration().unwrap_or_default(),
1079								)
1080							},
1081							Err(e) => {
1082								debug!(
1083									target: LOG_TARGET,
1084									"Libp2p => Failed to get record: {:?}",
1085									e,
1086								);
1087								DiscoveryOut::ValueNotFound(
1088									e.into_key(),
1089									stats.duration().unwrap_or_default(),
1090								)
1091							},
1092						};
1093						return Poll::Ready(ToSwarm::GenerateEvent(ev))
1094					},
1095					KademliaEvent::OutboundQueryProgressed {
1096						result: QueryResult::GetProviders(res),
1097						stats,
1098						id,
1099						..
1100					} => {
1101						let ev = match res {
1102							Ok(GetProvidersOk::FoundProviders { key, providers }) => {
1103								debug!(
1104									target: LOG_TARGET,
1105									"Libp2p => Found providers {:?} for key {:?}, id {:?}, stats {:?}",
1106									providers,
1107									key,
1108									id,
1109									stats,
1110								);
1111
1112								DiscoveryOut::ProvidersFound(
1113									key,
1114									providers,
1115									stats.duration().unwrap_or_default(),
1116								)
1117							},
1118							Ok(GetProvidersOk::FinishedWithNoAdditionalRecord {
1119								closest_peers: _,
1120							}) => {
1121								debug!(
1122									target: LOG_TARGET,
1123									"Libp2p => Finished with no additional providers {:?}, stats {:?}, took {:?} ms",
1124									id,
1125									stats,
1126									stats.duration().map(|val| val.as_millis())
1127								);
1128
1129								if let Some(key) = self.provider_keys_requested.remove(&id) {
1130									DiscoveryOut::NoMoreProviders(
1131										key,
1132										stats.duration().unwrap_or_default(),
1133									)
1134								} else {
1135									error!(
1136										target: LOG_TARGET,
1137										"No key found for `GET_PROVIDERS` query {id:?}. This is a bug.",
1138									);
1139									continue
1140								}
1141							},
1142							Err(GetProvidersError::Timeout { key, closest_peers: _ }) => {
1143								debug!(
1144									target: LOG_TARGET,
1145									"Libp2p => Failed to get providers for {key:?} due to timeout.",
1146								);
1147
1148								self.provider_keys_requested.remove(&id);
1149
1150								DiscoveryOut::ProvidersNotFound(
1151									key,
1152									stats.duration().unwrap_or_default(),
1153								)
1154							},
1155						};
1156						return Poll::Ready(ToSwarm::GenerateEvent(ev))
1157					},
1158					KademliaEvent::OutboundQueryProgressed {
1159						result: QueryResult::PutRecord(res),
1160						stats,
1161						..
1162					} => {
1163						let ev = match res {
1164							Ok(ok) => {
1165								trace!(
1166									target: LOG_TARGET,
1167									"Libp2p => Put record for key: {:?}",
1168									ok.key,
1169								);
1170								DiscoveryOut::ValuePut(ok.key, stats.duration().unwrap_or_default())
1171							},
1172							Err(e) => {
1173								debug!(
1174									target: LOG_TARGET,
1175									"Libp2p => Failed to put record for key {:?}: {:?}",
1176									e.key(),
1177									e,
1178								);
1179								DiscoveryOut::ValuePutFailed(
1180									e.into_key(),
1181									stats.duration().unwrap_or_default(),
1182								)
1183							},
1184						};
1185						return Poll::Ready(ToSwarm::GenerateEvent(ev))
1186					},
1187					KademliaEvent::OutboundQueryProgressed {
1188						result: QueryResult::RepublishRecord(res),
1189						..
1190					} => match res {
1191						Ok(ok) => debug!(
1192							target: LOG_TARGET,
1193							"Libp2p => Record republished: {:?}",
1194							ok.key,
1195						),
1196						Err(e) => debug!(
1197							target: LOG_TARGET,
1198							"Libp2p => Republishing of record {:?} failed with: {:?}",
1199							e.key(), e,
1200						),
1201					},
1202					KademliaEvent::OutboundQueryProgressed {
1203						result: QueryResult::StartProviding(res),
1204						stats,
1205						..
1206					} => {
1207						let ev = match res {
1208							Ok(ok) => {
1209								trace!(
1210									target: LOG_TARGET,
1211									"Libp2p => Started providing key {:?}",
1212									ok.key,
1213								);
1214								DiscoveryOut::StartedProviding(
1215									ok.key,
1216									stats.duration().unwrap_or_default(),
1217								)
1218							},
1219							Err(e) => {
1220								debug!(
1221									target: LOG_TARGET,
1222									"Libp2p => Failed to start providing key {:?}: {:?}",
1223									e.key(),
1224									e,
1225								);
1226								DiscoveryOut::StartProvidingFailed(
1227									e.into_key(),
1228									stats.duration().unwrap_or_default(),
1229								)
1230							},
1231						};
1232						return Poll::Ready(ToSwarm::GenerateEvent(ev))
1233					},
1234					KademliaEvent::OutboundQueryProgressed {
1235						result: QueryResult::Bootstrap(res),
1236						..
1237					} => match res {
1238						Ok(ok) => debug!(
1239							target: LOG_TARGET,
1240							"Libp2p => DHT bootstrap progressed: {ok:?}",
1241						),
1242						Err(e) => warn!(
1243							target: LOG_TARGET,
1244							"Libp2p => DHT bootstrap error: {e:?}",
1245						),
1246					},
1247					// We never start any other type of query.
1248					KademliaEvent::OutboundQueryProgressed { result: e, .. } => {
1249						warn!(target: LOG_TARGET, "Libp2p => Unhandled Kademlia event: {:?}", e)
1250					},
1251					Event::ModeChanged { new_mode } => {
1252						debug!(target: LOG_TARGET, "Libp2p => Kademlia mode changed: {new_mode}")
1253					},
1254				},
1255				ToSwarm::Dial { opts } => return Poll::Ready(ToSwarm::Dial { opts }),
1256				event => {
1257					return Poll::Ready(event.map_out(|_| {
1258						unreachable!("`GenerateEvent` is handled in a branch above; qed")
1259					}));
1260				},
1261			}
1262		}
1263
1264		// Poll mDNS.
1265		while let Poll::Ready(ev) = self.mdns.poll(cx) {
1266			match ev {
1267				ToSwarm::GenerateEvent(event) => match event {
1268					mdns::Event::Discovered(list) => {
1269						if self.num_connections >= self.discovery_only_if_under_num {
1270							continue
1271						}
1272
1273						self.pending_events.extend(
1274							list.into_iter().map(|(peer_id, _)| DiscoveryOut::Discovered(peer_id)),
1275						);
1276						if let Some(ev) = self.pending_events.pop_front() {
1277							return Poll::Ready(ToSwarm::GenerateEvent(ev))
1278						}
1279					},
1280					mdns::Event::Expired(_) => {},
1281				},
1282				ToSwarm::Dial { .. } => {
1283					unreachable!("mDNS never dials!");
1284				},
1285				// `event` is an enum with no variant
1286				ToSwarm::NotifyHandler { event, .. } => match event {},
1287				event => {
1288					return Poll::Ready(
1289						event
1290							.map_in(|_| {
1291								unreachable!("`NotifyHandler` is handled in a branch above; qed")
1292							})
1293							.map_out(|_| {
1294								unreachable!("`GenerateEvent` is handled in a branch above; qed")
1295							}),
1296					);
1297				},
1298			}
1299		}
1300
1301		Poll::Pending
1302	}
1303}
1304
1305/// Legacy (fallback) Kademlia protocol name based on `protocol_id`.
1306fn legacy_kademlia_protocol_name(id: &ProtocolId) -> StreamProtocol {
1307	let name = format!("/{}/kad", id.as_ref());
1308	StreamProtocol::try_from_owned(name).expect("protocol name is valid. qed")
1309}
1310
1311/// Kademlia protocol name based on `genesis_hash` and `fork_id`.
1312fn kademlia_protocol_name<Hash: AsRef<[u8]>>(
1313	genesis_hash: Hash,
1314	fork_id: Option<&str>,
1315) -> StreamProtocol {
1316	let genesis_hash_hex = bytes2hex("", genesis_hash.as_ref());
1317	let name = if let Some(fork_id) = fork_id {
1318		format!("/{genesis_hash_hex}/{fork_id}/kad")
1319	} else {
1320		format!("/{genesis_hash_hex}/kad")
1321	};
1322
1323	StreamProtocol::try_from_owned(name).expect("protocol name is valid. qed")
1324}
1325
1326#[cfg(test)]
1327mod tests {
1328	use super::{kademlia_protocol_name, legacy_kademlia_protocol_name, DiscoveryConfig};
1329	use crate::config::ProtocolId;
1330	use libp2p::{identity::Keypair, Multiaddr};
1331	use sp_core::hash::H256;
1332
1333	#[cfg(ignore_flaky_test)] // https://github.com/paritytech/polkadot-sdk/issues/48
1334	#[tokio::test]
1335	async fn discovery_working() {
1336		use super::DiscoveryOut;
1337		use futures::prelude::*;
1338		use libp2p::{
1339			core::{
1340				transport::{MemoryTransport, Transport},
1341				upgrade,
1342			},
1343			noise,
1344			swarm::{Swarm, SwarmEvent},
1345			yamux,
1346		};
1347		use std::{collections::HashSet, task::Poll, time::Duration};
1348		let mut first_swarm_peer_id_and_addr = None;
1349
1350		let genesis_hash = H256::from_low_u64_be(1);
1351		let fork_id = Some("test-fork-id");
1352		let protocol_id = ProtocolId::from("dot");
1353
1354		// Build swarms whose behaviour is `DiscoveryBehaviour`, each aware of
1355		// the first swarm via `with_permanent_addresses`.
1356		let mut swarms = (0..25)
1357			.map(|i| {
1358				let mut swarm = libp2p::SwarmBuilder::with_new_identity()
1359					.with_tokio()
1360					.with_other_transport(|keypair| {
1361						MemoryTransport::new()
1362							.upgrade(upgrade::Version::V1)
1363							.authenticate(noise::Config::new(&keypair).unwrap())
1364							.multiplex(yamux::Config::default())
1365							.boxed()
1366					})
1367					.unwrap()
1368					.with_behaviour(|keypair| {
1369						let mut config = DiscoveryConfig::new(keypair.public().to_peer_id());
1370						config
1371							.with_permanent_addresses(first_swarm_peer_id_and_addr.clone())
1372							.allow_private_ip(true)
1373							.allow_non_globals_in_dht(true)
1374							.discovery_limit(50)
1375							.with_kademlia(genesis_hash, fork_id, &protocol_id);
1376
1377						config.finish()
1378					})
1379					.unwrap()
1380					.with_swarm_config(|config| {
1381						// This is taken care of by notification protocols in non-test environment
1382						config.with_idle_connection_timeout(Duration::from_secs(10))
1383					})
1384					.build();
1385
1386				let listen_addr: Multiaddr =
1387					format!("/memory/{}", rand::random::<u64>()).parse().unwrap();
1388
1389				if i == 0 {
1390					first_swarm_peer_id_and_addr =
1391						Some((*swarm.local_peer_id(), listen_addr.clone()))
1392				}
1393
1394				swarm.listen_on(listen_addr.clone()).unwrap();
1395				(swarm, listen_addr)
1396			})
1397			.collect::<Vec<_>>();
1398
1399		// Build a `Vec<HashSet<PeerId>>` with the list of nodes remaining to be discovered.
1400		let mut to_discover = (0..swarms.len())
1401			.map(|n| {
1402				(0..swarms.len())
1403					// Skip the first swarm as all other swarms already know it.
1404					.skip(1)
1405					.filter(|p| *p != n)
1406					.map(|p| *Swarm::local_peer_id(&swarms[p].0))
1407					.collect::<HashSet<_>>()
1408			})
1409			.collect::<Vec<_>>();
1410
1411		let fut = futures::future::poll_fn(move |cx| {
1412			'polling: loop {
1413				for swarm_n in 0..swarms.len() {
1414					match swarms[swarm_n].0.poll_next_unpin(cx) {
1415						Poll::Ready(Some(e)) => {
1416							match e {
1417								SwarmEvent::Behaviour(behavior) => {
1418									match behavior {
1419										DiscoveryOut::UnroutablePeer(other) |
1420										DiscoveryOut::Discovered(other) => {
1421											// Call `add_self_reported_address` to simulate identify
1422											// happening.
1423											let addr = swarms
1424												.iter()
1425												.find_map(|(s, a)| {
1426													if s.behaviour().local_peer_id == other {
1427														Some(a.clone())
1428													} else {
1429														None
1430													}
1431												})
1432												.unwrap();
1433											// Test both genesis hash-based and legacy
1434											// protocol names.
1435											let protocol_names = if swarm_n % 2 == 0 {
1436												vec![kademlia_protocol_name(genesis_hash, fork_id)]
1437											} else {
1438												vec![
1439													legacy_kademlia_protocol_name(&protocol_id),
1440													kademlia_protocol_name(genesis_hash, fork_id),
1441												]
1442											};
1443											swarms[swarm_n]
1444												.0
1445												.behaviour_mut()
1446												.add_self_reported_address(
1447													&other,
1448													protocol_names.as_slice(),
1449													addr,
1450												);
1451
1452											to_discover[swarm_n].remove(&other);
1453										},
1454										DiscoveryOut::RandomKademliaStarted => {},
1455										DiscoveryOut::ClosestPeersFound(..) => {},
1456										// libp2p emits this event when it is not particularly
1457										// happy, but this doesn't break the discovery.
1458										DiscoveryOut::ClosestPeersNotFound(..) => {},
1459										e => {
1460											panic!("Unexpected event: {:?}", e)
1461										},
1462									}
1463								},
1464								// ignore non Behaviour events
1465								_ => {},
1466							}
1467							continue 'polling
1468						},
1469						_ => {},
1470					}
1471				}
1472				break
1473			}
1474
1475			if to_discover.iter().all(|l| l.is_empty()) {
1476				Poll::Ready(())
1477			} else {
1478				Poll::Pending
1479			}
1480		});
1481
1482		fut.await
1483	}
1484
1485	#[test]
1486	fn discovery_ignores_peers_with_unknown_protocols() {
1487		let supported_genesis_hash = H256::from_low_u64_be(1);
1488		let unsupported_genesis_hash = H256::from_low_u64_be(2);
1489		let supported_protocol_id = ProtocolId::from("a");
1490		let unsupported_protocol_id = ProtocolId::from("b");
1491
1492		let mut discovery = {
1493			let keypair = Keypair::generate_ed25519();
1494			let mut config = DiscoveryConfig::new(keypair.public().to_peer_id());
1495			config
1496				.allow_private_ip(true)
1497				.allow_non_globals_in_dht(true)
1498				.discovery_limit(50)
1499				.with_kademlia(supported_genesis_hash, None, &supported_protocol_id);
1500			config.finish()
1501		};
1502
1503		let predictable_peer_id = |bytes: &[u8; 32]| {
1504			Keypair::ed25519_from_bytes(bytes.to_owned()).unwrap().public().to_peer_id()
1505		};
1506
1507		let remote_peer_id = predictable_peer_id(b"00000000000000000000000000000001");
1508		let remote_addr: Multiaddr = "/memory/1".parse().unwrap();
1509		let another_peer_id = predictable_peer_id(b"00000000000000000000000000000002");
1510		let another_addr: Multiaddr = "/memory/2".parse().unwrap();
1511
1512		// Try adding remote peers with unsupported protocols.
1513		discovery.add_self_reported_address(
1514			&remote_peer_id,
1515			&[kademlia_protocol_name(unsupported_genesis_hash, None)],
1516			remote_addr.clone(),
1517		);
1518		discovery.add_self_reported_address(
1519			&another_peer_id,
1520			&[legacy_kademlia_protocol_name(&unsupported_protocol_id)],
1521			another_addr.clone(),
1522		);
1523
1524		{
1525			let kademlia = discovery.kademlia.as_mut().unwrap();
1526			assert!(
1527				kademlia
1528					.kbucket(remote_peer_id)
1529					.expect("Remote peer id not to be equal to local peer id.")
1530					.is_empty(),
1531				"Expect peer with unsupported protocol not to be added."
1532			);
1533			assert!(
1534				kademlia
1535					.kbucket(another_peer_id)
1536					.expect("Remote peer id not to be equal to local peer id.")
1537					.is_empty(),
1538				"Expect peer with unsupported protocol not to be added."
1539			);
1540		}
1541
1542		// Add remote peers with supported protocols.
1543		discovery.add_self_reported_address(
1544			&remote_peer_id,
1545			&[kademlia_protocol_name(supported_genesis_hash, None)],
1546			remote_addr.clone(),
1547		);
1548		{
1549			let kademlia = discovery.kademlia.as_mut().unwrap();
1550			assert!(
1551				!kademlia
1552					.kbucket(remote_peer_id)
1553					.expect("Remote peer id not to be equal to local peer id.")
1554					.is_empty(),
1555				"Expect peer with supported protocol to be added."
1556			);
1557		}
1558
1559		let unsupported_peer_id = predictable_peer_id(b"00000000000000000000000000000002");
1560		let unsupported_peer_addr: Multiaddr = "/memory/2".parse().unwrap();
1561
1562		// Check the unsupported peer is not present before and after the call.
1563		{
1564			let kademlia = discovery.kademlia.as_mut().unwrap();
1565			assert!(
1566				kademlia
1567					.kbucket(unsupported_peer_id)
1568					.expect("Remote peer id not to be equal to local peer id.")
1569					.is_empty(),
1570				"Expect unsupported peer not to be added."
1571			);
1572		}
1573		// Note: legacy protocol is not supported without genesis hash and fork ID,
1574		// if the legacy is the only protocol supported, then the peer will not be added.
1575		discovery.add_self_reported_address(
1576			&unsupported_peer_id,
1577			&[legacy_kademlia_protocol_name(&supported_protocol_id)],
1578			unsupported_peer_addr.clone(),
1579		);
1580		{
1581			let kademlia = discovery.kademlia.as_mut().unwrap();
1582			assert!(
1583				kademlia
1584					.kbucket(unsupported_peer_id)
1585					.expect("Remote peer id not to be equal to local peer id.")
1586					.is_empty(),
1587				"Expect unsupported peer not to be added."
1588			);
1589		}
1590
1591		// Supported legacy and genesis based protocols are allowed to be added.
1592		discovery.add_self_reported_address(
1593			&another_peer_id,
1594			&[
1595				legacy_kademlia_protocol_name(&supported_protocol_id),
1596				kademlia_protocol_name(supported_genesis_hash, None),
1597			],
1598			another_addr.clone(),
1599		);
1600
1601		{
1602			let kademlia = discovery.kademlia.as_mut().unwrap();
1603			assert_eq!(
1604				2,
1605				kademlia.kbuckets().fold(0, |acc, bucket| acc + bucket.num_entries()),
1606				"Expect peers with supported protocol to be added."
1607			);
1608			assert!(
1609				!kademlia
1610					.kbucket(another_peer_id)
1611					.expect("Remote peer id not to be equal to local peer id.")
1612					.is_empty(),
1613				"Expect peer with supported protocol to be added."
1614			);
1615		}
1616	}
1617}