1use crate::{config::ProtocolId, utils::LruHashSet};
50
51use array_bytes::bytes2hex;
52use futures::prelude::*;
53use futures_timer::Delay;
54use ip_network::IpNetwork;
55use libp2p::{
56 core::{Endpoint, Multiaddr},
57 kad::{
58 self,
59 record::store::{MemoryStore, RecordStore},
60 Behaviour as Kademlia, BucketInserts, Config as KademliaConfig, Event as KademliaEvent,
61 GetClosestPeersError, GetRecordOk, PeerRecord, QueryId, QueryResult, Quorum, Record,
62 RecordKey,
63 },
64 mdns::{self, tokio::Behaviour as TokioMdns},
65 multiaddr::Protocol,
66 swarm::{
67 behaviour::{
68 toggle::{Toggle, ToggleConnectionHandler},
69 DialFailure, ExternalAddrConfirmed, FromSwarm,
70 },
71 ConnectionDenied, ConnectionId, DialError, NetworkBehaviour, PollParameters,
72 StreamProtocol, THandler, THandlerInEvent, THandlerOutEvent, ToSwarm,
73 },
74 PeerId,
75};
76use linked_hash_set::LinkedHashSet;
77use log::{debug, info, trace, warn};
78use sp_core::hexdisplay::HexDisplay;
79use std::{
80 cmp,
81 collections::{hash_map::Entry, HashMap, HashSet, VecDeque},
82 num::NonZeroUsize,
83 task::{Context, Poll},
84 time::{Duration, Instant},
85};
86
87const MAX_KNOWN_EXTERNAL_ADDRESSES: usize = 32;
91
92pub const DEFAULT_KADEMLIA_REPLICATION_FACTOR: usize = 20;
95
96const GET_RECORD_REDUNDANCY_FACTOR: u32 = 4;
98
99pub struct DiscoveryConfig {
105 local_peer_id: PeerId,
106 permanent_addresses: Vec<(PeerId, Multiaddr)>,
107 dht_random_walk: bool,
108 allow_private_ip: bool,
109 allow_non_globals_in_dht: bool,
110 discovery_only_if_under_num: u64,
111 enable_mdns: bool,
112 kademlia_disjoint_query_paths: bool,
113 kademlia_protocol: Option<StreamProtocol>,
114 kademlia_legacy_protocol: Option<StreamProtocol>,
115 kademlia_replication_factor: NonZeroUsize,
116}
117
118impl DiscoveryConfig {
119 pub fn new(local_peer_id: PeerId) -> Self {
121 Self {
122 local_peer_id,
123 permanent_addresses: Vec::new(),
124 dht_random_walk: true,
125 allow_private_ip: true,
126 allow_non_globals_in_dht: false,
127 discovery_only_if_under_num: std::u64::MAX,
128 enable_mdns: false,
129 kademlia_disjoint_query_paths: false,
130 kademlia_protocol: None,
131 kademlia_legacy_protocol: None,
132 kademlia_replication_factor: NonZeroUsize::new(DEFAULT_KADEMLIA_REPLICATION_FACTOR)
133 .expect("value is a constant; constant is non-zero; qed."),
134 }
135 }
136
137 pub fn discovery_limit(&mut self, limit: u64) -> &mut Self {
139 self.discovery_only_if_under_num = limit;
140 self
141 }
142
143 pub fn with_permanent_addresses<I>(&mut self, permanent_addresses: I) -> &mut Self
145 where
146 I: IntoIterator<Item = (PeerId, Multiaddr)>,
147 {
148 self.permanent_addresses.extend(permanent_addresses);
149 self
150 }
151
152 pub fn with_dht_random_walk(&mut self, value: bool) -> &mut Self {
155 self.dht_random_walk = value;
156 self
157 }
158
159 pub fn allow_private_ip(&mut self, value: bool) -> &mut Self {
161 self.allow_private_ip = value;
162 self
163 }
164
165 pub fn allow_non_globals_in_dht(&mut self, value: bool) -> &mut Self {
167 self.allow_non_globals_in_dht = value;
168 self
169 }
170
171 pub fn with_mdns(&mut self, value: bool) -> &mut Self {
173 self.enable_mdns = value;
174 self
175 }
176
177 pub fn with_kademlia<Hash: AsRef<[u8]>>(
182 &mut self,
183 genesis_hash: Hash,
184 fork_id: Option<&str>,
185 protocol_id: &ProtocolId,
186 ) -> &mut Self {
187 self.kademlia_protocol = Some(kademlia_protocol_name(genesis_hash, fork_id));
188 self.kademlia_legacy_protocol = Some(legacy_kademlia_protocol_name(protocol_id));
189 self
190 }
191
192 pub fn use_kademlia_disjoint_query_paths(&mut self, value: bool) -> &mut Self {
195 self.kademlia_disjoint_query_paths = value;
196 self
197 }
198
199 pub fn with_kademlia_replication_factor(&mut self, value: NonZeroUsize) -> &mut Self {
201 self.kademlia_replication_factor = value;
202 self
203 }
204
205 pub fn finish(self) -> DiscoveryBehaviour {
207 let Self {
208 local_peer_id,
209 permanent_addresses,
210 dht_random_walk,
211 allow_private_ip,
212 allow_non_globals_in_dht,
213 discovery_only_if_under_num,
214 enable_mdns,
215 kademlia_disjoint_query_paths,
216 kademlia_protocol,
217 kademlia_legacy_protocol,
218 kademlia_replication_factor,
219 } = self;
220
221 let kademlia = if let Some(ref kademlia_protocol) = kademlia_protocol {
222 let mut config = KademliaConfig::default();
223
224 config.set_replication_factor(kademlia_replication_factor);
225 let kademlia_protocols = if let Some(legacy_protocol) = kademlia_legacy_protocol {
229 vec![kademlia_protocol.clone(), legacy_protocol]
230 } else {
231 vec![kademlia_protocol.clone()]
232 };
233 config.set_protocol_names(kademlia_protocols.into_iter().map(Into::into).collect());
234
235 config.set_record_filtering(libp2p::kad::StoreInserts::FilterBoth);
236
237 config.set_kbucket_inserts(BucketInserts::Manual);
241 config.disjoint_query_paths(kademlia_disjoint_query_paths);
242 let store = MemoryStore::new(local_peer_id);
243 let mut kad = Kademlia::with_config(local_peer_id, store, config);
244 kad.set_mode(Some(kad::Mode::Server));
245
246 for (peer_id, addr) in &permanent_addresses {
247 kad.add_address(peer_id, addr.clone());
248 }
249
250 Some(kad)
251 } else {
252 None
253 };
254
255 DiscoveryBehaviour {
256 permanent_addresses,
257 ephemeral_addresses: HashMap::new(),
258 kademlia: Toggle::from(kademlia),
259 next_kad_random_query: if dht_random_walk {
260 Some(Delay::new(Duration::new(0, 0)))
261 } else {
262 None
263 },
264 duration_to_next_kad: Duration::from_secs(1),
265 pending_events: VecDeque::new(),
266 local_peer_id,
267 num_connections: 0,
268 allow_private_ip,
269 discovery_only_if_under_num,
270 mdns: if enable_mdns {
271 match TokioMdns::new(mdns::Config::default(), local_peer_id) {
272 Ok(mdns) => Toggle::from(Some(mdns)),
273 Err(err) => {
274 warn!(target: "sub-libp2p", "Failed to initialize mDNS: {:?}", err);
275 Toggle::from(None)
276 },
277 }
278 } else {
279 Toggle::from(None)
280 },
281 allow_non_globals_in_dht,
282 known_external_addresses: LruHashSet::new(
283 NonZeroUsize::new(MAX_KNOWN_EXTERNAL_ADDRESSES)
284 .expect("value is a constant; constant is non-zero; qed."),
285 ),
286 records_to_publish: Default::default(),
287 kademlia_protocol,
288 }
289 }
290}
291
292pub struct DiscoveryBehaviour {
294 permanent_addresses: Vec<(PeerId, Multiaddr)>,
297 ephemeral_addresses: HashMap<PeerId, Vec<Multiaddr>>,
300 kademlia: Toggle<Kademlia<MemoryStore>>,
303 mdns: Toggle<TokioMdns>,
305 next_kad_random_query: Option<Delay>,
308 duration_to_next_kad: Duration,
310 pending_events: VecDeque<DiscoveryOut>,
312 local_peer_id: PeerId,
314 num_connections: u64,
316 allow_private_ip: bool,
319 discovery_only_if_under_num: u64,
321 allow_non_globals_in_dht: bool,
323 known_external_addresses: LruHashSet<Multiaddr>,
325 records_to_publish: HashMap<QueryId, Record>,
331 kademlia_protocol: Option<StreamProtocol>,
336}
337
338impl DiscoveryBehaviour {
339 pub fn known_peers(&mut self) -> HashSet<PeerId> {
341 let mut peers = HashSet::new();
342 if let Some(k) = self.kademlia.as_mut() {
343 for b in k.kbuckets() {
344 for e in b.iter() {
345 if !peers.contains(e.node.key.preimage()) {
346 peers.insert(*e.node.key.preimage());
347 }
348 }
349 }
350 }
351 peers
352 }
353
354 pub fn add_known_address(&mut self, peer_id: PeerId, addr: Multiaddr) {
360 let addrs_list = self.ephemeral_addresses.entry(peer_id).or_default();
361 if addrs_list.contains(&addr) {
362 return
363 }
364
365 if let Some(k) = self.kademlia.as_mut() {
366 k.add_address(&peer_id, addr.clone());
367 }
368
369 self.pending_events.push_back(DiscoveryOut::Discovered(peer_id));
370 addrs_list.push(addr);
371 }
372
373 pub fn add_self_reported_address(
379 &mut self,
380 peer_id: &PeerId,
381 supported_protocols: &[StreamProtocol],
382 addr: Multiaddr,
383 ) {
384 if let Some(kademlia) = self.kademlia.as_mut() {
385 if !self.allow_non_globals_in_dht && !Self::can_add_to_dht(&addr) {
386 trace!(
387 target: "sub-libp2p",
388 "Ignoring self-reported non-global address {} from {}.", addr, peer_id
389 );
390 return
391 }
392
393 if !supported_protocols.iter().any(|p| {
399 p == self
400 .kademlia_protocol
401 .as_ref()
402 .expect("kademlia protocol was checked above to be enabled; qed")
403 }) {
404 trace!(
405 target: "sub-libp2p",
406 "Ignoring self-reported address {} from {} as remote node is not part of the \
407 Kademlia DHT supported by the local node.", addr, peer_id,
408 );
409 return
410 }
411
412 trace!(
413 target: "sub-libp2p",
414 "Adding self-reported address {} from {} to Kademlia DHT.",
415 addr, peer_id
416 );
417 kademlia.add_address(peer_id, addr.clone());
418 }
419 }
420
421 pub fn get_value(&mut self, key: RecordKey) {
425 if let Some(k) = self.kademlia.as_mut() {
426 k.get_record(key.clone());
427 }
428 }
429
430 pub fn put_value(&mut self, key: RecordKey, value: Vec<u8>) {
435 if let Some(k) = self.kademlia.as_mut() {
436 if let Err(e) = k.put_record(Record::new(key.clone(), value.clone()), Quorum::All) {
437 warn!(target: "sub-libp2p", "Libp2p => Failed to put record: {:?}", e);
438 self.pending_events
439 .push_back(DiscoveryOut::ValuePutFailed(key.clone(), Duration::from_secs(0)));
440 }
441 }
442 }
443
444 pub fn put_record_to(
448 &mut self,
449 record: Record,
450 peers: HashSet<sc_network_types::PeerId>,
451 update_local_storage: bool,
452 ) {
453 if let Some(kad) = self.kademlia.as_mut() {
454 if update_local_storage {
455 if let Err(_e) = kad.store_mut().put(record.clone()) {
456 warn!(target: "sub-libp2p", "Failed to update local starage");
457 }
458 }
459
460 if !peers.is_empty() {
461 kad.put_record_to(
462 record,
463 peers.into_iter().map(|peer_id| peer_id.into()),
464 Quorum::All,
465 );
466 }
467 }
468 }
469 pub fn store_record(
471 &mut self,
472 record_key: RecordKey,
473 record_value: Vec<u8>,
474 publisher: Option<PeerId>,
475 expires: Option<Instant>,
476 ) {
477 if let Some(k) = self.kademlia.as_mut() {
478 if let Err(err) = k.store_mut().put(Record {
479 key: record_key,
480 value: record_value,
481 publisher: publisher.map(|publisher| publisher.into()),
482 expires,
483 }) {
484 debug!(
485 target: "sub-libp2p",
486 "Failed to store record with key: {:?}",
487 err
488 );
489 }
490 }
491 }
492
493 pub fn num_entries_per_kbucket(&mut self) -> Option<Vec<(u32, usize)>> {
498 self.kademlia.as_mut().map(|kad| {
499 kad.kbuckets()
500 .map(|bucket| (bucket.range().0.ilog2().unwrap_or(0), bucket.iter().count()))
501 .collect()
502 })
503 }
504
505 pub fn num_kademlia_records(&mut self) -> Option<usize> {
507 self.kademlia.as_mut().map(|kad| kad.store_mut().records().count())
509 }
510
511 pub fn kademlia_records_total_size(&mut self) -> Option<usize> {
513 self.kademlia
516 .as_mut()
517 .map(|kad| kad.store_mut().records().fold(0, |tot, rec| tot + rec.value.len()))
518 }
519
520 pub fn can_add_to_dht(addr: &Multiaddr) -> bool {
527 let ip = match addr.iter().next() {
528 Some(Protocol::Ip4(ip)) => IpNetwork::from(ip),
529 Some(Protocol::Ip6(ip)) => IpNetwork::from(ip),
530 Some(Protocol::Dns(_)) | Some(Protocol::Dns4(_)) | Some(Protocol::Dns6(_)) =>
531 return true,
532 _ => return false,
533 };
534 ip.is_global()
535 }
536}
537
538#[derive(Debug)]
540pub enum DiscoveryOut {
541 Discovered(PeerId),
547
548 UnroutablePeer(PeerId),
555
556 ValueFound(PeerRecord, Duration),
560
561 PutRecordRequest(
563 RecordKey,
564 Vec<u8>,
565 Option<sc_network_types::PeerId>,
566 Option<std::time::Instant>,
567 ),
568
569 ValueNotFound(RecordKey, Duration),
573
574 ValuePut(RecordKey, Duration),
578
579 ValuePutFailed(RecordKey, Duration),
583
584 RandomKademliaStarted,
588}
589
590impl NetworkBehaviour for DiscoveryBehaviour {
591 type ConnectionHandler =
592 ToggleConnectionHandler<<Kademlia<MemoryStore> as NetworkBehaviour>::ConnectionHandler>;
593 type ToSwarm = DiscoveryOut;
594
595 fn handle_established_inbound_connection(
596 &mut self,
597 connection_id: ConnectionId,
598 peer: PeerId,
599 local_addr: &Multiaddr,
600 remote_addr: &Multiaddr,
601 ) -> Result<THandler<Self>, ConnectionDenied> {
602 self.kademlia.handle_established_inbound_connection(
603 connection_id,
604 peer,
605 local_addr,
606 remote_addr,
607 )
608 }
609
610 fn handle_established_outbound_connection(
611 &mut self,
612 connection_id: ConnectionId,
613 peer: PeerId,
614 addr: &Multiaddr,
615 role_override: Endpoint,
616 ) -> Result<THandler<Self>, ConnectionDenied> {
617 self.kademlia.handle_established_outbound_connection(
618 connection_id,
619 peer,
620 addr,
621 role_override,
622 )
623 }
624
625 fn handle_pending_inbound_connection(
626 &mut self,
627 connection_id: ConnectionId,
628 local_addr: &Multiaddr,
629 remote_addr: &Multiaddr,
630 ) -> Result<(), ConnectionDenied> {
631 self.kademlia
632 .handle_pending_inbound_connection(connection_id, local_addr, remote_addr)
633 }
634
635 fn handle_pending_outbound_connection(
636 &mut self,
637 connection_id: ConnectionId,
638 maybe_peer: Option<PeerId>,
639 addresses: &[Multiaddr],
640 effective_role: Endpoint,
641 ) -> Result<Vec<Multiaddr>, ConnectionDenied> {
642 let Some(peer_id) = maybe_peer else { return Ok(Vec::new()) };
643
644 let mut list: LinkedHashSet<_> = self
649 .permanent_addresses
650 .iter()
651 .filter_map(|(p, a)| (*p == peer_id).then_some(a.clone()))
652 .collect();
653
654 if let Some(ephemeral_addresses) = self.ephemeral_addresses.get(&peer_id) {
655 ephemeral_addresses.iter().for_each(|address| {
656 list.insert_if_absent(address.clone());
657 });
658 }
659
660 {
661 let mut list_to_filter = self.kademlia.handle_pending_outbound_connection(
662 connection_id,
663 maybe_peer,
664 addresses,
665 effective_role,
666 )?;
667
668 list_to_filter.extend(self.mdns.handle_pending_outbound_connection(
669 connection_id,
670 maybe_peer,
671 addresses,
672 effective_role,
673 )?);
674
675 if !self.allow_private_ip {
676 list_to_filter.retain(|addr| match addr.iter().next() {
677 Some(Protocol::Ip4(addr)) if !IpNetwork::from(addr).is_global() => false,
678 Some(Protocol::Ip6(addr)) if !IpNetwork::from(addr).is_global() => false,
679 _ => true,
680 });
681 }
682
683 list_to_filter.into_iter().for_each(|address| {
684 list.insert_if_absent(address);
685 });
686 }
687
688 trace!(target: "sub-libp2p", "Addresses of {:?}: {:?}", peer_id, list);
689
690 Ok(list.into_iter().collect())
691 }
692
693 fn on_swarm_event(&mut self, event: FromSwarm<Self::ConnectionHandler>) {
694 match event {
695 FromSwarm::ConnectionEstablished(e) => {
696 self.num_connections += 1;
697 self.kademlia.on_swarm_event(FromSwarm::ConnectionEstablished(e));
698 },
699 FromSwarm::ConnectionClosed(e) => {
700 self.num_connections -= 1;
701 self.kademlia.on_swarm_event(FromSwarm::ConnectionClosed(e));
702 },
703 FromSwarm::DialFailure(e @ DialFailure { peer_id, error, .. }) => {
704 if let Some(peer_id) = peer_id {
705 if let DialError::Transport(errors) = error {
706 if let Entry::Occupied(mut entry) = self.ephemeral_addresses.entry(peer_id)
707 {
708 for (addr, _error) in errors {
709 entry.get_mut().retain(|a| a != addr);
710 }
711 if entry.get().is_empty() {
712 entry.remove();
713 }
714 }
715 }
716 }
717
718 self.kademlia.on_swarm_event(FromSwarm::DialFailure(e));
719 },
720 FromSwarm::ListenerClosed(e) => {
721 self.kademlia.on_swarm_event(FromSwarm::ListenerClosed(e));
722 },
723 FromSwarm::ListenFailure(e) => {
724 self.kademlia.on_swarm_event(FromSwarm::ListenFailure(e));
725 },
726 FromSwarm::ListenerError(e) => {
727 self.kademlia.on_swarm_event(FromSwarm::ListenerError(e));
728 },
729 FromSwarm::ExternalAddrExpired(e) => {
730 self.kademlia.on_swarm_event(FromSwarm::ExternalAddrExpired(e));
734 },
735 FromSwarm::NewListener(e) => {
736 self.kademlia.on_swarm_event(FromSwarm::NewListener(e));
737 },
738 FromSwarm::ExpiredListenAddr(e) => {
739 self.kademlia.on_swarm_event(FromSwarm::ExpiredListenAddr(e));
740 },
741 FromSwarm::NewExternalAddrCandidate(e) => {
742 self.kademlia.on_swarm_event(FromSwarm::NewExternalAddrCandidate(e));
743 },
744 FromSwarm::AddressChange(e) => {
745 self.kademlia.on_swarm_event(FromSwarm::AddressChange(e));
746 },
747 FromSwarm::NewListenAddr(e) => {
748 self.kademlia.on_swarm_event(FromSwarm::NewListenAddr(e));
749 self.mdns.on_swarm_event(FromSwarm::NewListenAddr(e));
750 },
751 FromSwarm::ExternalAddrConfirmed(e @ ExternalAddrConfirmed { addr }) => {
752 let new_addr = addr.clone().with(Protocol::P2p(self.local_peer_id));
753
754 if Self::can_add_to_dht(addr) {
755 if self.known_external_addresses.insert(new_addr.clone()) {
758 info!(
759 target: "sub-libp2p",
760 "🔍 Discovered new external address for our node: {}",
761 new_addr,
762 );
763 }
764 }
765
766 self.kademlia.on_swarm_event(FromSwarm::ExternalAddrConfirmed(e));
767 },
768 }
769 }
770
771 fn on_connection_handler_event(
772 &mut self,
773 peer_id: PeerId,
774 connection_id: ConnectionId,
775 event: THandlerOutEvent<Self>,
776 ) {
777 self.kademlia.on_connection_handler_event(peer_id, connection_id, event);
778 }
779
780 fn poll(
781 &mut self,
782 cx: &mut Context,
783 params: &mut impl PollParameters,
784 ) -> Poll<ToSwarm<Self::ToSwarm, THandlerInEvent<Self>>> {
785 if let Some(ev) = self.pending_events.pop_front() {
787 return Poll::Ready(ToSwarm::GenerateEvent(ev))
788 }
789
790 if let Some(kademlia) = self.kademlia.as_mut() {
792 if let Some(next_kad_random_query) = self.next_kad_random_query.as_mut() {
793 while next_kad_random_query.poll_unpin(cx).is_ready() {
794 let actually_started =
795 if self.num_connections < self.discovery_only_if_under_num {
796 let random_peer_id = PeerId::random();
797 debug!(
798 target: "sub-libp2p",
799 "Libp2p <= Starting random Kademlia request for {:?}",
800 random_peer_id,
801 );
802 kademlia.get_closest_peers(random_peer_id);
803 true
804 } else {
805 debug!(
806 target: "sub-libp2p",
807 "Kademlia paused due to high number of connections ({})",
808 self.num_connections
809 );
810 false
811 };
812
813 *next_kad_random_query = Delay::new(self.duration_to_next_kad);
816 self.duration_to_next_kad =
817 cmp::min(self.duration_to_next_kad * 2, Duration::from_secs(60));
818
819 if actually_started {
820 let ev = DiscoveryOut::RandomKademliaStarted;
821 return Poll::Ready(ToSwarm::GenerateEvent(ev))
822 }
823 }
824 }
825 }
826
827 while let Poll::Ready(ev) = self.kademlia.poll(cx, params) {
828 match ev {
829 ToSwarm::GenerateEvent(ev) => match ev {
830 KademliaEvent::RoutingUpdated { peer, .. } => {
831 let ev = DiscoveryOut::Discovered(peer);
832 return Poll::Ready(ToSwarm::GenerateEvent(ev))
833 },
834 KademliaEvent::UnroutablePeer { peer, .. } => {
835 let ev = DiscoveryOut::UnroutablePeer(peer);
836 return Poll::Ready(ToSwarm::GenerateEvent(ev))
837 },
838 KademliaEvent::RoutablePeer { peer, .. } => {
839 let ev = DiscoveryOut::Discovered(peer);
840 return Poll::Ready(ToSwarm::GenerateEvent(ev))
841 },
842 KademliaEvent::PendingRoutablePeer { .. } => {
843 },
845 KademliaEvent::InboundRequest { request } => match request {
846 libp2p::kad::InboundRequest::PutRecord { record: Some(record), .. } =>
847 return Poll::Ready(ToSwarm::GenerateEvent(
848 DiscoveryOut::PutRecordRequest(
849 record.key,
850 record.value,
851 record.publisher.map(Into::into),
852 record.expires,
853 ),
854 )),
855 _ => {},
856 },
857 KademliaEvent::OutboundQueryProgressed {
858 result: QueryResult::GetClosestPeers(res),
859 ..
860 } => match res {
861 Err(GetClosestPeersError::Timeout { key, peers }) => {
862 debug!(
863 target: "sub-libp2p",
864 "Libp2p => Query for {:?} timed out with {} results",
865 HexDisplay::from(&key), peers.len(),
866 );
867 },
868 Ok(ok) => {
869 trace!(
870 target: "sub-libp2p",
871 "Libp2p => Query for {:?} yielded {:?} results",
872 HexDisplay::from(&ok.key), ok.peers.len(),
873 );
874 if ok.peers.is_empty() && self.num_connections != 0 {
875 debug!(
876 target: "sub-libp2p",
877 "Libp2p => Random Kademlia query has yielded empty results",
878 );
879 }
880 },
881 },
882 KademliaEvent::OutboundQueryProgressed {
883 result: QueryResult::GetRecord(res),
884 stats,
885 id,
886 ..
887 } => {
888 let ev = match res {
889 Ok(GetRecordOk::FoundRecord(r)) => {
890 debug!(
891 target: "sub-libp2p",
892 "Libp2p => Found record ({:?}) with value: {:?} id {:?} stats {:?}",
893 r.record.key,
894 r.record.value,
895 id,
896 stats,
897 );
898
899 if stats.num_successes() > GET_RECORD_REDUNDANCY_FACTOR {
906 if let Some(kad) = self.kademlia.as_mut() {
907 if let Some(mut query) = kad.query_mut(&id) {
908 query.finish();
909 }
910 }
911 }
912
913 self.records_to_publish.insert(id, r.record.clone());
916
917 DiscoveryOut::ValueFound(r, stats.duration().unwrap_or_default())
918 },
919 Ok(GetRecordOk::FinishedWithNoAdditionalRecord {
920 cache_candidates,
921 }) => {
922 debug!(
923 target: "sub-libp2p",
924 "Libp2p => Finished with no-additional-record {:?} stats {:?} took {:?} ms",
925 id,
926 stats,
927 stats.duration().map(|val| val.as_millis())
928 );
929 if let Some(record) = self.records_to_publish.remove(&id) {
931 if cache_candidates.is_empty() {
932 continue
933 }
934
935 if let Some(kad) = self.kademlia.as_mut() {
938 kad.put_record_to(
939 record,
940 cache_candidates.into_iter().map(|v| v.1),
941 Quorum::One,
942 );
943 }
944 }
945
946 continue
947 },
948 Err(e @ libp2p::kad::GetRecordError::NotFound { .. }) => {
949 trace!(
950 target: "sub-libp2p",
951 "Libp2p => Failed to get record: {:?}",
952 e,
953 );
954 DiscoveryOut::ValueNotFound(
955 e.into_key(),
956 stats.duration().unwrap_or_default(),
957 )
958 },
959 Err(e) => {
960 debug!(
961 target: "sub-libp2p",
962 "Libp2p => Failed to get record: {:?}",
963 e,
964 );
965 DiscoveryOut::ValueNotFound(
966 e.into_key(),
967 stats.duration().unwrap_or_default(),
968 )
969 },
970 };
971 return Poll::Ready(ToSwarm::GenerateEvent(ev))
972 },
973 KademliaEvent::OutboundQueryProgressed {
974 result: QueryResult::PutRecord(res),
975 stats,
976 ..
977 } => {
978 let ev = match res {
979 Ok(ok) =>
980 DiscoveryOut::ValuePut(ok.key, stats.duration().unwrap_or_default()),
981 Err(e) => {
982 debug!(
983 target: "sub-libp2p",
984 "Libp2p => Failed to put record: {:?}",
985 e,
986 );
987 DiscoveryOut::ValuePutFailed(
988 e.into_key(),
989 stats.duration().unwrap_or_default(),
990 )
991 },
992 };
993 return Poll::Ready(ToSwarm::GenerateEvent(ev))
994 },
995 KademliaEvent::OutboundQueryProgressed {
996 result: QueryResult::RepublishRecord(res),
997 ..
998 } => match res {
999 Ok(ok) => debug!(
1000 target: "sub-libp2p",
1001 "Libp2p => Record republished: {:?}",
1002 ok.key,
1003 ),
1004 Err(e) => debug!(
1005 target: "sub-libp2p",
1006 "Libp2p => Republishing of record {:?} failed with: {:?}",
1007 e.key(), e,
1008 ),
1009 },
1010 KademliaEvent::OutboundQueryProgressed { result: e, .. } => {
1012 warn!(target: "sub-libp2p", "Libp2p => Unhandled Kademlia event: {:?}", e)
1013 },
1014 },
1015 ToSwarm::Dial { opts } => return Poll::Ready(ToSwarm::Dial { opts }),
1016 ToSwarm::NotifyHandler { peer_id, handler, event } =>
1017 return Poll::Ready(ToSwarm::NotifyHandler { peer_id, handler, event }),
1018 ToSwarm::CloseConnection { peer_id, connection } =>
1019 return Poll::Ready(ToSwarm::CloseConnection { peer_id, connection }),
1020 ToSwarm::NewExternalAddrCandidate(observed) =>
1021 return Poll::Ready(ToSwarm::NewExternalAddrCandidate(observed)),
1022 ToSwarm::ExternalAddrConfirmed(addr) =>
1023 return Poll::Ready(ToSwarm::ExternalAddrConfirmed(addr)),
1024 ToSwarm::ExternalAddrExpired(addr) =>
1025 return Poll::Ready(ToSwarm::ExternalAddrExpired(addr)),
1026 ToSwarm::ListenOn { opts } => return Poll::Ready(ToSwarm::ListenOn { opts }),
1027 ToSwarm::RemoveListener { id } =>
1028 return Poll::Ready(ToSwarm::RemoveListener { id }),
1029 }
1030 }
1031
1032 while let Poll::Ready(ev) = self.mdns.poll(cx, params) {
1034 match ev {
1035 ToSwarm::GenerateEvent(event) => match event {
1036 mdns::Event::Discovered(list) => {
1037 if self.num_connections >= self.discovery_only_if_under_num {
1038 continue
1039 }
1040
1041 self.pending_events.extend(
1042 list.into_iter().map(|(peer_id, _)| DiscoveryOut::Discovered(peer_id)),
1043 );
1044 if let Some(ev) = self.pending_events.pop_front() {
1045 return Poll::Ready(ToSwarm::GenerateEvent(ev))
1046 }
1047 },
1048 mdns::Event::Expired(_) => {},
1049 },
1050 ToSwarm::Dial { .. } => {
1051 unreachable!("mDNS never dials!");
1052 },
1053 ToSwarm::NotifyHandler { event, .. } => match event {},
1055 ToSwarm::CloseConnection { peer_id, connection } =>
1056 return Poll::Ready(ToSwarm::CloseConnection { peer_id, connection }),
1057 ToSwarm::NewExternalAddrCandidate(observed) =>
1058 return Poll::Ready(ToSwarm::NewExternalAddrCandidate(observed)),
1059 ToSwarm::ExternalAddrConfirmed(addr) =>
1060 return Poll::Ready(ToSwarm::ExternalAddrConfirmed(addr)),
1061 ToSwarm::ExternalAddrExpired(addr) =>
1062 return Poll::Ready(ToSwarm::ExternalAddrExpired(addr)),
1063 ToSwarm::ListenOn { opts } => return Poll::Ready(ToSwarm::ListenOn { opts }),
1064 ToSwarm::RemoveListener { id } =>
1065 return Poll::Ready(ToSwarm::RemoveListener { id }),
1066 }
1067 }
1068
1069 Poll::Pending
1070 }
1071}
1072
1073fn legacy_kademlia_protocol_name(id: &ProtocolId) -> StreamProtocol {
1075 let name = format!("/{}/kad", id.as_ref());
1076 StreamProtocol::try_from_owned(name).expect("protocol name is valid. qed")
1077}
1078
1079fn kademlia_protocol_name<Hash: AsRef<[u8]>>(
1081 genesis_hash: Hash,
1082 fork_id: Option<&str>,
1083) -> StreamProtocol {
1084 let genesis_hash_hex = bytes2hex("", genesis_hash.as_ref());
1085 let name = if let Some(fork_id) = fork_id {
1086 format!("/{genesis_hash_hex}/{fork_id}/kad")
1087 } else {
1088 format!("/{genesis_hash_hex}/kad")
1089 };
1090
1091 StreamProtocol::try_from_owned(name).expect("protocol name is valid. qed")
1092}
1093
1094#[cfg(test)]
1095mod tests {
1096 use super::{
1097 kademlia_protocol_name, legacy_kademlia_protocol_name, DiscoveryConfig, DiscoveryOut,
1098 };
1099 use crate::config::ProtocolId;
1100 use futures::prelude::*;
1101 use libp2p::{
1102 core::{
1103 transport::{MemoryTransport, Transport},
1104 upgrade,
1105 },
1106 identity::Keypair,
1107 noise,
1108 swarm::{Executor, Swarm, SwarmEvent},
1109 yamux, Multiaddr,
1110 };
1111 use sp_core::hash::H256;
1112 use std::{collections::HashSet, pin::Pin, task::Poll};
1113
1114 struct TokioExecutor(tokio::runtime::Runtime);
1115 impl Executor for TokioExecutor {
1116 fn exec(&self, f: Pin<Box<dyn Future<Output = ()> + Send>>) {
1117 let _ = self.0.spawn(f);
1118 }
1119 }
1120
1121 #[test]
1122 fn discovery_working() {
1123 let mut first_swarm_peer_id_and_addr = None;
1124
1125 let genesis_hash = H256::from_low_u64_be(1);
1126 let fork_id = Some("test-fork-id");
1127 let protocol_id = ProtocolId::from("dot");
1128
1129 let mut swarms = (0..25)
1132 .map(|i| {
1133 let keypair = Keypair::generate_ed25519();
1134
1135 let transport = MemoryTransport::new()
1136 .upgrade(upgrade::Version::V1)
1137 .authenticate(noise::Config::new(&keypair).unwrap())
1138 .multiplex(yamux::Config::default())
1139 .boxed();
1140
1141 let behaviour = {
1142 let mut config = DiscoveryConfig::new(keypair.public().to_peer_id());
1143 config
1144 .with_permanent_addresses(first_swarm_peer_id_and_addr.clone())
1145 .allow_private_ip(true)
1146 .allow_non_globals_in_dht(true)
1147 .discovery_limit(50)
1148 .with_kademlia(genesis_hash, fork_id, &protocol_id);
1149
1150 config.finish()
1151 };
1152
1153 let runtime = tokio::runtime::Runtime::new().unwrap();
1154 #[allow(deprecated)]
1155 let mut swarm = libp2p::swarm::SwarmBuilder::with_executor(
1156 transport,
1157 behaviour,
1158 keypair.public().to_peer_id(),
1159 TokioExecutor(runtime),
1160 )
1161 .build();
1162
1163 let listen_addr: Multiaddr =
1164 format!("/memory/{}", rand::random::<u64>()).parse().unwrap();
1165
1166 if i == 0 {
1167 first_swarm_peer_id_and_addr =
1168 Some((keypair.public().to_peer_id(), listen_addr.clone()))
1169 }
1170
1171 swarm.listen_on(listen_addr.clone()).unwrap();
1172 (swarm, listen_addr)
1173 })
1174 .collect::<Vec<_>>();
1175
1176 let mut to_discover = (0..swarms.len())
1178 .map(|n| {
1179 (0..swarms.len())
1180 .skip(1)
1182 .filter(|p| *p != n)
1183 .map(|p| *Swarm::local_peer_id(&swarms[p].0))
1184 .collect::<HashSet<_>>()
1185 })
1186 .collect::<Vec<_>>();
1187
1188 let fut = futures::future::poll_fn(move |cx| {
1189 'polling: loop {
1190 for swarm_n in 0..swarms.len() {
1191 match swarms[swarm_n].0.poll_next_unpin(cx) {
1192 Poll::Ready(Some(e)) => {
1193 match e {
1194 SwarmEvent::Behaviour(behavior) => {
1195 match behavior {
1196 DiscoveryOut::UnroutablePeer(other) |
1197 DiscoveryOut::Discovered(other) => {
1198 let addr = swarms
1201 .iter()
1202 .find_map(|(s, a)| {
1203 if s.behaviour().local_peer_id == other {
1204 Some(a.clone())
1205 } else {
1206 None
1207 }
1208 })
1209 .unwrap();
1210 let protocol_names = if swarm_n % 2 == 0 {
1213 vec![kademlia_protocol_name(genesis_hash, fork_id)]
1214 } else {
1215 vec![
1216 legacy_kademlia_protocol_name(&protocol_id),
1217 kademlia_protocol_name(genesis_hash, fork_id),
1218 ]
1219 };
1220 swarms[swarm_n]
1221 .0
1222 .behaviour_mut()
1223 .add_self_reported_address(
1224 &other,
1225 protocol_names.as_slice(),
1226 addr,
1227 );
1228
1229 to_discover[swarm_n].remove(&other);
1230 },
1231 DiscoveryOut::RandomKademliaStarted => {},
1232 e => {
1233 panic!("Unexpected event: {:?}", e)
1234 },
1235 }
1236 },
1237 _ => {},
1239 }
1240 continue 'polling
1241 },
1242 _ => {},
1243 }
1244 }
1245 break
1246 }
1247
1248 if to_discover.iter().all(|l| l.is_empty()) {
1249 Poll::Ready(())
1250 } else {
1251 Poll::Pending
1252 }
1253 });
1254
1255 futures::executor::block_on(fut);
1256 }
1257
1258 #[test]
1259 fn discovery_ignores_peers_with_unknown_protocols() {
1260 let supported_genesis_hash = H256::from_low_u64_be(1);
1261 let unsupported_genesis_hash = H256::from_low_u64_be(2);
1262 let supported_protocol_id = ProtocolId::from("a");
1263 let unsupported_protocol_id = ProtocolId::from("b");
1264
1265 let mut discovery = {
1266 let keypair = Keypair::generate_ed25519();
1267 let mut config = DiscoveryConfig::new(keypair.public().to_peer_id());
1268 config
1269 .allow_private_ip(true)
1270 .allow_non_globals_in_dht(true)
1271 .discovery_limit(50)
1272 .with_kademlia(supported_genesis_hash, None, &supported_protocol_id);
1273 config.finish()
1274 };
1275
1276 let predictable_peer_id = |bytes: &[u8; 32]| {
1277 Keypair::ed25519_from_bytes(bytes.to_owned()).unwrap().public().to_peer_id()
1278 };
1279
1280 let remote_peer_id = predictable_peer_id(b"00000000000000000000000000000001");
1281 let remote_addr: Multiaddr = "/memory/1".parse().unwrap();
1282 let another_peer_id = predictable_peer_id(b"00000000000000000000000000000002");
1283 let another_addr: Multiaddr = "/memory/2".parse().unwrap();
1284
1285 discovery.add_self_reported_address(
1287 &remote_peer_id,
1288 &[kademlia_protocol_name(unsupported_genesis_hash, None)],
1289 remote_addr.clone(),
1290 );
1291 discovery.add_self_reported_address(
1292 &another_peer_id,
1293 &[legacy_kademlia_protocol_name(&unsupported_protocol_id)],
1294 another_addr.clone(),
1295 );
1296
1297 {
1298 let kademlia = discovery.kademlia.as_mut().unwrap();
1299 assert!(
1300 kademlia
1301 .kbucket(remote_peer_id)
1302 .expect("Remote peer id not to be equal to local peer id.")
1303 .is_empty(),
1304 "Expect peer with unsupported protocol not to be added."
1305 );
1306 assert!(
1307 kademlia
1308 .kbucket(another_peer_id)
1309 .expect("Remote peer id not to be equal to local peer id.")
1310 .is_empty(),
1311 "Expect peer with unsupported protocol not to be added."
1312 );
1313 }
1314
1315 discovery.add_self_reported_address(
1317 &remote_peer_id,
1318 &[kademlia_protocol_name(supported_genesis_hash, None)],
1319 remote_addr.clone(),
1320 );
1321 {
1322 let kademlia = discovery.kademlia.as_mut().unwrap();
1323 assert!(
1324 !kademlia
1325 .kbucket(remote_peer_id)
1326 .expect("Remote peer id not to be equal to local peer id.")
1327 .is_empty(),
1328 "Expect peer with supported protocol to be added."
1329 );
1330 }
1331
1332 let unsupported_peer_id = predictable_peer_id(b"00000000000000000000000000000002");
1333 let unsupported_peer_addr: Multiaddr = "/memory/2".parse().unwrap();
1334
1335 {
1337 let kademlia = discovery.kademlia.as_mut().unwrap();
1338 assert!(
1339 kademlia
1340 .kbucket(unsupported_peer_id)
1341 .expect("Remote peer id not to be equal to local peer id.")
1342 .is_empty(),
1343 "Expect unsupported peer not to be added."
1344 );
1345 }
1346 discovery.add_self_reported_address(
1349 &unsupported_peer_id,
1350 &[legacy_kademlia_protocol_name(&supported_protocol_id)],
1351 unsupported_peer_addr.clone(),
1352 );
1353 {
1354 let kademlia = discovery.kademlia.as_mut().unwrap();
1355 assert!(
1356 kademlia
1357 .kbucket(unsupported_peer_id)
1358 .expect("Remote peer id not to be equal to local peer id.")
1359 .is_empty(),
1360 "Expect unsupported peer not to be added."
1361 );
1362 }
1363
1364 discovery.add_self_reported_address(
1366 &another_peer_id,
1367 &[
1368 legacy_kademlia_protocol_name(&supported_protocol_id),
1369 kademlia_protocol_name(supported_genesis_hash, None),
1370 ],
1371 another_addr.clone(),
1372 );
1373
1374 {
1375 let kademlia = discovery.kademlia.as_mut().unwrap();
1376 assert_eq!(
1377 2,
1378 kademlia.kbuckets().fold(0, |acc, bucket| acc + bucket.num_entries()),
1379 "Expect peers with supported protocol to be added."
1380 );
1381 assert!(
1382 !kademlia
1383 .kbucket(another_peer_id)
1384 .expect("Remote peer id not to be equal to local peer id.")
1385 .is_empty(),
1386 "Expect peer with supported protocol to be added."
1387 );
1388 }
1389 }
1390}