1use crate::{
22 config::{NetworkConfiguration, ProtocolId},
23 peer_store::PeerStoreProvider,
24};
25
26use array_bytes::bytes2hex;
27use futures::{FutureExt, Stream};
28use futures_timer::Delay;
29use ip_network::IpNetwork;
30use libp2p::kad::record::Key as KademliaKey;
31use litep2p::{
32 protocol::{
33 libp2p::{
34 identify::{Config as IdentifyConfig, IdentifyEvent},
35 kademlia::{
36 Config as KademliaConfig, ConfigBuilder as KademliaConfigBuilder,
37 IncomingRecordValidationMode, KademliaEvent, KademliaHandle, QueryId, Quorum,
38 Record, RecordKey, RecordsType,
39 },
40 ping::{Config as PingConfig, PingEvent},
41 },
42 mdns::{Config as MdnsConfig, MdnsEvent},
43 },
44 types::multiaddr::{Multiaddr, Protocol},
45 PeerId, ProtocolName,
46};
47use parking_lot::RwLock;
48use schnellru::{ByLength, LruMap};
49
50use std::{
51 cmp,
52 collections::{HashMap, HashSet, VecDeque},
53 num::NonZeroUsize,
54 pin::Pin,
55 sync::Arc,
56 task::{Context, Poll},
57 time::{Duration, Instant},
58};
59
60const LOG_TARGET: &str = "sub-libp2p::discovery";
62
63const KADEMLIA_QUERY_INTERVAL: Duration = Duration::from_secs(5);
65
66const MDNS_QUERY_INTERVAL: Duration = Duration::from_secs(30);
68
69const GET_RECORD_REDUNDANCY_FACTOR: usize = 4;
71
72const MAX_EXTERNAL_ADDRESSES: u32 = 32;
74
75const MIN_ADDRESS_CONFIRMATIONS: usize = 2;
80
81#[derive(Debug)]
83pub enum DiscoveryEvent {
84 Ping {
86 peer: PeerId,
88
89 rtt: Duration,
91 },
92
93 Identified {
95 peer: PeerId,
97
98 listen_addresses: Vec<Multiaddr>,
100
101 supported_protocols: HashSet<ProtocolName>,
103 },
104
105 Discovered {
107 addresses: Vec<Multiaddr>,
109 },
110
111 RoutingTableUpdate {
113 peers: HashSet<PeerId>,
115 },
116
117 ExternalAddressDiscovered {
119 address: Multiaddr,
121 },
122
123 GetRecordSuccess {
125 query_id: QueryId,
127
128 records: RecordsType,
130 },
131
132 PutRecordSuccess {
134 query_id: QueryId,
136 },
137
138 QueryFailed {
140 query_id: QueryId,
142 },
143
144 IncomingRecord {
146 record: Record,
148 },
149
150 RandomKademliaStarted,
152}
153
154pub struct Discovery {
156 ping_event_stream: Box<dyn Stream<Item = PingEvent> + Send + Unpin>,
158
159 identify_event_stream: Box<dyn Stream<Item = IdentifyEvent> + Send + Unpin>,
161
162 mdns_event_stream: Option<Box<dyn Stream<Item = MdnsEvent> + Send + Unpin>>,
164
165 kademlia_handle: KademliaHandle,
167
168 _peerstore_handle: Arc<dyn PeerStoreProvider>,
170
171 next_kad_query: Option<Delay>,
175
176 find_node_query_id: Option<QueryId>,
178
179 pending_events: VecDeque<DiscoveryEvent>,
181
182 allow_non_global_addresses: bool,
184
185 local_protocols: HashSet<ProtocolName>,
187
188 public_addresses: HashSet<Multiaddr>,
190
191 listen_addresses: Arc<RwLock<HashSet<Multiaddr>>>,
193
194 address_confirmations: LruMap<Multiaddr, HashSet<PeerId>>,
196
197 duration_to_next_find_query: Duration,
199}
200
201fn legacy_kademlia_protocol_name(id: &ProtocolId) -> ProtocolName {
203 ProtocolName::from(format!("/{}/kad", id.as_ref()))
204}
205
206fn kademlia_protocol_name<Hash: AsRef<[u8]>>(
208 genesis_hash: Hash,
209 fork_id: Option<&str>,
210) -> ProtocolName {
211 let genesis_hash_hex = bytes2hex("", genesis_hash.as_ref());
212 let protocol = if let Some(fork_id) = fork_id {
213 format!("/{}/{}/kad", genesis_hash_hex, fork_id)
214 } else {
215 format!("/{}/kad", genesis_hash_hex)
216 };
217
218 ProtocolName::from(protocol)
219}
220
221impl Discovery {
222 pub fn new<Hash: AsRef<[u8]> + Clone>(
227 config: &NetworkConfiguration,
228 genesis_hash: Hash,
229 fork_id: Option<&str>,
230 protocol_id: &ProtocolId,
231 known_peers: HashMap<PeerId, Vec<Multiaddr>>,
232 listen_addresses: Arc<RwLock<HashSet<Multiaddr>>>,
233 _peerstore_handle: Arc<dyn PeerStoreProvider>,
234 ) -> (Self, PingConfig, IdentifyConfig, KademliaConfig, Option<MdnsConfig>) {
235 let (ping_config, ping_event_stream) = PingConfig::default();
236 let user_agent = format!("{} ({})", config.client_version, config.node_name);
237
238 let (identify_config, identify_event_stream) =
239 IdentifyConfig::new("/substrate/1.0".to_string(), Some(user_agent));
240
241 let (mdns_config, mdns_event_stream) = match config.transport {
242 crate::config::TransportConfig::Normal { enable_mdns, .. } => match enable_mdns {
243 true => {
244 let (mdns_config, mdns_event_stream) = MdnsConfig::new(MDNS_QUERY_INTERVAL);
245 (Some(mdns_config), Some(mdns_event_stream))
246 },
247 false => (None, None),
248 },
249 _ => panic!("memory transport not supported"),
250 };
251
252 let (kademlia_config, kademlia_handle) = {
253 let protocol_names = vec![
254 kademlia_protocol_name(genesis_hash.clone(), fork_id),
255 legacy_kademlia_protocol_name(protocol_id),
256 ];
257
258 KademliaConfigBuilder::new()
259 .with_known_peers(known_peers)
260 .with_protocol_names(protocol_names)
261 .with_incoming_records_validation_mode(IncomingRecordValidationMode::Manual)
262 .build()
263 };
264
265 (
266 Self {
267 ping_event_stream,
268 identify_event_stream,
269 mdns_event_stream,
270 kademlia_handle,
271 _peerstore_handle,
272 listen_addresses,
273 find_node_query_id: None,
274 pending_events: VecDeque::new(),
275 duration_to_next_find_query: Duration::from_secs(1),
276 address_confirmations: LruMap::new(ByLength::new(MAX_EXTERNAL_ADDRESSES)),
277 allow_non_global_addresses: config.allow_non_globals_in_dht,
278 public_addresses: config.public_addresses.iter().cloned().map(Into::into).collect(),
279 next_kad_query: Some(Delay::new(KADEMLIA_QUERY_INTERVAL)),
280 local_protocols: HashSet::from_iter([kademlia_protocol_name(
281 genesis_hash,
282 fork_id,
283 )]),
284 },
285 ping_config,
286 identify_config,
287 kademlia_config,
288 mdns_config,
289 )
290 }
291
292 #[allow(unused)]
294 pub async fn add_known_peer(&mut self, peer: PeerId, addresses: Vec<Multiaddr>) {
295 self.kademlia_handle.add_known_peer(peer, addresses).await;
296 }
297
298 pub async fn add_self_reported_address(
301 &mut self,
302 peer: PeerId,
303 supported_protocols: HashSet<ProtocolName>,
304 addresses: Vec<Multiaddr>,
305 ) {
306 if self.local_protocols.is_disjoint(&supported_protocols) {
307 log::trace!(
308 target: LOG_TARGET,
309 "Ignoring self-reported address of peer {peer} as remote node is not part of the \
310 Kademlia DHT supported by the local node.",
311 );
312 return
313 }
314
315 let addresses = addresses
316 .into_iter()
317 .filter_map(|address| {
318 if !self.allow_non_global_addresses && !Discovery::can_add_to_dht(&address) {
319 log::trace!(
320 target: LOG_TARGET,
321 "ignoring self-reported non-global address {address} from {peer}."
322 );
323
324 return None
325 }
326
327 Some(address)
328 })
329 .collect();
330
331 log::trace!(
332 target: LOG_TARGET,
333 "add self-reported addresses for {peer:?}: {addresses:?}",
334 );
335
336 self.kademlia_handle.add_known_peer(peer, addresses).await;
337 }
338
339 pub async fn get_value(&mut self, key: KademliaKey) -> QueryId {
341 self.kademlia_handle
342 .get_record(
343 RecordKey::new(&key.to_vec()),
344 Quorum::N(NonZeroUsize::new(GET_RECORD_REDUNDANCY_FACTOR).unwrap()),
345 )
346 .await
347 }
348
349 pub async fn put_value(&mut self, key: KademliaKey, value: Vec<u8>) -> QueryId {
351 self.kademlia_handle
352 .put_record(Record::new(RecordKey::new(&key.to_vec()), value))
353 .await
354 }
355
356 pub async fn put_value_to_peers(
358 &mut self,
359 record: Record,
360 peers: Vec<sc_network_types::PeerId>,
361 update_local_storage: bool,
362 ) -> QueryId {
363 self.kademlia_handle
364 .put_record_to_peers(
365 record,
366 peers.into_iter().map(|peer| peer.into()).collect(),
367 update_local_storage,
368 )
369 .await
370 }
371
372 pub async fn store_record(
374 &mut self,
375 key: KademliaKey,
376 value: Vec<u8>,
377 publisher: Option<sc_network_types::PeerId>,
378 expires: Option<Instant>,
379 ) {
380 log::debug!(
381 target: LOG_TARGET,
382 "Storing DHT record with key {key:?}, originally published by {publisher:?}, \
383 expires {expires:?}.",
384 );
385
386 self.kademlia_handle
387 .store_record(Record {
388 key: RecordKey::new(&key.to_vec()),
389 value,
390 publisher: publisher.map(Into::into),
391 expires,
392 })
393 .await;
394 }
395
396 fn is_known_address(known: &Multiaddr, observed: &Multiaddr) -> bool {
398 let mut known = known.iter();
399 let mut observed = observed.iter();
400
401 loop {
402 match (known.next(), observed.next()) {
403 (None, None) => return true,
404 (None, Some(Protocol::P2p(_))) => return true,
405 (Some(Protocol::P2p(_)), None) => return true,
406 (known, observed) if known != observed => return false,
407 _ => {},
408 }
409 }
410 }
411
412 fn can_add_to_dht(address: &Multiaddr) -> bool {
414 let ip = match address.iter().next() {
415 Some(Protocol::Ip4(ip)) => IpNetwork::from(ip),
416 Some(Protocol::Ip6(ip)) => IpNetwork::from(ip),
417 Some(Protocol::Dns(_)) | Some(Protocol::Dns4(_)) | Some(Protocol::Dns6(_)) =>
418 return true,
419 _ => return false,
420 };
421
422 ip.is_global()
423 }
424
425 fn is_new_external_address(&mut self, address: &Multiaddr, peer: PeerId) -> bool {
427 log::trace!(target: LOG_TARGET, "verify new external address: {address}");
428
429 if self
431 .listen_addresses
432 .read()
433 .iter()
434 .chain(self.public_addresses.iter())
435 .any(|known_address| Discovery::is_known_address(&known_address, &address))
436 {
437 return true
438 }
439
440 match self.address_confirmations.get(address) {
441 Some(confirmations) => {
442 confirmations.insert(peer);
443
444 if confirmations.len() >= MIN_ADDRESS_CONFIRMATIONS {
445 return true
446 }
447 },
448 None => {
449 self.address_confirmations.insert(address.clone(), Default::default());
450 },
451 }
452
453 false
454 }
455}
456
457impl Stream for Discovery {
458 type Item = DiscoveryEvent;
459
460 fn poll_next(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
461 let this = Pin::into_inner(self);
462
463 if let Some(event) = this.pending_events.pop_front() {
464 return Poll::Ready(Some(event))
465 }
466
467 if let Some(mut delay) = this.next_kad_query.take() {
468 match delay.poll_unpin(cx) {
469 Poll::Pending => {
470 this.next_kad_query = Some(delay);
471 },
472 Poll::Ready(()) => {
473 let peer = PeerId::random();
474
475 log::trace!(target: LOG_TARGET, "start next kademlia query for {peer:?}");
476
477 match this.kademlia_handle.try_find_node(peer) {
478 Ok(query_id) => {
479 this.find_node_query_id = Some(query_id);
480 return Poll::Ready(Some(DiscoveryEvent::RandomKademliaStarted))
481 },
482 Err(()) => {
483 this.duration_to_next_find_query = cmp::min(
484 this.duration_to_next_find_query * 2,
485 Duration::from_secs(60),
486 );
487 this.next_kad_query =
488 Some(Delay::new(this.duration_to_next_find_query));
489 },
490 }
491 },
492 }
493 }
494
495 match Pin::new(&mut this.kademlia_handle).poll_next(cx) {
496 Poll::Pending => {},
497 Poll::Ready(None) => return Poll::Ready(None),
498 Poll::Ready(Some(KademliaEvent::FindNodeSuccess { peers, .. })) => {
499 log::trace!(target: LOG_TARGET, "dht random walk yielded {} peers", peers.len());
503
504 this.next_kad_query = Some(Delay::new(KADEMLIA_QUERY_INTERVAL));
505
506 return Poll::Ready(Some(DiscoveryEvent::RoutingTableUpdate {
507 peers: peers.into_iter().map(|(peer, _)| peer).collect(),
508 }))
509 },
510 Poll::Ready(Some(KademliaEvent::RoutingTableUpdate { peers })) => {
511 log::trace!(target: LOG_TARGET, "routing table update, discovered {} peers", peers.len());
512
513 return Poll::Ready(Some(DiscoveryEvent::RoutingTableUpdate {
514 peers: peers.into_iter().collect(),
515 }))
516 },
517 Poll::Ready(Some(KademliaEvent::GetRecordSuccess { query_id, records })) => {
518 log::trace!(
519 target: LOG_TARGET,
520 "`GET_RECORD` succeeded for {query_id:?}: {records:?}",
521 );
522
523 return Poll::Ready(Some(DiscoveryEvent::GetRecordSuccess { query_id, records }));
524 },
525 Poll::Ready(Some(KademliaEvent::PutRecordSucess { query_id, key: _ })) =>
526 return Poll::Ready(Some(DiscoveryEvent::PutRecordSuccess { query_id })),
527 Poll::Ready(Some(KademliaEvent::QueryFailed { query_id })) => {
528 match this.find_node_query_id == Some(query_id) {
529 true => {
530 this.find_node_query_id = None;
531 this.duration_to_next_find_query =
532 cmp::min(this.duration_to_next_find_query * 2, Duration::from_secs(60));
533 this.next_kad_query = Some(Delay::new(this.duration_to_next_find_query));
534 },
535 false => return Poll::Ready(Some(DiscoveryEvent::QueryFailed { query_id })),
536 }
537 },
538 Poll::Ready(Some(KademliaEvent::IncomingRecord { record })) => {
539 log::trace!(
540 target: LOG_TARGET,
541 "incoming `PUT_RECORD` request with key {:?} from publisher {:?}",
542 record.key,
543 record.publisher,
544 );
545
546 return Poll::Ready(Some(DiscoveryEvent::IncomingRecord { record }))
547 },
548 }
549
550 match Pin::new(&mut this.identify_event_stream).poll_next(cx) {
551 Poll::Pending => {},
552 Poll::Ready(None) => return Poll::Ready(None),
553 Poll::Ready(Some(IdentifyEvent::PeerIdentified {
554 peer,
555 listen_addresses,
556 supported_protocols,
557 observed_address,
558 ..
559 })) => {
560 if this.is_new_external_address(&observed_address, peer) {
561 this.pending_events.push_back(DiscoveryEvent::ExternalAddressDiscovered {
562 address: observed_address.clone(),
563 });
564 }
565
566 return Poll::Ready(Some(DiscoveryEvent::Identified {
567 peer,
568 listen_addresses,
569 supported_protocols,
570 }));
571 },
572 }
573
574 match Pin::new(&mut this.ping_event_stream).poll_next(cx) {
575 Poll::Pending => {},
576 Poll::Ready(None) => return Poll::Ready(None),
577 Poll::Ready(Some(PingEvent::Ping { peer, ping })) =>
578 return Poll::Ready(Some(DiscoveryEvent::Ping { peer, rtt: ping })),
579 }
580
581 if let Some(ref mut mdns_event_stream) = &mut this.mdns_event_stream {
582 match Pin::new(mdns_event_stream).poll_next(cx) {
583 Poll::Pending => {},
584 Poll::Ready(None) => return Poll::Ready(None),
585 Poll::Ready(Some(MdnsEvent::Discovered(addresses))) =>
586 return Poll::Ready(Some(DiscoveryEvent::Discovered { addresses })),
587 }
588 }
589
590 Poll::Pending
591 }
592}