sc_network/litep2p/
discovery.rs

1// This file is part of Substrate.
2
3// Copyright (C) Parity Technologies (UK) Ltd.
4// SPDX-License-Identifier: GPL-3.0-or-later WITH Classpath-exception-2.0
5
6// This program is free software: you can redistribute it and/or modify
7// it under the terms of the GNU General Public License as published by
8// the Free Software Foundation, either version 3 of the License, or
9// (at your option) any later version.
10
11// This program is distributed in the hope that it will be useful,
12// but WITHOUT ANY WARRANTY; without even the implied warranty of
13// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
14// GNU General Public License for more details.
15
16// You should have received a copy of the GNU General Public License
17// along with this program. If not, see <https://www.gnu.org/licenses/>.
18
19//! libp2p-related discovery code for litep2p backend.
20
21use crate::{
22	config::{NetworkConfiguration, ProtocolId},
23	peer_store::PeerStoreProvider,
24};
25
26use array_bytes::bytes2hex;
27use futures::{FutureExt, Stream};
28use futures_timer::Delay;
29use ip_network::IpNetwork;
30use libp2p::kad::record::Key as KademliaKey;
31use litep2p::{
32	protocol::{
33		libp2p::{
34			identify::{Config as IdentifyConfig, IdentifyEvent},
35			kademlia::{
36				Config as KademliaConfig, ConfigBuilder as KademliaConfigBuilder,
37				IncomingRecordValidationMode, KademliaEvent, KademliaHandle, QueryId, Quorum,
38				Record, RecordKey, RecordsType,
39			},
40			ping::{Config as PingConfig, PingEvent},
41		},
42		mdns::{Config as MdnsConfig, MdnsEvent},
43	},
44	types::multiaddr::{Multiaddr, Protocol},
45	PeerId, ProtocolName,
46};
47use parking_lot::RwLock;
48use schnellru::{ByLength, LruMap};
49
50use std::{
51	cmp,
52	collections::{HashMap, HashSet, VecDeque},
53	num::NonZeroUsize,
54	pin::Pin,
55	sync::Arc,
56	task::{Context, Poll},
57	time::{Duration, Instant},
58};
59
60/// Logging target for the file.
61const LOG_TARGET: &str = "sub-libp2p::discovery";
62
63/// Kademlia query interval.
64const KADEMLIA_QUERY_INTERVAL: Duration = Duration::from_secs(5);
65
66/// mDNS query interval.
67const MDNS_QUERY_INTERVAL: Duration = Duration::from_secs(30);
68
69/// The minimum number of peers we expect an answer before we terminate the request.
70const GET_RECORD_REDUNDANCY_FACTOR: usize = 4;
71
72/// The maximum number of tracked external addresses we allow.
73const MAX_EXTERNAL_ADDRESSES: u32 = 32;
74
75/// Minimum number of confirmations received before an address is verified.
76///
77/// Note: all addresses are confirmed by libp2p on the first encounter. This aims to make
78/// addresses a bit more robust.
79const MIN_ADDRESS_CONFIRMATIONS: usize = 2;
80
81/// Discovery events.
82#[derive(Debug)]
83pub enum DiscoveryEvent {
84	/// Ping RTT measured for peer.
85	Ping {
86		/// Remote peer ID.
87		peer: PeerId,
88
89		/// Ping round-trip time.
90		rtt: Duration,
91	},
92
93	/// Peer identified over `/ipfs/identify/1.0.0` protocol.
94	Identified {
95		/// Peer ID.
96		peer: PeerId,
97
98		/// Listen addresses.
99		listen_addresses: Vec<Multiaddr>,
100
101		/// Supported protocols.
102		supported_protocols: HashSet<ProtocolName>,
103	},
104
105	/// One or more addresses discovered.
106	Discovered {
107		/// Discovered addresses.
108		addresses: Vec<Multiaddr>,
109	},
110
111	/// Routing table has been updated.
112	RoutingTableUpdate {
113		/// Peers that were added to routing table.
114		peers: HashSet<PeerId>,
115	},
116
117	/// New external address discovered.
118	ExternalAddressDiscovered {
119		/// Discovered addresses.
120		address: Multiaddr,
121	},
122
123	/// Record was found from the DHT.
124	GetRecordSuccess {
125		/// Query ID.
126		query_id: QueryId,
127
128		/// Records.
129		records: RecordsType,
130	},
131
132	/// Record was successfully stored on the DHT.
133	PutRecordSuccess {
134		/// Query ID.
135		query_id: QueryId,
136	},
137
138	/// Query failed.
139	QueryFailed {
140		/// Query ID.
141		query_id: QueryId,
142	},
143
144	/// Incoming record to store.
145	IncomingRecord {
146		/// Record.
147		record: Record,
148	},
149
150	/// Started a random Kademlia query.
151	RandomKademliaStarted,
152}
153
154/// Discovery.
155pub struct Discovery {
156	/// Ping event stream.
157	ping_event_stream: Box<dyn Stream<Item = PingEvent> + Send + Unpin>,
158
159	/// Identify event stream.
160	identify_event_stream: Box<dyn Stream<Item = IdentifyEvent> + Send + Unpin>,
161
162	/// mDNS event stream, if enabled.
163	mdns_event_stream: Option<Box<dyn Stream<Item = MdnsEvent> + Send + Unpin>>,
164
165	/// Kademlia handle.
166	kademlia_handle: KademliaHandle,
167
168	/// `Peerstore` handle.
169	_peerstore_handle: Arc<dyn PeerStoreProvider>,
170
171	/// Next Kademlia query for a random peer ID.
172	///
173	/// If `None`, there is currently a query pending.
174	next_kad_query: Option<Delay>,
175
176	/// Active `FIND_NODE` query if it exists.
177	find_node_query_id: Option<QueryId>,
178
179	/// Pending events.
180	pending_events: VecDeque<DiscoveryEvent>,
181
182	/// Allow non-global addresses in the DHT.
183	allow_non_global_addresses: bool,
184
185	/// Protocols supported by the local node.
186	local_protocols: HashSet<ProtocolName>,
187
188	/// Public addresses.
189	public_addresses: HashSet<Multiaddr>,
190
191	/// Listen addresses.
192	listen_addresses: Arc<RwLock<HashSet<Multiaddr>>>,
193
194	/// External address confirmations.
195	address_confirmations: LruMap<Multiaddr, HashSet<PeerId>>,
196
197	/// Delay to next `FIND_NODE` query.
198	duration_to_next_find_query: Duration,
199}
200
201/// Legacy (fallback) Kademlia protocol name based on `protocol_id`.
202fn legacy_kademlia_protocol_name(id: &ProtocolId) -> ProtocolName {
203	ProtocolName::from(format!("/{}/kad", id.as_ref()))
204}
205
206/// Kademlia protocol name based on `genesis_hash` and `fork_id`.
207fn kademlia_protocol_name<Hash: AsRef<[u8]>>(
208	genesis_hash: Hash,
209	fork_id: Option<&str>,
210) -> ProtocolName {
211	let genesis_hash_hex = bytes2hex("", genesis_hash.as_ref());
212	let protocol = if let Some(fork_id) = fork_id {
213		format!("/{}/{}/kad", genesis_hash_hex, fork_id)
214	} else {
215		format!("/{}/kad", genesis_hash_hex)
216	};
217
218	ProtocolName::from(protocol)
219}
220
221impl Discovery {
222	/// Create new [`Discovery`].
223	///
224	/// Enables `/ipfs/ping/1.0.0` and `/ipfs/identify/1.0.0` by default and starts
225	/// the mDNS peer discovery if it was enabled.
226	pub fn new<Hash: AsRef<[u8]> + Clone>(
227		config: &NetworkConfiguration,
228		genesis_hash: Hash,
229		fork_id: Option<&str>,
230		protocol_id: &ProtocolId,
231		known_peers: HashMap<PeerId, Vec<Multiaddr>>,
232		listen_addresses: Arc<RwLock<HashSet<Multiaddr>>>,
233		_peerstore_handle: Arc<dyn PeerStoreProvider>,
234	) -> (Self, PingConfig, IdentifyConfig, KademliaConfig, Option<MdnsConfig>) {
235		let (ping_config, ping_event_stream) = PingConfig::default();
236		let user_agent = format!("{} ({})", config.client_version, config.node_name);
237
238		let (identify_config, identify_event_stream) =
239			IdentifyConfig::new("/substrate/1.0".to_string(), Some(user_agent));
240
241		let (mdns_config, mdns_event_stream) = match config.transport {
242			crate::config::TransportConfig::Normal { enable_mdns, .. } => match enable_mdns {
243				true => {
244					let (mdns_config, mdns_event_stream) = MdnsConfig::new(MDNS_QUERY_INTERVAL);
245					(Some(mdns_config), Some(mdns_event_stream))
246				},
247				false => (None, None),
248			},
249			_ => panic!("memory transport not supported"),
250		};
251
252		let (kademlia_config, kademlia_handle) = {
253			let protocol_names = vec![
254				kademlia_protocol_name(genesis_hash.clone(), fork_id),
255				legacy_kademlia_protocol_name(protocol_id),
256			];
257
258			KademliaConfigBuilder::new()
259				.with_known_peers(known_peers)
260				.with_protocol_names(protocol_names)
261				.with_incoming_records_validation_mode(IncomingRecordValidationMode::Manual)
262				.build()
263		};
264
265		(
266			Self {
267				ping_event_stream,
268				identify_event_stream,
269				mdns_event_stream,
270				kademlia_handle,
271				_peerstore_handle,
272				listen_addresses,
273				find_node_query_id: None,
274				pending_events: VecDeque::new(),
275				duration_to_next_find_query: Duration::from_secs(1),
276				address_confirmations: LruMap::new(ByLength::new(MAX_EXTERNAL_ADDRESSES)),
277				allow_non_global_addresses: config.allow_non_globals_in_dht,
278				public_addresses: config.public_addresses.iter().cloned().map(Into::into).collect(),
279				next_kad_query: Some(Delay::new(KADEMLIA_QUERY_INTERVAL)),
280				local_protocols: HashSet::from_iter([kademlia_protocol_name(
281					genesis_hash,
282					fork_id,
283				)]),
284			},
285			ping_config,
286			identify_config,
287			kademlia_config,
288			mdns_config,
289		)
290	}
291
292	/// Add known peer to `Kademlia`.
293	#[allow(unused)]
294	pub async fn add_known_peer(&mut self, peer: PeerId, addresses: Vec<Multiaddr>) {
295		self.kademlia_handle.add_known_peer(peer, addresses).await;
296	}
297
298	/// Add self-reported addresses to routing table if `peer` supports
299	/// at least one of the locally supported DHT protocol.
300	pub async fn add_self_reported_address(
301		&mut self,
302		peer: PeerId,
303		supported_protocols: HashSet<ProtocolName>,
304		addresses: Vec<Multiaddr>,
305	) {
306		if self.local_protocols.is_disjoint(&supported_protocols) {
307			log::trace!(
308				target: LOG_TARGET,
309				"Ignoring self-reported address of peer {peer} as remote node is not part of the \
310				 Kademlia DHT supported by the local node.",
311			);
312			return
313		}
314
315		let addresses = addresses
316			.into_iter()
317			.filter_map(|address| {
318				if !self.allow_non_global_addresses && !Discovery::can_add_to_dht(&address) {
319					log::trace!(
320						target: LOG_TARGET,
321						"ignoring self-reported non-global address {address} from {peer}."
322					);
323
324					return None
325				}
326
327				Some(address)
328			})
329			.collect();
330
331		log::trace!(
332			target: LOG_TARGET,
333			"add self-reported addresses for {peer:?}: {addresses:?}",
334		);
335
336		self.kademlia_handle.add_known_peer(peer, addresses).await;
337	}
338
339	/// Start Kademlia `GET_VALUE` query for `key`.
340	pub async fn get_value(&mut self, key: KademliaKey) -> QueryId {
341		self.kademlia_handle
342			.get_record(
343				RecordKey::new(&key.to_vec()),
344				Quorum::N(NonZeroUsize::new(GET_RECORD_REDUNDANCY_FACTOR).unwrap()),
345			)
346			.await
347	}
348
349	/// Publish value on the DHT using Kademlia `PUT_VALUE`.
350	pub async fn put_value(&mut self, key: KademliaKey, value: Vec<u8>) -> QueryId {
351		self.kademlia_handle
352			.put_record(Record::new(RecordKey::new(&key.to_vec()), value))
353			.await
354	}
355
356	/// Put record to given peers.
357	pub async fn put_value_to_peers(
358		&mut self,
359		record: Record,
360		peers: Vec<sc_network_types::PeerId>,
361		update_local_storage: bool,
362	) -> QueryId {
363		self.kademlia_handle
364			.put_record_to_peers(
365				record,
366				peers.into_iter().map(|peer| peer.into()).collect(),
367				update_local_storage,
368			)
369			.await
370	}
371
372	/// Store record in the local DHT store.
373	pub async fn store_record(
374		&mut self,
375		key: KademliaKey,
376		value: Vec<u8>,
377		publisher: Option<sc_network_types::PeerId>,
378		expires: Option<Instant>,
379	) {
380		log::debug!(
381			target: LOG_TARGET,
382			"Storing DHT record with key {key:?}, originally published by {publisher:?}, \
383			 expires {expires:?}.",
384		);
385
386		self.kademlia_handle
387			.store_record(Record {
388				key: RecordKey::new(&key.to_vec()),
389				value,
390				publisher: publisher.map(Into::into),
391				expires,
392			})
393			.await;
394	}
395
396	/// Check if the observed address is a known address.
397	fn is_known_address(known: &Multiaddr, observed: &Multiaddr) -> bool {
398		let mut known = known.iter();
399		let mut observed = observed.iter();
400
401		loop {
402			match (known.next(), observed.next()) {
403				(None, None) => return true,
404				(None, Some(Protocol::P2p(_))) => return true,
405				(Some(Protocol::P2p(_)), None) => return true,
406				(known, observed) if known != observed => return false,
407				_ => {},
408			}
409		}
410	}
411
412	/// Can `address` be added to DHT.
413	fn can_add_to_dht(address: &Multiaddr) -> bool {
414		let ip = match address.iter().next() {
415			Some(Protocol::Ip4(ip)) => IpNetwork::from(ip),
416			Some(Protocol::Ip6(ip)) => IpNetwork::from(ip),
417			Some(Protocol::Dns(_)) | Some(Protocol::Dns4(_)) | Some(Protocol::Dns6(_)) =>
418				return true,
419			_ => return false,
420		};
421
422		ip.is_global()
423	}
424
425	/// Check if `address` can be considered a new external address.
426	fn is_new_external_address(&mut self, address: &Multiaddr, peer: PeerId) -> bool {
427		log::trace!(target: LOG_TARGET, "verify new external address: {address}");
428
429		// is the address one of our known addresses
430		if self
431			.listen_addresses
432			.read()
433			.iter()
434			.chain(self.public_addresses.iter())
435			.any(|known_address| Discovery::is_known_address(&known_address, &address))
436		{
437			return true
438		}
439
440		match self.address_confirmations.get(address) {
441			Some(confirmations) => {
442				confirmations.insert(peer);
443
444				if confirmations.len() >= MIN_ADDRESS_CONFIRMATIONS {
445					return true
446				}
447			},
448			None => {
449				self.address_confirmations.insert(address.clone(), Default::default());
450			},
451		}
452
453		false
454	}
455}
456
457impl Stream for Discovery {
458	type Item = DiscoveryEvent;
459
460	fn poll_next(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
461		let this = Pin::into_inner(self);
462
463		if let Some(event) = this.pending_events.pop_front() {
464			return Poll::Ready(Some(event))
465		}
466
467		if let Some(mut delay) = this.next_kad_query.take() {
468			match delay.poll_unpin(cx) {
469				Poll::Pending => {
470					this.next_kad_query = Some(delay);
471				},
472				Poll::Ready(()) => {
473					let peer = PeerId::random();
474
475					log::trace!(target: LOG_TARGET, "start next kademlia query for {peer:?}");
476
477					match this.kademlia_handle.try_find_node(peer) {
478						Ok(query_id) => {
479							this.find_node_query_id = Some(query_id);
480							return Poll::Ready(Some(DiscoveryEvent::RandomKademliaStarted))
481						},
482						Err(()) => {
483							this.duration_to_next_find_query = cmp::min(
484								this.duration_to_next_find_query * 2,
485								Duration::from_secs(60),
486							);
487							this.next_kad_query =
488								Some(Delay::new(this.duration_to_next_find_query));
489						},
490					}
491				},
492			}
493		}
494
495		match Pin::new(&mut this.kademlia_handle).poll_next(cx) {
496			Poll::Pending => {},
497			Poll::Ready(None) => return Poll::Ready(None),
498			Poll::Ready(Some(KademliaEvent::FindNodeSuccess { peers, .. })) => {
499				// the addresses are already inserted into the DHT and in `TransportManager` so
500				// there is no need to add them again. The found peers must be registered to
501				// `Peerstore` so other protocols are aware of them through `Peerset`.
502				log::trace!(target: LOG_TARGET, "dht random walk yielded {} peers", peers.len());
503
504				this.next_kad_query = Some(Delay::new(KADEMLIA_QUERY_INTERVAL));
505
506				return Poll::Ready(Some(DiscoveryEvent::RoutingTableUpdate {
507					peers: peers.into_iter().map(|(peer, _)| peer).collect(),
508				}))
509			},
510			Poll::Ready(Some(KademliaEvent::RoutingTableUpdate { peers })) => {
511				log::trace!(target: LOG_TARGET, "routing table update, discovered {} peers", peers.len());
512
513				return Poll::Ready(Some(DiscoveryEvent::RoutingTableUpdate {
514					peers: peers.into_iter().collect(),
515				}))
516			},
517			Poll::Ready(Some(KademliaEvent::GetRecordSuccess { query_id, records })) => {
518				log::trace!(
519					target: LOG_TARGET,
520					"`GET_RECORD` succeeded for {query_id:?}: {records:?}",
521				);
522
523				return Poll::Ready(Some(DiscoveryEvent::GetRecordSuccess { query_id, records }));
524			},
525			Poll::Ready(Some(KademliaEvent::PutRecordSucess { query_id, key: _ })) =>
526				return Poll::Ready(Some(DiscoveryEvent::PutRecordSuccess { query_id })),
527			Poll::Ready(Some(KademliaEvent::QueryFailed { query_id })) => {
528				match this.find_node_query_id == Some(query_id) {
529					true => {
530						this.find_node_query_id = None;
531						this.duration_to_next_find_query =
532							cmp::min(this.duration_to_next_find_query * 2, Duration::from_secs(60));
533						this.next_kad_query = Some(Delay::new(this.duration_to_next_find_query));
534					},
535					false => return Poll::Ready(Some(DiscoveryEvent::QueryFailed { query_id })),
536				}
537			},
538			Poll::Ready(Some(KademliaEvent::IncomingRecord { record })) => {
539				log::trace!(
540					target: LOG_TARGET,
541					"incoming `PUT_RECORD` request with key {:?} from publisher {:?}",
542					record.key,
543					record.publisher,
544				);
545
546				return Poll::Ready(Some(DiscoveryEvent::IncomingRecord { record }))
547			},
548		}
549
550		match Pin::new(&mut this.identify_event_stream).poll_next(cx) {
551			Poll::Pending => {},
552			Poll::Ready(None) => return Poll::Ready(None),
553			Poll::Ready(Some(IdentifyEvent::PeerIdentified {
554				peer,
555				listen_addresses,
556				supported_protocols,
557				observed_address,
558				..
559			})) => {
560				if this.is_new_external_address(&observed_address, peer) {
561					this.pending_events.push_back(DiscoveryEvent::ExternalAddressDiscovered {
562						address: observed_address.clone(),
563					});
564				}
565
566				return Poll::Ready(Some(DiscoveryEvent::Identified {
567					peer,
568					listen_addresses,
569					supported_protocols,
570				}));
571			},
572		}
573
574		match Pin::new(&mut this.ping_event_stream).poll_next(cx) {
575			Poll::Pending => {},
576			Poll::Ready(None) => return Poll::Ready(None),
577			Poll::Ready(Some(PingEvent::Ping { peer, ping })) =>
578				return Poll::Ready(Some(DiscoveryEvent::Ping { peer, rtt: ping })),
579		}
580
581		if let Some(ref mut mdns_event_stream) = &mut this.mdns_event_stream {
582			match Pin::new(mdns_event_stream).poll_next(cx) {
583				Poll::Pending => {},
584				Poll::Ready(None) => return Poll::Ready(None),
585				Poll::Ready(Some(MdnsEvent::Discovered(addresses))) =>
586					return Poll::Ready(Some(DiscoveryEvent::Discovered { addresses })),
587			}
588		}
589
590		Poll::Pending
591	}
592}