referrerpolicy=no-referrer-when-downgrade

sc_network/litep2p/
discovery.rs

1// This file is part of Substrate.
2
3// Copyright (C) Parity Technologies (UK) Ltd.
4// SPDX-License-Identifier: GPL-3.0-or-later WITH Classpath-exception-2.0
5
6// This program is free software: you can redistribute it and/or modify
7// it under the terms of the GNU General Public License as published by
8// the Free Software Foundation, either version 3 of the License, or
9// (at your option) any later version.
10
11// This program is distributed in the hope that it will be useful,
12// but WITHOUT ANY WARRANTY; without even the implied warranty of
13// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
14// GNU General Public License for more details.
15
16// You should have received a copy of the GNU General Public License
17// along with this program. If not, see <https://www.gnu.org/licenses/>.
18
19//! libp2p-related discovery code for litep2p backend.
20
21use crate::{
22	config::{
23		NetworkConfiguration, ProtocolId, KADEMLIA_MAX_PROVIDER_KEYS, KADEMLIA_PROVIDER_RECORD_TTL,
24		KADEMLIA_PROVIDER_REPUBLISH_INTERVAL,
25	},
26	peer_store::PeerStoreProvider,
27};
28
29use array_bytes::bytes2hex;
30use futures::{FutureExt, Stream};
31use futures_timer::Delay;
32use ip_network::IpNetwork;
33use litep2p::{
34	protocol::{
35		libp2p::{
36			identify::{Config as IdentifyConfig, IdentifyEvent},
37			kademlia::{
38				Config as KademliaConfig, ConfigBuilder as KademliaConfigBuilder, ContentProvider,
39				IncomingRecordValidationMode, KademliaEvent, KademliaHandle, PeerRecord, QueryId,
40				Quorum, Record, RecordKey,
41			},
42			ping::{Config as PingConfig, PingEvent},
43		},
44		mdns::{Config as MdnsConfig, MdnsEvent},
45	},
46	types::multiaddr::{Multiaddr, Protocol},
47	PeerId, ProtocolName,
48};
49use parking_lot::RwLock;
50use sc_network_types::kad::Key as KademliaKey;
51use schnellru::{ByLength, LruMap};
52
53use std::{
54	cmp,
55	collections::{HashMap, HashSet, VecDeque},
56	iter,
57	num::NonZeroUsize,
58	pin::Pin,
59	sync::Arc,
60	task::{Context, Poll},
61	time::{Duration, Instant},
62};
63
64/// Logging target for the file.
65const LOG_TARGET: &str = "sub-libp2p::discovery";
66
67/// Kademlia query interval.
68const KADEMLIA_QUERY_INTERVAL: Duration = Duration::from_secs(5);
69
70/// mDNS query interval.
71const MDNS_QUERY_INTERVAL: Duration = Duration::from_secs(30);
72
73/// The minimum number of peers we expect an answer before we terminate the request.
74const GET_RECORD_REDUNDANCY_FACTOR: usize = 4;
75
76/// The maximum number of tracked external addresses we allow.
77const MAX_EXTERNAL_ADDRESSES: u32 = 32;
78
79/// Number of times observed address is received from different peers before it is confirmed as
80/// external.
81const MIN_ADDRESS_CONFIRMATIONS: usize = 3;
82
83/// Discovery events.
84#[derive(Debug)]
85pub enum DiscoveryEvent {
86	/// Ping RTT measured for peer.
87	Ping {
88		/// Remote peer ID.
89		peer: PeerId,
90
91		/// Ping round-trip time.
92		rtt: Duration,
93	},
94
95	/// Peer identified over `/ipfs/identify/1.0.0` protocol.
96	Identified {
97		/// Peer ID.
98		peer: PeerId,
99
100		/// Listen addresses.
101		listen_addresses: Vec<Multiaddr>,
102
103		/// Supported protocols.
104		supported_protocols: HashSet<ProtocolName>,
105	},
106
107	/// One or more addresses discovered.
108	///
109	/// This event is emitted when a new peer is discovered over mDNS.
110	Discovered {
111		/// Discovered addresses.
112		addresses: Vec<Multiaddr>,
113	},
114
115	/// Routing table has been updated.
116	RoutingTableUpdate {
117		/// Peers that were added to routing table.
118		peers: HashSet<PeerId>,
119	},
120
121	/// New external address discovered.
122	ExternalAddressDiscovered {
123		/// Discovered address.
124		address: Multiaddr,
125	},
126
127	/// The external address has expired.
128	///
129	/// This happens when the internal buffers exceed the maximum number of external addresses,
130	/// and this address is the oldest one.
131	ExternalAddressExpired {
132		/// Expired address.
133		address: Multiaddr,
134	},
135
136	/// `FIND_NODE` query succeeded.
137	FindNodeSuccess {
138		/// Query ID.
139		query_id: QueryId,
140
141		/// Target.
142		target: PeerId,
143
144		/// Found peers.
145		peers: Vec<(PeerId, Vec<Multiaddr>)>,
146	},
147
148	/// `GetRecord` query succeeded.
149	GetRecordSuccess {
150		/// Query ID.
151		query_id: QueryId,
152	},
153
154	/// Record was found from the DHT.
155	GetRecordPartialResult {
156		/// Query ID.
157		query_id: QueryId,
158
159		/// Record.
160		record: PeerRecord,
161	},
162
163	/// Record was successfully stored on the DHT.
164	PutRecordSuccess {
165		/// Query ID.
166		query_id: QueryId,
167	},
168
169	/// Providers were successfully retrieved.
170	GetProvidersSuccess {
171		/// Query ID.
172		query_id: QueryId,
173		/// Found providers sorted by distance to provided key.
174		providers: Vec<ContentProvider>,
175	},
176
177	/// Query failed.
178	QueryFailed {
179		/// Query ID.
180		query_id: QueryId,
181	},
182
183	/// Incoming record to store.
184	IncomingRecord {
185		/// Record.
186		record: Record,
187	},
188
189	/// Started a random Kademlia query.
190	RandomKademliaStarted,
191}
192
193/// Discovery.
194pub struct Discovery {
195	/// Local peer ID.
196	local_peer_id: litep2p::PeerId,
197
198	/// Ping event stream.
199	ping_event_stream: Box<dyn Stream<Item = PingEvent> + Send + Unpin>,
200
201	/// Identify event stream.
202	identify_event_stream: Box<dyn Stream<Item = IdentifyEvent> + Send + Unpin>,
203
204	/// mDNS event stream, if enabled.
205	mdns_event_stream: Option<Box<dyn Stream<Item = MdnsEvent> + Send + Unpin>>,
206
207	/// Kademlia handle.
208	kademlia_handle: KademliaHandle,
209
210	/// `Peerstore` handle.
211	_peerstore_handle: Arc<dyn PeerStoreProvider>,
212
213	/// Next Kademlia query for a random peer ID.
214	///
215	/// If `None`, there is currently a query pending.
216	next_kad_query: Option<Delay>,
217
218	/// Active `FIND_NODE` query if it exists.
219	random_walk_query_id: Option<QueryId>,
220
221	/// Pending events.
222	pending_events: VecDeque<DiscoveryEvent>,
223
224	/// Allow non-global addresses in the DHT.
225	allow_non_global_addresses: bool,
226
227	/// Protocols supported by the local node.
228	local_protocols: HashSet<ProtocolName>,
229
230	/// Public addresses.
231	public_addresses: HashSet<Multiaddr>,
232
233	/// Listen addresses.
234	listen_addresses: Arc<RwLock<HashSet<Multiaddr>>>,
235
236	/// External address confirmations.
237	address_confirmations: LruMap<Multiaddr, HashSet<PeerId>>,
238
239	/// Delay to next `FIND_NODE` query.
240	duration_to_next_find_query: Duration,
241}
242
243/// Legacy (fallback) Kademlia protocol name based on `protocol_id`.
244fn legacy_kademlia_protocol_name(id: &ProtocolId) -> ProtocolName {
245	ProtocolName::from(format!("/{}/kad", id.as_ref()))
246}
247
248/// Kademlia protocol name based on `genesis_hash` and `fork_id`.
249fn kademlia_protocol_name<Hash: AsRef<[u8]>>(
250	genesis_hash: Hash,
251	fork_id: Option<&str>,
252) -> ProtocolName {
253	let genesis_hash_hex = bytes2hex("", genesis_hash.as_ref());
254	let protocol = if let Some(fork_id) = fork_id {
255		format!("/{}/{}/kad", genesis_hash_hex, fork_id)
256	} else {
257		format!("/{}/kad", genesis_hash_hex)
258	};
259
260	ProtocolName::from(protocol)
261}
262
263impl Discovery {
264	/// Create new [`Discovery`].
265	///
266	/// Enables `/ipfs/ping/1.0.0` and `/ipfs/identify/1.0.0` by default and starts
267	/// the mDNS peer discovery if it was enabled.
268	pub fn new<Hash: AsRef<[u8]> + Clone>(
269		local_peer_id: litep2p::PeerId,
270		config: &NetworkConfiguration,
271		genesis_hash: Hash,
272		fork_id: Option<&str>,
273		protocol_id: &ProtocolId,
274		known_peers: HashMap<PeerId, Vec<Multiaddr>>,
275		listen_addresses: Arc<RwLock<HashSet<Multiaddr>>>,
276		_peerstore_handle: Arc<dyn PeerStoreProvider>,
277	) -> (Self, PingConfig, IdentifyConfig, KademliaConfig, Option<MdnsConfig>) {
278		let (ping_config, ping_event_stream) = PingConfig::default();
279		let user_agent = format!("{} ({}) (litep2p)", config.client_version, config.node_name);
280
281		let (identify_config, identify_event_stream) =
282			IdentifyConfig::new("/substrate/1.0".to_string(), Some(user_agent));
283
284		let (mdns_config, mdns_event_stream) = match config.transport {
285			crate::config::TransportConfig::Normal { enable_mdns, .. } => match enable_mdns {
286				true => {
287					let (mdns_config, mdns_event_stream) = MdnsConfig::new(MDNS_QUERY_INTERVAL);
288					(Some(mdns_config), Some(mdns_event_stream))
289				},
290				false => (None, None),
291			},
292			_ => panic!("memory transport not supported"),
293		};
294
295		let (kademlia_config, kademlia_handle) = {
296			let protocol_names = vec![
297				kademlia_protocol_name(genesis_hash.clone(), fork_id),
298				legacy_kademlia_protocol_name(protocol_id),
299			];
300
301			KademliaConfigBuilder::new()
302				.with_known_peers(known_peers)
303				.with_protocol_names(protocol_names)
304				.with_incoming_records_validation_mode(IncomingRecordValidationMode::Manual)
305				.with_provider_record_ttl(KADEMLIA_PROVIDER_RECORD_TTL)
306				.with_provider_refresh_interval(KADEMLIA_PROVIDER_REPUBLISH_INTERVAL)
307				.with_max_provider_keys(KADEMLIA_MAX_PROVIDER_KEYS)
308				.build()
309		};
310
311		(
312			Self {
313				local_peer_id,
314				ping_event_stream,
315				identify_event_stream,
316				mdns_event_stream,
317				kademlia_handle,
318				_peerstore_handle,
319				listen_addresses,
320				random_walk_query_id: None,
321				pending_events: VecDeque::new(),
322				duration_to_next_find_query: Duration::from_secs(1),
323				address_confirmations: LruMap::new(ByLength::new(MAX_EXTERNAL_ADDRESSES)),
324				allow_non_global_addresses: config.allow_non_globals_in_dht,
325				public_addresses: config.public_addresses.iter().cloned().map(Into::into).collect(),
326				next_kad_query: Some(Delay::new(KADEMLIA_QUERY_INTERVAL)),
327				local_protocols: HashSet::from_iter([kademlia_protocol_name(
328					genesis_hash,
329					fork_id,
330				)]),
331			},
332			ping_config,
333			identify_config,
334			kademlia_config,
335			mdns_config,
336		)
337	}
338
339	/// Add known peer to `Kademlia`.
340	#[allow(unused)]
341	pub async fn add_known_peer(&mut self, peer: PeerId, addresses: Vec<Multiaddr>) {
342		self.kademlia_handle.add_known_peer(peer, addresses).await;
343	}
344
345	/// Add self-reported addresses to routing table if `peer` supports
346	/// at least one of the locally supported DHT protocol.
347	pub async fn add_self_reported_address(
348		&mut self,
349		peer: PeerId,
350		supported_protocols: HashSet<ProtocolName>,
351		addresses: Vec<Multiaddr>,
352	) {
353		if self.local_protocols.is_disjoint(&supported_protocols) {
354			log::trace!(
355				target: LOG_TARGET,
356				"Ignoring self-reported address of peer {peer} as remote node is not part of the \
357				 Kademlia DHT supported by the local node.",
358			);
359			return
360		}
361
362		let addresses = addresses
363			.into_iter()
364			.filter_map(|address| {
365				if !self.allow_non_global_addresses && !Discovery::can_add_to_dht(&address) {
366					log::trace!(
367						target: LOG_TARGET,
368						"ignoring self-reported non-global address {address} from {peer}."
369					);
370
371					return None
372				}
373
374				Some(address)
375			})
376			.collect();
377
378		log::trace!(
379			target: LOG_TARGET,
380			"add self-reported addresses for {peer:?}: {addresses:?}",
381		);
382
383		self.kademlia_handle.add_known_peer(peer, addresses).await;
384	}
385
386	/// Start Kademlia `FIND_NODE` query for `target`.
387	pub async fn find_node(&mut self, target: PeerId) -> QueryId {
388		self.kademlia_handle.find_node(target).await
389	}
390
391	/// Start Kademlia `GET_VALUE` query for `key`.
392	pub async fn get_value(&mut self, key: KademliaKey) -> QueryId {
393		self.kademlia_handle
394			.get_record(
395				RecordKey::new(&key.to_vec()),
396				Quorum::N(NonZeroUsize::new(GET_RECORD_REDUNDANCY_FACTOR).unwrap()),
397			)
398			.await
399	}
400
401	/// Publish value on the DHT using Kademlia `PUT_VALUE`.
402	pub async fn put_value(&mut self, key: KademliaKey, value: Vec<u8>) -> QueryId {
403		self.kademlia_handle
404			.put_record(Record::new(RecordKey::new(&key.to_vec()), value))
405			.await
406	}
407
408	/// Put record to given peers.
409	pub async fn put_value_to_peers(
410		&mut self,
411		record: Record,
412		peers: Vec<sc_network_types::PeerId>,
413		update_local_storage: bool,
414	) -> QueryId {
415		self.kademlia_handle
416			.put_record_to_peers(
417				record,
418				peers.into_iter().map(|peer| peer.into()).collect(),
419				update_local_storage,
420			)
421			.await
422	}
423
424	/// Store record in the local DHT store.
425	pub async fn store_record(
426		&mut self,
427		key: KademliaKey,
428		value: Vec<u8>,
429		publisher: Option<sc_network_types::PeerId>,
430		expires: Option<Instant>,
431	) {
432		log::debug!(
433			target: LOG_TARGET,
434			"Storing DHT record with key {key:?}, originally published by {publisher:?}, \
435			 expires {expires:?}.",
436		);
437
438		self.kademlia_handle
439			.store_record(Record {
440				key: RecordKey::new(&key.to_vec()),
441				value,
442				publisher: publisher.map(Into::into),
443				expires,
444			})
445			.await;
446	}
447
448	/// Start providing `key`.
449	pub async fn start_providing(&mut self, key: KademliaKey) {
450		self.kademlia_handle.start_providing(key.into()).await;
451	}
452
453	/// Stop providing `key`.
454	pub async fn stop_providing(&mut self, key: KademliaKey) {
455		self.kademlia_handle.stop_providing(key.into()).await;
456	}
457
458	/// Get providers for `key`.
459	pub async fn get_providers(&mut self, key: KademliaKey) -> QueryId {
460		self.kademlia_handle.get_providers(key.into()).await
461	}
462
463	/// Check if the observed address is a known address.
464	fn is_known_address(known: &Multiaddr, observed: &Multiaddr) -> bool {
465		let mut known = known.iter();
466		let mut observed = observed.iter();
467
468		loop {
469			match (known.next(), observed.next()) {
470				(None, None) => return true,
471				(None, Some(Protocol::P2p(_))) => return true,
472				(Some(Protocol::P2p(_)), None) => return true,
473				(known, observed) if known != observed => return false,
474				_ => {},
475			}
476		}
477	}
478
479	/// Can `address` be added to DHT.
480	fn can_add_to_dht(address: &Multiaddr) -> bool {
481		let ip = match address.iter().next() {
482			Some(Protocol::Ip4(ip)) => IpNetwork::from(ip),
483			Some(Protocol::Ip6(ip)) => IpNetwork::from(ip),
484			Some(Protocol::Dns(_)) | Some(Protocol::Dns4(_)) | Some(Protocol::Dns6(_)) =>
485				return true,
486			_ => return false,
487		};
488
489		ip.is_global()
490	}
491
492	/// Check if `address` can be considered a new external address.
493	///
494	/// If this address replaces an older address, the expired address is returned.
495	fn is_new_external_address(
496		&mut self,
497		address: &Multiaddr,
498		peer: PeerId,
499	) -> (bool, Option<Multiaddr>) {
500		log::trace!(target: LOG_TARGET, "verify new external address: {address}");
501
502		if !self.allow_non_global_addresses && !Discovery::can_add_to_dht(&address) {
503			log::trace!(
504				target: LOG_TARGET,
505				"ignoring externally reported non-global address {address} from {peer}."
506			);
507
508			return (false, None);
509		}
510
511		// is the address one of our known addresses
512		if self
513			.listen_addresses
514			.read()
515			.iter()
516			.chain(self.public_addresses.iter())
517			.any(|known_address| Discovery::is_known_address(&known_address, &address))
518		{
519			return (true, None)
520		}
521
522		match self.address_confirmations.get(address) {
523			Some(confirmations) => {
524				confirmations.insert(peer);
525
526				if confirmations.len() >= MIN_ADDRESS_CONFIRMATIONS {
527					return (true, None)
528				}
529			},
530			None => {
531				let oldest = (self.address_confirmations.len() >=
532					self.address_confirmations.limiter().max_length() as usize)
533					.then(|| {
534						self.address_confirmations.pop_oldest().map(|(address, peers)| {
535							if peers.len() >= MIN_ADDRESS_CONFIRMATIONS {
536								return Some(address)
537							} else {
538								None
539							}
540						})
541					})
542					.flatten()
543					.flatten();
544
545				self.address_confirmations.insert(address.clone(), iter::once(peer).collect());
546
547				return (false, oldest)
548			},
549		}
550
551		(false, None)
552	}
553}
554
555impl Stream for Discovery {
556	type Item = DiscoveryEvent;
557
558	fn poll_next(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
559		let this = Pin::into_inner(self);
560
561		if let Some(event) = this.pending_events.pop_front() {
562			return Poll::Ready(Some(event))
563		}
564
565		if let Some(mut delay) = this.next_kad_query.take() {
566			match delay.poll_unpin(cx) {
567				Poll::Pending => {
568					this.next_kad_query = Some(delay);
569				},
570				Poll::Ready(()) => {
571					let peer = PeerId::random();
572
573					log::trace!(target: LOG_TARGET, "start next kademlia query for {peer:?}");
574
575					match this.kademlia_handle.try_find_node(peer) {
576						Ok(query_id) => {
577							this.random_walk_query_id = Some(query_id);
578							return Poll::Ready(Some(DiscoveryEvent::RandomKademliaStarted))
579						},
580						Err(()) => {
581							this.duration_to_next_find_query = cmp::min(
582								this.duration_to_next_find_query * 2,
583								Duration::from_secs(60),
584							);
585							this.next_kad_query =
586								Some(Delay::new(this.duration_to_next_find_query));
587						},
588					}
589				},
590			}
591		}
592
593		match Pin::new(&mut this.kademlia_handle).poll_next(cx) {
594			Poll::Pending => {},
595			Poll::Ready(None) => return Poll::Ready(None),
596			Poll::Ready(Some(KademliaEvent::FindNodeSuccess { query_id, peers, .. }))
597				if Some(query_id) == this.random_walk_query_id =>
598			{
599				// the addresses are already inserted into the DHT and in `TransportManager` so
600				// there is no need to add them again. The found peers must be registered to
601				// `Peerstore` so other protocols are aware of them through `Peerset`.
602				log::trace!(target: LOG_TARGET, "dht random walk yielded {} peers", peers.len());
603
604				this.next_kad_query = Some(Delay::new(KADEMLIA_QUERY_INTERVAL));
605
606				return Poll::Ready(Some(DiscoveryEvent::RoutingTableUpdate {
607					peers: peers.into_iter().map(|(peer, _)| peer).collect(),
608				}))
609			},
610			Poll::Ready(Some(KademliaEvent::FindNodeSuccess { query_id, target, peers })) => {
611				log::trace!(target: LOG_TARGET, "find node query yielded {} peers", peers.len());
612
613				return Poll::Ready(Some(DiscoveryEvent::FindNodeSuccess {
614					query_id,
615					target,
616					peers,
617				}))
618			},
619			Poll::Ready(Some(KademliaEvent::RoutingTableUpdate { peers })) => {
620				log::trace!(target: LOG_TARGET, "routing table update, discovered {} peers", peers.len());
621
622				return Poll::Ready(Some(DiscoveryEvent::RoutingTableUpdate {
623					peers: peers.into_iter().collect(),
624				}))
625			},
626			Poll::Ready(Some(KademliaEvent::GetRecordSuccess { query_id })) => {
627				log::trace!(
628					target: LOG_TARGET,
629					"`GET_RECORD` succeeded for {query_id:?}",
630				);
631
632				return Poll::Ready(Some(DiscoveryEvent::GetRecordSuccess { query_id }));
633			},
634			Poll::Ready(Some(KademliaEvent::GetRecordPartialResult { query_id, record })) => {
635				log::trace!(
636					target: LOG_TARGET,
637					"`GET_RECORD` intermediary succeeded for {query_id:?}: {record:?}",
638				);
639
640				return Poll::Ready(Some(DiscoveryEvent::GetRecordPartialResult {
641					query_id,
642					record,
643				}));
644			},
645			Poll::Ready(Some(KademliaEvent::PutRecordSuccess { query_id, key: _ })) =>
646				return Poll::Ready(Some(DiscoveryEvent::PutRecordSuccess { query_id })),
647			Poll::Ready(Some(KademliaEvent::QueryFailed { query_id })) => {
648				match this.random_walk_query_id == Some(query_id) {
649					true => {
650						this.random_walk_query_id = None;
651						this.duration_to_next_find_query =
652							cmp::min(this.duration_to_next_find_query * 2, Duration::from_secs(60));
653						this.next_kad_query = Some(Delay::new(this.duration_to_next_find_query));
654					},
655					false => return Poll::Ready(Some(DiscoveryEvent::QueryFailed { query_id })),
656				}
657			},
658			Poll::Ready(Some(KademliaEvent::IncomingRecord { record })) => {
659				log::trace!(
660					target: LOG_TARGET,
661					"incoming `PUT_RECORD` request with key {:?} from publisher {:?}",
662					record.key,
663					record.publisher,
664				);
665
666				return Poll::Ready(Some(DiscoveryEvent::IncomingRecord { record }))
667			},
668			Poll::Ready(Some(KademliaEvent::GetProvidersSuccess {
669				provided_key,
670				providers,
671				query_id,
672			})) => {
673				log::trace!(
674					target: LOG_TARGET,
675					"`GET_PROVIDERS` for {query_id:?} with {provided_key:?} yielded {providers:?}",
676				);
677
678				return Poll::Ready(Some(DiscoveryEvent::GetProvidersSuccess {
679					query_id,
680					providers,
681				}))
682			},
683			// We do not validate incoming providers.
684			Poll::Ready(Some(KademliaEvent::IncomingProvider { .. })) => {},
685		}
686
687		match Pin::new(&mut this.identify_event_stream).poll_next(cx) {
688			Poll::Pending => {},
689			Poll::Ready(None) => return Poll::Ready(None),
690			Poll::Ready(Some(IdentifyEvent::PeerIdentified {
691				peer,
692				listen_addresses,
693				supported_protocols,
694				observed_address,
695				..
696			})) => {
697				let observed_address =
698					if let Some(Protocol::P2p(peer_id)) = observed_address.iter().last() {
699						if peer_id != *this.local_peer_id.as_ref() {
700							log::warn!(
701								target: LOG_TARGET,
702								"Discovered external address for a peer that is not us: {observed_address}",
703							);
704							None
705						} else {
706							Some(observed_address)
707						}
708					} else {
709						Some(observed_address.with(Protocol::P2p(this.local_peer_id.into())))
710					};
711
712				// Ensure that an external address with a different peer ID does not have
713				// side effects of evicting other external addresses via `ExternalAddressExpired`.
714				if let Some(observed_address) = observed_address {
715					let (is_new, expired_address) =
716						this.is_new_external_address(&observed_address, peer);
717
718					if let Some(expired_address) = expired_address {
719						log::trace!(
720							target: LOG_TARGET,
721							"Removing expired external address expired={expired_address} is_new={is_new} observed={observed_address}",
722						);
723
724						this.pending_events.push_back(DiscoveryEvent::ExternalAddressExpired {
725							address: expired_address,
726						});
727					}
728
729					if is_new {
730						this.pending_events.push_back(DiscoveryEvent::ExternalAddressDiscovered {
731							address: observed_address.clone(),
732						});
733					}
734				}
735
736				return Poll::Ready(Some(DiscoveryEvent::Identified {
737					peer,
738					listen_addresses,
739					supported_protocols,
740				}));
741			},
742		}
743
744		match Pin::new(&mut this.ping_event_stream).poll_next(cx) {
745			Poll::Pending => {},
746			Poll::Ready(None) => return Poll::Ready(None),
747			Poll::Ready(Some(PingEvent::Ping { peer, ping })) =>
748				return Poll::Ready(Some(DiscoveryEvent::Ping { peer, rtt: ping })),
749		}
750
751		if let Some(ref mut mdns_event_stream) = &mut this.mdns_event_stream {
752			match Pin::new(mdns_event_stream).poll_next(cx) {
753				Poll::Pending => {},
754				Poll::Ready(None) => return Poll::Ready(None),
755				Poll::Ready(Some(MdnsEvent::Discovered(addresses))) =>
756					return Poll::Ready(Some(DiscoveryEvent::Discovered { addresses })),
757			}
758		}
759
760		Poll::Pending
761	}
762}
763
764#[cfg(test)]
765mod tests {
766	use super::*;
767
768	use std::sync::atomic::AtomicU32;
769
770	use crate::{
771		config::ProtocolId,
772		peer_store::{PeerStore, PeerStoreProvider},
773	};
774	use futures::{stream::FuturesUnordered, StreamExt};
775	use sp_core::H256;
776	use sp_tracing::tracing_subscriber;
777
778	use litep2p::{
779		config::ConfigBuilder as Litep2pConfigBuilder, transport::tcp::config::Config as TcpConfig,
780		Litep2p,
781	};
782
783	#[tokio::test]
784	async fn litep2p_discovery_works() {
785		let _ = tracing_subscriber::fmt()
786			.with_env_filter(tracing_subscriber::EnvFilter::from_default_env())
787			.try_init();
788
789		let mut known_peers = HashMap::new();
790		let genesis_hash = H256::from_low_u64_be(1);
791		let fork_id = Some("test-fork-id");
792		let protocol_id = ProtocolId::from("dot");
793
794		// Build backends such that the first peer is known to all other peers.
795		let backends = (0..10)
796			.map(|i| {
797				let keypair = litep2p::crypto::ed25519::Keypair::generate();
798				let peer_id: PeerId = keypair.public().to_peer_id().into();
799
800				let listen_addresses = Arc::new(RwLock::new(HashSet::new()));
801
802				let peer_store = PeerStore::new(vec![], None);
803				let peer_store_handle: Arc<dyn PeerStoreProvider> = Arc::new(peer_store.handle());
804
805				let (discovery, ping_config, identify_config, kademlia_config, _mdns) =
806					Discovery::new(
807						peer_id,
808						&NetworkConfiguration::new_local(),
809						genesis_hash,
810						fork_id,
811						&protocol_id,
812						known_peers.clone(),
813						listen_addresses.clone(),
814						peer_store_handle,
815					);
816
817				let config = Litep2pConfigBuilder::new()
818					.with_keypair(keypair)
819					.with_tcp(TcpConfig {
820						listen_addresses: vec!["/ip6/::1/tcp/0".parse().unwrap()],
821						..Default::default()
822					})
823					.with_libp2p_ping(ping_config)
824					.with_libp2p_identify(identify_config)
825					.with_libp2p_kademlia(kademlia_config)
826					.build();
827
828				let mut litep2p = Litep2p::new(config).unwrap();
829
830				let addresses = litep2p.listen_addresses().cloned().collect::<Vec<_>>();
831				// Propagate addresses to discovery.
832				addresses.iter().for_each(|address| {
833					listen_addresses.write().insert(address.clone());
834				});
835
836				// Except the first peer, all other peers know the first peer addresses.
837				if i == 0 {
838					log::info!(target: LOG_TARGET, "First peer is {peer_id:?} with addresses {addresses:?}");
839					known_peers.insert(peer_id, addresses.clone());
840				} else {
841					let (peer, addresses) = known_peers.iter().next().unwrap();
842
843					let result = litep2p.add_known_address(*peer, addresses.into_iter().cloned());
844
845					log::info!(target: LOG_TARGET, "{peer_id:?}: Adding known peer {peer:?} with addresses {addresses:?} result={result:?}");
846
847				}
848
849				(peer_id, litep2p, discovery)
850			})
851			.collect::<Vec<_>>();
852
853		let total_peers = backends.len() as u32;
854		let remaining_peers =
855			backends.iter().map(|(peer_id, _, _)| *peer_id).collect::<HashSet<_>>();
856
857		let first_peer = *known_peers.iter().next().unwrap().0;
858
859		// Each backend must discover the whole network.
860		let mut futures = FuturesUnordered::new();
861		let num_finished = Arc::new(AtomicU32::new(0));
862
863		for (peer_id, mut litep2p, mut discovery) in backends {
864			// Remove the local peer id from the set.
865			let mut remaining_peers = remaining_peers.clone();
866			remaining_peers.remove(&peer_id);
867
868			let num_finished = num_finished.clone();
869
870			let future = async move {
871				log::info!(target: LOG_TARGET, "{peer_id:?} starting loop");
872
873				if peer_id != first_peer {
874					log::info!(target: LOG_TARGET, "{peer_id:?} dialing {first_peer:?}");
875					litep2p.dial(&first_peer).await.unwrap();
876				}
877
878				loop {
879					// We need to keep the network alive until all peers are discovered.
880					if num_finished.load(std::sync::atomic::Ordering::Relaxed) == total_peers {
881						log::info!(target: LOG_TARGET, "{peer_id:?} all peers discovered");
882						break
883					}
884
885					tokio::select! {
886						// Drive litep2p backend forward.
887						event = litep2p.next_event() => {
888							log::info!(target: LOG_TARGET, "{peer_id:?} Litep2p event: {event:?}");
889						},
890
891						// Detect discovery events.
892						event = discovery.next() => {
893							match event.unwrap() {
894								// We have discovered the peer via kademlia and established
895								// a connection on the identify protocol.
896								DiscoveryEvent::Identified { peer, .. } => {
897									log::info!(target: LOG_TARGET, "{peer_id:?} Peer {peer} identified");
898
899									remaining_peers.remove(&peer);
900
901									if remaining_peers.is_empty() {
902										log::info!(target: LOG_TARGET, "{peer_id:?} All peers discovered");
903
904										num_finished.fetch_add(1, std::sync::atomic::Ordering::AcqRel);
905									}
906								},
907
908								event => {
909									log::info!(target: LOG_TARGET, "{peer_id:?} Discovery event: {event:?}");
910								}
911							}
912						}
913					}
914				}
915			};
916
917			futures.push(future);
918		}
919
920		// Futures will exit when all peers are discovered.
921		tokio::time::timeout(Duration::from_secs(60), futures.next())
922			.await
923			.expect("All peers should finish within 60 seconds");
924	}
925}