1pub(crate) use crate::worker::addr_cache::AddrCache;
20use crate::{
21 error::{Error, Result},
22 interval::ExpIncInterval,
23 ServicetoWorkerMsg, WorkerConfig,
24};
25
26use std::{
27 collections::{HashMap, HashSet},
28 marker::PhantomData,
29 path::PathBuf,
30 sync::Arc,
31 time::{Duration, Instant, SystemTime, UNIX_EPOCH},
32};
33
34use futures::{channel::mpsc, future, stream::Fuse, FutureExt, Stream, StreamExt};
35
36use codec::{Decode, Encode};
37use ip_network::IpNetwork;
38use linked_hash_set::LinkedHashSet;
39use sc_network_types::kad::{Key, PeerRecord, Record};
40
41use log::{debug, error, info, trace};
42use prometheus_endpoint::{register, Counter, CounterVec, Gauge, Opts, U64};
43use prost::Message;
44use rand::{seq::SliceRandom, thread_rng};
45
46use sc_network::{
47 config::DEFAULT_KADEMLIA_REPLICATION_FACTOR, event::DhtEvent, multiaddr, KademliaKey,
48 Multiaddr, NetworkDHTProvider, NetworkSigner, NetworkStateInfo,
49};
50use sc_network_types::{multihash::Code, PeerId};
51use schema::PeerSignature;
52use sp_api::{ApiError, ProvideRuntimeApi};
53use sp_authority_discovery::{
54 AuthorityDiscoveryApi, AuthorityId, AuthorityPair, AuthoritySignature,
55};
56use sp_blockchain::HeaderBackend;
57use sp_core::{
58 crypto::{key_types, ByteArray, Pair},
59 traits::SpawnNamed,
60};
61use sp_keystore::{Keystore, KeystorePtr};
62use sp_runtime::traits::Block as BlockT;
63
64mod addr_cache;
65mod schema {
67 #[cfg(test)]
68 mod tests;
69
70 include!(concat!(env!("OUT_DIR"), "/authority_discovery_v3.rs"));
71}
72#[cfg(test)]
73pub mod tests;
74
75const LOG_TARGET: &str = "sub-authority-discovery";
76pub(crate) const ADDR_CACHE_FILE_NAME: &str = "authority_discovery_addr_cache.json";
77const ADDR_CACHE_PERSIST_INTERVAL: Duration = Duration::from_secs(60 * 10); const MAX_ADDRESSES_PER_AUTHORITY: usize = 16;
81
82const MAX_GLOBAL_LISTEN_ADDRESSES: usize = 4;
84
85const MAX_ADDRESSES_TO_PUBLISH: usize = 32;
87
88const MAX_IN_FLIGHT_LOOKUPS: usize = 8;
90
91pub enum Role {
93 PublishAndDiscover(KeystorePtr),
95 Discover,
97}
98
99pub struct Worker<Client, Block: BlockT, DhtEventStream> {
126 from_service: Fuse<mpsc::Receiver<ServicetoWorkerMsg>>,
128
129 client: Arc<Client>,
130
131 network: Arc<dyn NetworkProvider>,
132
133 dht_event_rx: DhtEventStream,
135
136 publish_interval: ExpIncInterval,
138
139 publish_if_changed_interval: ExpIncInterval,
142
143 latest_published_keys: HashSet<AuthorityId>,
146 latest_published_kad_keys: HashSet<KademliaKey>,
149
150 publish_non_global_ips: bool,
152
153 public_addresses: LinkedHashSet<Multiaddr>,
156
157 strict_record_validation: bool,
159
160 query_interval: ExpIncInterval,
162
163 pending_lookups: Vec<AuthorityId>,
165
166 known_authorities: HashMap<KademliaKey, AuthorityId>,
168
169 authorities_queried_at: Option<Block::Hash>,
171
172 in_flight_lookups: HashMap<KademliaKey, AuthorityId>,
174
175 known_lookups: HashMap<KademliaKey, AuthorityId>,
179
180 last_known_records: HashMap<KademliaKey, RecordInfo>,
184
185 addr_cache: addr_cache::AddrCache,
186
187 metrics: Option<Metrics>,
188
189 warn_public_addresses: bool,
191
192 role: Role,
193
194 phantom: PhantomData<Block>,
195
196 spawner: Box<dyn SpawnNamed>,
198
199 persisted_cache_file_path: Option<PathBuf>,
203}
204
205#[derive(Debug, Clone)]
206struct RecordInfo {
207 creation_time: u128,
209 peers_with_record: HashSet<PeerId>,
212 record: Record,
214}
215
216#[async_trait::async_trait]
219pub trait AuthorityDiscovery<Block: BlockT> {
220 async fn authorities(&self, at: Block::Hash)
222 -> std::result::Result<Vec<AuthorityId>, ApiError>;
223
224 async fn best_hash(&self) -> std::result::Result<Block::Hash, Error>;
226}
227
228#[async_trait::async_trait]
229impl<Block, T> AuthorityDiscovery<Block> for T
230where
231 T: ProvideRuntimeApi<Block> + HeaderBackend<Block> + Send + Sync,
232 T::Api: AuthorityDiscoveryApi<Block>,
233 Block: BlockT,
234{
235 async fn authorities(
236 &self,
237 at: Block::Hash,
238 ) -> std::result::Result<Vec<AuthorityId>, ApiError> {
239 self.runtime_api().authorities(at)
240 }
241
242 async fn best_hash(&self) -> std::result::Result<Block::Hash, Error> {
243 Ok(self.info().best_hash)
244 }
245}
246
247impl<Client, Block, DhtEventStream> Worker<Client, Block, DhtEventStream>
248where
249 Block: BlockT + Unpin + 'static,
250 Client: AuthorityDiscovery<Block> + 'static,
251 DhtEventStream: Stream<Item = DhtEvent> + Unpin,
252{
253 pub(crate) fn new(
255 from_service: mpsc::Receiver<ServicetoWorkerMsg>,
256 client: Arc<Client>,
257 network: Arc<dyn NetworkProvider>,
258 dht_event_rx: DhtEventStream,
259 role: Role,
260 prometheus_registry: Option<prometheus_endpoint::Registry>,
261 config: WorkerConfig,
262 spawner: impl SpawnNamed + 'static,
263 ) -> Self {
264 let publish_interval =
271 ExpIncInterval::new(Duration::from_secs(2), config.max_publish_interval);
272 let query_interval = ExpIncInterval::new(Duration::from_secs(2), config.max_query_interval);
273
274 let publish_if_changed_interval =
277 ExpIncInterval::new(config.keystore_refresh_interval, config.keystore_refresh_interval);
278
279 let maybe_persisted_cache_file_path =
280 config.persisted_cache_directory.as_ref().map(|dir| {
281 let mut path = dir.clone();
282 path.push(ADDR_CACHE_FILE_NAME);
283 path
284 });
285
286 let addr_cache: AddrCache = if let Some(persisted_cache_file_path) =
290 maybe_persisted_cache_file_path.as_ref()
291 {
292 let loaded =
293 AddrCache::try_from(persisted_cache_file_path.as_path()).unwrap_or_else(|e| {
294 info!(target: LOG_TARGET, "Failed to load AddrCache from file, using empty instead: {}", e);
295 AddrCache::new()
296 });
297 info!(target: LOG_TARGET, "Loaded persisted AddrCache with {} authority ids.", loaded.num_authority_ids());
298 loaded
299 } else {
300 info!(target: LOG_TARGET, "No persisted cache file path provided, authority discovery will not persist the address cache to disk.");
301 AddrCache::new()
302 };
303
304 let metrics = match prometheus_registry {
305 Some(registry) => match Metrics::register(®istry) {
306 Ok(metrics) => Some(metrics),
307 Err(e) => {
308 error!(target: LOG_TARGET, "Failed to register metrics: {}", e);
309 None
310 },
311 },
312 None => None,
313 };
314
315 let public_addresses = {
316 let local_peer_id = network.local_peer_id();
317
318 config
319 .public_addresses
320 .into_iter()
321 .map(|address| AddressType::PublicAddress(address).without_p2p(local_peer_id))
322 .collect()
323 };
324
325 Worker {
326 from_service: from_service.fuse(),
327 client,
328 network,
329 dht_event_rx,
330 publish_interval,
331 known_authorities: Default::default(),
332 authorities_queried_at: None,
333 publish_if_changed_interval,
334 latest_published_keys: HashSet::new(),
335 latest_published_kad_keys: HashSet::new(),
336 publish_non_global_ips: config.publish_non_global_ips,
337 public_addresses,
338 strict_record_validation: config.strict_record_validation,
339 query_interval,
340 pending_lookups: Vec::new(),
341 in_flight_lookups: HashMap::new(),
342 known_lookups: HashMap::new(),
343 addr_cache,
344 role,
345 metrics,
346 warn_public_addresses: false,
347 phantom: PhantomData,
348 last_known_records: HashMap::new(),
349 spawner: Box::new(spawner),
350 persisted_cache_file_path: maybe_persisted_cache_file_path,
351 }
352 }
353
354 pub fn persist_addr_cache_if_supported(&self) {
356 let Some(path) = self.persisted_cache_file_path.as_ref().cloned() else {
357 return;
358 };
359 let cloned_cache = self.addr_cache.clone();
360 self.spawner.spawn_blocking(
361 "persist-addr-cache",
362 Some("authority-discovery-worker"),
363 Box::pin(async move {
364 cloned_cache.serialize_and_persist(path);
365 }),
366 )
367 }
368
369 pub async fn run(mut self) {
371 let mut persist_interval = tokio::time::interval(ADDR_CACHE_PERSIST_INTERVAL);
372
373 loop {
374 self.start_new_lookups();
375
376 futures::select! {
377 _ = persist_interval.tick().fuse() => {
378 self.persist_addr_cache_if_supported();
379 },
380 event = self.dht_event_rx.next().fuse() => {
382 if let Some(event) = event {
383 self.handle_dht_event(event).await;
384 } else {
385 self.persist_addr_cache_if_supported();
386 return;
389 }
390 },
391 msg = self.from_service.select_next_some() => {
393 self.process_message_from_service(msg);
394 },
395 only_if_changed = future::select(
397 self.publish_interval.next().map(|_| false),
398 self.publish_if_changed_interval.next().map(|_| true)
399 ).map(|e| e.factor_first().0).fuse() => {
400 if let Err(e) = self.publish_ext_addresses(only_if_changed).await {
401 error!(
402 target: LOG_TARGET,
403 "Failed to publish external addresses: {}", e,
404 );
405 }
406 },
407 _ = self.query_interval.next().fuse() => {
409 if let Err(e) = self.refill_pending_lookups_queue().await {
410 error!(
411 target: LOG_TARGET,
412 "Failed to request addresses of authorities: {}", e,
413 );
414 }
415 },
416 }
417 }
418 }
419
420 fn process_message_from_service(&self, msg: ServicetoWorkerMsg) {
421 match msg {
422 ServicetoWorkerMsg::GetAddressesByAuthorityId(authority, sender) => {
423 let _ = sender.send(
424 self.addr_cache.get_addresses_by_authority_id(&authority).map(Clone::clone),
425 );
426 },
427 ServicetoWorkerMsg::GetAuthorityIdsByPeerId(peer_id, sender) => {
428 let _ = sender
429 .send(self.addr_cache.get_authority_ids_by_peer_id(&peer_id).map(Clone::clone));
430 },
431 }
432 }
433
434 fn addresses_to_publish(&mut self) -> impl Iterator<Item = Multiaddr> {
435 let local_peer_id = self.network.local_peer_id();
436 let publish_non_global_ips = self.publish_non_global_ips;
437
438 let address_is_global = |address: &Multiaddr| {
440 address.iter().all(|protocol| match protocol {
441 multiaddr::Protocol::Ip4(ip) => IpNetwork::from(ip).is_global(),
444 multiaddr::Protocol::Ip6(ip) => IpNetwork::from(ip).is_global(),
445 _ => true,
446 })
447 };
448
449 let mut global_listen_addresses = self
457 .network
458 .listen_addresses()
459 .into_iter()
460 .filter_map(|address| {
461 address_is_global(&address)
462 .then(|| AddressType::GlobalListenAddress(address).without_p2p(local_peer_id))
463 })
464 .take(MAX_GLOBAL_LISTEN_ADDRESSES)
465 .peekable();
466
467 let mut external_addresses = self
469 .network
470 .external_addresses()
471 .into_iter()
472 .filter_map(|address| {
473 (publish_non_global_ips || address_is_global(&address))
474 .then(|| AddressType::ExternalAddress(address).without_p2p(local_peer_id))
475 })
476 .peekable();
477
478 let has_global_listen_addresses = global_listen_addresses.peek().is_some();
479 trace!(
480 target: LOG_TARGET,
481 "Node has public addresses: {}, global listen addresses: {}, external addresses: {}",
482 !self.public_addresses.is_empty(),
483 has_global_listen_addresses,
484 external_addresses.peek().is_some(),
485 );
486
487 let mut seen_addresses = HashSet::new();
488
489 let addresses = self
490 .public_addresses
491 .clone()
492 .into_iter()
493 .chain(global_listen_addresses)
494 .chain(external_addresses)
495 .filter(|address| seen_addresses.insert(address.clone()))
497 .take(MAX_ADDRESSES_TO_PUBLISH)
498 .collect::<Vec<_>>();
499
500 if !addresses.is_empty() {
501 debug!(
502 target: LOG_TARGET,
503 "Publishing authority DHT record peer_id='{local_peer_id}' with addresses='{addresses:?}'",
504 );
505
506 if !self.warn_public_addresses &&
507 self.public_addresses.is_empty() &&
508 !has_global_listen_addresses
509 {
510 self.warn_public_addresses = true;
511
512 error!(
513 target: LOG_TARGET,
514 "No public addresses configured and no global listen addresses found. \
515 Authority DHT record may contain unreachable addresses. \
516 Consider setting `--public-addr` to the public IP address of this node. \
517 This will become a hard requirement in future versions for authorities."
518 );
519 }
520 }
521
522 addresses
524 .into_iter()
525 .map(move |a| a.with(multiaddr::Protocol::P2p(*local_peer_id.as_ref())))
526 }
527
528 async fn publish_ext_addresses(&mut self, only_if_changed: bool) -> Result<()> {
533 let key_store = match &self.role {
534 Role::PublishAndDiscover(key_store) => key_store,
535 Role::Discover => return Ok(()),
536 }
537 .clone();
538
539 let addresses = serialize_addresses(self.addresses_to_publish());
540 if addresses.is_empty() {
541 trace!(
542 target: LOG_TARGET,
543 "No addresses to publish. Skipping publication."
544 );
545
546 self.publish_interval.set_to_start();
547 return Ok(())
548 }
549
550 let keys =
551 Worker::<Client, Block, DhtEventStream>::get_own_public_keys_within_authority_set(
552 key_store.clone(),
553 self.client.as_ref(),
554 )
555 .await?
556 .into_iter()
557 .collect::<HashSet<_>>();
558
559 if only_if_changed {
560 if keys == self.latest_published_keys {
563 return Ok(())
564 }
565
566 self.publish_interval.set_to_start();
569 self.query_interval.set_to_start();
570 }
571
572 if let Some(metrics) = &self.metrics {
573 metrics.publish.inc();
574 metrics
575 .amount_addresses_last_published
576 .set(addresses.len().try_into().unwrap_or(std::u64::MAX));
577 }
578
579 let serialized_record = serialize_authority_record(addresses, Some(build_creation_time()))?;
580 let peer_signature = sign_record_with_peer_id(&serialized_record, &self.network)?;
581
582 let keys_vec = keys.iter().cloned().collect::<Vec<_>>();
583
584 let kv_pairs = sign_record_with_authority_ids(
585 serialized_record,
586 Some(peer_signature),
587 key_store.as_ref(),
588 keys_vec,
589 )?;
590
591 self.latest_published_kad_keys = kv_pairs.iter().map(|(k, _)| k.clone()).collect();
592
593 for (key, value) in kv_pairs.into_iter() {
594 self.network.put_value(key, value);
595 }
596
597 self.latest_published_keys = keys;
598
599 Ok(())
600 }
601
602 async fn refill_pending_lookups_queue(&mut self) -> Result<()> {
603 let best_hash = self.client.best_hash().await?;
604
605 let local_keys = match &self.role {
606 Role::PublishAndDiscover(key_store) => key_store
607 .sr25519_public_keys(key_types::AUTHORITY_DISCOVERY)
608 .into_iter()
609 .collect::<HashSet<_>>(),
610 Role::Discover => HashSet::new(),
611 };
612
613 let mut authorities = self
614 .client
615 .authorities(best_hash)
616 .await
617 .map_err(|e| Error::CallingRuntime(e.into()))?
618 .into_iter()
619 .filter(|id| !local_keys.contains(id.as_ref()))
620 .collect::<Vec<_>>();
621
622 self.known_authorities = authorities
623 .clone()
624 .into_iter()
625 .map(|authority| (hash_authority_id(authority.as_ref()), authority))
626 .collect::<HashMap<_, _>>();
627 self.authorities_queried_at = Some(best_hash);
628
629 self.addr_cache.retain_ids(&authorities);
630 let now = Instant::now();
631 self.last_known_records.retain(|k, value| {
632 self.known_authorities.contains_key(k) && !value.record.is_expired(now)
633 });
634
635 authorities.shuffle(&mut thread_rng());
636 self.pending_lookups = authorities;
637 self.in_flight_lookups.clear();
640 self.known_lookups.clear();
641
642 if let Some(metrics) = &self.metrics {
643 metrics
644 .requests_pending
645 .set(self.pending_lookups.len().try_into().unwrap_or(std::u64::MAX));
646 }
647
648 Ok(())
649 }
650
651 fn start_new_lookups(&mut self) {
652 while self.in_flight_lookups.len() < MAX_IN_FLIGHT_LOOKUPS {
653 let authority_id = match self.pending_lookups.pop() {
654 Some(authority) => authority,
655 None => return,
656 };
657 let hash = hash_authority_id(authority_id.as_ref());
658 self.network.get_value(&hash);
659 self.in_flight_lookups.insert(hash, authority_id);
660
661 if let Some(metrics) = &self.metrics {
662 metrics.requests.inc();
663 metrics
664 .requests_pending
665 .set(self.pending_lookups.len().try_into().unwrap_or(std::u64::MAX));
666 }
667 }
668 }
669
670 async fn handle_dht_event(&mut self, event: DhtEvent) {
672 match event {
673 DhtEvent::ValueFound(v) => {
674 if let Some(metrics) = &self.metrics {
675 metrics.dht_event_received.with_label_values(&["value_found"]).inc();
676 }
677
678 debug!(target: LOG_TARGET, "Value for hash '{:?}' found on Dht.", v.record.key);
679
680 if let Err(e) = self.handle_dht_value_found_event(v) {
681 if let Some(metrics) = &self.metrics {
682 metrics.handle_value_found_event_failure.inc();
683 }
684 debug!(target: LOG_TARGET, "Failed to handle Dht value found event: {}", e);
685 }
686 },
687 DhtEvent::ValueNotFound(hash) => {
688 if let Some(metrics) = &self.metrics {
689 metrics.dht_event_received.with_label_values(&["value_not_found"]).inc();
690 }
691
692 if self.in_flight_lookups.remove(&hash).is_some() {
693 debug!(target: LOG_TARGET, "Value for hash '{:?}' not found on Dht.", hash)
694 } else {
695 debug!(
696 target: LOG_TARGET,
697 "Received 'ValueNotFound' for unexpected hash '{:?}'.", hash
698 )
699 }
700 },
701 DhtEvent::ValuePut(hash) => {
702 if !self.latest_published_kad_keys.contains(&hash) {
703 return;
704 }
705
706 self.publish_interval.set_to_max();
710
711 if let Some(metrics) = &self.metrics {
712 metrics.dht_event_received.with_label_values(&["value_put"]).inc();
713 }
714
715 debug!(target: LOG_TARGET, "Successfully put hash '{:?}' on Dht.", hash)
716 },
717 DhtEvent::ValuePutFailed(hash) => {
718 if !self.latest_published_kad_keys.contains(&hash) {
719 return;
721 }
722
723 if let Some(metrics) = &self.metrics {
724 metrics.dht_event_received.with_label_values(&["value_put_failed"]).inc();
725 }
726
727 debug!(target: LOG_TARGET, "Failed to put hash '{:?}' on Dht.", hash)
728 },
729 DhtEvent::PutRecordRequest(record_key, record_value, publisher, expires) => {
730 if let Err(e) = self
731 .handle_put_record_requested(record_key, record_value, publisher, expires)
732 .await
733 {
734 debug!(target: LOG_TARGET, "Failed to handle put record request: {}", e)
735 }
736
737 if let Some(metrics) = &self.metrics {
738 metrics.dht_event_received.with_label_values(&["put_record_req"]).inc();
739 }
740 },
741 _ => {},
742 }
743 }
744
745 async fn handle_put_record_requested(
746 &mut self,
747 record_key: Key,
748 record_value: Vec<u8>,
749 publisher: Option<PeerId>,
750 expires: Option<std::time::Instant>,
751 ) -> Result<()> {
752 let publisher = publisher.ok_or(Error::MissingPublisher)?;
753
754 let best_hash = self.client.best_hash().await?;
757 if !self.known_authorities.contains_key(&record_key) &&
758 self.authorities_queried_at
759 .map(|authorities_queried_at| authorities_queried_at != best_hash)
760 .unwrap_or(true)
761 {
762 let authorities = self
763 .client
764 .authorities(best_hash)
765 .await
766 .map_err(|e| Error::CallingRuntime(e.into()))?
767 .into_iter()
768 .collect::<Vec<_>>();
769
770 self.known_authorities = authorities
771 .into_iter()
772 .map(|authority| (hash_authority_id(authority.as_ref()), authority))
773 .collect::<HashMap<_, _>>();
774
775 self.authorities_queried_at = Some(best_hash);
776 }
777
778 let authority_id =
779 self.known_authorities.get(&record_key).ok_or(Error::UnknownAuthority)?;
780 let signed_record =
781 Self::check_record_signed_with_authority_id(record_value.as_slice(), authority_id)?;
782 self.check_record_signed_with_network_key(
783 &signed_record.record,
784 signed_record.peer_signature,
785 publisher,
786 authority_id,
787 )?;
788
789 let records_creation_time: u128 =
790 schema::AuthorityRecord::decode(signed_record.record.as_slice())
791 .map_err(Error::DecodingProto)?
792 .creation_time
793 .map(|creation_time| {
794 u128::decode(&mut &creation_time.timestamp[..]).unwrap_or_default()
795 })
796 .unwrap_or_default(); let current_record_info = self.last_known_records.get(&record_key);
799 if let Some(current_record_info) = current_record_info {
802 if records_creation_time < current_record_info.creation_time {
803 debug!(
804 target: LOG_TARGET,
805 "Skip storing because record creation time {:?} is older than the current known record {:?}",
806 records_creation_time,
807 current_record_info.creation_time
808 );
809 return Ok(());
810 }
811 }
812
813 self.network.store_record(record_key, record_value, Some(publisher), expires);
814 Ok(())
815 }
816
817 fn check_record_signed_with_authority_id(
818 record: &[u8],
819 authority_id: &AuthorityId,
820 ) -> Result<schema::SignedAuthorityRecord> {
821 let signed_record: schema::SignedAuthorityRecord =
822 schema::SignedAuthorityRecord::decode(record).map_err(Error::DecodingProto)?;
823
824 let auth_signature = AuthoritySignature::decode(&mut &signed_record.auth_signature[..])
825 .map_err(Error::EncodingDecodingScale)?;
826
827 if !AuthorityPair::verify(&auth_signature, &signed_record.record, &authority_id) {
828 return Err(Error::VerifyingDhtPayload)
829 }
830
831 Ok(signed_record)
832 }
833
834 fn check_record_signed_with_network_key(
835 &self,
836 record: &Vec<u8>,
837 peer_signature: Option<PeerSignature>,
838 remote_peer_id: PeerId,
839 authority_id: &AuthorityId,
840 ) -> Result<()> {
841 if let Some(peer_signature) = peer_signature {
842 match self.network.verify(
843 remote_peer_id.into(),
844 &peer_signature.public_key,
845 &peer_signature.signature,
846 record,
847 ) {
848 Ok(true) => {},
849 Ok(false) => return Err(Error::VerifyingDhtPayload),
850 Err(error) => return Err(Error::ParsingLibp2pIdentity(error)),
851 }
852 } else if self.strict_record_validation {
853 return Err(Error::MissingPeerIdSignature)
854 } else {
855 debug!(
856 target: LOG_TARGET,
857 "Received unsigned authority discovery record from {}", authority_id
858 );
859 }
860 Ok(())
861 }
862
863 fn handle_dht_value_found_event(&mut self, peer_record: PeerRecord) -> Result<()> {
864 let remote_key = peer_record.record.key.clone();
866
867 let authority_id: AuthorityId =
868 if let Some(authority_id) = self.in_flight_lookups.remove(&remote_key) {
869 self.known_lookups.insert(remote_key.clone(), authority_id.clone());
870 authority_id
871 } else if let Some(authority_id) = self.known_lookups.get(&remote_key) {
872 authority_id.clone()
873 } else {
874 return Err(Error::ReceivingUnexpectedRecord);
875 };
876
877 let local_peer_id = self.network.local_peer_id();
878
879 let schema::SignedAuthorityRecord { record, peer_signature, .. } =
880 Self::check_record_signed_with_authority_id(
881 peer_record.record.value.as_slice(),
882 &authority_id,
883 )?;
884
885 let authority_record =
886 schema::AuthorityRecord::decode(record.as_slice()).map_err(Error::DecodingProto)?;
887
888 let records_creation_time: u128 = authority_record
889 .creation_time
890 .as_ref()
891 .map(|creation_time| {
892 u128::decode(&mut &creation_time.timestamp[..]).unwrap_or_default()
893 })
894 .unwrap_or_default(); let addresses: Vec<Multiaddr> = authority_record
897 .addresses
898 .into_iter()
899 .map(|a| a.try_into())
900 .collect::<std::result::Result<_, _>>()
901 .map_err(Error::ParsingMultiaddress)?;
902
903 let get_peer_id = |a: &Multiaddr| match a.iter().last() {
904 Some(multiaddr::Protocol::P2p(key)) => PeerId::from_multihash(key).ok(),
905 _ => None,
906 };
907
908 let addresses: Vec<Multiaddr> = addresses
910 .into_iter()
911 .filter(|a| get_peer_id(&a).filter(|p| *p != local_peer_id).is_some())
912 .collect();
913
914 let remote_peer_id = single(addresses.iter().map(|a| get_peer_id(&a)))
915 .map_err(|_| Error::ReceivingDhtValueFoundEventWithDifferentPeerIds)? .flatten()
917 .ok_or(Error::ReceivingDhtValueFoundEventWithNoPeerIds)?; self.check_record_signed_with_network_key(
923 &record,
924 peer_signature,
925 remote_peer_id,
926 &authority_id,
927 )?;
928
929 let remote_addresses: Vec<Multiaddr> =
930 addresses.into_iter().take(MAX_ADDRESSES_PER_AUTHORITY).collect();
931
932 let answering_peer_id = peer_record.peer.map(|peer| peer.into());
933
934 let addr_cache_needs_update = self.handle_new_record(
935 &authority_id,
936 remote_key.clone(),
937 RecordInfo {
938 creation_time: records_creation_time,
939 peers_with_record: answering_peer_id.into_iter().collect(),
940 record: peer_record.record,
941 },
942 );
943
944 if !remote_addresses.is_empty() && addr_cache_needs_update {
945 self.addr_cache.insert(authority_id, remote_addresses);
946 if let Some(metrics) = &self.metrics {
947 metrics
948 .known_authorities_count
949 .set(self.addr_cache.num_authority_ids().try_into().unwrap_or(std::u64::MAX));
950 }
951 }
952 Ok(())
953 }
954
955 fn handle_new_record(
958 &mut self,
959 authority_id: &AuthorityId,
960 kademlia_key: KademliaKey,
961 new_record: RecordInfo,
962 ) -> bool {
963 let current_record_info = self
964 .last_known_records
965 .entry(kademlia_key.clone())
966 .or_insert_with(|| new_record.clone());
967
968 if new_record.creation_time > current_record_info.creation_time {
969 let peers_that_need_updating = current_record_info.peers_with_record.clone();
970 self.network.put_record_to(
971 new_record.record.clone(),
972 peers_that_need_updating.clone(),
973 current_record_info.peers_with_record.is_empty(),
976 );
977 debug!(
978 target: LOG_TARGET,
979 "Found a newer record for {:?} new record creation time {:?} old record creation time {:?}",
980 authority_id, new_record.creation_time, current_record_info.creation_time
981 );
982 self.last_known_records.insert(kademlia_key, new_record);
983 return true
984 }
985
986 if new_record.creation_time == current_record_info.creation_time {
987 debug!(
990 target: LOG_TARGET,
991 "Found same record for {:?} record creation time {:?}",
992 authority_id, new_record.creation_time
993 );
994 if current_record_info.peers_with_record.len() + new_record.peers_with_record.len() <=
995 DEFAULT_KADEMLIA_REPLICATION_FACTOR
996 {
997 current_record_info.peers_with_record.extend(new_record.peers_with_record);
998 }
999 return true
1000 }
1001
1002 debug!(
1003 target: LOG_TARGET,
1004 "Found old record for {:?} received record creation time {:?} current record creation time {:?}",
1005 authority_id, new_record.creation_time, current_record_info.creation_time,
1006 );
1007 self.network.put_record_to(
1008 current_record_info.record.clone().into(),
1009 new_record.peers_with_record.clone(),
1010 new_record.peers_with_record.is_empty(),
1013 );
1014 return false
1015 }
1016
1017 async fn get_own_public_keys_within_authority_set(
1023 key_store: KeystorePtr,
1024 client: &Client,
1025 ) -> Result<HashSet<AuthorityId>> {
1026 let local_pub_keys = key_store
1027 .sr25519_public_keys(key_types::AUTHORITY_DISCOVERY)
1028 .into_iter()
1029 .collect::<HashSet<_>>();
1030
1031 let best_hash = client.best_hash().await?;
1032 let authorities = client
1033 .authorities(best_hash)
1034 .await
1035 .map_err(|e| Error::CallingRuntime(e.into()))?
1036 .into_iter()
1037 .map(Into::into)
1038 .collect::<HashSet<_>>();
1039
1040 let intersection =
1041 local_pub_keys.intersection(&authorities).cloned().map(Into::into).collect();
1042
1043 Ok(intersection)
1044 }
1045}
1046
1047#[derive(Debug, Clone, PartialEq, Eq)]
1049enum AddressType {
1050 PublicAddress(Multiaddr),
1052 GlobalListenAddress(Multiaddr),
1054 ExternalAddress(Multiaddr),
1056}
1057
1058impl AddressType {
1059 fn without_p2p(self, local_peer_id: PeerId) -> Multiaddr {
1064 let (mut address, source) = match self {
1066 AddressType::PublicAddress(address) => (address, "public address"),
1067 AddressType::GlobalListenAddress(address) => (address, "global listen address"),
1068 AddressType::ExternalAddress(address) => (address, "external address"),
1069 };
1070
1071 if let Some(multiaddr::Protocol::P2p(peer_id)) = address.iter().last() {
1072 if peer_id != *local_peer_id.as_ref() {
1073 error!(
1074 target: LOG_TARGET,
1075 "Network returned '{source}' '{address}' with peer id \
1076 not matching the local peer id '{local_peer_id}'.",
1077 );
1078 }
1079 address.pop();
1080 }
1081 address
1082 }
1083}
1084
1085pub trait NetworkProvider:
1089 NetworkDHTProvider + NetworkStateInfo + NetworkSigner + Send + Sync
1090{
1091}
1092
1093impl<T> NetworkProvider for T where
1094 T: NetworkDHTProvider + NetworkStateInfo + NetworkSigner + Send + Sync
1095{
1096}
1097
1098fn hash_authority_id(id: &[u8]) -> KademliaKey {
1099 KademliaKey::new(&Code::Sha2_256.digest(id).digest())
1100}
1101
1102fn single<T>(values: impl IntoIterator<Item = T>) -> std::result::Result<Option<T>, ()>
1107where
1108 T: PartialEq<T>,
1109{
1110 values.into_iter().try_fold(None, |acc, item| match acc {
1111 None => Ok(Some(item)),
1112 Some(ref prev) if *prev != item => Err(()),
1113 Some(x) => Ok(Some(x)),
1114 })
1115}
1116
1117fn serialize_addresses(addresses: impl Iterator<Item = Multiaddr>) -> Vec<Vec<u8>> {
1118 addresses.map(|a| a.to_vec()).collect()
1119}
1120
1121fn build_creation_time() -> schema::TimestampInfo {
1122 let creation_time = SystemTime::now()
1123 .duration_since(UNIX_EPOCH)
1124 .map(|time| time.as_nanos())
1125 .unwrap_or_default();
1126 schema::TimestampInfo { timestamp: creation_time.encode() }
1127}
1128
1129fn serialize_authority_record(
1130 addresses: Vec<Vec<u8>>,
1131 creation_time: Option<schema::TimestampInfo>,
1132) -> Result<Vec<u8>> {
1133 let mut serialized_record = vec![];
1134
1135 schema::AuthorityRecord { addresses, creation_time }
1136 .encode(&mut serialized_record)
1137 .map_err(Error::EncodingProto)?;
1138 Ok(serialized_record)
1139}
1140
1141fn sign_record_with_peer_id(
1142 serialized_record: &[u8],
1143 network: &impl NetworkSigner,
1144) -> Result<schema::PeerSignature> {
1145 let signature = network
1146 .sign_with_local_identity(serialized_record.to_vec())
1147 .map_err(|e| Error::CannotSign(format!("{} (network packet)", e)))?;
1148 let public_key = signature.public_key.encode_protobuf();
1149 let signature = signature.bytes;
1150 Ok(schema::PeerSignature { signature, public_key })
1151}
1152
1153fn sign_record_with_authority_ids(
1154 serialized_record: Vec<u8>,
1155 peer_signature: Option<schema::PeerSignature>,
1156 key_store: &dyn Keystore,
1157 keys: Vec<AuthorityId>,
1158) -> Result<Vec<(KademliaKey, Vec<u8>)>> {
1159 let mut result = Vec::with_capacity(keys.len());
1160
1161 for key in keys.iter() {
1162 let auth_signature = key_store
1163 .sr25519_sign(key_types::AUTHORITY_DISCOVERY, key.as_ref(), &serialized_record)
1164 .map_err(|e| Error::CannotSign(format!("{}. Key: {:?}", e, key)))?
1165 .ok_or_else(|| {
1166 Error::CannotSign(format!("Could not find key in keystore. Key: {:?}", key))
1167 })?;
1168
1169 let auth_signature = auth_signature.encode();
1171 let signed_record = schema::SignedAuthorityRecord {
1172 record: serialized_record.clone(),
1173 auth_signature,
1174 peer_signature: peer_signature.clone(),
1175 }
1176 .encode_to_vec();
1177
1178 result.push((hash_authority_id(key.as_slice()), signed_record));
1179 }
1180
1181 Ok(result)
1182}
1183
1184#[derive(Clone)]
1186pub(crate) struct Metrics {
1187 publish: Counter<U64>,
1188 amount_addresses_last_published: Gauge<U64>,
1189 requests: Counter<U64>,
1190 requests_pending: Gauge<U64>,
1191 dht_event_received: CounterVec<U64>,
1192 handle_value_found_event_failure: Counter<U64>,
1193 known_authorities_count: Gauge<U64>,
1194}
1195
1196impl Metrics {
1197 pub(crate) fn register(registry: &prometheus_endpoint::Registry) -> Result<Self> {
1198 Ok(Self {
1199 publish: register(
1200 Counter::new(
1201 "substrate_authority_discovery_times_published_total",
1202 "Number of times authority discovery has published external addresses.",
1203 )?,
1204 registry,
1205 )?,
1206 amount_addresses_last_published: register(
1207 Gauge::new(
1208 "substrate_authority_discovery_amount_external_addresses_last_published",
1209 "Number of external addresses published when authority discovery last \
1210 published addresses.",
1211 )?,
1212 registry,
1213 )?,
1214 requests: register(
1215 Counter::new(
1216 "substrate_authority_discovery_authority_addresses_requested_total",
1217 "Number of times authority discovery has requested external addresses of a \
1218 single authority.",
1219 )?,
1220 registry,
1221 )?,
1222 requests_pending: register(
1223 Gauge::new(
1224 "substrate_authority_discovery_authority_address_requests_pending",
1225 "Number of pending authority address requests.",
1226 )?,
1227 registry,
1228 )?,
1229 dht_event_received: register(
1230 CounterVec::new(
1231 Opts::new(
1232 "substrate_authority_discovery_dht_event_received",
1233 "Number of dht events received by authority discovery.",
1234 ),
1235 &["name"],
1236 )?,
1237 registry,
1238 )?,
1239 handle_value_found_event_failure: register(
1240 Counter::new(
1241 "substrate_authority_discovery_handle_value_found_event_failure",
1242 "Number of times handling a dht value found event failed.",
1243 )?,
1244 registry,
1245 )?,
1246 known_authorities_count: register(
1247 Gauge::new(
1248 "substrate_authority_discovery_known_authorities_count",
1249 "Number of authorities known by authority discovery.",
1250 )?,
1251 registry,
1252 )?,
1253 })
1254 }
1255}
1256
1257#[cfg(test)]
1259impl<Block: BlockT, Client, DhtEventStream> Worker<Client, Block, DhtEventStream> {
1260 pub(crate) fn inject_addresses(&mut self, authority: AuthorityId, addresses: Vec<Multiaddr>) {
1261 self.addr_cache.insert(authority, addresses)
1262 }
1263
1264 pub(crate) fn contains_authority(&self, authority: &AuthorityId) -> bool {
1265 self.addr_cache.get_addresses_by_authority_id(authority).is_some()
1266 }
1267}