1pub(crate) use crate::worker::addr_cache::AddrCache;
20use crate::{
21 error::{Error, Result},
22 interval::ExpIncInterval,
23 ServicetoWorkerMsg, WorkerConfig,
24};
25
26use std::{
27 collections::{HashMap, HashSet},
28 marker::PhantomData,
29 path::PathBuf,
30 sync::Arc,
31 time::{Duration, Instant, SystemTime, UNIX_EPOCH},
32};
33
34use futures::{channel::mpsc, future, stream::Fuse, FutureExt, Stream, StreamExt};
35
36use codec::{Decode, Encode};
37use ip_network::IpNetwork;
38use linked_hash_set::LinkedHashSet;
39use sc_network_types::kad::{Key, PeerRecord, Record};
40
41use log::{debug, error, info, trace};
42use prometheus_endpoint::{register, Counter, CounterVec, Gauge, Opts, U64};
43use prost::Message;
44use rand::{seq::SliceRandom, thread_rng};
45
46use sc_network::{
47 config::DEFAULT_KADEMLIA_REPLICATION_FACTOR, event::DhtEvent, multiaddr, KademliaKey,
48 Multiaddr, NetworkDHTProvider, NetworkSigner, NetworkStateInfo,
49};
50use sc_network_types::{multihash::Code, PeerId};
51use schema::PeerSignature;
52use sp_api::{ApiError, ProvideRuntimeApi};
53use sp_authority_discovery::{
54 AuthorityDiscoveryApi, AuthorityId, AuthorityPair, AuthoritySignature,
55};
56use sp_blockchain::HeaderBackend;
57use sp_core::{
58 crypto::{key_types, ByteArray, Pair},
59 traits::SpawnNamed,
60};
61use sp_keystore::{Keystore, KeystorePtr};
62use sp_runtime::traits::Block as BlockT;
63
64mod addr_cache;
65mod schema {
67 #[cfg(test)]
68 mod tests;
69
70 include!(concat!(env!("OUT_DIR"), "/authority_discovery_v3.rs"));
71}
72#[cfg(test)]
73pub mod tests;
74
75const LOG_TARGET: &str = "sub-authority-discovery";
76pub(crate) const ADDR_CACHE_FILE_NAME: &str = "authority_discovery_addr_cache.json";
77const ADDR_CACHE_PERSIST_INTERVAL: Duration = Duration::from_secs(60 * 10); const MAX_ADDRESSES_PER_AUTHORITY: usize = 16;
81
82const MAX_GLOBAL_LISTEN_ADDRESSES: usize = 4;
84
85const MAX_ADDRESSES_TO_PUBLISH: usize = 32;
87
88const MAX_IN_FLIGHT_LOOKUPS: usize = 8;
90
91pub enum Role {
93 PublishAndDiscover(KeystorePtr),
95 Discover,
97}
98
99pub struct Worker<Client, Block: BlockT, DhtEventStream> {
126 from_service: Fuse<mpsc::Receiver<ServicetoWorkerMsg>>,
128
129 client: Arc<Client>,
130
131 network: Arc<dyn NetworkProvider>,
132
133 dht_event_rx: DhtEventStream,
135
136 publish_interval: ExpIncInterval,
138
139 publish_if_changed_interval: ExpIncInterval,
142
143 latest_published_keys: HashSet<AuthorityId>,
146 latest_published_kad_keys: HashSet<KademliaKey>,
149
150 publish_non_global_ips: bool,
152
153 public_addresses: LinkedHashSet<Multiaddr>,
156
157 strict_record_validation: bool,
159
160 query_interval: ExpIncInterval,
162
163 pending_lookups: Vec<AuthorityId>,
165
166 known_authorities: HashMap<KademliaKey, AuthorityId>,
168
169 authorities_queried_at: Option<Block::Hash>,
171
172 in_flight_lookups: HashMap<KademliaKey, AuthorityId>,
174
175 known_lookups: HashMap<KademliaKey, AuthorityId>,
179
180 last_known_records: HashMap<KademliaKey, RecordInfo>,
184
185 addr_cache: addr_cache::AddrCache,
186
187 metrics: Option<Metrics>,
188
189 warn_public_addresses: bool,
191
192 role: Role,
193
194 phantom: PhantomData<Block>,
195
196 spawner: Box<dyn SpawnNamed>,
198
199 persisted_cache_file_path: Option<PathBuf>,
203}
204
205#[derive(Debug, Clone)]
206struct RecordInfo {
207 creation_time: u128,
209 peers_with_record: HashSet<PeerId>,
212 record: Record,
214}
215
216#[async_trait::async_trait]
219pub trait AuthorityDiscovery<Block: BlockT> {
220 async fn authorities(&self, at: Block::Hash)
222 -> std::result::Result<Vec<AuthorityId>, ApiError>;
223
224 async fn best_hash(&self) -> std::result::Result<Block::Hash, Error>;
226}
227
228#[async_trait::async_trait]
229impl<Block, T> AuthorityDiscovery<Block> for T
230where
231 T: ProvideRuntimeApi<Block> + HeaderBackend<Block> + Send + Sync,
232 T::Api: AuthorityDiscoveryApi<Block>,
233 Block: BlockT,
234{
235 async fn authorities(
236 &self,
237 at: Block::Hash,
238 ) -> std::result::Result<Vec<AuthorityId>, ApiError> {
239 self.runtime_api().authorities(at)
240 }
241
242 async fn best_hash(&self) -> std::result::Result<Block::Hash, Error> {
243 Ok(self.info().best_hash)
244 }
245}
246
247impl<Client, Block, DhtEventStream> Worker<Client, Block, DhtEventStream>
248where
249 Block: BlockT + Unpin + 'static,
250 Client: AuthorityDiscovery<Block> + 'static,
251 DhtEventStream: Stream<Item = DhtEvent> + Unpin,
252{
253 pub(crate) fn new(
255 from_service: mpsc::Receiver<ServicetoWorkerMsg>,
256 client: Arc<Client>,
257 network: Arc<dyn NetworkProvider>,
258 dht_event_rx: DhtEventStream,
259 role: Role,
260 prometheus_registry: Option<prometheus_endpoint::Registry>,
261 config: WorkerConfig,
262 spawner: impl SpawnNamed + 'static,
263 ) -> Self {
264 let publish_interval =
271 ExpIncInterval::new(Duration::from_secs(2), config.max_publish_interval);
272 let query_interval = ExpIncInterval::new(Duration::from_secs(2), config.max_query_interval);
273
274 let publish_if_changed_interval =
277 ExpIncInterval::new(config.keystore_refresh_interval, config.keystore_refresh_interval);
278
279 let maybe_persisted_cache_file_path =
280 config.persisted_cache_directory.as_ref().map(|dir| {
281 let mut path = dir.clone();
282 path.push(ADDR_CACHE_FILE_NAME);
283 path
284 });
285
286 let addr_cache: AddrCache = if let Some(persisted_cache_file_path) =
290 maybe_persisted_cache_file_path.as_ref()
291 {
292 let loaded =
293 AddrCache::try_from(persisted_cache_file_path.as_path()).unwrap_or_else(|e| {
294 info!(target: LOG_TARGET, "Failed to load AddrCache from file, using empty instead: {}", e);
295 AddrCache::new()
296 });
297 info!(target: LOG_TARGET, "Loaded persisted AddrCache with {} authority ids.", loaded.num_authority_ids());
298 loaded
299 } else {
300 info!(target: LOG_TARGET, "No persisted cache file path provided, authority discovery will not persist the address cache to disk.");
301 AddrCache::new()
302 };
303
304 let metrics = match prometheus_registry {
305 Some(registry) => match Metrics::register(®istry) {
306 Ok(metrics) => Some(metrics),
307 Err(e) => {
308 error!(target: LOG_TARGET, "Failed to register metrics: {}", e);
309 None
310 },
311 },
312 None => None,
313 };
314
315 let public_addresses = {
316 let local_peer_id = network.local_peer_id();
317
318 config
319 .public_addresses
320 .into_iter()
321 .map(|address| AddressType::PublicAddress(address).without_p2p(local_peer_id))
322 .collect()
323 };
324
325 Worker {
326 from_service: from_service.fuse(),
327 client,
328 network,
329 dht_event_rx,
330 publish_interval,
331 known_authorities: Default::default(),
332 authorities_queried_at: None,
333 publish_if_changed_interval,
334 latest_published_keys: HashSet::new(),
335 latest_published_kad_keys: HashSet::new(),
336 publish_non_global_ips: config.publish_non_global_ips,
337 public_addresses,
338 strict_record_validation: config.strict_record_validation,
339 query_interval,
340 pending_lookups: Vec::new(),
341 in_flight_lookups: HashMap::new(),
342 known_lookups: HashMap::new(),
343 addr_cache,
344 role,
345 metrics,
346 warn_public_addresses: false,
347 phantom: PhantomData,
348 last_known_records: HashMap::new(),
349 spawner: Box::new(spawner),
350 persisted_cache_file_path: maybe_persisted_cache_file_path,
351 }
352 }
353
354 pub fn persist_addr_cache_if_supported(&self) {
356 let Some(path) = self.persisted_cache_file_path.as_ref().cloned() else {
357 return;
358 };
359 let cloned_cache = self.addr_cache.clone();
360 self.spawner.spawn_blocking(
361 "persist-addr-cache",
362 Some("authority-discovery-worker"),
363 Box::pin(async move {
364 cloned_cache.serialize_and_persist(path);
365 }),
366 )
367 }
368
369 pub async fn run(mut self) {
371 let mut persist_interval = tokio::time::interval(ADDR_CACHE_PERSIST_INTERVAL);
372
373 loop {
374 self.start_new_lookups();
375
376 futures::select! {
377 _ = persist_interval.tick().fuse() => {
378 self.persist_addr_cache_if_supported();
379 },
380 event = self.dht_event_rx.next().fuse() => {
382 if let Some(event) = event {
383 self.handle_dht_event(event).await;
384 } else {
385 self.persist_addr_cache_if_supported();
386 return;
389 }
390 },
391 msg = self.from_service.select_next_some() => {
393 self.process_message_from_service(msg);
394 },
395 only_if_changed = future::select(
397 self.publish_interval.next().map(|_| false),
398 self.publish_if_changed_interval.next().map(|_| true)
399 ).map(|e| e.factor_first().0).fuse() => {
400 if let Err(e) = self.publish_ext_addresses(only_if_changed).await {
401 error!(
402 target: LOG_TARGET,
403 "Failed to publish external addresses: {}", e,
404 );
405 }
406 },
407 _ = self.query_interval.next().fuse() => {
409 if let Err(e) = self.refill_pending_lookups_queue().await {
410 error!(
411 target: LOG_TARGET,
412 "Failed to request addresses of authorities: {}", e,
413 );
414 }
415 },
416 }
417 }
418 }
419
420 fn process_message_from_service(&self, msg: ServicetoWorkerMsg) {
421 match msg {
422 ServicetoWorkerMsg::GetAddressesByAuthorityId(authority, sender) => {
423 let _ = sender.send(
424 self.addr_cache.get_addresses_by_authority_id(&authority).map(Clone::clone),
425 );
426 },
427 ServicetoWorkerMsg::GetAuthorityIdsByPeerId(peer_id, sender) => {
428 let _ = sender
429 .send(self.addr_cache.get_authority_ids_by_peer_id(&peer_id).map(Clone::clone));
430 },
431 }
432 }
433
434 fn addresses_to_publish(&mut self) -> impl Iterator<Item = Multiaddr> {
435 let local_peer_id = self.network.local_peer_id();
436 let publish_non_global_ips = self.publish_non_global_ips;
437
438 let address_is_global = |address: &Multiaddr| {
440 address.iter().all(|protocol| match protocol {
441 multiaddr::Protocol::Ip4(ip) => IpNetwork::from(ip).is_global(),
444 multiaddr::Protocol::Ip6(ip) => IpNetwork::from(ip).is_global(),
445 _ => true,
446 })
447 };
448
449 let address_has_port = |address: &Multiaddr| {
451 address.iter().any(|protocol| {
452 matches!(
453 protocol,
454 multiaddr::Protocol::Tcp(_) |
455 multiaddr::Protocol::Udp(_) |
456 multiaddr::Protocol::Memory(_)
457 )
458 })
459 };
460
461 let mut global_listen_addresses = self
469 .network
470 .listen_addresses()
471 .into_iter()
472 .filter_map(|address| {
473 (address_is_global(&address) && address_has_port(&address))
474 .then(|| AddressType::GlobalListenAddress(address).without_p2p(local_peer_id))
475 })
476 .take(MAX_GLOBAL_LISTEN_ADDRESSES)
477 .peekable();
478
479 let mut external_addresses = self
481 .network
482 .external_addresses()
483 .into_iter()
484 .filter_map(|address| {
485 (address_has_port(&address) &&
487 (publish_non_global_ips || address_is_global(&address)))
488 .then(|| AddressType::ExternalAddress(address).without_p2p(local_peer_id))
489 })
490 .peekable();
491
492 let has_global_listen_addresses = global_listen_addresses.peek().is_some();
493 trace!(
494 target: LOG_TARGET,
495 "Node has public addresses: {}, global listen addresses: {}, external addresses: {}",
496 !self.public_addresses.is_empty(),
497 has_global_listen_addresses,
498 external_addresses.peek().is_some(),
499 );
500
501 let mut seen_addresses = HashSet::new();
502
503 let addresses = self
504 .public_addresses
505 .clone()
506 .into_iter()
507 .filter(address_has_port)
508 .chain(global_listen_addresses)
509 .chain(external_addresses)
510 .filter(|address| seen_addresses.insert(address.clone()))
512 .take(MAX_ADDRESSES_TO_PUBLISH)
513 .collect::<Vec<_>>();
514
515 if !addresses.is_empty() {
516 debug!(
517 target: LOG_TARGET,
518 "Publishing authority DHT record peer_id='{local_peer_id}' with addresses='{addresses:?}'",
519 );
520
521 if !self.warn_public_addresses &&
522 self.public_addresses.is_empty() &&
523 !has_global_listen_addresses
524 {
525 self.warn_public_addresses = true;
526
527 error!(
528 target: LOG_TARGET,
529 "No public addresses configured and no global listen addresses found. \
530 Authority DHT record may contain unreachable addresses. \
531 Consider setting `--public-addr` to the public IP address of this node. \
532 This will become a hard requirement in future versions for authorities."
533 );
534 }
535 }
536
537 addresses
539 .into_iter()
540 .map(move |a| a.with(multiaddr::Protocol::P2p(*local_peer_id.as_ref())))
541 }
542
543 async fn publish_ext_addresses(&mut self, only_if_changed: bool) -> Result<()> {
548 let key_store = match &self.role {
549 Role::PublishAndDiscover(key_store) => key_store,
550 Role::Discover => return Ok(()),
551 }
552 .clone();
553
554 let addresses = serialize_addresses(self.addresses_to_publish());
555 if addresses.is_empty() {
556 trace!(
557 target: LOG_TARGET,
558 "No addresses to publish. Skipping publication."
559 );
560
561 self.publish_interval.set_to_start();
562 return Ok(());
563 }
564
565 let keys =
566 Worker::<Client, Block, DhtEventStream>::get_own_public_keys_within_authority_set(
567 key_store.clone(),
568 self.client.as_ref(),
569 )
570 .await?
571 .into_iter()
572 .collect::<HashSet<_>>();
573
574 if only_if_changed {
575 if keys == self.latest_published_keys {
578 return Ok(());
579 }
580
581 self.publish_interval.set_to_start();
584 self.query_interval.set_to_start();
585 }
586
587 if let Some(metrics) = &self.metrics {
588 metrics.publish.inc();
589 metrics
590 .amount_addresses_last_published
591 .set(addresses.len().try_into().unwrap_or(std::u64::MAX));
592 }
593
594 let serialized_record = serialize_authority_record(addresses, Some(build_creation_time()))?;
595 let peer_signature = sign_record_with_peer_id(&serialized_record, &self.network)?;
596
597 let keys_vec = keys.iter().cloned().collect::<Vec<_>>();
598
599 let kv_pairs = sign_record_with_authority_ids(
600 serialized_record,
601 Some(peer_signature),
602 key_store.as_ref(),
603 keys_vec,
604 )?;
605
606 self.latest_published_kad_keys = kv_pairs.iter().map(|(k, _)| k.clone()).collect();
607
608 for (key, value) in kv_pairs.into_iter() {
609 self.network.put_value(key, value);
610 }
611
612 self.latest_published_keys = keys;
613
614 Ok(())
615 }
616
617 async fn refill_pending_lookups_queue(&mut self) -> Result<()> {
618 let best_hash = self.client.best_hash().await?;
619
620 let local_keys = match &self.role {
621 Role::PublishAndDiscover(key_store) => key_store
622 .sr25519_public_keys(key_types::AUTHORITY_DISCOVERY)
623 .into_iter()
624 .collect::<HashSet<_>>(),
625 Role::Discover => HashSet::new(),
626 };
627
628 let mut authorities = self
629 .client
630 .authorities(best_hash)
631 .await
632 .map_err(|e| Error::CallingRuntime(e.into()))?
633 .into_iter()
634 .filter(|id| !local_keys.contains(id.as_ref()))
635 .collect::<Vec<_>>();
636
637 self.known_authorities = authorities
638 .clone()
639 .into_iter()
640 .map(|authority| (hash_authority_id(authority.as_ref()), authority))
641 .collect::<HashMap<_, _>>();
642 self.authorities_queried_at = Some(best_hash);
643
644 self.addr_cache.retain_ids(&authorities);
645 let now = Instant::now();
646 self.last_known_records.retain(|k, value| {
647 self.known_authorities.contains_key(k) && !value.record.is_expired(now)
648 });
649
650 authorities.shuffle(&mut thread_rng());
651 self.pending_lookups = authorities;
652 self.in_flight_lookups.clear();
655 self.known_lookups.clear();
656
657 if let Some(metrics) = &self.metrics {
658 metrics
659 .requests_pending
660 .set(self.pending_lookups.len().try_into().unwrap_or(std::u64::MAX));
661 }
662
663 Ok(())
664 }
665
666 fn start_new_lookups(&mut self) {
667 while self.in_flight_lookups.len() < MAX_IN_FLIGHT_LOOKUPS {
668 let authority_id = match self.pending_lookups.pop() {
669 Some(authority) => authority,
670 None => return,
671 };
672 let hash = hash_authority_id(authority_id.as_ref());
673 self.network.get_value(&hash);
674 self.in_flight_lookups.insert(hash, authority_id);
675
676 if let Some(metrics) = &self.metrics {
677 metrics.requests.inc();
678 metrics
679 .requests_pending
680 .set(self.pending_lookups.len().try_into().unwrap_or(std::u64::MAX));
681 }
682 }
683 }
684
685 async fn handle_dht_event(&mut self, event: DhtEvent) {
687 match event {
688 DhtEvent::ValueFound(v) => {
689 if let Some(metrics) = &self.metrics {
690 metrics.dht_event_received.with_label_values(&["value_found"]).inc();
691 }
692
693 debug!(target: LOG_TARGET, "Value for hash '{:?}' found on Dht.", v.record.key);
694
695 if let Err(e) = self.handle_dht_value_found_event(v) {
696 if let Some(metrics) = &self.metrics {
697 metrics.handle_value_found_event_failure.inc();
698 }
699 debug!(target: LOG_TARGET, "Failed to handle Dht value found event: {}", e);
700 }
701 },
702 DhtEvent::ValueNotFound(hash) => {
703 if let Some(metrics) = &self.metrics {
704 metrics.dht_event_received.with_label_values(&["value_not_found"]).inc();
705 }
706
707 if self.in_flight_lookups.remove(&hash).is_some() {
708 debug!(target: LOG_TARGET, "Value for hash '{:?}' not found on Dht.", hash)
709 } else {
710 debug!(
711 target: LOG_TARGET,
712 "Received 'ValueNotFound' for unexpected hash '{:?}'.", hash
713 )
714 }
715 },
716 DhtEvent::ValuePut(hash) => {
717 if !self.latest_published_kad_keys.contains(&hash) {
718 return;
719 }
720
721 self.publish_interval.set_to_max();
725
726 if let Some(metrics) = &self.metrics {
727 metrics.dht_event_received.with_label_values(&["value_put"]).inc();
728 }
729
730 debug!(target: LOG_TARGET, "Successfully put hash '{:?}' on Dht.", hash)
731 },
732 DhtEvent::ValuePutFailed(hash) => {
733 if !self.latest_published_kad_keys.contains(&hash) {
734 return;
736 }
737
738 if let Some(metrics) = &self.metrics {
739 metrics.dht_event_received.with_label_values(&["value_put_failed"]).inc();
740 }
741
742 debug!(target: LOG_TARGET, "Failed to put hash '{:?}' on Dht.", hash)
743 },
744 DhtEvent::PutRecordRequest(record_key, record_value, publisher, expires) => {
745 if let Err(e) = self
746 .handle_put_record_requested(record_key, record_value, publisher, expires)
747 .await
748 {
749 debug!(target: LOG_TARGET, "Failed to handle put record request: {}", e)
750 }
751
752 if let Some(metrics) = &self.metrics {
753 metrics.dht_event_received.with_label_values(&["put_record_req"]).inc();
754 }
755 },
756 _ => {},
757 }
758 }
759
760 async fn handle_put_record_requested(
761 &mut self,
762 record_key: Key,
763 record_value: Vec<u8>,
764 publisher: Option<PeerId>,
765 expires: Option<std::time::Instant>,
766 ) -> Result<()> {
767 let publisher = publisher.ok_or(Error::MissingPublisher)?;
768
769 let best_hash = self.client.best_hash().await?;
772 if !self.known_authorities.contains_key(&record_key) &&
773 self.authorities_queried_at
774 .map(|authorities_queried_at| authorities_queried_at != best_hash)
775 .unwrap_or(true)
776 {
777 let authorities = self
778 .client
779 .authorities(best_hash)
780 .await
781 .map_err(|e| Error::CallingRuntime(e.into()))?
782 .into_iter()
783 .collect::<Vec<_>>();
784
785 self.known_authorities = authorities
786 .into_iter()
787 .map(|authority| (hash_authority_id(authority.as_ref()), authority))
788 .collect::<HashMap<_, _>>();
789
790 self.authorities_queried_at = Some(best_hash);
791 }
792
793 let authority_id =
794 self.known_authorities.get(&record_key).ok_or(Error::UnknownAuthority)?;
795 let signed_record =
796 Self::check_record_signed_with_authority_id(record_value.as_slice(), authority_id)?;
797 self.check_record_signed_with_network_key(
798 &signed_record.record,
799 signed_record.peer_signature,
800 publisher,
801 authority_id,
802 )?;
803
804 let records_creation_time: u128 =
805 schema::AuthorityRecord::decode(signed_record.record.as_slice())
806 .map_err(Error::DecodingProto)?
807 .creation_time
808 .map(|creation_time| {
809 u128::decode(&mut &creation_time.timestamp[..]).unwrap_or_default()
810 })
811 .unwrap_or_default(); let current_record_info = self.last_known_records.get(&record_key);
814 if let Some(current_record_info) = current_record_info {
817 if records_creation_time < current_record_info.creation_time {
818 debug!(
819 target: LOG_TARGET,
820 "Skip storing because record creation time {:?} is older than the current known record {:?}",
821 records_creation_time,
822 current_record_info.creation_time
823 );
824 return Ok(());
825 }
826 }
827
828 self.network.store_record(record_key, record_value, Some(publisher), expires);
829 Ok(())
830 }
831
832 fn check_record_signed_with_authority_id(
833 record: &[u8],
834 authority_id: &AuthorityId,
835 ) -> Result<schema::SignedAuthorityRecord> {
836 let signed_record: schema::SignedAuthorityRecord =
837 schema::SignedAuthorityRecord::decode(record).map_err(Error::DecodingProto)?;
838
839 let auth_signature = AuthoritySignature::decode(&mut &signed_record.auth_signature[..])
840 .map_err(Error::EncodingDecodingScale)?;
841
842 if !AuthorityPair::verify(&auth_signature, &signed_record.record, &authority_id) {
843 return Err(Error::VerifyingDhtPayload);
844 }
845
846 Ok(signed_record)
847 }
848
849 fn check_record_signed_with_network_key(
850 &self,
851 record: &Vec<u8>,
852 peer_signature: Option<PeerSignature>,
853 remote_peer_id: PeerId,
854 authority_id: &AuthorityId,
855 ) -> Result<()> {
856 if let Some(peer_signature) = peer_signature {
857 match self.network.verify(
858 remote_peer_id.into(),
859 &peer_signature.public_key,
860 &peer_signature.signature,
861 record,
862 ) {
863 Ok(true) => {},
864 Ok(false) => return Err(Error::VerifyingDhtPayload),
865 Err(error) => return Err(Error::ParsingLibp2pIdentity(error)),
866 }
867 } else if self.strict_record_validation {
868 return Err(Error::MissingPeerIdSignature);
869 } else {
870 debug!(
871 target: LOG_TARGET,
872 "Received unsigned authority discovery record from {}", authority_id
873 );
874 }
875 Ok(())
876 }
877
878 fn handle_dht_value_found_event(&mut self, peer_record: PeerRecord) -> Result<()> {
879 let remote_key = peer_record.record.key.clone();
881
882 let authority_id: AuthorityId =
883 if let Some(authority_id) = self.in_flight_lookups.remove(&remote_key) {
884 self.known_lookups.insert(remote_key.clone(), authority_id.clone());
885 authority_id
886 } else if let Some(authority_id) = self.known_lookups.get(&remote_key) {
887 authority_id.clone()
888 } else {
889 return Err(Error::ReceivingUnexpectedRecord);
890 };
891
892 let local_peer_id = self.network.local_peer_id();
893
894 let schema::SignedAuthorityRecord { record, peer_signature, .. } =
895 Self::check_record_signed_with_authority_id(
896 peer_record.record.value.as_slice(),
897 &authority_id,
898 )?;
899
900 let authority_record =
901 schema::AuthorityRecord::decode(record.as_slice()).map_err(Error::DecodingProto)?;
902
903 let records_creation_time: u128 = authority_record
904 .creation_time
905 .as_ref()
906 .map(|creation_time| {
907 u128::decode(&mut &creation_time.timestamp[..]).unwrap_or_default()
908 })
909 .unwrap_or_default(); let addresses: Vec<Multiaddr> = authority_record
912 .addresses
913 .into_iter()
914 .map(|a| a.try_into())
915 .collect::<std::result::Result<_, _>>()
916 .map_err(Error::ParsingMultiaddress)?;
917
918 let get_peer_id = |a: &Multiaddr| match a.iter().last() {
919 Some(multiaddr::Protocol::P2p(key)) => PeerId::from_multihash(key).ok(),
920 _ => None,
921 };
922
923 let addresses: Vec<Multiaddr> = addresses
925 .into_iter()
926 .filter(|a| get_peer_id(&a).filter(|p| *p != local_peer_id).is_some())
927 .collect();
928
929 let remote_peer_id = single(addresses.iter().map(|a| get_peer_id(&a)))
930 .map_err(|_| Error::ReceivingDhtValueFoundEventWithDifferentPeerIds)? .flatten()
932 .ok_or(Error::ReceivingDhtValueFoundEventWithNoPeerIds)?; self.check_record_signed_with_network_key(
938 &record,
939 peer_signature,
940 remote_peer_id,
941 &authority_id,
942 )?;
943
944 let remote_addresses: Vec<Multiaddr> =
945 addresses.into_iter().take(MAX_ADDRESSES_PER_AUTHORITY).collect();
946
947 let answering_peer_id = peer_record.peer.map(|peer| peer.into());
948
949 let addr_cache_needs_update = self.handle_new_record(
950 &authority_id,
951 remote_key.clone(),
952 RecordInfo {
953 creation_time: records_creation_time,
954 peers_with_record: answering_peer_id.into_iter().collect(),
955 record: peer_record.record,
956 },
957 );
958
959 if !remote_addresses.is_empty() && addr_cache_needs_update {
960 self.addr_cache.insert(authority_id, remote_addresses);
961 if let Some(metrics) = &self.metrics {
962 metrics
963 .known_authorities_count
964 .set(self.addr_cache.num_authority_ids().try_into().unwrap_or(std::u64::MAX));
965 }
966 }
967 Ok(())
968 }
969
970 fn handle_new_record(
973 &mut self,
974 authority_id: &AuthorityId,
975 kademlia_key: KademliaKey,
976 new_record: RecordInfo,
977 ) -> bool {
978 let current_record_info = self
979 .last_known_records
980 .entry(kademlia_key.clone())
981 .or_insert_with(|| new_record.clone());
982
983 if new_record.creation_time > current_record_info.creation_time {
984 let peers_that_need_updating = current_record_info.peers_with_record.clone();
985 self.network.put_record_to(
986 new_record.record.clone(),
987 peers_that_need_updating.clone(),
988 current_record_info.peers_with_record.is_empty(),
991 );
992 debug!(
993 target: LOG_TARGET,
994 "Found a newer record for {:?} new record creation time {:?} old record creation time {:?}",
995 authority_id, new_record.creation_time, current_record_info.creation_time
996 );
997 self.last_known_records.insert(kademlia_key, new_record);
998 return true;
999 }
1000
1001 if new_record.creation_time == current_record_info.creation_time {
1002 debug!(
1005 target: LOG_TARGET,
1006 "Found same record for {:?} record creation time {:?}",
1007 authority_id, new_record.creation_time
1008 );
1009 if current_record_info.peers_with_record.len() + new_record.peers_with_record.len() <=
1010 DEFAULT_KADEMLIA_REPLICATION_FACTOR
1011 {
1012 current_record_info.peers_with_record.extend(new_record.peers_with_record);
1013 }
1014 return true;
1015 }
1016
1017 debug!(
1018 target: LOG_TARGET,
1019 "Found old record for {:?} received record creation time {:?} current record creation time {:?}",
1020 authority_id, new_record.creation_time, current_record_info.creation_time,
1021 );
1022 self.network.put_record_to(
1023 current_record_info.record.clone().into(),
1024 new_record.peers_with_record.clone(),
1025 new_record.peers_with_record.is_empty(),
1028 );
1029 return false;
1030 }
1031
1032 async fn get_own_public_keys_within_authority_set(
1038 key_store: KeystorePtr,
1039 client: &Client,
1040 ) -> Result<HashSet<AuthorityId>> {
1041 let local_pub_keys = key_store
1042 .sr25519_public_keys(key_types::AUTHORITY_DISCOVERY)
1043 .into_iter()
1044 .collect::<HashSet<_>>();
1045
1046 let best_hash = client.best_hash().await?;
1047 let authorities = client
1048 .authorities(best_hash)
1049 .await
1050 .map_err(|e| Error::CallingRuntime(e.into()))?
1051 .into_iter()
1052 .map(Into::into)
1053 .collect::<HashSet<_>>();
1054
1055 let intersection =
1056 local_pub_keys.intersection(&authorities).cloned().map(Into::into).collect();
1057
1058 Ok(intersection)
1059 }
1060}
1061
1062#[derive(Debug, Clone, PartialEq, Eq)]
1064enum AddressType {
1065 PublicAddress(Multiaddr),
1067 GlobalListenAddress(Multiaddr),
1069 ExternalAddress(Multiaddr),
1071}
1072
1073impl AddressType {
1074 fn without_p2p(self, local_peer_id: PeerId) -> Multiaddr {
1079 let (mut address, source) = match self {
1081 AddressType::PublicAddress(address) => (address, "public address"),
1082 AddressType::GlobalListenAddress(address) => (address, "global listen address"),
1083 AddressType::ExternalAddress(address) => (address, "external address"),
1084 };
1085
1086 if let Some(multiaddr::Protocol::P2p(peer_id)) = address.iter().last() {
1087 if peer_id != *local_peer_id.as_ref() {
1088 error!(
1089 target: LOG_TARGET,
1090 "Network returned '{source}' '{address}' with peer id \
1091 not matching the local peer id '{local_peer_id}'.",
1092 );
1093 }
1094 address.pop();
1095 }
1096 address
1097 }
1098}
1099
1100pub trait NetworkProvider:
1104 NetworkDHTProvider + NetworkStateInfo + NetworkSigner + Send + Sync
1105{
1106}
1107
1108impl<T> NetworkProvider for T where
1109 T: NetworkDHTProvider + NetworkStateInfo + NetworkSigner + Send + Sync
1110{
1111}
1112
1113fn hash_authority_id(id: &[u8]) -> KademliaKey {
1114 KademliaKey::new(&Code::Sha2_256.digest(id).digest())
1115}
1116
1117fn single<T>(values: impl IntoIterator<Item = T>) -> std::result::Result<Option<T>, ()>
1122where
1123 T: PartialEq<T>,
1124{
1125 values.into_iter().try_fold(None, |acc, item| match acc {
1126 None => Ok(Some(item)),
1127 Some(ref prev) if *prev != item => Err(()),
1128 Some(x) => Ok(Some(x)),
1129 })
1130}
1131
1132fn serialize_addresses(addresses: impl Iterator<Item = Multiaddr>) -> Vec<Vec<u8>> {
1133 addresses.map(|a| a.to_vec()).collect()
1134}
1135
1136fn build_creation_time() -> schema::TimestampInfo {
1137 let creation_time = SystemTime::now()
1138 .duration_since(UNIX_EPOCH)
1139 .map(|time| time.as_nanos())
1140 .unwrap_or_default();
1141 schema::TimestampInfo { timestamp: creation_time.encode() }
1142}
1143
1144fn serialize_authority_record(
1145 addresses: Vec<Vec<u8>>,
1146 creation_time: Option<schema::TimestampInfo>,
1147) -> Result<Vec<u8>> {
1148 let mut serialized_record = vec![];
1149
1150 schema::AuthorityRecord { addresses, creation_time }
1151 .encode(&mut serialized_record)
1152 .map_err(Error::EncodingProto)?;
1153 Ok(serialized_record)
1154}
1155
1156fn sign_record_with_peer_id(
1157 serialized_record: &[u8],
1158 network: &impl NetworkSigner,
1159) -> Result<schema::PeerSignature> {
1160 let signature = network
1161 .sign_with_local_identity(serialized_record.to_vec())
1162 .map_err(|e| Error::CannotSign(format!("{} (network packet)", e)))?;
1163 let public_key = signature.public_key.encode_protobuf();
1164 let signature = signature.bytes;
1165 Ok(schema::PeerSignature { signature, public_key })
1166}
1167
1168fn sign_record_with_authority_ids(
1169 serialized_record: Vec<u8>,
1170 peer_signature: Option<schema::PeerSignature>,
1171 key_store: &dyn Keystore,
1172 keys: Vec<AuthorityId>,
1173) -> Result<Vec<(KademliaKey, Vec<u8>)>> {
1174 let mut result = Vec::with_capacity(keys.len());
1175
1176 for key in keys.iter() {
1177 let auth_signature = key_store
1178 .sr25519_sign(key_types::AUTHORITY_DISCOVERY, key.as_ref(), &serialized_record)
1179 .map_err(|e| Error::CannotSign(format!("{}. Key: {:?}", e, key)))?
1180 .ok_or_else(|| {
1181 Error::CannotSign(format!("Could not find key in keystore. Key: {:?}", key))
1182 })?;
1183
1184 let auth_signature = auth_signature.encode();
1186 let signed_record = schema::SignedAuthorityRecord {
1187 record: serialized_record.clone(),
1188 auth_signature,
1189 peer_signature: peer_signature.clone(),
1190 }
1191 .encode_to_vec();
1192
1193 result.push((hash_authority_id(key.as_slice()), signed_record));
1194 }
1195
1196 Ok(result)
1197}
1198
1199#[derive(Clone)]
1201pub(crate) struct Metrics {
1202 publish: Counter<U64>,
1203 amount_addresses_last_published: Gauge<U64>,
1204 requests: Counter<U64>,
1205 requests_pending: Gauge<U64>,
1206 dht_event_received: CounterVec<U64>,
1207 handle_value_found_event_failure: Counter<U64>,
1208 known_authorities_count: Gauge<U64>,
1209}
1210
1211impl Metrics {
1212 pub(crate) fn register(registry: &prometheus_endpoint::Registry) -> Result<Self> {
1213 Ok(Self {
1214 publish: register(
1215 Counter::new(
1216 "substrate_authority_discovery_times_published_total",
1217 "Number of times authority discovery has published external addresses.",
1218 )?,
1219 registry,
1220 )?,
1221 amount_addresses_last_published: register(
1222 Gauge::new(
1223 "substrate_authority_discovery_amount_external_addresses_last_published",
1224 "Number of external addresses published when authority discovery last \
1225 published addresses.",
1226 )?,
1227 registry,
1228 )?,
1229 requests: register(
1230 Counter::new(
1231 "substrate_authority_discovery_authority_addresses_requested_total",
1232 "Number of times authority discovery has requested external addresses of a \
1233 single authority.",
1234 )?,
1235 registry,
1236 )?,
1237 requests_pending: register(
1238 Gauge::new(
1239 "substrate_authority_discovery_authority_address_requests_pending",
1240 "Number of pending authority address requests.",
1241 )?,
1242 registry,
1243 )?,
1244 dht_event_received: register(
1245 CounterVec::new(
1246 Opts::new(
1247 "substrate_authority_discovery_dht_event_received",
1248 "Number of dht events received by authority discovery.",
1249 ),
1250 &["name"],
1251 )?,
1252 registry,
1253 )?,
1254 handle_value_found_event_failure: register(
1255 Counter::new(
1256 "substrate_authority_discovery_handle_value_found_event_failure",
1257 "Number of times handling a dht value found event failed.",
1258 )?,
1259 registry,
1260 )?,
1261 known_authorities_count: register(
1262 Gauge::new(
1263 "substrate_authority_discovery_known_authorities_count",
1264 "Number of authorities known by authority discovery.",
1265 )?,
1266 registry,
1267 )?,
1268 })
1269 }
1270}
1271
1272#[cfg(test)]
1274impl<Block: BlockT, Client, DhtEventStream> Worker<Client, Block, DhtEventStream> {
1275 pub(crate) fn inject_addresses(&mut self, authority: AuthorityId, addresses: Vec<Multiaddr>) {
1276 self.addr_cache.insert(authority, addresses)
1277 }
1278
1279 pub(crate) fn contains_authority(&self, authority: &AuthorityId) -> bool {
1280 self.addr_cache.get_addresses_by_authority_id(authority).is_some()
1281 }
1282}