1use crate::{
50 config::{
51 ProtocolId, KADEMLIA_MAX_PROVIDER_KEYS, KADEMLIA_PROVIDER_RECORD_TTL,
52 KADEMLIA_PROVIDER_REPUBLISH_INTERVAL,
53 },
54 utils::LruHashSet,
55};
56
57use array_bytes::bytes2hex;
58use futures::prelude::*;
59use futures_timer::Delay;
60use ip_network::IpNetwork;
61use libp2p::{
62 core::{transport::PortUse, Endpoint, Multiaddr},
63 kad::{
64 self,
65 store::{MemoryStore, MemoryStoreConfig, RecordStore},
66 Behaviour as Kademlia, BucketInserts, Config as KademliaConfig, Event as KademliaEvent,
67 Event, GetClosestPeersError, GetClosestPeersOk, GetProvidersError, GetProvidersOk,
68 GetRecordOk, PeerRecord, QueryId, QueryResult, Quorum, Record, RecordKey,
69 },
70 mdns::{self, tokio::Behaviour as TokioMdns},
71 multiaddr::Protocol,
72 swarm::{
73 behaviour::{
74 toggle::{Toggle, ToggleConnectionHandler},
75 DialFailure, ExternalAddrConfirmed, FromSwarm,
76 },
77 ConnectionDenied, ConnectionId, DialError, NetworkBehaviour, StreamProtocol, THandler,
78 THandlerInEvent, THandlerOutEvent, ToSwarm,
79 },
80 PeerId,
81};
82use linked_hash_set::LinkedHashSet;
83use log::{debug, error, info, trace, warn};
84use sp_core::hexdisplay::HexDisplay;
85use std::{
86 cmp,
87 collections::{hash_map::Entry, HashMap, HashSet, VecDeque},
88 num::NonZeroUsize,
89 task::{Context, Poll},
90 time::{Duration, Instant},
91};
92
93const LOG_TARGET: &str = "sub-libp2p::discovery";
95
96const MAX_KNOWN_EXTERNAL_ADDRESSES: usize = 32;
100
101pub const DEFAULT_KADEMLIA_REPLICATION_FACTOR: usize = 20;
104
105const GET_RECORD_REDUNDANCY_FACTOR: u32 = 4;
107
108const KAD_QUERY_TIMEOUT: Duration = Duration::from_secs(300);
111
112pub struct DiscoveryConfig {
118 local_peer_id: PeerId,
119 permanent_addresses: Vec<(PeerId, Multiaddr)>,
120 dht_random_walk: bool,
121 allow_private_ip: bool,
122 allow_non_globals_in_dht: bool,
123 discovery_only_if_under_num: u64,
124 enable_mdns: bool,
125 kademlia_disjoint_query_paths: bool,
126 kademlia_protocol: Option<StreamProtocol>,
127 kademlia_legacy_protocol: Option<StreamProtocol>,
128 kademlia_replication_factor: NonZeroUsize,
129}
130
131impl DiscoveryConfig {
132 pub fn new(local_peer_id: PeerId) -> Self {
134 Self {
135 local_peer_id,
136 permanent_addresses: Vec::new(),
137 dht_random_walk: true,
138 allow_private_ip: true,
139 allow_non_globals_in_dht: false,
140 discovery_only_if_under_num: std::u64::MAX,
141 enable_mdns: false,
142 kademlia_disjoint_query_paths: false,
143 kademlia_protocol: None,
144 kademlia_legacy_protocol: None,
145 kademlia_replication_factor: NonZeroUsize::new(DEFAULT_KADEMLIA_REPLICATION_FACTOR)
146 .expect("value is a constant; constant is non-zero; qed."),
147 }
148 }
149
150 pub fn discovery_limit(&mut self, limit: u64) -> &mut Self {
152 self.discovery_only_if_under_num = limit;
153 self
154 }
155
156 pub fn with_permanent_addresses<I>(&mut self, permanent_addresses: I) -> &mut Self
158 where
159 I: IntoIterator<Item = (PeerId, Multiaddr)>,
160 {
161 self.permanent_addresses.extend(permanent_addresses);
162 self
163 }
164
165 pub fn with_dht_random_walk(&mut self, value: bool) -> &mut Self {
168 self.dht_random_walk = value;
169 self
170 }
171
172 pub fn allow_private_ip(&mut self, value: bool) -> &mut Self {
174 self.allow_private_ip = value;
175 self
176 }
177
178 pub fn allow_non_globals_in_dht(&mut self, value: bool) -> &mut Self {
180 self.allow_non_globals_in_dht = value;
181 self
182 }
183
184 pub fn with_mdns(&mut self, value: bool) -> &mut Self {
186 self.enable_mdns = value;
187 self
188 }
189
190 pub fn with_kademlia<Hash: AsRef<[u8]>>(
195 &mut self,
196 genesis_hash: Hash,
197 fork_id: Option<&str>,
198 protocol_id: &ProtocolId,
199 ) -> &mut Self {
200 self.kademlia_protocol = Some(kademlia_protocol_name(genesis_hash, fork_id));
201 self.kademlia_legacy_protocol = Some(legacy_kademlia_protocol_name(protocol_id));
202 self
203 }
204
205 pub fn use_kademlia_disjoint_query_paths(&mut self, value: bool) -> &mut Self {
208 self.kademlia_disjoint_query_paths = value;
209 self
210 }
211
212 pub fn with_kademlia_replication_factor(&mut self, value: NonZeroUsize) -> &mut Self {
214 self.kademlia_replication_factor = value;
215 self
216 }
217
218 pub fn finish(self) -> DiscoveryBehaviour {
220 let Self {
221 local_peer_id,
222 permanent_addresses,
223 dht_random_walk,
224 allow_private_ip,
225 allow_non_globals_in_dht,
226 discovery_only_if_under_num,
227 enable_mdns,
228 kademlia_disjoint_query_paths,
229 kademlia_protocol,
230 kademlia_legacy_protocol: _,
231 kademlia_replication_factor,
232 } = self;
233
234 let kademlia = if let Some(ref kademlia_protocol) = kademlia_protocol {
235 let mut config = KademliaConfig::new(kademlia_protocol.clone());
236
237 config.set_replication_factor(kademlia_replication_factor);
238
239 config.set_record_filtering(libp2p::kad::StoreInserts::FilterBoth);
240
241 config.set_query_timeout(KAD_QUERY_TIMEOUT);
242
243 config.set_kbucket_inserts(BucketInserts::Manual);
247 config.disjoint_query_paths(kademlia_disjoint_query_paths);
248
249 config.set_provider_record_ttl(Some(KADEMLIA_PROVIDER_RECORD_TTL));
250 config.set_provider_publication_interval(Some(KADEMLIA_PROVIDER_REPUBLISH_INTERVAL));
251
252 let store = MemoryStore::with_config(
253 local_peer_id,
254 MemoryStoreConfig {
255 max_provided_keys: KADEMLIA_MAX_PROVIDER_KEYS,
256 ..Default::default()
257 },
258 );
259
260 let mut kad = Kademlia::with_config(local_peer_id, store, config);
261 kad.set_mode(Some(kad::Mode::Server));
262
263 for (peer_id, addr) in &permanent_addresses {
264 kad.add_address(peer_id, addr.clone());
265 }
266
267 Some(kad)
268 } else {
269 None
270 };
271
272 DiscoveryBehaviour {
273 permanent_addresses,
274 ephemeral_addresses: HashMap::new(),
275 kademlia: Toggle::from(kademlia),
276 next_kad_random_query: if dht_random_walk {
277 Some(Delay::new(Duration::new(0, 0)))
278 } else {
279 None
280 },
281 duration_to_next_kad: Duration::from_secs(1),
282 pending_events: VecDeque::new(),
283 local_peer_id,
284 num_connections: 0,
285 allow_private_ip,
286 discovery_only_if_under_num,
287 mdns: if enable_mdns {
288 match TokioMdns::new(mdns::Config::default(), local_peer_id) {
289 Ok(mdns) => Toggle::from(Some(mdns)),
290 Err(err) => {
291 warn!(target: LOG_TARGET, "Failed to initialize mDNS: {:?}", err);
292 Toggle::from(None)
293 },
294 }
295 } else {
296 Toggle::from(None)
297 },
298 allow_non_globals_in_dht,
299 known_external_addresses: LruHashSet::new(
300 NonZeroUsize::new(MAX_KNOWN_EXTERNAL_ADDRESSES)
301 .expect("value is a constant; constant is non-zero; qed."),
302 ),
303 records_to_publish: Default::default(),
304 kademlia_protocol,
305 provider_keys_requested: HashMap::new(),
306 }
307 }
308}
309
310pub struct DiscoveryBehaviour {
312 permanent_addresses: Vec<(PeerId, Multiaddr)>,
315 ephemeral_addresses: HashMap<PeerId, Vec<Multiaddr>>,
318 kademlia: Toggle<Kademlia<MemoryStore>>,
321 mdns: Toggle<TokioMdns>,
323 next_kad_random_query: Option<Delay>,
326 duration_to_next_kad: Duration,
328 pending_events: VecDeque<DiscoveryOut>,
330 local_peer_id: PeerId,
332 num_connections: u64,
334 allow_private_ip: bool,
337 discovery_only_if_under_num: u64,
339 allow_non_globals_in_dht: bool,
341 known_external_addresses: LruHashSet<Multiaddr>,
343 records_to_publish: HashMap<QueryId, Record>,
349 kademlia_protocol: Option<StreamProtocol>,
354 provider_keys_requested: HashMap<QueryId, RecordKey>,
356}
357
358impl DiscoveryBehaviour {
359 pub fn known_peers(&mut self) -> HashSet<PeerId> {
361 let mut peers = HashSet::new();
362 if let Some(k) = self.kademlia.as_mut() {
363 for b in k.kbuckets() {
364 for e in b.iter() {
365 if !peers.contains(e.node.key.preimage()) {
366 peers.insert(*e.node.key.preimage());
367 }
368 }
369 }
370 }
371 peers
372 }
373
374 pub fn add_known_address(&mut self, peer_id: PeerId, addr: Multiaddr) {
380 let addrs_list = self.ephemeral_addresses.entry(peer_id).or_default();
381 if addrs_list.contains(&addr) {
382 return
383 }
384
385 if let Some(k) = self.kademlia.as_mut() {
386 k.add_address(&peer_id, addr.clone());
387 }
388
389 self.pending_events.push_back(DiscoveryOut::Discovered(peer_id));
390 addrs_list.push(addr);
391 }
392
393 pub fn add_self_reported_address(
399 &mut self,
400 peer_id: &PeerId,
401 supported_protocols: &[StreamProtocol],
402 addr: Multiaddr,
403 ) {
404 if let Some(kademlia) = self.kademlia.as_mut() {
405 if !self.allow_non_globals_in_dht && !Self::can_add_to_dht(&addr) {
406 trace!(
407 target: LOG_TARGET,
408 "Ignoring self-reported non-global address {} from {}.", addr, peer_id
409 );
410 return
411 }
412
413 if !supported_protocols.iter().any(|p| {
419 p == self
420 .kademlia_protocol
421 .as_ref()
422 .expect("kademlia protocol was checked above to be enabled; qed")
423 }) {
424 trace!(
425 target: LOG_TARGET,
426 "Ignoring self-reported address {} from {} as remote node is not part of the \
427 Kademlia DHT supported by the local node.", addr, peer_id,
428 );
429 return
430 }
431
432 trace!(
433 target: LOG_TARGET,
434 "Adding self-reported address {} from {} to Kademlia DHT.",
435 addr, peer_id
436 );
437 kademlia.add_address(peer_id, addr.clone());
438 }
439 }
440
441 pub fn find_closest_peers(&mut self, target: PeerId) {
445 if let Some(k) = self.kademlia.as_mut() {
446 k.get_closest_peers(target);
447 }
448 }
449
450 pub fn get_value(&mut self, key: RecordKey) {
454 if let Some(k) = self.kademlia.as_mut() {
455 k.get_record(key.clone());
456 }
457 }
458
459 pub fn put_value(&mut self, key: RecordKey, value: Vec<u8>) {
464 if let Some(k) = self.kademlia.as_mut() {
465 if let Err(e) = k.put_record(Record::new(key.clone(), value.clone()), Quorum::All) {
466 warn!(target: LOG_TARGET, "Libp2p => Failed to put record: {:?}", e);
467 self.pending_events
468 .push_back(DiscoveryOut::ValuePutFailed(key.clone(), Duration::from_secs(0)));
469 }
470 }
471 }
472
473 pub fn put_record_to(
477 &mut self,
478 record: Record,
479 peers: HashSet<sc_network_types::PeerId>,
480 update_local_storage: bool,
481 ) {
482 if let Some(kad) = self.kademlia.as_mut() {
483 if update_local_storage {
484 if let Err(_e) = kad.store_mut().put(record.clone()) {
485 warn!(target: LOG_TARGET, "Failed to update local starage");
486 }
487 }
488
489 if !peers.is_empty() {
490 kad.put_record_to(
491 record,
492 peers.into_iter().map(|peer_id| peer_id.into()),
493 Quorum::All,
494 );
495 }
496 }
497 }
498
499 pub fn start_providing(&mut self, key: RecordKey) {
501 if let Some(kad) = self.kademlia.as_mut() {
502 if let Err(e) = kad.start_providing(key.clone()) {
503 warn!(target: LOG_TARGET, "Libp2p => Failed to start providing {key:?}: {e}.");
504 self.pending_events
505 .push_back(DiscoveryOut::StartProvidingFailed(key, Duration::from_secs(0)));
506 }
507 }
508 }
509
510 pub fn stop_providing(&mut self, key: &RecordKey) {
512 if let Some(kad) = self.kademlia.as_mut() {
513 kad.stop_providing(key);
514 }
515 }
516
517 pub fn get_providers(&mut self, key: RecordKey) {
519 if let Some(kad) = self.kademlia.as_mut() {
520 let query_id = kad.get_providers(key.clone());
521 self.provider_keys_requested.insert(query_id, key);
522 }
523 }
524
525 pub fn store_record(
527 &mut self,
528 record_key: RecordKey,
529 record_value: Vec<u8>,
530 publisher: Option<PeerId>,
531 expires: Option<Instant>,
532 ) {
533 if let Some(k) = self.kademlia.as_mut() {
534 if let Err(err) = k.store_mut().put(Record {
535 key: record_key,
536 value: record_value,
537 publisher: publisher.map(|publisher| publisher.into()),
538 expires,
539 }) {
540 debug!(
541 target: LOG_TARGET,
542 "Failed to store record with key: {:?}",
543 err
544 );
545 }
546 }
547 }
548
549 pub fn num_entries_per_kbucket(&mut self) -> Option<Vec<(u32, usize)>> {
554 self.kademlia.as_mut().map(|kad| {
555 kad.kbuckets()
556 .map(|bucket| (bucket.range().0.ilog2().unwrap_or(0), bucket.iter().count()))
557 .collect()
558 })
559 }
560
561 pub fn num_kademlia_records(&mut self) -> Option<usize> {
563 self.kademlia.as_mut().map(|kad| kad.store_mut().records().count())
565 }
566
567 pub fn kademlia_records_total_size(&mut self) -> Option<usize> {
569 self.kademlia
572 .as_mut()
573 .map(|kad| kad.store_mut().records().fold(0, |tot, rec| tot + rec.value.len()))
574 }
575
576 pub fn can_add_to_dht(addr: &Multiaddr) -> bool {
583 let ip = match addr.iter().next() {
584 Some(Protocol::Ip4(ip)) => IpNetwork::from(ip),
585 Some(Protocol::Ip6(ip)) => IpNetwork::from(ip),
586 Some(Protocol::Dns(_)) | Some(Protocol::Dns4(_)) | Some(Protocol::Dns6(_)) =>
587 return true,
588 _ => return false,
589 };
590 ip.is_global()
591 }
592}
593
594#[derive(Debug)]
596pub enum DiscoveryOut {
597 Discovered(PeerId),
600
601 UnroutablePeer(PeerId),
608
609 ClosestPeersFound(PeerId, Vec<(PeerId, Vec<Multiaddr>)>, Duration),
613
614 ClosestPeersNotFound(PeerId, Duration),
616
617 ValueFound(PeerRecord, Duration),
621
622 PutRecordRequest(
624 RecordKey,
625 Vec<u8>,
626 Option<sc_network_types::PeerId>,
627 Option<std::time::Instant>,
628 ),
629
630 ValueNotFound(RecordKey, Duration),
634
635 ValuePut(RecordKey, Duration),
639
640 ValuePutFailed(RecordKey, Duration),
644
645 StartedProviding(RecordKey, Duration),
647
648 StartProvidingFailed(RecordKey, Duration),
650
651 ProvidersFound(RecordKey, HashSet<PeerId>, Duration),
653
654 NoMoreProviders(RecordKey, Duration),
656
657 ProvidersNotFound(RecordKey, Duration),
659
660 RandomKademliaStarted,
664}
665
666impl NetworkBehaviour for DiscoveryBehaviour {
667 type ConnectionHandler =
668 ToggleConnectionHandler<<Kademlia<MemoryStore> as NetworkBehaviour>::ConnectionHandler>;
669 type ToSwarm = DiscoveryOut;
670
671 fn handle_established_inbound_connection(
672 &mut self,
673 connection_id: ConnectionId,
674 peer: PeerId,
675 local_addr: &Multiaddr,
676 remote_addr: &Multiaddr,
677 ) -> Result<THandler<Self>, ConnectionDenied> {
678 self.kademlia.handle_established_inbound_connection(
679 connection_id,
680 peer,
681 local_addr,
682 remote_addr,
683 )
684 }
685
686 fn handle_established_outbound_connection(
687 &mut self,
688 connection_id: ConnectionId,
689 peer: PeerId,
690 addr: &Multiaddr,
691 role_override: Endpoint,
692 port_use: PortUse,
693 ) -> Result<THandler<Self>, ConnectionDenied> {
694 self.kademlia.handle_established_outbound_connection(
695 connection_id,
696 peer,
697 addr,
698 role_override,
699 port_use,
700 )
701 }
702
703 fn handle_pending_inbound_connection(
704 &mut self,
705 connection_id: ConnectionId,
706 local_addr: &Multiaddr,
707 remote_addr: &Multiaddr,
708 ) -> Result<(), ConnectionDenied> {
709 self.kademlia
710 .handle_pending_inbound_connection(connection_id, local_addr, remote_addr)
711 }
712
713 fn handle_pending_outbound_connection(
714 &mut self,
715 connection_id: ConnectionId,
716 maybe_peer: Option<PeerId>,
717 addresses: &[Multiaddr],
718 effective_role: Endpoint,
719 ) -> Result<Vec<Multiaddr>, ConnectionDenied> {
720 let Some(peer_id) = maybe_peer else { return Ok(Vec::new()) };
721
722 let mut list: LinkedHashSet<_> = self
727 .permanent_addresses
728 .iter()
729 .filter_map(|(p, a)| (*p == peer_id).then(|| a.clone()))
730 .collect();
731
732 if let Some(ephemeral_addresses) = self.ephemeral_addresses.get(&peer_id) {
733 ephemeral_addresses.iter().for_each(|address| {
734 list.insert_if_absent(address.clone());
735 });
736 }
737
738 {
739 let mut list_to_filter = self.kademlia.handle_pending_outbound_connection(
740 connection_id,
741 maybe_peer,
742 addresses,
743 effective_role,
744 )?;
745
746 list_to_filter.extend(self.mdns.handle_pending_outbound_connection(
747 connection_id,
748 maybe_peer,
749 addresses,
750 effective_role,
751 )?);
752
753 if !self.allow_private_ip {
754 list_to_filter.retain(|addr| match addr.iter().next() {
755 Some(Protocol::Ip4(addr)) if !IpNetwork::from(addr).is_global() => false,
756 Some(Protocol::Ip6(addr)) if !IpNetwork::from(addr).is_global() => false,
757 _ => true,
758 });
759 }
760
761 list_to_filter.into_iter().for_each(|address| {
762 list.insert_if_absent(address);
763 });
764 }
765
766 trace!(target: LOG_TARGET, "Addresses of {:?}: {:?}", peer_id, list);
767
768 Ok(list.into_iter().collect())
769 }
770
771 fn on_swarm_event(&mut self, event: FromSwarm) {
772 match event {
773 FromSwarm::ConnectionEstablished(e) => {
774 self.num_connections += 1;
775 self.kademlia.on_swarm_event(FromSwarm::ConnectionEstablished(e));
776 },
777 FromSwarm::ConnectionClosed(e) => {
778 self.num_connections -= 1;
779 self.kademlia.on_swarm_event(FromSwarm::ConnectionClosed(e));
780 },
781 FromSwarm::DialFailure(e @ DialFailure { peer_id, error, .. }) => {
782 if let Some(peer_id) = peer_id {
783 if let DialError::Transport(errors) = error {
784 if let Entry::Occupied(mut entry) = self.ephemeral_addresses.entry(peer_id)
785 {
786 for (addr, _error) in errors {
787 entry.get_mut().retain(|a| a != addr);
788 }
789 if entry.get().is_empty() {
790 entry.remove();
791 }
792 }
793 }
794 }
795
796 self.kademlia.on_swarm_event(FromSwarm::DialFailure(e));
797 },
798 FromSwarm::ListenerClosed(e) => {
799 self.kademlia.on_swarm_event(FromSwarm::ListenerClosed(e));
800 },
801 FromSwarm::ListenFailure(e) => {
802 self.kademlia.on_swarm_event(FromSwarm::ListenFailure(e));
803 },
804 FromSwarm::ListenerError(e) => {
805 self.kademlia.on_swarm_event(FromSwarm::ListenerError(e));
806 },
807 FromSwarm::ExternalAddrExpired(e) => {
808 self.kademlia.on_swarm_event(FromSwarm::ExternalAddrExpired(e));
812 },
813 FromSwarm::NewListener(e) => {
814 self.kademlia.on_swarm_event(FromSwarm::NewListener(e));
815 },
816 FromSwarm::ExpiredListenAddr(e) => {
817 self.kademlia.on_swarm_event(FromSwarm::ExpiredListenAddr(e));
818 },
819 FromSwarm::NewExternalAddrCandidate(e) => {
820 self.kademlia.on_swarm_event(FromSwarm::NewExternalAddrCandidate(e));
821 },
822 FromSwarm::AddressChange(e) => {
823 self.kademlia.on_swarm_event(FromSwarm::AddressChange(e));
824 },
825 FromSwarm::NewListenAddr(e) => {
826 self.kademlia.on_swarm_event(FromSwarm::NewListenAddr(e));
827 self.mdns.on_swarm_event(FromSwarm::NewListenAddr(e));
828 },
829 FromSwarm::ExternalAddrConfirmed(e @ ExternalAddrConfirmed { addr }) => {
830 let mut address = addr.clone();
831
832 if let Some(Protocol::P2p(peer_id)) = addr.iter().last() {
833 if peer_id != self.local_peer_id {
834 warn!(
835 target: LOG_TARGET,
836 "๐ Discovered external address for a peer that is not us: {addr}",
837 );
838 return
840 }
841 } else {
842 address.push(Protocol::P2p(self.local_peer_id));
843 }
844
845 if Self::can_add_to_dht(&address) {
846 if self.known_external_addresses.insert(address.clone()) {
849 info!(
850 target: LOG_TARGET,
851 "๐ Discovered new external address for our node: {address}",
852 );
853 }
854 }
855
856 self.kademlia.on_swarm_event(FromSwarm::ExternalAddrConfirmed(e));
857 },
858 FromSwarm::NewExternalAddrOfPeer(e) => {
859 self.kademlia.on_swarm_event(FromSwarm::NewExternalAddrOfPeer(e));
860 self.mdns.on_swarm_event(FromSwarm::NewExternalAddrOfPeer(e));
861 },
862 event => {
863 debug!(target: LOG_TARGET, "New unknown `FromSwarm` libp2p event: {event:?}");
864 self.kademlia.on_swarm_event(event);
865 self.mdns.on_swarm_event(event);
866 },
867 }
868 }
869
870 fn on_connection_handler_event(
871 &mut self,
872 peer_id: PeerId,
873 connection_id: ConnectionId,
874 event: THandlerOutEvent<Self>,
875 ) {
876 self.kademlia.on_connection_handler_event(peer_id, connection_id, event);
877 }
878
879 fn poll(&mut self, cx: &mut Context) -> Poll<ToSwarm<Self::ToSwarm, THandlerInEvent<Self>>> {
880 if let Some(ev) = self.pending_events.pop_front() {
882 return Poll::Ready(ToSwarm::GenerateEvent(ev))
883 }
884
885 if let Some(kademlia) = self.kademlia.as_mut() {
887 if let Some(next_kad_random_query) = self.next_kad_random_query.as_mut() {
888 while next_kad_random_query.poll_unpin(cx).is_ready() {
889 let actually_started =
890 if self.num_connections < self.discovery_only_if_under_num {
891 let random_peer_id = PeerId::random();
892 debug!(
893 target: LOG_TARGET,
894 "Libp2p <= Starting random Kademlia request for {:?}",
895 random_peer_id,
896 );
897 kademlia.get_closest_peers(random_peer_id);
898 true
899 } else {
900 debug!(
901 target: LOG_TARGET,
902 "Kademlia paused due to high number of connections ({})",
903 self.num_connections
904 );
905 false
906 };
907
908 *next_kad_random_query = Delay::new(self.duration_to_next_kad);
911 self.duration_to_next_kad =
912 cmp::min(self.duration_to_next_kad * 2, Duration::from_secs(60));
913
914 if actually_started {
915 let ev = DiscoveryOut::RandomKademliaStarted;
916 return Poll::Ready(ToSwarm::GenerateEvent(ev))
917 }
918 }
919 }
920 }
921
922 while let Poll::Ready(ev) = self.kademlia.poll(cx) {
923 match ev {
924 ToSwarm::GenerateEvent(ev) => match ev {
925 KademliaEvent::RoutingUpdated { peer, .. } => {
926 let ev = DiscoveryOut::Discovered(peer);
927 return Poll::Ready(ToSwarm::GenerateEvent(ev))
928 },
929 KademliaEvent::UnroutablePeer { peer, .. } => {
930 let ev = DiscoveryOut::UnroutablePeer(peer);
931 return Poll::Ready(ToSwarm::GenerateEvent(ev))
932 },
933 KademliaEvent::RoutablePeer { .. } => {
934 },
937 KademliaEvent::PendingRoutablePeer { .. } => {
938 },
940 KademliaEvent::InboundRequest { request } => match request {
941 libp2p::kad::InboundRequest::PutRecord { record: Some(record), .. } =>
942 return Poll::Ready(ToSwarm::GenerateEvent(
943 DiscoveryOut::PutRecordRequest(
944 record.key,
945 record.value,
946 record.publisher.map(Into::into),
947 record.expires,
948 ),
949 )),
950 _ => {},
951 },
952 KademliaEvent::OutboundQueryProgressed {
953 result: QueryResult::GetClosestPeers(res),
954 stats,
955 ..
956 } => {
957 let (key, peers, timeout) = match res {
958 Ok(GetClosestPeersOk { key, peers }) => (key, peers, false),
959 Err(GetClosestPeersError::Timeout { key, peers }) => (key, peers, true),
960 };
961
962 let target = match PeerId::from_bytes(&key.clone()) {
963 Ok(peer_id) => peer_id,
964 Err(_) => {
965 warn!(
966 target: LOG_TARGET,
967 "Libp2p => FIND_NODE query finished for target that is not \
968 a peer ID: {:?}",
969 HexDisplay::from(&key),
970 );
971 continue
972 },
973 };
974
975 if timeout {
976 debug!(
977 target: LOG_TARGET,
978 "Libp2p => Query for target {target:?} timed out and yielded {} peers",
979 peers.len(),
980 );
981 } else {
982 debug!(
983 target: LOG_TARGET,
984 "Libp2p => Query for target {target:?} yielded {} peers",
985 peers.len(),
986 );
987 }
988
989 let ev = if peers.is_empty() {
990 DiscoveryOut::ClosestPeersNotFound(
991 target,
992 stats.duration().unwrap_or_default(),
993 )
994 } else {
995 DiscoveryOut::ClosestPeersFound(
996 target,
997 peers.into_iter().map(|p| (p.peer_id, p.addrs)).collect(),
998 stats.duration().unwrap_or_default(),
999 )
1000 };
1001
1002 return Poll::Ready(ToSwarm::GenerateEvent(ev))
1003 },
1004 KademliaEvent::OutboundQueryProgressed {
1005 result: QueryResult::GetRecord(res),
1006 stats,
1007 id,
1008 ..
1009 } => {
1010 let ev = match res {
1011 Ok(GetRecordOk::FoundRecord(r)) => {
1012 debug!(
1013 target: LOG_TARGET,
1014 "Libp2p => Found record ({:?}) with value: {:?} id {:?} stats {:?}",
1015 r.record.key,
1016 r.record.value,
1017 id,
1018 stats,
1019 );
1020
1021 if stats.num_successes() > GET_RECORD_REDUNDANCY_FACTOR {
1028 if let Some(kad) = self.kademlia.as_mut() {
1029 if let Some(mut query) = kad.query_mut(&id) {
1030 query.finish();
1031 }
1032 }
1033 }
1034
1035 self.records_to_publish.insert(id, r.record.clone());
1038
1039 DiscoveryOut::ValueFound(r, stats.duration().unwrap_or_default())
1040 },
1041 Ok(GetRecordOk::FinishedWithNoAdditionalRecord {
1042 cache_candidates,
1043 }) => {
1044 debug!(
1045 target: LOG_TARGET,
1046 "Libp2p => Finished with no-additional-record {:?} stats {:?} took {:?} ms",
1047 id,
1048 stats,
1049 stats.duration().map(|val| val.as_millis())
1050 );
1051 if let Some(record) = self.records_to_publish.remove(&id) {
1053 if cache_candidates.is_empty() {
1054 continue
1055 }
1056
1057 if let Some(kad) = self.kademlia.as_mut() {
1060 kad.put_record_to(
1061 record,
1062 cache_candidates.into_iter().map(|v| v.1),
1063 Quorum::One,
1064 );
1065 }
1066 }
1067
1068 continue
1069 },
1070 Err(e @ libp2p::kad::GetRecordError::NotFound { .. }) => {
1071 trace!(
1072 target: LOG_TARGET,
1073 "Libp2p => Failed to get record: {:?}",
1074 e,
1075 );
1076 DiscoveryOut::ValueNotFound(
1077 e.into_key(),
1078 stats.duration().unwrap_or_default(),
1079 )
1080 },
1081 Err(e) => {
1082 debug!(
1083 target: LOG_TARGET,
1084 "Libp2p => Failed to get record: {:?}",
1085 e,
1086 );
1087 DiscoveryOut::ValueNotFound(
1088 e.into_key(),
1089 stats.duration().unwrap_or_default(),
1090 )
1091 },
1092 };
1093 return Poll::Ready(ToSwarm::GenerateEvent(ev))
1094 },
1095 KademliaEvent::OutboundQueryProgressed {
1096 result: QueryResult::GetProviders(res),
1097 stats,
1098 id,
1099 ..
1100 } => {
1101 let ev = match res {
1102 Ok(GetProvidersOk::FoundProviders { key, providers }) => {
1103 debug!(
1104 target: LOG_TARGET,
1105 "Libp2p => Found providers {:?} for key {:?}, id {:?}, stats {:?}",
1106 providers,
1107 key,
1108 id,
1109 stats,
1110 );
1111
1112 DiscoveryOut::ProvidersFound(
1113 key,
1114 providers,
1115 stats.duration().unwrap_or_default(),
1116 )
1117 },
1118 Ok(GetProvidersOk::FinishedWithNoAdditionalRecord {
1119 closest_peers: _,
1120 }) => {
1121 debug!(
1122 target: LOG_TARGET,
1123 "Libp2p => Finished with no additional providers {:?}, stats {:?}, took {:?} ms",
1124 id,
1125 stats,
1126 stats.duration().map(|val| val.as_millis())
1127 );
1128
1129 if let Some(key) = self.provider_keys_requested.remove(&id) {
1130 DiscoveryOut::NoMoreProviders(
1131 key,
1132 stats.duration().unwrap_or_default(),
1133 )
1134 } else {
1135 error!(
1136 target: LOG_TARGET,
1137 "No key found for `GET_PROVIDERS` query {id:?}. This is a bug.",
1138 );
1139 continue
1140 }
1141 },
1142 Err(GetProvidersError::Timeout { key, closest_peers: _ }) => {
1143 debug!(
1144 target: LOG_TARGET,
1145 "Libp2p => Failed to get providers for {key:?} due to timeout.",
1146 );
1147
1148 self.provider_keys_requested.remove(&id);
1149
1150 DiscoveryOut::ProvidersNotFound(
1151 key,
1152 stats.duration().unwrap_or_default(),
1153 )
1154 },
1155 };
1156 return Poll::Ready(ToSwarm::GenerateEvent(ev))
1157 },
1158 KademliaEvent::OutboundQueryProgressed {
1159 result: QueryResult::PutRecord(res),
1160 stats,
1161 ..
1162 } => {
1163 let ev = match res {
1164 Ok(ok) => {
1165 trace!(
1166 target: LOG_TARGET,
1167 "Libp2p => Put record for key: {:?}",
1168 ok.key,
1169 );
1170 DiscoveryOut::ValuePut(ok.key, stats.duration().unwrap_or_default())
1171 },
1172 Err(e) => {
1173 debug!(
1174 target: LOG_TARGET,
1175 "Libp2p => Failed to put record for key {:?}: {:?}",
1176 e.key(),
1177 e,
1178 );
1179 DiscoveryOut::ValuePutFailed(
1180 e.into_key(),
1181 stats.duration().unwrap_or_default(),
1182 )
1183 },
1184 };
1185 return Poll::Ready(ToSwarm::GenerateEvent(ev))
1186 },
1187 KademliaEvent::OutboundQueryProgressed {
1188 result: QueryResult::RepublishRecord(res),
1189 ..
1190 } => match res {
1191 Ok(ok) => debug!(
1192 target: LOG_TARGET,
1193 "Libp2p => Record republished: {:?}",
1194 ok.key,
1195 ),
1196 Err(e) => debug!(
1197 target: LOG_TARGET,
1198 "Libp2p => Republishing of record {:?} failed with: {:?}",
1199 e.key(), e,
1200 ),
1201 },
1202 KademliaEvent::OutboundQueryProgressed {
1203 result: QueryResult::StartProviding(res),
1204 stats,
1205 ..
1206 } => {
1207 let ev = match res {
1208 Ok(ok) => {
1209 trace!(
1210 target: LOG_TARGET,
1211 "Libp2p => Started providing key {:?}",
1212 ok.key,
1213 );
1214 DiscoveryOut::StartedProviding(
1215 ok.key,
1216 stats.duration().unwrap_or_default(),
1217 )
1218 },
1219 Err(e) => {
1220 debug!(
1221 target: LOG_TARGET,
1222 "Libp2p => Failed to start providing key {:?}: {:?}",
1223 e.key(),
1224 e,
1225 );
1226 DiscoveryOut::StartProvidingFailed(
1227 e.into_key(),
1228 stats.duration().unwrap_or_default(),
1229 )
1230 },
1231 };
1232 return Poll::Ready(ToSwarm::GenerateEvent(ev))
1233 },
1234 KademliaEvent::OutboundQueryProgressed {
1235 result: QueryResult::Bootstrap(res),
1236 ..
1237 } => match res {
1238 Ok(ok) => debug!(
1239 target: LOG_TARGET,
1240 "Libp2p => DHT bootstrap progressed: {ok:?}",
1241 ),
1242 Err(e) => warn!(
1243 target: LOG_TARGET,
1244 "Libp2p => DHT bootstrap error: {e:?}",
1245 ),
1246 },
1247 KademliaEvent::OutboundQueryProgressed { result: e, .. } => {
1249 warn!(target: LOG_TARGET, "Libp2p => Unhandled Kademlia event: {:?}", e)
1250 },
1251 Event::ModeChanged { new_mode } => {
1252 debug!(target: LOG_TARGET, "Libp2p => Kademlia mode changed: {new_mode}")
1253 },
1254 },
1255 ToSwarm::Dial { opts } => return Poll::Ready(ToSwarm::Dial { opts }),
1256 event => {
1257 return Poll::Ready(event.map_out(|_| {
1258 unreachable!("`GenerateEvent` is handled in a branch above; qed")
1259 }));
1260 },
1261 }
1262 }
1263
1264 while let Poll::Ready(ev) = self.mdns.poll(cx) {
1266 match ev {
1267 ToSwarm::GenerateEvent(event) => match event {
1268 mdns::Event::Discovered(list) => {
1269 if self.num_connections >= self.discovery_only_if_under_num {
1270 continue
1271 }
1272
1273 self.pending_events.extend(
1274 list.into_iter().map(|(peer_id, _)| DiscoveryOut::Discovered(peer_id)),
1275 );
1276 if let Some(ev) = self.pending_events.pop_front() {
1277 return Poll::Ready(ToSwarm::GenerateEvent(ev))
1278 }
1279 },
1280 mdns::Event::Expired(_) => {},
1281 },
1282 ToSwarm::Dial { .. } => {
1283 unreachable!("mDNS never dials!");
1284 },
1285 ToSwarm::NotifyHandler { event, .. } => match event {},
1287 event => {
1288 return Poll::Ready(
1289 event
1290 .map_in(|_| {
1291 unreachable!("`NotifyHandler` is handled in a branch above; qed")
1292 })
1293 .map_out(|_| {
1294 unreachable!("`GenerateEvent` is handled in a branch above; qed")
1295 }),
1296 );
1297 },
1298 }
1299 }
1300
1301 Poll::Pending
1302 }
1303}
1304
1305fn legacy_kademlia_protocol_name(id: &ProtocolId) -> StreamProtocol {
1307 let name = format!("/{}/kad", id.as_ref());
1308 StreamProtocol::try_from_owned(name).expect("protocol name is valid. qed")
1309}
1310
1311fn kademlia_protocol_name<Hash: AsRef<[u8]>>(
1313 genesis_hash: Hash,
1314 fork_id: Option<&str>,
1315) -> StreamProtocol {
1316 let genesis_hash_hex = bytes2hex("", genesis_hash.as_ref());
1317 let name = if let Some(fork_id) = fork_id {
1318 format!("/{genesis_hash_hex}/{fork_id}/kad")
1319 } else {
1320 format!("/{genesis_hash_hex}/kad")
1321 };
1322
1323 StreamProtocol::try_from_owned(name).expect("protocol name is valid. qed")
1324}
1325
1326#[cfg(test)]
1327mod tests {
1328 use super::{kademlia_protocol_name, legacy_kademlia_protocol_name, DiscoveryConfig};
1329 use crate::config::ProtocolId;
1330 use libp2p::{identity::Keypair, Multiaddr};
1331 use sp_core::hash::H256;
1332
1333 #[cfg(ignore_flaky_test)] #[tokio::test]
1335 async fn discovery_working() {
1336 use super::DiscoveryOut;
1337 use futures::prelude::*;
1338 use libp2p::{
1339 core::{
1340 transport::{MemoryTransport, Transport},
1341 upgrade,
1342 },
1343 noise,
1344 swarm::{Swarm, SwarmEvent},
1345 yamux,
1346 };
1347 use std::{collections::HashSet, task::Poll, time::Duration};
1348 let mut first_swarm_peer_id_and_addr = None;
1349
1350 let genesis_hash = H256::from_low_u64_be(1);
1351 let fork_id = Some("test-fork-id");
1352 let protocol_id = ProtocolId::from("dot");
1353
1354 let mut swarms = (0..25)
1357 .map(|i| {
1358 let mut swarm = libp2p::SwarmBuilder::with_new_identity()
1359 .with_tokio()
1360 .with_other_transport(|keypair| {
1361 MemoryTransport::new()
1362 .upgrade(upgrade::Version::V1)
1363 .authenticate(noise::Config::new(&keypair).unwrap())
1364 .multiplex(yamux::Config::default())
1365 .boxed()
1366 })
1367 .unwrap()
1368 .with_behaviour(|keypair| {
1369 let mut config = DiscoveryConfig::new(keypair.public().to_peer_id());
1370 config
1371 .with_permanent_addresses(first_swarm_peer_id_and_addr.clone())
1372 .allow_private_ip(true)
1373 .allow_non_globals_in_dht(true)
1374 .discovery_limit(50)
1375 .with_kademlia(genesis_hash, fork_id, &protocol_id);
1376
1377 config.finish()
1378 })
1379 .unwrap()
1380 .with_swarm_config(|config| {
1381 config.with_idle_connection_timeout(Duration::from_secs(10))
1383 })
1384 .build();
1385
1386 let listen_addr: Multiaddr =
1387 format!("/memory/{}", rand::random::<u64>()).parse().unwrap();
1388
1389 if i == 0 {
1390 first_swarm_peer_id_and_addr =
1391 Some((*swarm.local_peer_id(), listen_addr.clone()))
1392 }
1393
1394 swarm.listen_on(listen_addr.clone()).unwrap();
1395 (swarm, listen_addr)
1396 })
1397 .collect::<Vec<_>>();
1398
1399 let mut to_discover = (0..swarms.len())
1401 .map(|n| {
1402 (0..swarms.len())
1403 .skip(1)
1405 .filter(|p| *p != n)
1406 .map(|p| *Swarm::local_peer_id(&swarms[p].0))
1407 .collect::<HashSet<_>>()
1408 })
1409 .collect::<Vec<_>>();
1410
1411 let fut = futures::future::poll_fn(move |cx| {
1412 'polling: loop {
1413 for swarm_n in 0..swarms.len() {
1414 match swarms[swarm_n].0.poll_next_unpin(cx) {
1415 Poll::Ready(Some(e)) => {
1416 match e {
1417 SwarmEvent::Behaviour(behavior) => {
1418 match behavior {
1419 DiscoveryOut::UnroutablePeer(other) |
1420 DiscoveryOut::Discovered(other) => {
1421 let addr = swarms
1424 .iter()
1425 .find_map(|(s, a)| {
1426 if s.behaviour().local_peer_id == other {
1427 Some(a.clone())
1428 } else {
1429 None
1430 }
1431 })
1432 .unwrap();
1433 let protocol_names = if swarm_n % 2 == 0 {
1436 vec![kademlia_protocol_name(genesis_hash, fork_id)]
1437 } else {
1438 vec![
1439 legacy_kademlia_protocol_name(&protocol_id),
1440 kademlia_protocol_name(genesis_hash, fork_id),
1441 ]
1442 };
1443 swarms[swarm_n]
1444 .0
1445 .behaviour_mut()
1446 .add_self_reported_address(
1447 &other,
1448 protocol_names.as_slice(),
1449 addr,
1450 );
1451
1452 to_discover[swarm_n].remove(&other);
1453 },
1454 DiscoveryOut::RandomKademliaStarted => {},
1455 DiscoveryOut::ClosestPeersFound(..) => {},
1456 DiscoveryOut::ClosestPeersNotFound(..) => {},
1459 e => {
1460 panic!("Unexpected event: {:?}", e)
1461 },
1462 }
1463 },
1464 _ => {},
1466 }
1467 continue 'polling
1468 },
1469 _ => {},
1470 }
1471 }
1472 break
1473 }
1474
1475 if to_discover.iter().all(|l| l.is_empty()) {
1476 Poll::Ready(())
1477 } else {
1478 Poll::Pending
1479 }
1480 });
1481
1482 fut.await
1483 }
1484
1485 #[test]
1486 fn discovery_ignores_peers_with_unknown_protocols() {
1487 let supported_genesis_hash = H256::from_low_u64_be(1);
1488 let unsupported_genesis_hash = H256::from_low_u64_be(2);
1489 let supported_protocol_id = ProtocolId::from("a");
1490 let unsupported_protocol_id = ProtocolId::from("b");
1491
1492 let mut discovery = {
1493 let keypair = Keypair::generate_ed25519();
1494 let mut config = DiscoveryConfig::new(keypair.public().to_peer_id());
1495 config
1496 .allow_private_ip(true)
1497 .allow_non_globals_in_dht(true)
1498 .discovery_limit(50)
1499 .with_kademlia(supported_genesis_hash, None, &supported_protocol_id);
1500 config.finish()
1501 };
1502
1503 let predictable_peer_id = |bytes: &[u8; 32]| {
1504 Keypair::ed25519_from_bytes(bytes.to_owned()).unwrap().public().to_peer_id()
1505 };
1506
1507 let remote_peer_id = predictable_peer_id(b"00000000000000000000000000000001");
1508 let remote_addr: Multiaddr = "/memory/1".parse().unwrap();
1509 let another_peer_id = predictable_peer_id(b"00000000000000000000000000000002");
1510 let another_addr: Multiaddr = "/memory/2".parse().unwrap();
1511
1512 discovery.add_self_reported_address(
1514 &remote_peer_id,
1515 &[kademlia_protocol_name(unsupported_genesis_hash, None)],
1516 remote_addr.clone(),
1517 );
1518 discovery.add_self_reported_address(
1519 &another_peer_id,
1520 &[legacy_kademlia_protocol_name(&unsupported_protocol_id)],
1521 another_addr.clone(),
1522 );
1523
1524 {
1525 let kademlia = discovery.kademlia.as_mut().unwrap();
1526 assert!(
1527 kademlia
1528 .kbucket(remote_peer_id)
1529 .expect("Remote peer id not to be equal to local peer id.")
1530 .is_empty(),
1531 "Expect peer with unsupported protocol not to be added."
1532 );
1533 assert!(
1534 kademlia
1535 .kbucket(another_peer_id)
1536 .expect("Remote peer id not to be equal to local peer id.")
1537 .is_empty(),
1538 "Expect peer with unsupported protocol not to be added."
1539 );
1540 }
1541
1542 discovery.add_self_reported_address(
1544 &remote_peer_id,
1545 &[kademlia_protocol_name(supported_genesis_hash, None)],
1546 remote_addr.clone(),
1547 );
1548 {
1549 let kademlia = discovery.kademlia.as_mut().unwrap();
1550 assert!(
1551 !kademlia
1552 .kbucket(remote_peer_id)
1553 .expect("Remote peer id not to be equal to local peer id.")
1554 .is_empty(),
1555 "Expect peer with supported protocol to be added."
1556 );
1557 }
1558
1559 let unsupported_peer_id = predictable_peer_id(b"00000000000000000000000000000002");
1560 let unsupported_peer_addr: Multiaddr = "/memory/2".parse().unwrap();
1561
1562 {
1564 let kademlia = discovery.kademlia.as_mut().unwrap();
1565 assert!(
1566 kademlia
1567 .kbucket(unsupported_peer_id)
1568 .expect("Remote peer id not to be equal to local peer id.")
1569 .is_empty(),
1570 "Expect unsupported peer not to be added."
1571 );
1572 }
1573 discovery.add_self_reported_address(
1576 &unsupported_peer_id,
1577 &[legacy_kademlia_protocol_name(&supported_protocol_id)],
1578 unsupported_peer_addr.clone(),
1579 );
1580 {
1581 let kademlia = discovery.kademlia.as_mut().unwrap();
1582 assert!(
1583 kademlia
1584 .kbucket(unsupported_peer_id)
1585 .expect("Remote peer id not to be equal to local peer id.")
1586 .is_empty(),
1587 "Expect unsupported peer not to be added."
1588 );
1589 }
1590
1591 discovery.add_self_reported_address(
1593 &another_peer_id,
1594 &[
1595 legacy_kademlia_protocol_name(&supported_protocol_id),
1596 kademlia_protocol_name(supported_genesis_hash, None),
1597 ],
1598 another_addr.clone(),
1599 );
1600
1601 {
1602 let kademlia = discovery.kademlia.as_mut().unwrap();
1603 assert_eq!(
1604 2,
1605 kademlia.kbuckets().fold(0, |acc, bucket| acc + bucket.num_entries()),
1606 "Expect peers with supported protocol to be added."
1607 );
1608 assert!(
1609 !kademlia
1610 .kbucket(another_peer_id)
1611 .expect("Remote peer id not to be equal to local peer id.")
1612 .is_empty(),
1613 "Expect peer with supported protocol to be added."
1614 );
1615 }
1616 }
1617}