1use crate::{
20 error::{Error, Result},
21 interval::ExpIncInterval,
22 ServicetoWorkerMsg, WorkerConfig,
23};
24
25use std::{
26 collections::{HashMap, HashSet},
27 marker::PhantomData,
28 sync::Arc,
29 time::{Duration, Instant, SystemTime, UNIX_EPOCH},
30};
31
32use futures::{channel::mpsc, future, stream::Fuse, FutureExt, Stream, StreamExt};
33
34use addr_cache::AddrCache;
35use codec::{Decode, Encode};
36use ip_network::IpNetwork;
37use libp2p::kad::{PeerRecord, Record};
38use linked_hash_set::LinkedHashSet;
39
40use log::{debug, error, trace};
41use prometheus_endpoint::{register, Counter, CounterVec, Gauge, Opts, U64};
42use prost::Message;
43use rand::{seq::SliceRandom, thread_rng};
44
45use sc_network::{
46 config::DEFAULT_KADEMLIA_REPLICATION_FACTOR, event::DhtEvent, multiaddr, KademliaKey,
47 Multiaddr, NetworkDHTProvider, NetworkSigner, NetworkStateInfo,
48};
49use sc_network_types::{multihash::Code, PeerId};
50use schema::PeerSignature;
51use sp_api::{ApiError, ProvideRuntimeApi};
52use sp_authority_discovery::{
53 AuthorityDiscoveryApi, AuthorityId, AuthorityPair, AuthoritySignature,
54};
55use sp_blockchain::HeaderBackend;
56use sp_core::crypto::{key_types, ByteArray, Pair};
57use sp_keystore::{Keystore, KeystorePtr};
58use sp_runtime::traits::Block as BlockT;
59
60mod addr_cache;
61mod schema {
63 #[cfg(test)]
64 mod tests;
65
66 include!(concat!(env!("OUT_DIR"), "/authority_discovery_v3.rs"));
67}
68#[cfg(test)]
69pub mod tests;
70
71const LOG_TARGET: &str = "sub-authority-discovery";
72
73const MAX_ADDRESSES_PER_AUTHORITY: usize = 10;
75
76const MAX_IN_FLIGHT_LOOKUPS: usize = 8;
78
79pub enum Role {
81 PublishAndDiscover(KeystorePtr),
83 Discover,
85}
86
87pub struct Worker<Client, Block: BlockT, DhtEventStream> {
114 from_service: Fuse<mpsc::Receiver<ServicetoWorkerMsg>>,
116
117 client: Arc<Client>,
118
119 network: Arc<dyn NetworkProvider>,
120
121 dht_event_rx: DhtEventStream,
123
124 publish_interval: ExpIncInterval,
126
127 publish_if_changed_interval: ExpIncInterval,
130
131 latest_published_keys: HashSet<AuthorityId>,
134 latest_published_kad_keys: HashSet<KademliaKey>,
137
138 publish_non_global_ips: bool,
140
141 public_addresses: LinkedHashSet<Multiaddr>,
144
145 strict_record_validation: bool,
147
148 query_interval: ExpIncInterval,
150
151 pending_lookups: Vec<AuthorityId>,
153
154 known_authorities: HashMap<KademliaKey, AuthorityId>,
156
157 authorities_queried_at: Option<Block::Hash>,
159
160 in_flight_lookups: HashMap<KademliaKey, AuthorityId>,
162
163 known_lookups: HashMap<KademliaKey, AuthorityId>,
167
168 last_known_records: HashMap<KademliaKey, RecordInfo>,
172
173 addr_cache: addr_cache::AddrCache,
174
175 metrics: Option<Metrics>,
176
177 role: Role,
178
179 phantom: PhantomData<Block>,
180}
181
182#[derive(Debug, Clone)]
183struct RecordInfo {
184 creation_time: u128,
186 peers_with_record: HashSet<PeerId>,
189 record: Record,
191}
192
193#[async_trait::async_trait]
196pub trait AuthorityDiscovery<Block: BlockT> {
197 async fn authorities(&self, at: Block::Hash)
199 -> std::result::Result<Vec<AuthorityId>, ApiError>;
200
201 async fn best_hash(&self) -> std::result::Result<Block::Hash, Error>;
203}
204
205#[async_trait::async_trait]
206impl<Block, T> AuthorityDiscovery<Block> for T
207where
208 T: ProvideRuntimeApi<Block> + HeaderBackend<Block> + Send + Sync,
209 T::Api: AuthorityDiscoveryApi<Block>,
210 Block: BlockT,
211{
212 async fn authorities(
213 &self,
214 at: Block::Hash,
215 ) -> std::result::Result<Vec<AuthorityId>, ApiError> {
216 self.runtime_api().authorities(at)
217 }
218
219 async fn best_hash(&self) -> std::result::Result<Block::Hash, Error> {
220 Ok(self.info().best_hash)
221 }
222}
223
224impl<Client, Block, DhtEventStream> Worker<Client, Block, DhtEventStream>
225where
226 Block: BlockT + Unpin + 'static,
227 Client: AuthorityDiscovery<Block> + 'static,
228 DhtEventStream: Stream<Item = DhtEvent> + Unpin,
229{
230 pub(crate) fn new(
232 from_service: mpsc::Receiver<ServicetoWorkerMsg>,
233 client: Arc<Client>,
234 network: Arc<dyn NetworkProvider>,
235 dht_event_rx: DhtEventStream,
236 role: Role,
237 prometheus_registry: Option<prometheus_endpoint::Registry>,
238 config: WorkerConfig,
239 ) -> Self {
240 let publish_interval =
247 ExpIncInterval::new(Duration::from_secs(2), config.max_publish_interval);
248 let query_interval = ExpIncInterval::new(Duration::from_secs(2), config.max_query_interval);
249
250 let publish_if_changed_interval =
253 ExpIncInterval::new(config.keystore_refresh_interval, config.keystore_refresh_interval);
254
255 let addr_cache = AddrCache::new();
256
257 let metrics = match prometheus_registry {
258 Some(registry) => match Metrics::register(®istry) {
259 Ok(metrics) => Some(metrics),
260 Err(e) => {
261 error!(target: LOG_TARGET, "Failed to register metrics: {}", e);
262 None
263 },
264 },
265 None => None,
266 };
267
268 let public_addresses = {
269 let local_peer_id = network.local_peer_id();
270
271 config
272 .public_addresses
273 .into_iter()
274 .map(|mut address| {
275 if let Some(multiaddr::Protocol::P2p(peer_id)) = address.iter().last() {
276 if peer_id != *local_peer_id.as_ref() {
277 error!(
278 target: LOG_TARGET,
279 "Discarding invalid local peer ID in public address {address}.",
280 );
281 }
282 address.pop();
285 }
286 address
287 })
288 .collect()
289 };
290
291 Worker {
292 from_service: from_service.fuse(),
293 client,
294 network,
295 dht_event_rx,
296 publish_interval,
297 known_authorities: Default::default(),
298 authorities_queried_at: None,
299 publish_if_changed_interval,
300 latest_published_keys: HashSet::new(),
301 latest_published_kad_keys: HashSet::new(),
302 publish_non_global_ips: config.publish_non_global_ips,
303 public_addresses,
304 strict_record_validation: config.strict_record_validation,
305 query_interval,
306 pending_lookups: Vec::new(),
307 in_flight_lookups: HashMap::new(),
308 known_lookups: HashMap::new(),
309 addr_cache,
310 role,
311 metrics,
312 phantom: PhantomData,
313 last_known_records: HashMap::new(),
314 }
315 }
316
317 pub async fn run(mut self) {
319 loop {
320 self.start_new_lookups();
321
322 futures::select! {
323 event = self.dht_event_rx.next().fuse() => {
325 if let Some(event) = event {
326 self.handle_dht_event(event).await;
327 } else {
328 return;
331 }
332 },
333 msg = self.from_service.select_next_some() => {
335 self.process_message_from_service(msg);
336 },
337 only_if_changed = future::select(
339 self.publish_interval.next().map(|_| false),
340 self.publish_if_changed_interval.next().map(|_| true)
341 ).map(|e| e.factor_first().0).fuse() => {
342 if let Err(e) = self.publish_ext_addresses(only_if_changed).await {
343 error!(
344 target: LOG_TARGET,
345 "Failed to publish external addresses: {}", e,
346 );
347 }
348 },
349 _ = self.query_interval.next().fuse() => {
351 if let Err(e) = self.refill_pending_lookups_queue().await {
352 error!(
353 target: LOG_TARGET,
354 "Failed to request addresses of authorities: {}", e,
355 );
356 }
357 },
358 }
359 }
360 }
361
362 fn process_message_from_service(&self, msg: ServicetoWorkerMsg) {
363 match msg {
364 ServicetoWorkerMsg::GetAddressesByAuthorityId(authority, sender) => {
365 let _ = sender.send(
366 self.addr_cache.get_addresses_by_authority_id(&authority).map(Clone::clone),
367 );
368 },
369 ServicetoWorkerMsg::GetAuthorityIdsByPeerId(peer_id, sender) => {
370 let _ = sender
371 .send(self.addr_cache.get_authority_ids_by_peer_id(&peer_id).map(Clone::clone));
372 },
373 }
374 }
375
376 fn addresses_to_publish(&self) -> impl Iterator<Item = Multiaddr> {
377 let local_peer_id = self.network.local_peer_id();
378 let publish_non_global_ips = self.publish_non_global_ips;
379 let addresses = self
380 .public_addresses
381 .clone()
382 .into_iter()
383 .chain(self.network.external_addresses().into_iter().filter_map(|mut address| {
384 if let Some(multiaddr::Protocol::P2p(peer_id)) = address.iter().last() {
386 if peer_id != *local_peer_id.as_ref() {
387 error!(
388 target: LOG_TARGET,
389 "Network returned external address '{address}' with peer id \
390 not matching the local peer id '{local_peer_id}'.",
391 );
392 debug_assert!(false);
393 }
394 address.pop();
395 }
396
397 if self.public_addresses.contains(&address) {
398 None
400 } else {
401 Some(address)
402 }
403 }))
404 .filter(move |address| {
405 if publish_non_global_ips {
406 return true
407 }
408
409 address.iter().all(|protocol| match protocol {
410 multiaddr::Protocol::Ip4(ip) if !IpNetwork::from(ip).is_global() => false,
413 multiaddr::Protocol::Ip6(ip) if !IpNetwork::from(ip).is_global() => false,
414 _ => true,
415 })
416 })
417 .collect::<Vec<_>>();
418
419 if !addresses.is_empty() {
420 debug!(
421 target: LOG_TARGET,
422 "Publishing authority DHT record peer_id='{local_peer_id}' with addresses='{addresses:?}'",
423 );
424 }
425
426 addresses
428 .into_iter()
429 .map(move |a| a.with(multiaddr::Protocol::P2p(*local_peer_id.as_ref())))
430 }
431
432 async fn publish_ext_addresses(&mut self, only_if_changed: bool) -> Result<()> {
437 let key_store = match &self.role {
438 Role::PublishAndDiscover(key_store) => key_store,
439 Role::Discover => return Ok(()),
440 };
441
442 let addresses = serialize_addresses(self.addresses_to_publish());
443 if addresses.is_empty() {
444 trace!(
445 target: LOG_TARGET,
446 "No addresses to publish. Skipping publication."
447 );
448
449 self.publish_interval.set_to_start();
450 return Ok(())
451 }
452
453 let keys =
454 Worker::<Client, Block, DhtEventStream>::get_own_public_keys_within_authority_set(
455 key_store.clone(),
456 self.client.as_ref(),
457 )
458 .await?
459 .into_iter()
460 .collect::<HashSet<_>>();
461
462 if only_if_changed {
463 if keys == self.latest_published_keys {
466 return Ok(())
467 }
468
469 self.publish_interval.set_to_start();
472 self.query_interval.set_to_start();
473 }
474
475 if let Some(metrics) = &self.metrics {
476 metrics.publish.inc();
477 metrics
478 .amount_addresses_last_published
479 .set(addresses.len().try_into().unwrap_or(std::u64::MAX));
480 }
481
482 let serialized_record = serialize_authority_record(addresses, Some(build_creation_time()))?;
483 let peer_signature = sign_record_with_peer_id(&serialized_record, &self.network)?;
484
485 let keys_vec = keys.iter().cloned().collect::<Vec<_>>();
486
487 let kv_pairs = sign_record_with_authority_ids(
488 serialized_record,
489 Some(peer_signature),
490 key_store.as_ref(),
491 keys_vec,
492 )?;
493
494 self.latest_published_kad_keys = kv_pairs.iter().map(|(k, _)| k.clone()).collect();
495
496 for (key, value) in kv_pairs.into_iter() {
497 self.network.put_value(key, value);
498 }
499
500 self.latest_published_keys = keys;
501
502 Ok(())
503 }
504
505 async fn refill_pending_lookups_queue(&mut self) -> Result<()> {
506 let best_hash = self.client.best_hash().await?;
507
508 let local_keys = match &self.role {
509 Role::PublishAndDiscover(key_store) => key_store
510 .sr25519_public_keys(key_types::AUTHORITY_DISCOVERY)
511 .into_iter()
512 .collect::<HashSet<_>>(),
513 Role::Discover => HashSet::new(),
514 };
515
516 let mut authorities = self
517 .client
518 .authorities(best_hash)
519 .await
520 .map_err(|e| Error::CallingRuntime(e.into()))?
521 .into_iter()
522 .filter(|id| !local_keys.contains(id.as_ref()))
523 .collect::<Vec<_>>();
524
525 self.known_authorities = authorities
526 .clone()
527 .into_iter()
528 .map(|authority| (hash_authority_id(authority.as_ref()), authority))
529 .collect::<HashMap<_, _>>();
530 self.authorities_queried_at = Some(best_hash);
531
532 self.addr_cache.retain_ids(&authorities);
533 let now = Instant::now();
534 self.last_known_records.retain(|k, value| {
535 self.known_authorities.contains_key(k) && !value.record.is_expired(now)
536 });
537
538 authorities.shuffle(&mut thread_rng());
539 self.pending_lookups = authorities;
540 self.in_flight_lookups.clear();
543 self.known_lookups.clear();
544
545 if let Some(metrics) = &self.metrics {
546 metrics
547 .requests_pending
548 .set(self.pending_lookups.len().try_into().unwrap_or(std::u64::MAX));
549 }
550
551 Ok(())
552 }
553
554 fn start_new_lookups(&mut self) {
555 while self.in_flight_lookups.len() < MAX_IN_FLIGHT_LOOKUPS {
556 let authority_id = match self.pending_lookups.pop() {
557 Some(authority) => authority,
558 None => return,
559 };
560 let hash = hash_authority_id(authority_id.as_ref());
561 self.network.get_value(&hash);
562 self.in_flight_lookups.insert(hash, authority_id);
563
564 if let Some(metrics) = &self.metrics {
565 metrics.requests.inc();
566 metrics
567 .requests_pending
568 .set(self.pending_lookups.len().try_into().unwrap_or(std::u64::MAX));
569 }
570 }
571 }
572
573 async fn handle_dht_event(&mut self, event: DhtEvent) {
575 match event {
576 DhtEvent::ValueFound(v) => {
577 if let Some(metrics) = &self.metrics {
578 metrics.dht_event_received.with_label_values(&["value_found"]).inc();
579 }
580
581 debug!(target: LOG_TARGET, "Value for hash '{:?}' found on Dht.", v.record.key);
582
583 if let Err(e) = self.handle_dht_value_found_event(v) {
584 if let Some(metrics) = &self.metrics {
585 metrics.handle_value_found_event_failure.inc();
586 }
587 debug!(target: LOG_TARGET, "Failed to handle Dht value found event: {}", e);
588 }
589 },
590 DhtEvent::ValueNotFound(hash) => {
591 if let Some(metrics) = &self.metrics {
592 metrics.dht_event_received.with_label_values(&["value_not_found"]).inc();
593 }
594
595 if self.in_flight_lookups.remove(&hash).is_some() {
596 debug!(target: LOG_TARGET, "Value for hash '{:?}' not found on Dht.", hash)
597 } else {
598 debug!(
599 target: LOG_TARGET,
600 "Received 'ValueNotFound' for unexpected hash '{:?}'.", hash
601 )
602 }
603 },
604 DhtEvent::ValuePut(hash) => {
605 if !self.latest_published_kad_keys.contains(&hash) {
606 return;
607 }
608
609 self.publish_interval.set_to_max();
613
614 if let Some(metrics) = &self.metrics {
615 metrics.dht_event_received.with_label_values(&["value_put"]).inc();
616 }
617
618 debug!(target: LOG_TARGET, "Successfully put hash '{:?}' on Dht.", hash)
619 },
620 DhtEvent::ValuePutFailed(hash) => {
621 if !self.latest_published_kad_keys.contains(&hash) {
622 return;
624 }
625
626 if let Some(metrics) = &self.metrics {
627 metrics.dht_event_received.with_label_values(&["value_put_failed"]).inc();
628 }
629
630 debug!(target: LOG_TARGET, "Failed to put hash '{:?}' on Dht.", hash)
631 },
632 DhtEvent::PutRecordRequest(record_key, record_value, publisher, expires) => {
633 if let Err(e) = self
634 .handle_put_record_requested(record_key, record_value, publisher, expires)
635 .await
636 {
637 debug!(target: LOG_TARGET, "Failed to handle put record request: {}", e)
638 }
639
640 if let Some(metrics) = &self.metrics {
641 metrics.dht_event_received.with_label_values(&["put_record_req"]).inc();
642 }
643 },
644 }
645 }
646
647 async fn handle_put_record_requested(
648 &mut self,
649 record_key: KademliaKey,
650 record_value: Vec<u8>,
651 publisher: Option<PeerId>,
652 expires: Option<std::time::Instant>,
653 ) -> Result<()> {
654 let publisher = publisher.ok_or(Error::MissingPublisher)?;
655
656 let best_hash = self.client.best_hash().await?;
659 if !self.known_authorities.contains_key(&record_key) &&
660 self.authorities_queried_at
661 .map(|authorities_queried_at| authorities_queried_at != best_hash)
662 .unwrap_or(true)
663 {
664 let authorities = self
665 .client
666 .authorities(best_hash)
667 .await
668 .map_err(|e| Error::CallingRuntime(e.into()))?
669 .into_iter()
670 .collect::<Vec<_>>();
671
672 self.known_authorities = authorities
673 .into_iter()
674 .map(|authority| (hash_authority_id(authority.as_ref()), authority))
675 .collect::<HashMap<_, _>>();
676
677 self.authorities_queried_at = Some(best_hash);
678 }
679
680 let authority_id =
681 self.known_authorities.get(&record_key).ok_or(Error::UnknownAuthority)?;
682 let signed_record =
683 Self::check_record_signed_with_authority_id(record_value.as_slice(), authority_id)?;
684 self.check_record_signed_with_network_key(
685 &signed_record.record,
686 signed_record.peer_signature,
687 publisher,
688 authority_id,
689 )?;
690
691 let records_creation_time: u128 =
692 schema::AuthorityRecord::decode(signed_record.record.as_slice())
693 .map_err(Error::DecodingProto)?
694 .creation_time
695 .map(|creation_time| {
696 u128::decode(&mut &creation_time.timestamp[..]).unwrap_or_default()
697 })
698 .unwrap_or_default(); let current_record_info = self.last_known_records.get(&record_key);
701 if let Some(current_record_info) = current_record_info {
704 if records_creation_time < current_record_info.creation_time {
705 debug!(
706 target: LOG_TARGET,
707 "Skip storing because record creation time {:?} is older than the current known record {:?}",
708 records_creation_time,
709 current_record_info.creation_time
710 );
711 return Ok(());
712 }
713 }
714
715 self.network.store_record(record_key, record_value, Some(publisher), expires);
716 Ok(())
717 }
718
719 fn check_record_signed_with_authority_id(
720 record: &[u8],
721 authority_id: &AuthorityId,
722 ) -> Result<schema::SignedAuthorityRecord> {
723 let signed_record: schema::SignedAuthorityRecord =
724 schema::SignedAuthorityRecord::decode(record).map_err(Error::DecodingProto)?;
725
726 let auth_signature = AuthoritySignature::decode(&mut &signed_record.auth_signature[..])
727 .map_err(Error::EncodingDecodingScale)?;
728
729 if !AuthorityPair::verify(&auth_signature, &signed_record.record, &authority_id) {
730 return Err(Error::VerifyingDhtPayload)
731 }
732
733 Ok(signed_record)
734 }
735
736 fn check_record_signed_with_network_key(
737 &self,
738 record: &Vec<u8>,
739 peer_signature: Option<PeerSignature>,
740 remote_peer_id: PeerId,
741 authority_id: &AuthorityId,
742 ) -> Result<()> {
743 if let Some(peer_signature) = peer_signature {
744 match self.network.verify(
745 remote_peer_id.into(),
746 &peer_signature.public_key,
747 &peer_signature.signature,
748 record,
749 ) {
750 Ok(true) => {},
751 Ok(false) => return Err(Error::VerifyingDhtPayload),
752 Err(error) => return Err(Error::ParsingLibp2pIdentity(error)),
753 }
754 } else if self.strict_record_validation {
755 return Err(Error::MissingPeerIdSignature)
756 } else {
757 debug!(
758 target: LOG_TARGET,
759 "Received unsigned authority discovery record from {}", authority_id
760 );
761 }
762 Ok(())
763 }
764
765 fn handle_dht_value_found_event(&mut self, peer_record: PeerRecord) -> Result<()> {
766 let remote_key = peer_record.record.key.clone();
768
769 let authority_id: AuthorityId =
770 if let Some(authority_id) = self.in_flight_lookups.remove(&remote_key) {
771 self.known_lookups.insert(remote_key.clone(), authority_id.clone());
772 authority_id
773 } else if let Some(authority_id) = self.known_lookups.get(&remote_key) {
774 authority_id.clone()
775 } else {
776 return Err(Error::ReceivingUnexpectedRecord);
777 };
778
779 let local_peer_id = self.network.local_peer_id();
780
781 let schema::SignedAuthorityRecord { record, peer_signature, .. } =
782 Self::check_record_signed_with_authority_id(
783 peer_record.record.value.as_slice(),
784 &authority_id,
785 )?;
786
787 let authority_record =
788 schema::AuthorityRecord::decode(record.as_slice()).map_err(Error::DecodingProto)?;
789
790 let records_creation_time: u128 = authority_record
791 .creation_time
792 .as_ref()
793 .map(|creation_time| {
794 u128::decode(&mut &creation_time.timestamp[..]).unwrap_or_default()
795 })
796 .unwrap_or_default(); let addresses: Vec<Multiaddr> = authority_record
799 .addresses
800 .into_iter()
801 .map(|a| a.try_into())
802 .collect::<std::result::Result<_, _>>()
803 .map_err(Error::ParsingMultiaddress)?;
804
805 let get_peer_id = |a: &Multiaddr| match a.iter().last() {
806 Some(multiaddr::Protocol::P2p(key)) => PeerId::from_multihash(key).ok(),
807 _ => None,
808 };
809
810 let addresses: Vec<Multiaddr> = addresses
812 .into_iter()
813 .filter(|a| get_peer_id(&a).filter(|p| *p != local_peer_id).is_some())
814 .collect();
815
816 let remote_peer_id = single(addresses.iter().map(|a| get_peer_id(&a)))
817 .map_err(|_| Error::ReceivingDhtValueFoundEventWithDifferentPeerIds)? .flatten()
819 .ok_or(Error::ReceivingDhtValueFoundEventWithNoPeerIds)?; self.check_record_signed_with_network_key(
825 &record,
826 peer_signature,
827 remote_peer_id,
828 &authority_id,
829 )?;
830
831 let remote_addresses: Vec<Multiaddr> =
832 addresses.into_iter().take(MAX_ADDRESSES_PER_AUTHORITY).collect();
833
834 let answering_peer_id = peer_record.peer.map(|peer| peer.into());
835
836 let addr_cache_needs_update = self.handle_new_record(
837 &authority_id,
838 remote_key.clone(),
839 RecordInfo {
840 creation_time: records_creation_time,
841 peers_with_record: answering_peer_id.into_iter().collect(),
842 record: peer_record.record,
843 },
844 );
845
846 if !remote_addresses.is_empty() && addr_cache_needs_update {
847 self.addr_cache.insert(authority_id, remote_addresses);
848 if let Some(metrics) = &self.metrics {
849 metrics
850 .known_authorities_count
851 .set(self.addr_cache.num_authority_ids().try_into().unwrap_or(std::u64::MAX));
852 }
853 }
854 Ok(())
855 }
856
857 fn handle_new_record(
860 &mut self,
861 authority_id: &AuthorityId,
862 kademlia_key: KademliaKey,
863 new_record: RecordInfo,
864 ) -> bool {
865 let current_record_info = self
866 .last_known_records
867 .entry(kademlia_key.clone())
868 .or_insert_with(|| new_record.clone());
869
870 if new_record.creation_time > current_record_info.creation_time {
871 let peers_that_need_updating = current_record_info.peers_with_record.clone();
872 self.network.put_record_to(
873 new_record.record.clone(),
874 peers_that_need_updating.clone(),
875 current_record_info.peers_with_record.is_empty(),
878 );
879 debug!(
880 target: LOG_TARGET,
881 "Found a newer record for {:?} new record creation time {:?} old record creation time {:?}",
882 authority_id, new_record.creation_time, current_record_info.creation_time
883 );
884 self.last_known_records.insert(kademlia_key, new_record);
885 return true
886 }
887
888 if new_record.creation_time == current_record_info.creation_time {
889 debug!(
892 target: LOG_TARGET,
893 "Found same record for {:?} record creation time {:?}",
894 authority_id, new_record.creation_time
895 );
896 if current_record_info.peers_with_record.len() + new_record.peers_with_record.len() <=
897 DEFAULT_KADEMLIA_REPLICATION_FACTOR
898 {
899 current_record_info.peers_with_record.extend(new_record.peers_with_record);
900 }
901 return true
902 }
903
904 debug!(
905 target: LOG_TARGET,
906 "Found old record for {:?} received record creation time {:?} current record creation time {:?}",
907 authority_id, new_record.creation_time, current_record_info.creation_time,
908 );
909 self.network.put_record_to(
910 current_record_info.record.clone(),
911 new_record.peers_with_record.clone(),
912 new_record.peers_with_record.is_empty(),
915 );
916 return false
917 }
918
919 async fn get_own_public_keys_within_authority_set(
925 key_store: KeystorePtr,
926 client: &Client,
927 ) -> Result<HashSet<AuthorityId>> {
928 let local_pub_keys = key_store
929 .sr25519_public_keys(key_types::AUTHORITY_DISCOVERY)
930 .into_iter()
931 .collect::<HashSet<_>>();
932
933 let best_hash = client.best_hash().await?;
934 let authorities = client
935 .authorities(best_hash)
936 .await
937 .map_err(|e| Error::CallingRuntime(e.into()))?
938 .into_iter()
939 .map(Into::into)
940 .collect::<HashSet<_>>();
941
942 let intersection =
943 local_pub_keys.intersection(&authorities).cloned().map(Into::into).collect();
944
945 Ok(intersection)
946 }
947}
948
949pub trait NetworkProvider:
953 NetworkDHTProvider + NetworkStateInfo + NetworkSigner + Send + Sync
954{
955}
956
957impl<T> NetworkProvider for T where
958 T: NetworkDHTProvider + NetworkStateInfo + NetworkSigner + Send + Sync
959{
960}
961
962fn hash_authority_id(id: &[u8]) -> KademliaKey {
963 KademliaKey::new(&Code::Sha2_256.digest(id).digest())
964}
965
966fn single<T>(values: impl IntoIterator<Item = T>) -> std::result::Result<Option<T>, ()>
971where
972 T: PartialEq<T>,
973{
974 values.into_iter().try_fold(None, |acc, item| match acc {
975 None => Ok(Some(item)),
976 Some(ref prev) if *prev != item => Err(()),
977 Some(x) => Ok(Some(x)),
978 })
979}
980
981fn serialize_addresses(addresses: impl Iterator<Item = Multiaddr>) -> Vec<Vec<u8>> {
982 addresses.map(|a| a.to_vec()).collect()
983}
984
985fn build_creation_time() -> schema::TimestampInfo {
986 let creation_time = SystemTime::now()
987 .duration_since(UNIX_EPOCH)
988 .map(|time| time.as_nanos())
989 .unwrap_or_default();
990 schema::TimestampInfo { timestamp: creation_time.encode() }
991}
992
993fn serialize_authority_record(
994 addresses: Vec<Vec<u8>>,
995 creation_time: Option<schema::TimestampInfo>,
996) -> Result<Vec<u8>> {
997 let mut serialized_record = vec![];
998
999 schema::AuthorityRecord { addresses, creation_time }
1000 .encode(&mut serialized_record)
1001 .map_err(Error::EncodingProto)?;
1002 Ok(serialized_record)
1003}
1004
1005fn sign_record_with_peer_id(
1006 serialized_record: &[u8],
1007 network: &impl NetworkSigner,
1008) -> Result<schema::PeerSignature> {
1009 let signature = network
1010 .sign_with_local_identity(serialized_record.to_vec())
1011 .map_err(|e| Error::CannotSign(format!("{} (network packet)", e)))?;
1012 let public_key = signature.public_key.encode_protobuf();
1013 let signature = signature.bytes;
1014 Ok(schema::PeerSignature { signature, public_key })
1015}
1016
1017fn sign_record_with_authority_ids(
1018 serialized_record: Vec<u8>,
1019 peer_signature: Option<schema::PeerSignature>,
1020 key_store: &dyn Keystore,
1021 keys: Vec<AuthorityId>,
1022) -> Result<Vec<(KademliaKey, Vec<u8>)>> {
1023 let mut result = Vec::with_capacity(keys.len());
1024
1025 for key in keys.iter() {
1026 let auth_signature = key_store
1027 .sr25519_sign(key_types::AUTHORITY_DISCOVERY, key.as_ref(), &serialized_record)
1028 .map_err(|e| Error::CannotSign(format!("{}. Key: {:?}", e, key)))?
1029 .ok_or_else(|| {
1030 Error::CannotSign(format!("Could not find key in keystore. Key: {:?}", key))
1031 })?;
1032
1033 let auth_signature = auth_signature.encode();
1035 let signed_record = schema::SignedAuthorityRecord {
1036 record: serialized_record.clone(),
1037 auth_signature,
1038 peer_signature: peer_signature.clone(),
1039 }
1040 .encode_to_vec();
1041
1042 result.push((hash_authority_id(key.as_slice()), signed_record));
1043 }
1044
1045 Ok(result)
1046}
1047
1048#[derive(Clone)]
1050pub(crate) struct Metrics {
1051 publish: Counter<U64>,
1052 amount_addresses_last_published: Gauge<U64>,
1053 requests: Counter<U64>,
1054 requests_pending: Gauge<U64>,
1055 dht_event_received: CounterVec<U64>,
1056 handle_value_found_event_failure: Counter<U64>,
1057 known_authorities_count: Gauge<U64>,
1058}
1059
1060impl Metrics {
1061 pub(crate) fn register(registry: &prometheus_endpoint::Registry) -> Result<Self> {
1062 Ok(Self {
1063 publish: register(
1064 Counter::new(
1065 "substrate_authority_discovery_times_published_total",
1066 "Number of times authority discovery has published external addresses.",
1067 )?,
1068 registry,
1069 )?,
1070 amount_addresses_last_published: register(
1071 Gauge::new(
1072 "substrate_authority_discovery_amount_external_addresses_last_published",
1073 "Number of external addresses published when authority discovery last \
1074 published addresses.",
1075 )?,
1076 registry,
1077 )?,
1078 requests: register(
1079 Counter::new(
1080 "substrate_authority_discovery_authority_addresses_requested_total",
1081 "Number of times authority discovery has requested external addresses of a \
1082 single authority.",
1083 )?,
1084 registry,
1085 )?,
1086 requests_pending: register(
1087 Gauge::new(
1088 "substrate_authority_discovery_authority_address_requests_pending",
1089 "Number of pending authority address requests.",
1090 )?,
1091 registry,
1092 )?,
1093 dht_event_received: register(
1094 CounterVec::new(
1095 Opts::new(
1096 "substrate_authority_discovery_dht_event_received",
1097 "Number of dht events received by authority discovery.",
1098 ),
1099 &["name"],
1100 )?,
1101 registry,
1102 )?,
1103 handle_value_found_event_failure: register(
1104 Counter::new(
1105 "substrate_authority_discovery_handle_value_found_event_failure",
1106 "Number of times handling a dht value found event failed.",
1107 )?,
1108 registry,
1109 )?,
1110 known_authorities_count: register(
1111 Gauge::new(
1112 "substrate_authority_discovery_known_authorities_count",
1113 "Number of authorities known by authority discovery.",
1114 )?,
1115 registry,
1116 )?,
1117 })
1118 }
1119}
1120
1121#[cfg(test)]
1123impl<Block: BlockT, Client, DhtEventStream> Worker<Client, Block, DhtEventStream> {
1124 pub(crate) fn inject_addresses(&mut self, authority: AuthorityId, addresses: Vec<Multiaddr>) {
1125 self.addr_cache.insert(authority, addresses);
1126 }
1127}