1use crate::{
22 config::{
23 NetworkConfiguration, ProtocolId, KADEMLIA_MAX_PROVIDER_KEYS, KADEMLIA_PROVIDER_RECORD_TTL,
24 KADEMLIA_PROVIDER_REPUBLISH_INTERVAL,
25 },
26 peer_store::PeerStoreProvider,
27};
28
29use array_bytes::bytes2hex;
30use futures::{FutureExt, Stream};
31use futures_timer::Delay;
32use ip_network::IpNetwork;
33use litep2p::{
34 protocol::{
35 libp2p::{
36 identify::{Config as IdentifyConfig, IdentifyEvent},
37 kademlia::{
38 Config as KademliaConfig, ConfigBuilder as KademliaConfigBuilder, ContentProvider,
39 IncomingRecordValidationMode, KademliaEvent, KademliaHandle, PeerRecord, QueryId,
40 Quorum, Record, RecordKey,
41 },
42 ping::{Config as PingConfig, PingEvent},
43 },
44 mdns::{Config as MdnsConfig, MdnsEvent},
45 },
46 types::multiaddr::{Multiaddr, Protocol},
47 PeerId, ProtocolName,
48};
49use parking_lot::RwLock;
50use sc_network_types::kad::Key as KademliaKey;
51use schnellru::{ByLength, LruMap};
52
53use std::{
54 cmp,
55 collections::{HashMap, HashSet, VecDeque},
56 iter,
57 num::NonZeroUsize,
58 pin::Pin,
59 sync::Arc,
60 task::{Context, Poll},
61 time::{Duration, Instant},
62};
63
64const LOG_TARGET: &str = "sub-libp2p::discovery";
66
67const KADEMLIA_QUERY_INTERVAL: Duration = Duration::from_secs(5);
69
70const MDNS_QUERY_INTERVAL: Duration = Duration::from_secs(30);
72
73const GET_RECORD_REDUNDANCY_FACTOR: usize = 4;
75
76const MAX_EXTERNAL_ADDRESSES: u32 = 32;
78
79const MIN_ADDRESS_CONFIRMATIONS: usize = 3;
82
83#[derive(Debug)]
85pub enum DiscoveryEvent {
86 Ping {
88 peer: PeerId,
90
91 rtt: Duration,
93 },
94
95 Identified {
97 peer: PeerId,
99
100 listen_addresses: Vec<Multiaddr>,
102
103 supported_protocols: HashSet<ProtocolName>,
105 },
106
107 Discovered {
111 addresses: Vec<Multiaddr>,
113 },
114
115 RoutingTableUpdate {
117 peers: HashSet<PeerId>,
119 },
120
121 ExternalAddressDiscovered {
123 address: Multiaddr,
125 },
126
127 ExternalAddressExpired {
132 address: Multiaddr,
134 },
135
136 FindNodeSuccess {
138 query_id: QueryId,
140
141 target: PeerId,
143
144 peers: Vec<(PeerId, Vec<Multiaddr>)>,
146 },
147
148 GetRecordSuccess {
150 query_id: QueryId,
152 },
153
154 GetRecordPartialResult {
156 query_id: QueryId,
158
159 record: PeerRecord,
161 },
162
163 PutRecordSuccess {
165 query_id: QueryId,
167 },
168
169 GetProvidersSuccess {
171 query_id: QueryId,
173 providers: Vec<ContentProvider>,
175 },
176
177 QueryFailed {
179 query_id: QueryId,
181 },
182
183 IncomingRecord {
185 record: Record,
187 },
188
189 RandomKademliaStarted,
191}
192
193pub struct Discovery {
195 local_peer_id: litep2p::PeerId,
197
198 ping_event_stream: Box<dyn Stream<Item = PingEvent> + Send + Unpin>,
200
201 identify_event_stream: Box<dyn Stream<Item = IdentifyEvent> + Send + Unpin>,
203
204 mdns_event_stream: Option<Box<dyn Stream<Item = MdnsEvent> + Send + Unpin>>,
206
207 kademlia_handle: KademliaHandle,
209
210 _peerstore_handle: Arc<dyn PeerStoreProvider>,
212
213 next_kad_query: Option<Delay>,
217
218 random_walk_query_id: Option<QueryId>,
220
221 pending_events: VecDeque<DiscoveryEvent>,
223
224 allow_non_global_addresses: bool,
226
227 local_protocols: HashSet<ProtocolName>,
229
230 public_addresses: HashSet<Multiaddr>,
232
233 listen_addresses: Arc<RwLock<HashSet<Multiaddr>>>,
235
236 address_confirmations: LruMap<Multiaddr, HashSet<PeerId>>,
238
239 duration_to_next_find_query: Duration,
241}
242
243fn legacy_kademlia_protocol_name(id: &ProtocolId) -> ProtocolName {
245 ProtocolName::from(format!("/{}/kad", id.as_ref()))
246}
247
248fn kademlia_protocol_name<Hash: AsRef<[u8]>>(
250 genesis_hash: Hash,
251 fork_id: Option<&str>,
252) -> ProtocolName {
253 let genesis_hash_hex = bytes2hex("", genesis_hash.as_ref());
254 let protocol = if let Some(fork_id) = fork_id {
255 format!("/{}/{}/kad", genesis_hash_hex, fork_id)
256 } else {
257 format!("/{}/kad", genesis_hash_hex)
258 };
259
260 ProtocolName::from(protocol)
261}
262
263impl Discovery {
264 pub fn new<Hash: AsRef<[u8]> + Clone>(
269 local_peer_id: litep2p::PeerId,
270 config: &NetworkConfiguration,
271 genesis_hash: Hash,
272 fork_id: Option<&str>,
273 protocol_id: &ProtocolId,
274 known_peers: HashMap<PeerId, Vec<Multiaddr>>,
275 listen_addresses: Arc<RwLock<HashSet<Multiaddr>>>,
276 _peerstore_handle: Arc<dyn PeerStoreProvider>,
277 ) -> (Self, PingConfig, IdentifyConfig, KademliaConfig, Option<MdnsConfig>) {
278 let (ping_config, ping_event_stream) = PingConfig::default();
279 let user_agent = format!("{} ({}) (litep2p)", config.client_version, config.node_name);
280
281 let (identify_config, identify_event_stream) =
282 IdentifyConfig::new("/substrate/1.0".to_string(), Some(user_agent));
283
284 let (mdns_config, mdns_event_stream) = match config.transport {
285 crate::config::TransportConfig::Normal { enable_mdns, .. } => match enable_mdns {
286 true => {
287 let (mdns_config, mdns_event_stream) = MdnsConfig::new(MDNS_QUERY_INTERVAL);
288 (Some(mdns_config), Some(mdns_event_stream))
289 },
290 false => (None, None),
291 },
292 _ => panic!("memory transport not supported"),
293 };
294
295 let (kademlia_config, kademlia_handle) = {
296 let protocol_names = vec![
297 kademlia_protocol_name(genesis_hash.clone(), fork_id),
298 legacy_kademlia_protocol_name(protocol_id),
299 ];
300
301 KademliaConfigBuilder::new()
302 .with_known_peers(known_peers)
303 .with_protocol_names(protocol_names)
304 .with_incoming_records_validation_mode(IncomingRecordValidationMode::Manual)
305 .with_provider_record_ttl(KADEMLIA_PROVIDER_RECORD_TTL)
306 .with_provider_refresh_interval(KADEMLIA_PROVIDER_REPUBLISH_INTERVAL)
307 .with_max_provider_keys(KADEMLIA_MAX_PROVIDER_KEYS)
308 .build()
309 };
310
311 (
312 Self {
313 local_peer_id,
314 ping_event_stream,
315 identify_event_stream,
316 mdns_event_stream,
317 kademlia_handle,
318 _peerstore_handle,
319 listen_addresses,
320 random_walk_query_id: None,
321 pending_events: VecDeque::new(),
322 duration_to_next_find_query: Duration::from_secs(1),
323 address_confirmations: LruMap::new(ByLength::new(MAX_EXTERNAL_ADDRESSES)),
324 allow_non_global_addresses: config.allow_non_globals_in_dht,
325 public_addresses: config.public_addresses.iter().cloned().map(Into::into).collect(),
326 next_kad_query: Some(Delay::new(KADEMLIA_QUERY_INTERVAL)),
327 local_protocols: HashSet::from_iter([kademlia_protocol_name(
328 genesis_hash,
329 fork_id,
330 )]),
331 },
332 ping_config,
333 identify_config,
334 kademlia_config,
335 mdns_config,
336 )
337 }
338
339 #[allow(unused)]
341 pub async fn add_known_peer(&mut self, peer: PeerId, addresses: Vec<Multiaddr>) {
342 self.kademlia_handle.add_known_peer(peer, addresses).await;
343 }
344
345 pub async fn add_self_reported_address(
348 &mut self,
349 peer: PeerId,
350 supported_protocols: HashSet<ProtocolName>,
351 addresses: Vec<Multiaddr>,
352 ) {
353 if self.local_protocols.is_disjoint(&supported_protocols) {
354 log::trace!(
355 target: LOG_TARGET,
356 "Ignoring self-reported address of peer {peer} as remote node is not part of the \
357 Kademlia DHT supported by the local node.",
358 );
359 return
360 }
361
362 let addresses = addresses
363 .into_iter()
364 .filter_map(|address| {
365 if !self.allow_non_global_addresses && !Discovery::can_add_to_dht(&address) {
366 log::trace!(
367 target: LOG_TARGET,
368 "ignoring self-reported non-global address {address} from {peer}."
369 );
370
371 return None
372 }
373
374 Some(address)
375 })
376 .collect();
377
378 log::trace!(
379 target: LOG_TARGET,
380 "add self-reported addresses for {peer:?}: {addresses:?}",
381 );
382
383 self.kademlia_handle.add_known_peer(peer, addresses).await;
384 }
385
386 pub async fn find_node(&mut self, target: PeerId) -> QueryId {
388 self.kademlia_handle.find_node(target).await
389 }
390
391 pub async fn get_value(&mut self, key: KademliaKey) -> QueryId {
393 self.kademlia_handle
394 .get_record(
395 RecordKey::new(&key.to_vec()),
396 Quorum::N(NonZeroUsize::new(GET_RECORD_REDUNDANCY_FACTOR).unwrap()),
397 )
398 .await
399 }
400
401 pub async fn put_value(&mut self, key: KademliaKey, value: Vec<u8>) -> QueryId {
403 self.kademlia_handle
404 .put_record(Record::new(RecordKey::new(&key.to_vec()), value))
405 .await
406 }
407
408 pub async fn put_value_to_peers(
410 &mut self,
411 record: Record,
412 peers: Vec<sc_network_types::PeerId>,
413 update_local_storage: bool,
414 ) -> QueryId {
415 self.kademlia_handle
416 .put_record_to_peers(
417 record,
418 peers.into_iter().map(|peer| peer.into()).collect(),
419 update_local_storage,
420 )
421 .await
422 }
423
424 pub async fn store_record(
426 &mut self,
427 key: KademliaKey,
428 value: Vec<u8>,
429 publisher: Option<sc_network_types::PeerId>,
430 expires: Option<Instant>,
431 ) {
432 log::debug!(
433 target: LOG_TARGET,
434 "Storing DHT record with key {key:?}, originally published by {publisher:?}, \
435 expires {expires:?}.",
436 );
437
438 self.kademlia_handle
439 .store_record(Record {
440 key: RecordKey::new(&key.to_vec()),
441 value,
442 publisher: publisher.map(Into::into),
443 expires,
444 })
445 .await;
446 }
447
448 pub async fn start_providing(&mut self, key: KademliaKey) {
450 self.kademlia_handle.start_providing(key.into()).await;
451 }
452
453 pub async fn stop_providing(&mut self, key: KademliaKey) {
455 self.kademlia_handle.stop_providing(key.into()).await;
456 }
457
458 pub async fn get_providers(&mut self, key: KademliaKey) -> QueryId {
460 self.kademlia_handle.get_providers(key.into()).await
461 }
462
463 fn is_known_address(known: &Multiaddr, observed: &Multiaddr) -> bool {
465 let mut known = known.iter();
466 let mut observed = observed.iter();
467
468 loop {
469 match (known.next(), observed.next()) {
470 (None, None) => return true,
471 (None, Some(Protocol::P2p(_))) => return true,
472 (Some(Protocol::P2p(_)), None) => return true,
473 (known, observed) if known != observed => return false,
474 _ => {},
475 }
476 }
477 }
478
479 fn can_add_to_dht(address: &Multiaddr) -> bool {
481 let ip = match address.iter().next() {
482 Some(Protocol::Ip4(ip)) => IpNetwork::from(ip),
483 Some(Protocol::Ip6(ip)) => IpNetwork::from(ip),
484 Some(Protocol::Dns(_)) | Some(Protocol::Dns4(_)) | Some(Protocol::Dns6(_)) =>
485 return true,
486 _ => return false,
487 };
488
489 ip.is_global()
490 }
491
492 fn is_new_external_address(
496 &mut self,
497 address: &Multiaddr,
498 peer: PeerId,
499 ) -> (bool, Option<Multiaddr>) {
500 log::trace!(target: LOG_TARGET, "verify new external address: {address}");
501
502 if !self.allow_non_global_addresses && !Discovery::can_add_to_dht(&address) {
503 log::trace!(
504 target: LOG_TARGET,
505 "ignoring externally reported non-global address {address} from {peer}."
506 );
507
508 return (false, None);
509 }
510
511 if self
513 .listen_addresses
514 .read()
515 .iter()
516 .chain(self.public_addresses.iter())
517 .any(|known_address| Discovery::is_known_address(&known_address, &address))
518 {
519 return (true, None)
520 }
521
522 match self.address_confirmations.get(address) {
523 Some(confirmations) => {
524 confirmations.insert(peer);
525
526 if confirmations.len() >= MIN_ADDRESS_CONFIRMATIONS {
527 return (true, None)
528 }
529 },
530 None => {
531 let oldest = (self.address_confirmations.len() >=
532 self.address_confirmations.limiter().max_length() as usize)
533 .then(|| {
534 self.address_confirmations.pop_oldest().map(|(address, peers)| {
535 if peers.len() >= MIN_ADDRESS_CONFIRMATIONS {
536 return Some(address)
537 } else {
538 None
539 }
540 })
541 })
542 .flatten()
543 .flatten();
544
545 self.address_confirmations.insert(address.clone(), iter::once(peer).collect());
546
547 return (false, oldest)
548 },
549 }
550
551 (false, None)
552 }
553}
554
555impl Stream for Discovery {
556 type Item = DiscoveryEvent;
557
558 fn poll_next(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
559 let this = Pin::into_inner(self);
560
561 if let Some(event) = this.pending_events.pop_front() {
562 return Poll::Ready(Some(event))
563 }
564
565 if let Some(mut delay) = this.next_kad_query.take() {
566 match delay.poll_unpin(cx) {
567 Poll::Pending => {
568 this.next_kad_query = Some(delay);
569 },
570 Poll::Ready(()) => {
571 let peer = PeerId::random();
572
573 log::trace!(target: LOG_TARGET, "start next kademlia query for {peer:?}");
574
575 match this.kademlia_handle.try_find_node(peer) {
576 Ok(query_id) => {
577 this.random_walk_query_id = Some(query_id);
578 return Poll::Ready(Some(DiscoveryEvent::RandomKademliaStarted))
579 },
580 Err(()) => {
581 this.duration_to_next_find_query = cmp::min(
582 this.duration_to_next_find_query * 2,
583 Duration::from_secs(60),
584 );
585 this.next_kad_query =
586 Some(Delay::new(this.duration_to_next_find_query));
587 },
588 }
589 },
590 }
591 }
592
593 match Pin::new(&mut this.kademlia_handle).poll_next(cx) {
594 Poll::Pending => {},
595 Poll::Ready(None) => return Poll::Ready(None),
596 Poll::Ready(Some(KademliaEvent::FindNodeSuccess { query_id, peers, .. }))
597 if Some(query_id) == this.random_walk_query_id =>
598 {
599 log::trace!(target: LOG_TARGET, "dht random walk yielded {} peers", peers.len());
603
604 this.next_kad_query = Some(Delay::new(KADEMLIA_QUERY_INTERVAL));
605
606 return Poll::Ready(Some(DiscoveryEvent::RoutingTableUpdate {
607 peers: peers.into_iter().map(|(peer, _)| peer).collect(),
608 }))
609 },
610 Poll::Ready(Some(KademliaEvent::FindNodeSuccess { query_id, target, peers })) => {
611 log::trace!(target: LOG_TARGET, "find node query yielded {} peers", peers.len());
612
613 return Poll::Ready(Some(DiscoveryEvent::FindNodeSuccess {
614 query_id,
615 target,
616 peers,
617 }))
618 },
619 Poll::Ready(Some(KademliaEvent::RoutingTableUpdate { peers })) => {
620 log::trace!(target: LOG_TARGET, "routing table update, discovered {} peers", peers.len());
621
622 return Poll::Ready(Some(DiscoveryEvent::RoutingTableUpdate {
623 peers: peers.into_iter().collect(),
624 }))
625 },
626 Poll::Ready(Some(KademliaEvent::GetRecordSuccess { query_id })) => {
627 log::trace!(
628 target: LOG_TARGET,
629 "`GET_RECORD` succeeded for {query_id:?}",
630 );
631
632 return Poll::Ready(Some(DiscoveryEvent::GetRecordSuccess { query_id }));
633 },
634 Poll::Ready(Some(KademliaEvent::GetRecordPartialResult { query_id, record })) => {
635 log::trace!(
636 target: LOG_TARGET,
637 "`GET_RECORD` intermediary succeeded for {query_id:?}: {record:?}",
638 );
639
640 return Poll::Ready(Some(DiscoveryEvent::GetRecordPartialResult {
641 query_id,
642 record,
643 }));
644 },
645 Poll::Ready(Some(KademliaEvent::PutRecordSuccess { query_id, key: _ })) =>
646 return Poll::Ready(Some(DiscoveryEvent::PutRecordSuccess { query_id })),
647 Poll::Ready(Some(KademliaEvent::QueryFailed { query_id })) => {
648 match this.random_walk_query_id == Some(query_id) {
649 true => {
650 this.random_walk_query_id = None;
651 this.duration_to_next_find_query =
652 cmp::min(this.duration_to_next_find_query * 2, Duration::from_secs(60));
653 this.next_kad_query = Some(Delay::new(this.duration_to_next_find_query));
654 },
655 false => return Poll::Ready(Some(DiscoveryEvent::QueryFailed { query_id })),
656 }
657 },
658 Poll::Ready(Some(KademliaEvent::IncomingRecord { record })) => {
659 log::trace!(
660 target: LOG_TARGET,
661 "incoming `PUT_RECORD` request with key {:?} from publisher {:?}",
662 record.key,
663 record.publisher,
664 );
665
666 return Poll::Ready(Some(DiscoveryEvent::IncomingRecord { record }))
667 },
668 Poll::Ready(Some(KademliaEvent::GetProvidersSuccess {
669 provided_key,
670 providers,
671 query_id,
672 })) => {
673 log::trace!(
674 target: LOG_TARGET,
675 "`GET_PROVIDERS` for {query_id:?} with {provided_key:?} yielded {providers:?}",
676 );
677
678 return Poll::Ready(Some(DiscoveryEvent::GetProvidersSuccess {
679 query_id,
680 providers,
681 }))
682 },
683 Poll::Ready(Some(KademliaEvent::IncomingProvider { .. })) => {},
685 }
686
687 match Pin::new(&mut this.identify_event_stream).poll_next(cx) {
688 Poll::Pending => {},
689 Poll::Ready(None) => return Poll::Ready(None),
690 Poll::Ready(Some(IdentifyEvent::PeerIdentified {
691 peer,
692 listen_addresses,
693 supported_protocols,
694 observed_address,
695 ..
696 })) => {
697 let observed_address =
698 if let Some(Protocol::P2p(peer_id)) = observed_address.iter().last() {
699 if peer_id != *this.local_peer_id.as_ref() {
700 log::warn!(
701 target: LOG_TARGET,
702 "Discovered external address for a peer that is not us: {observed_address}",
703 );
704 None
705 } else {
706 Some(observed_address)
707 }
708 } else {
709 Some(observed_address.with(Protocol::P2p(this.local_peer_id.into())))
710 };
711
712 if let Some(observed_address) = observed_address {
715 let (is_new, expired_address) =
716 this.is_new_external_address(&observed_address, peer);
717
718 if let Some(expired_address) = expired_address {
719 log::trace!(
720 target: LOG_TARGET,
721 "Removing expired external address expired={expired_address} is_new={is_new} observed={observed_address}",
722 );
723
724 this.pending_events.push_back(DiscoveryEvent::ExternalAddressExpired {
725 address: expired_address,
726 });
727 }
728
729 if is_new {
730 this.pending_events.push_back(DiscoveryEvent::ExternalAddressDiscovered {
731 address: observed_address.clone(),
732 });
733 }
734 }
735
736 return Poll::Ready(Some(DiscoveryEvent::Identified {
737 peer,
738 listen_addresses,
739 supported_protocols,
740 }));
741 },
742 }
743
744 match Pin::new(&mut this.ping_event_stream).poll_next(cx) {
745 Poll::Pending => {},
746 Poll::Ready(None) => return Poll::Ready(None),
747 Poll::Ready(Some(PingEvent::Ping { peer, ping })) =>
748 return Poll::Ready(Some(DiscoveryEvent::Ping { peer, rtt: ping })),
749 }
750
751 if let Some(ref mut mdns_event_stream) = &mut this.mdns_event_stream {
752 match Pin::new(mdns_event_stream).poll_next(cx) {
753 Poll::Pending => {},
754 Poll::Ready(None) => return Poll::Ready(None),
755 Poll::Ready(Some(MdnsEvent::Discovered(addresses))) =>
756 return Poll::Ready(Some(DiscoveryEvent::Discovered { addresses })),
757 }
758 }
759
760 Poll::Pending
761 }
762}
763
764#[cfg(test)]
765mod tests {
766 use super::*;
767
768 use std::sync::atomic::AtomicU32;
769
770 use crate::{
771 config::ProtocolId,
772 peer_store::{PeerStore, PeerStoreProvider},
773 };
774 use futures::{stream::FuturesUnordered, StreamExt};
775 use sp_core::H256;
776 use sp_tracing::tracing_subscriber;
777
778 use litep2p::{
779 config::ConfigBuilder as Litep2pConfigBuilder, transport::tcp::config::Config as TcpConfig,
780 Litep2p,
781 };
782
783 #[tokio::test]
784 async fn litep2p_discovery_works() {
785 let _ = tracing_subscriber::fmt()
786 .with_env_filter(tracing_subscriber::EnvFilter::from_default_env())
787 .try_init();
788
789 let mut known_peers = HashMap::new();
790 let genesis_hash = H256::from_low_u64_be(1);
791 let fork_id = Some("test-fork-id");
792 let protocol_id = ProtocolId::from("dot");
793
794 let backends = (0..10)
796 .map(|i| {
797 let keypair = litep2p::crypto::ed25519::Keypair::generate();
798 let peer_id: PeerId = keypair.public().to_peer_id().into();
799
800 let listen_addresses = Arc::new(RwLock::new(HashSet::new()));
801
802 let peer_store = PeerStore::new(vec![], None);
803 let peer_store_handle: Arc<dyn PeerStoreProvider> = Arc::new(peer_store.handle());
804
805 let (discovery, ping_config, identify_config, kademlia_config, _mdns) =
806 Discovery::new(
807 peer_id,
808 &NetworkConfiguration::new_local(),
809 genesis_hash,
810 fork_id,
811 &protocol_id,
812 known_peers.clone(),
813 listen_addresses.clone(),
814 peer_store_handle,
815 );
816
817 let config = Litep2pConfigBuilder::new()
818 .with_keypair(keypair)
819 .with_tcp(TcpConfig {
820 listen_addresses: vec!["/ip6/::1/tcp/0".parse().unwrap()],
821 ..Default::default()
822 })
823 .with_libp2p_ping(ping_config)
824 .with_libp2p_identify(identify_config)
825 .with_libp2p_kademlia(kademlia_config)
826 .build();
827
828 let mut litep2p = Litep2p::new(config).unwrap();
829
830 let addresses = litep2p.listen_addresses().cloned().collect::<Vec<_>>();
831 addresses.iter().for_each(|address| {
833 listen_addresses.write().insert(address.clone());
834 });
835
836 if i == 0 {
838 log::info!(target: LOG_TARGET, "First peer is {peer_id:?} with addresses {addresses:?}");
839 known_peers.insert(peer_id, addresses.clone());
840 } else {
841 let (peer, addresses) = known_peers.iter().next().unwrap();
842
843 let result = litep2p.add_known_address(*peer, addresses.into_iter().cloned());
844
845 log::info!(target: LOG_TARGET, "{peer_id:?}: Adding known peer {peer:?} with addresses {addresses:?} result={result:?}");
846
847 }
848
849 (peer_id, litep2p, discovery)
850 })
851 .collect::<Vec<_>>();
852
853 let total_peers = backends.len() as u32;
854 let remaining_peers =
855 backends.iter().map(|(peer_id, _, _)| *peer_id).collect::<HashSet<_>>();
856
857 let first_peer = *known_peers.iter().next().unwrap().0;
858
859 let mut futures = FuturesUnordered::new();
861 let num_finished = Arc::new(AtomicU32::new(0));
862
863 for (peer_id, mut litep2p, mut discovery) in backends {
864 let mut remaining_peers = remaining_peers.clone();
866 remaining_peers.remove(&peer_id);
867
868 let num_finished = num_finished.clone();
869
870 let future = async move {
871 log::info!(target: LOG_TARGET, "{peer_id:?} starting loop");
872
873 if peer_id != first_peer {
874 log::info!(target: LOG_TARGET, "{peer_id:?} dialing {first_peer:?}");
875 litep2p.dial(&first_peer).await.unwrap();
876 }
877
878 loop {
879 if num_finished.load(std::sync::atomic::Ordering::Relaxed) == total_peers {
881 log::info!(target: LOG_TARGET, "{peer_id:?} all peers discovered");
882 break
883 }
884
885 tokio::select! {
886 event = litep2p.next_event() => {
888 log::info!(target: LOG_TARGET, "{peer_id:?} Litep2p event: {event:?}");
889 },
890
891 event = discovery.next() => {
893 match event.unwrap() {
894 DiscoveryEvent::Identified { peer, .. } => {
897 log::info!(target: LOG_TARGET, "{peer_id:?} Peer {peer} identified");
898
899 remaining_peers.remove(&peer);
900
901 if remaining_peers.is_empty() {
902 log::info!(target: LOG_TARGET, "{peer_id:?} All peers discovered");
903
904 num_finished.fetch_add(1, std::sync::atomic::Ordering::AcqRel);
905 }
906 },
907
908 event => {
909 log::info!(target: LOG_TARGET, "{peer_id:?} Discovery event: {event:?}");
910 }
911 }
912 }
913 }
914 }
915 };
916
917 futures.push(future);
918 }
919
920 tokio::time::timeout(Duration::from_secs(60), futures.next())
922 .await
923 .expect("All peers should finish within 60 seconds");
924 }
925}