referrerpolicy=no-referrer-when-downgrade

sc_authority_discovery/
worker.rs

1// This file is part of Substrate.
2
3// Copyright (C) Parity Technologies (UK) Ltd.
4// SPDX-License-Identifier: GPL-3.0-or-later WITH Classpath-exception-2.0
5
6// This program is free software: you can redistribute it and/or modify
7// it under the terms of the GNU General Public License as published by
8// the Free Software Foundation, either version 3 of the License, or
9// (at your option) any later version.
10
11// This program is distributed in the hope that it will be useful,
12// but WITHOUT ANY WARRANTY; without even the implied warranty of
13// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
14// GNU General Public License for more details.
15
16// You should have received a copy of the GNU General Public License
17// along with this program. If not, see <https://www.gnu.org/licenses/>.
18
19pub(crate) use crate::worker::addr_cache::AddrCache;
20use crate::{
21	error::{Error, Result},
22	interval::ExpIncInterval,
23	ServicetoWorkerMsg, WorkerConfig,
24};
25
26use std::{
27	collections::{HashMap, HashSet},
28	marker::PhantomData,
29	path::PathBuf,
30	sync::Arc,
31	time::{Duration, Instant, SystemTime, UNIX_EPOCH},
32};
33
34use futures::{channel::mpsc, future, stream::Fuse, FutureExt, Stream, StreamExt};
35
36use codec::{Decode, Encode};
37use ip_network::IpNetwork;
38use linked_hash_set::LinkedHashSet;
39use sc_network_types::kad::{Key, PeerRecord, Record};
40
41use log::{debug, error, info, trace};
42use prometheus_endpoint::{register, Counter, CounterVec, Gauge, Opts, U64};
43use prost::Message;
44use rand::{seq::SliceRandom, thread_rng};
45
46use sc_network::{
47	config::DEFAULT_KADEMLIA_REPLICATION_FACTOR, event::DhtEvent, multiaddr, KademliaKey,
48	Multiaddr, NetworkDHTProvider, NetworkSigner, NetworkStateInfo,
49};
50use sc_network_types::{multihash::Code, PeerId};
51use schema::PeerSignature;
52use sp_api::{ApiError, ProvideRuntimeApi};
53use sp_authority_discovery::{
54	AuthorityDiscoveryApi, AuthorityId, AuthorityPair, AuthoritySignature,
55};
56use sp_blockchain::HeaderBackend;
57use sp_core::{
58	crypto::{key_types, ByteArray, Pair},
59	traits::SpawnNamed,
60};
61use sp_keystore::{Keystore, KeystorePtr};
62use sp_runtime::traits::Block as BlockT;
63
64mod addr_cache;
65/// Dht payload schemas generated from Protobuf definitions via Prost crate in build.rs.
66mod schema {
67	#[cfg(test)]
68	mod tests;
69
70	include!(concat!(env!("OUT_DIR"), "/authority_discovery_v3.rs"));
71}
72#[cfg(test)]
73pub mod tests;
74
75const LOG_TARGET: &str = "sub-authority-discovery";
76pub(crate) const ADDR_CACHE_FILE_NAME: &str = "authority_discovery_addr_cache.json";
77const ADDR_CACHE_PERSIST_INTERVAL: Duration = Duration::from_secs(60 * 10); // 10 minutes
78
79/// Maximum number of addresses cached per authority. Additional addresses are discarded.
80const MAX_ADDRESSES_PER_AUTHORITY: usize = 16;
81
82/// Maximum number of global listen addresses published by the node.
83const MAX_GLOBAL_LISTEN_ADDRESSES: usize = 4;
84
85/// Maximum number of addresses to publish in a single record.
86const MAX_ADDRESSES_TO_PUBLISH: usize = 32;
87
88/// Maximum number of in-flight DHT lookups at any given point in time.
89const MAX_IN_FLIGHT_LOOKUPS: usize = 8;
90
91/// Role an authority discovery [`Worker`] can run as.
92pub enum Role {
93	/// Publish own addresses and discover addresses of others.
94	PublishAndDiscover(KeystorePtr),
95	/// Discover addresses of others.
96	Discover,
97}
98
99/// An authority discovery [`Worker`] can publish the local node's addresses as well as discover
100/// those of other nodes via a Kademlia DHT.
101///
102/// When constructed with [`Role::PublishAndDiscover`] a [`Worker`] will
103///
104///    1. Retrieve its external addresses (including peer id).
105///
106///    2. Get the list of keys owned by the local node participating in the current authority set.
107///
108///    3. Sign the addresses with the keys.
109///
110///    4. Put addresses and signature as a record with the authority id as a key on a Kademlia DHT.
111///
112/// When constructed with either [`Role::PublishAndDiscover`] or [`Role::Discover`] a [`Worker`]
113/// will
114///
115///    1. Retrieve the current and next set of authorities.
116///
117///    2. Start DHT queries for the ids of the authorities.
118///
119///    3. Validate the signatures of the retrieved key value pairs.
120///
121///    4. Add the retrieved external addresses as priority nodes to the
122///    network peerset.
123///
124///    5. Allow querying of the collected addresses via the [`crate::Service`].
125pub struct Worker<Client, Block: BlockT, DhtEventStream> {
126	/// Channel receiver for messages send by a [`crate::Service`].
127	from_service: Fuse<mpsc::Receiver<ServicetoWorkerMsg>>,
128
129	client: Arc<Client>,
130
131	network: Arc<dyn NetworkProvider>,
132
133	/// Channel we receive Dht events on.
134	dht_event_rx: DhtEventStream,
135
136	/// Interval to be proactive, publishing own addresses.
137	publish_interval: ExpIncInterval,
138
139	/// Pro-actively publish our own addresses at this interval, if the keys in the keystore
140	/// have changed.
141	publish_if_changed_interval: ExpIncInterval,
142
143	/// List of keys onto which addresses have been published at the latest publication.
144	/// Used to check whether they have changed.
145	latest_published_keys: HashSet<AuthorityId>,
146	/// List of the kademlia keys that have been published at the latest publication.
147	/// Used to associate DHT events with our published records.
148	latest_published_kad_keys: HashSet<KademliaKey>,
149
150	/// Same value as in the configuration.
151	publish_non_global_ips: bool,
152
153	/// Public addresses set by the node operator to always publish first in the authority
154	/// discovery DHT record.
155	public_addresses: LinkedHashSet<Multiaddr>,
156
157	/// Same value as in the configuration.
158	strict_record_validation: bool,
159
160	/// Interval at which to request addresses of authorities, refilling the pending lookups queue.
161	query_interval: ExpIncInterval,
162
163	/// Queue of throttled lookups pending to be passed to the network.
164	pending_lookups: Vec<AuthorityId>,
165
166	/// The list of all known authorities.
167	known_authorities: HashMap<KademliaKey, AuthorityId>,
168
169	/// The last time we requested the list of authorities.
170	authorities_queried_at: Option<Block::Hash>,
171
172	/// Set of in-flight lookups.
173	in_flight_lookups: HashMap<KademliaKey, AuthorityId>,
174
175	/// Set of lookups we can still receive records.
176	/// These are the entries in the `in_flight_lookups` for which
177	/// we got at least one successfull result.
178	known_lookups: HashMap<KademliaKey, AuthorityId>,
179
180	/// Last known record by key, here we always keep the record with
181	/// the highest creation time and we don't accept records older than
182	/// that.
183	last_known_records: HashMap<KademliaKey, RecordInfo>,
184
185	addr_cache: addr_cache::AddrCache,
186
187	metrics: Option<Metrics>,
188
189	/// Flag to ensure the warning about missing public addresses is only printed once.
190	warn_public_addresses: bool,
191
192	role: Role,
193
194	phantom: PhantomData<Block>,
195
196	/// A spawner of tasks
197	spawner: Box<dyn SpawnNamed>,
198
199	/// The directory of where the persisted AddrCache file is located,
200	/// optional since NetworkConfiguration's `net_config_path` field
201	/// is optional. If None, we won't persist the AddrCache at all.
202	persisted_cache_file_path: Option<PathBuf>,
203}
204
205#[derive(Debug, Clone)]
206struct RecordInfo {
207	/// Time since UNIX_EPOCH in nanoseconds.
208	creation_time: u128,
209	/// Peers that we know have this record, bounded to no more than
210	/// DEFAULT_KADEMLIA_REPLICATION_FACTOR(20).
211	peers_with_record: HashSet<PeerId>,
212	/// The record itself.
213	record: Record,
214}
215
216/// Wrapper for [`AuthorityDiscoveryApi`](sp_authority_discovery::AuthorityDiscoveryApi). Can be
217/// be implemented by any struct without dependency on the runtime.
218#[async_trait::async_trait]
219pub trait AuthorityDiscovery<Block: BlockT> {
220	/// Retrieve authority identifiers of the current and next authority set.
221	async fn authorities(&self, at: Block::Hash)
222		-> std::result::Result<Vec<AuthorityId>, ApiError>;
223
224	/// Retrieve best block hash
225	async fn best_hash(&self) -> std::result::Result<Block::Hash, Error>;
226}
227
228#[async_trait::async_trait]
229impl<Block, T> AuthorityDiscovery<Block> for T
230where
231	T: ProvideRuntimeApi<Block> + HeaderBackend<Block> + Send + Sync,
232	T::Api: AuthorityDiscoveryApi<Block>,
233	Block: BlockT,
234{
235	async fn authorities(
236		&self,
237		at: Block::Hash,
238	) -> std::result::Result<Vec<AuthorityId>, ApiError> {
239		self.runtime_api().authorities(at)
240	}
241
242	async fn best_hash(&self) -> std::result::Result<Block::Hash, Error> {
243		Ok(self.info().best_hash)
244	}
245}
246
247impl<Client, Block, DhtEventStream> Worker<Client, Block, DhtEventStream>
248where
249	Block: BlockT + Unpin + 'static,
250	Client: AuthorityDiscovery<Block> + 'static,
251	DhtEventStream: Stream<Item = DhtEvent> + Unpin,
252{
253	/// Construct a [`Worker`].
254	pub(crate) fn new(
255		from_service: mpsc::Receiver<ServicetoWorkerMsg>,
256		client: Arc<Client>,
257		network: Arc<dyn NetworkProvider>,
258		dht_event_rx: DhtEventStream,
259		role: Role,
260		prometheus_registry: Option<prometheus_endpoint::Registry>,
261		config: WorkerConfig,
262		spawner: impl SpawnNamed + 'static,
263	) -> Self {
264		// When a node starts up publishing and querying might fail due to various reasons, for
265		// example due to being not yet fully bootstrapped on the DHT. Thus one should retry rather
266		// sooner than later. On the other hand, a long running node is likely well connected and
267		// thus timely retries are not needed. For this reasoning use an exponentially increasing
268		// interval for `publish_interval`, `query_interval` and `priority_group_set_interval`
269		// instead of a constant interval.
270		let publish_interval =
271			ExpIncInterval::new(Duration::from_secs(2), config.max_publish_interval);
272		let query_interval = ExpIncInterval::new(Duration::from_secs(2), config.max_query_interval);
273
274		// An `ExpIncInterval` is overkill here because the interval is constant, but consistency
275		// is more simple.
276		let publish_if_changed_interval =
277			ExpIncInterval::new(config.keystore_refresh_interval, config.keystore_refresh_interval);
278
279		let maybe_persisted_cache_file_path =
280			config.persisted_cache_directory.as_ref().map(|dir| {
281				let mut path = dir.clone();
282				path.push(ADDR_CACHE_FILE_NAME);
283				path
284			});
285
286		// If we have a path to persisted cache file, then we will try to
287		// load the contents of persisted cache from file, if it exists, and is valid.
288		// Create a new one otherwise.
289		let addr_cache: AddrCache = if let Some(persisted_cache_file_path) =
290			maybe_persisted_cache_file_path.as_ref()
291		{
292			let loaded =
293				AddrCache::try_from(persisted_cache_file_path.as_path()).unwrap_or_else(|e| {
294					info!(target: LOG_TARGET, "Failed to load AddrCache from file, using empty instead: {}", e);
295					AddrCache::new()
296				});
297			info!(target: LOG_TARGET, "Loaded persisted AddrCache with {} authority ids.", loaded.num_authority_ids());
298			loaded
299		} else {
300			info!(target: LOG_TARGET, "No persisted cache file path provided, authority discovery will not persist the address cache to disk.");
301			AddrCache::new()
302		};
303
304		let metrics = match prometheus_registry {
305			Some(registry) => match Metrics::register(&registry) {
306				Ok(metrics) => Some(metrics),
307				Err(e) => {
308					error!(target: LOG_TARGET, "Failed to register metrics: {}", e);
309					None
310				},
311			},
312			None => None,
313		};
314
315		let public_addresses = {
316			let local_peer_id = network.local_peer_id();
317
318			config
319				.public_addresses
320				.into_iter()
321				.map(|address| AddressType::PublicAddress(address).without_p2p(local_peer_id))
322				.collect()
323		};
324
325		Worker {
326			from_service: from_service.fuse(),
327			client,
328			network,
329			dht_event_rx,
330			publish_interval,
331			known_authorities: Default::default(),
332			authorities_queried_at: None,
333			publish_if_changed_interval,
334			latest_published_keys: HashSet::new(),
335			latest_published_kad_keys: HashSet::new(),
336			publish_non_global_ips: config.publish_non_global_ips,
337			public_addresses,
338			strict_record_validation: config.strict_record_validation,
339			query_interval,
340			pending_lookups: Vec::new(),
341			in_flight_lookups: HashMap::new(),
342			known_lookups: HashMap::new(),
343			addr_cache,
344			role,
345			metrics,
346			warn_public_addresses: false,
347			phantom: PhantomData,
348			last_known_records: HashMap::new(),
349			spawner: Box::new(spawner),
350			persisted_cache_file_path: maybe_persisted_cache_file_path,
351		}
352	}
353
354	/// Persists `AddrCache` to disk if the `persisted_cache_file_path` is set.
355	pub fn persist_addr_cache_if_supported(&self) {
356		let Some(path) = self.persisted_cache_file_path.as_ref().cloned() else {
357			return;
358		};
359		let cloned_cache = self.addr_cache.clone();
360		self.spawner.spawn_blocking(
361			"persist-addr-cache",
362			Some("authority-discovery-worker"),
363			Box::pin(async move {
364				cloned_cache.serialize_and_persist(path);
365			}),
366		)
367	}
368
369	/// Start the worker
370	pub async fn run(mut self) {
371		let mut persist_interval = tokio::time::interval(ADDR_CACHE_PERSIST_INTERVAL);
372
373		loop {
374			self.start_new_lookups();
375
376			futures::select! {
377				_ = persist_interval.tick().fuse() => {
378					self.persist_addr_cache_if_supported();
379				},
380				// Process incoming events.
381				event = self.dht_event_rx.next().fuse() => {
382					if let Some(event) = event {
383						self.handle_dht_event(event).await;
384					} else {
385						self.persist_addr_cache_if_supported();
386						// This point is reached if the network has shut down, at which point there is not
387						// much else to do than to shut down the authority discovery as well.
388						return;
389					}
390				},
391				// Handle messages from [`Service`]. Ignore if sender side is closed.
392				msg = self.from_service.select_next_some() => {
393					self.process_message_from_service(msg);
394				},
395				// Publish own addresses.
396				only_if_changed = future::select(
397					self.publish_interval.next().map(|_| false),
398					self.publish_if_changed_interval.next().map(|_| true)
399				).map(|e| e.factor_first().0).fuse() => {
400					if let Err(e) = self.publish_ext_addresses(only_if_changed).await {
401						error!(
402							target: LOG_TARGET,
403							"Failed to publish external addresses: {}", e,
404						);
405					}
406				},
407				// Request addresses of authorities.
408				_ = self.query_interval.next().fuse() => {
409					if let Err(e) = self.refill_pending_lookups_queue().await {
410						error!(
411							target: LOG_TARGET,
412							"Failed to request addresses of authorities: {}", e,
413						);
414					}
415				},
416			}
417		}
418	}
419
420	fn process_message_from_service(&self, msg: ServicetoWorkerMsg) {
421		match msg {
422			ServicetoWorkerMsg::GetAddressesByAuthorityId(authority, sender) => {
423				let _ = sender.send(
424					self.addr_cache.get_addresses_by_authority_id(&authority).map(Clone::clone),
425				);
426			},
427			ServicetoWorkerMsg::GetAuthorityIdsByPeerId(peer_id, sender) => {
428				let _ = sender
429					.send(self.addr_cache.get_authority_ids_by_peer_id(&peer_id).map(Clone::clone));
430			},
431		}
432	}
433
434	fn addresses_to_publish(&mut self) -> impl Iterator<Item = Multiaddr> {
435		let local_peer_id = self.network.local_peer_id();
436		let publish_non_global_ips = self.publish_non_global_ips;
437
438		// Checks that the address is global.
439		let address_is_global = |address: &Multiaddr| {
440			address.iter().all(|protocol| match protocol {
441				// The `ip_network` library is used because its `is_global()` method is stable,
442				// while `is_global()` in the standard library currently isn't.
443				multiaddr::Protocol::Ip4(ip) => IpNetwork::from(ip).is_global(),
444				multiaddr::Protocol::Ip6(ip) => IpNetwork::from(ip).is_global(),
445				_ => true,
446			})
447		};
448
449		// Addresses without a port cannot be dialed.
450		let address_has_port = |address: &Multiaddr| {
451			address.iter().any(|protocol| {
452				matches!(
453					protocol,
454					multiaddr::Protocol::Tcp(_) |
455						multiaddr::Protocol::Udp(_) |
456						multiaddr::Protocol::Memory(_)
457				)
458			})
459		};
460
461		// These are the addresses the node is listening for incoming connections,
462		// as reported by installed protocols (tcp / websocket etc).
463		//
464		// We double check the address is global. In other words, we double check the node
465		// is not running behind a NAT.
466		// Note: we do this regardless of the `publish_non_global_ips` setting, since the
467		// node discovers many external addresses via the identify protocol.
468		let mut global_listen_addresses = self
469			.network
470			.listen_addresses()
471			.into_iter()
472			.filter_map(|address| {
473				(address_is_global(&address) && address_has_port(&address))
474					.then(|| AddressType::GlobalListenAddress(address).without_p2p(local_peer_id))
475			})
476			.take(MAX_GLOBAL_LISTEN_ADDRESSES)
477			.peekable();
478
479		// Similar to listen addresses that takes into consideration `publish_non_global_ips`.
480		let mut external_addresses = self
481			.network
482			.external_addresses()
483			.into_iter()
484			.filter_map(|address| {
485				// Only publish addresses that have a port and are global.
486				(address_has_port(&address) &&
487					(publish_non_global_ips || address_is_global(&address)))
488				.then(|| AddressType::ExternalAddress(address).without_p2p(local_peer_id))
489			})
490			.peekable();
491
492		let has_global_listen_addresses = global_listen_addresses.peek().is_some();
493		trace!(
494			target: LOG_TARGET,
495			"Node has public addresses: {}, global listen addresses: {}, external addresses: {}",
496			!self.public_addresses.is_empty(),
497			has_global_listen_addresses,
498			external_addresses.peek().is_some(),
499		);
500
501		let mut seen_addresses = HashSet::new();
502
503		let addresses = self
504			.public_addresses
505			.clone()
506			.into_iter()
507			.filter(address_has_port)
508			.chain(global_listen_addresses)
509			.chain(external_addresses)
510			// Deduplicate addresses.
511			.filter(|address| seen_addresses.insert(address.clone()))
512			.take(MAX_ADDRESSES_TO_PUBLISH)
513			.collect::<Vec<_>>();
514
515		if !addresses.is_empty() {
516			debug!(
517				target: LOG_TARGET,
518				"Publishing authority DHT record peer_id='{local_peer_id}' with addresses='{addresses:?}'",
519			);
520
521			if !self.warn_public_addresses &&
522				self.public_addresses.is_empty() &&
523				!has_global_listen_addresses
524			{
525				self.warn_public_addresses = true;
526
527				error!(
528					target: LOG_TARGET,
529					"No public addresses configured and no global listen addresses found. \
530					Authority DHT record may contain unreachable addresses. \
531					Consider setting `--public-addr` to the public IP address of this node. \
532					This will become a hard requirement in future versions for authorities."
533				);
534			}
535		}
536
537		// The address must include the local peer id.
538		addresses
539			.into_iter()
540			.map(move |a| a.with(multiaddr::Protocol::P2p(*local_peer_id.as_ref())))
541	}
542
543	/// Publish own public addresses.
544	///
545	/// If `only_if_changed` is true, the function has no effect if the list of keys to publish
546	/// is equal to `self.latest_published_keys`.
547	async fn publish_ext_addresses(&mut self, only_if_changed: bool) -> Result<()> {
548		let key_store = match &self.role {
549			Role::PublishAndDiscover(key_store) => key_store,
550			Role::Discover => return Ok(()),
551		}
552		.clone();
553
554		let addresses = serialize_addresses(self.addresses_to_publish());
555		if addresses.is_empty() {
556			trace!(
557				target: LOG_TARGET,
558				"No addresses to publish. Skipping publication."
559			);
560
561			self.publish_interval.set_to_start();
562			return Ok(());
563		}
564
565		let keys =
566			Worker::<Client, Block, DhtEventStream>::get_own_public_keys_within_authority_set(
567				key_store.clone(),
568				self.client.as_ref(),
569			)
570			.await?
571			.into_iter()
572			.collect::<HashSet<_>>();
573
574		if only_if_changed {
575			// If the authority keys did not change and the `publish_if_changed_interval` was
576			// triggered then do nothing.
577			if keys == self.latest_published_keys {
578				return Ok(());
579			}
580
581			// We have detected a change in the authority keys, reset the timers to
582			// publish and gather data faster.
583			self.publish_interval.set_to_start();
584			self.query_interval.set_to_start();
585		}
586
587		if let Some(metrics) = &self.metrics {
588			metrics.publish.inc();
589			metrics
590				.amount_addresses_last_published
591				.set(addresses.len().try_into().unwrap_or(std::u64::MAX));
592		}
593
594		let serialized_record = serialize_authority_record(addresses, Some(build_creation_time()))?;
595		let peer_signature = sign_record_with_peer_id(&serialized_record, &self.network)?;
596
597		let keys_vec = keys.iter().cloned().collect::<Vec<_>>();
598
599		let kv_pairs = sign_record_with_authority_ids(
600			serialized_record,
601			Some(peer_signature),
602			key_store.as_ref(),
603			keys_vec,
604		)?;
605
606		self.latest_published_kad_keys = kv_pairs.iter().map(|(k, _)| k.clone()).collect();
607
608		for (key, value) in kv_pairs.into_iter() {
609			self.network.put_value(key, value);
610		}
611
612		self.latest_published_keys = keys;
613
614		Ok(())
615	}
616
617	async fn refill_pending_lookups_queue(&mut self) -> Result<()> {
618		let best_hash = self.client.best_hash().await?;
619
620		let local_keys = match &self.role {
621			Role::PublishAndDiscover(key_store) => key_store
622				.sr25519_public_keys(key_types::AUTHORITY_DISCOVERY)
623				.into_iter()
624				.collect::<HashSet<_>>(),
625			Role::Discover => HashSet::new(),
626		};
627
628		let mut authorities = self
629			.client
630			.authorities(best_hash)
631			.await
632			.map_err(|e| Error::CallingRuntime(e.into()))?
633			.into_iter()
634			.filter(|id| !local_keys.contains(id.as_ref()))
635			.collect::<Vec<_>>();
636
637		self.known_authorities = authorities
638			.clone()
639			.into_iter()
640			.map(|authority| (hash_authority_id(authority.as_ref()), authority))
641			.collect::<HashMap<_, _>>();
642		self.authorities_queried_at = Some(best_hash);
643
644		self.addr_cache.retain_ids(&authorities);
645		let now = Instant::now();
646		self.last_known_records.retain(|k, value| {
647			self.known_authorities.contains_key(k) && !value.record.is_expired(now)
648		});
649
650		authorities.shuffle(&mut thread_rng());
651		self.pending_lookups = authorities;
652		// Ignore all still in-flight lookups. Those that are still in-flight are likely stalled as
653		// query interval ticks are far enough apart for all lookups to succeed.
654		self.in_flight_lookups.clear();
655		self.known_lookups.clear();
656
657		if let Some(metrics) = &self.metrics {
658			metrics
659				.requests_pending
660				.set(self.pending_lookups.len().try_into().unwrap_or(std::u64::MAX));
661		}
662
663		Ok(())
664	}
665
666	fn start_new_lookups(&mut self) {
667		while self.in_flight_lookups.len() < MAX_IN_FLIGHT_LOOKUPS {
668			let authority_id = match self.pending_lookups.pop() {
669				Some(authority) => authority,
670				None => return,
671			};
672			let hash = hash_authority_id(authority_id.as_ref());
673			self.network.get_value(&hash);
674			self.in_flight_lookups.insert(hash, authority_id);
675
676			if let Some(metrics) = &self.metrics {
677				metrics.requests.inc();
678				metrics
679					.requests_pending
680					.set(self.pending_lookups.len().try_into().unwrap_or(std::u64::MAX));
681			}
682		}
683	}
684
685	/// Handle incoming Dht events.
686	async fn handle_dht_event(&mut self, event: DhtEvent) {
687		match event {
688			DhtEvent::ValueFound(v) => {
689				if let Some(metrics) = &self.metrics {
690					metrics.dht_event_received.with_label_values(&["value_found"]).inc();
691				}
692
693				debug!(target: LOG_TARGET, "Value for hash '{:?}' found on Dht.", v.record.key);
694
695				if let Err(e) = self.handle_dht_value_found_event(v) {
696					if let Some(metrics) = &self.metrics {
697						metrics.handle_value_found_event_failure.inc();
698					}
699					debug!(target: LOG_TARGET, "Failed to handle Dht value found event: {}", e);
700				}
701			},
702			DhtEvent::ValueNotFound(hash) => {
703				if let Some(metrics) = &self.metrics {
704					metrics.dht_event_received.with_label_values(&["value_not_found"]).inc();
705				}
706
707				if self.in_flight_lookups.remove(&hash).is_some() {
708					debug!(target: LOG_TARGET, "Value for hash '{:?}' not found on Dht.", hash)
709				} else {
710					debug!(
711						target: LOG_TARGET,
712						"Received 'ValueNotFound' for unexpected hash '{:?}'.", hash
713					)
714				}
715			},
716			DhtEvent::ValuePut(hash) => {
717				if !self.latest_published_kad_keys.contains(&hash) {
718					return;
719				}
720
721				// Fast forward the exponentially increasing interval to the configured maximum. In
722				// case this was the first successful address publishing there is no need for a
723				// timely retry.
724				self.publish_interval.set_to_max();
725
726				if let Some(metrics) = &self.metrics {
727					metrics.dht_event_received.with_label_values(&["value_put"]).inc();
728				}
729
730				debug!(target: LOG_TARGET, "Successfully put hash '{:?}' on Dht.", hash)
731			},
732			DhtEvent::ValuePutFailed(hash) => {
733				if !self.latest_published_kad_keys.contains(&hash) {
734					// Not a value we have published or received multiple times.
735					return;
736				}
737
738				if let Some(metrics) = &self.metrics {
739					metrics.dht_event_received.with_label_values(&["value_put_failed"]).inc();
740				}
741
742				debug!(target: LOG_TARGET, "Failed to put hash '{:?}' on Dht.", hash)
743			},
744			DhtEvent::PutRecordRequest(record_key, record_value, publisher, expires) => {
745				if let Err(e) = self
746					.handle_put_record_requested(record_key, record_value, publisher, expires)
747					.await
748				{
749					debug!(target: LOG_TARGET, "Failed to handle put record request: {}", e)
750				}
751
752				if let Some(metrics) = &self.metrics {
753					metrics.dht_event_received.with_label_values(&["put_record_req"]).inc();
754				}
755			},
756			_ => {},
757		}
758	}
759
760	async fn handle_put_record_requested(
761		&mut self,
762		record_key: Key,
763		record_value: Vec<u8>,
764		publisher: Option<PeerId>,
765		expires: Option<std::time::Instant>,
766	) -> Result<()> {
767		let publisher = publisher.ok_or(Error::MissingPublisher)?;
768
769		// Make sure we don't ever work with an outdated set of authorities
770		// and that we do not update known_authorithies too often.
771		let best_hash = self.client.best_hash().await?;
772		if !self.known_authorities.contains_key(&record_key) &&
773			self.authorities_queried_at
774				.map(|authorities_queried_at| authorities_queried_at != best_hash)
775				.unwrap_or(true)
776		{
777			let authorities = self
778				.client
779				.authorities(best_hash)
780				.await
781				.map_err(|e| Error::CallingRuntime(e.into()))?
782				.into_iter()
783				.collect::<Vec<_>>();
784
785			self.known_authorities = authorities
786				.into_iter()
787				.map(|authority| (hash_authority_id(authority.as_ref()), authority))
788				.collect::<HashMap<_, _>>();
789
790			self.authorities_queried_at = Some(best_hash);
791		}
792
793		let authority_id =
794			self.known_authorities.get(&record_key).ok_or(Error::UnknownAuthority)?;
795		let signed_record =
796			Self::check_record_signed_with_authority_id(record_value.as_slice(), authority_id)?;
797		self.check_record_signed_with_network_key(
798			&signed_record.record,
799			signed_record.peer_signature,
800			publisher,
801			authority_id,
802		)?;
803
804		let records_creation_time: u128 =
805			schema::AuthorityRecord::decode(signed_record.record.as_slice())
806				.map_err(Error::DecodingProto)?
807				.creation_time
808				.map(|creation_time| {
809					u128::decode(&mut &creation_time.timestamp[..]).unwrap_or_default()
810				})
811				.unwrap_or_default(); // 0 is a sane default for records that do not have creation time present.
812
813		let current_record_info = self.last_known_records.get(&record_key);
814		// If record creation time is older than the current record creation time,
815		// we don't store it since we want to give higher priority to newer records.
816		if let Some(current_record_info) = current_record_info {
817			if records_creation_time < current_record_info.creation_time {
818				debug!(
819					target: LOG_TARGET,
820					"Skip storing because record creation time {:?} is older than the current known record {:?}",
821					records_creation_time,
822					current_record_info.creation_time
823				);
824				return Ok(());
825			}
826		}
827
828		self.network.store_record(record_key, record_value, Some(publisher), expires);
829		Ok(())
830	}
831
832	fn check_record_signed_with_authority_id(
833		record: &[u8],
834		authority_id: &AuthorityId,
835	) -> Result<schema::SignedAuthorityRecord> {
836		let signed_record: schema::SignedAuthorityRecord =
837			schema::SignedAuthorityRecord::decode(record).map_err(Error::DecodingProto)?;
838
839		let auth_signature = AuthoritySignature::decode(&mut &signed_record.auth_signature[..])
840			.map_err(Error::EncodingDecodingScale)?;
841
842		if !AuthorityPair::verify(&auth_signature, &signed_record.record, &authority_id) {
843			return Err(Error::VerifyingDhtPayload);
844		}
845
846		Ok(signed_record)
847	}
848
849	fn check_record_signed_with_network_key(
850		&self,
851		record: &Vec<u8>,
852		peer_signature: Option<PeerSignature>,
853		remote_peer_id: PeerId,
854		authority_id: &AuthorityId,
855	) -> Result<()> {
856		if let Some(peer_signature) = peer_signature {
857			match self.network.verify(
858				remote_peer_id.into(),
859				&peer_signature.public_key,
860				&peer_signature.signature,
861				record,
862			) {
863				Ok(true) => {},
864				Ok(false) => return Err(Error::VerifyingDhtPayload),
865				Err(error) => return Err(Error::ParsingLibp2pIdentity(error)),
866			}
867		} else if self.strict_record_validation {
868			return Err(Error::MissingPeerIdSignature);
869		} else {
870			debug!(
871				target: LOG_TARGET,
872				"Received unsigned authority discovery record from {}", authority_id
873			);
874		}
875		Ok(())
876	}
877
878	fn handle_dht_value_found_event(&mut self, peer_record: PeerRecord) -> Result<()> {
879		// Ensure `values` is not empty and all its keys equal.
880		let remote_key = peer_record.record.key.clone();
881
882		let authority_id: AuthorityId =
883			if let Some(authority_id) = self.in_flight_lookups.remove(&remote_key) {
884				self.known_lookups.insert(remote_key.clone(), authority_id.clone());
885				authority_id
886			} else if let Some(authority_id) = self.known_lookups.get(&remote_key) {
887				authority_id.clone()
888			} else {
889				return Err(Error::ReceivingUnexpectedRecord);
890			};
891
892		let local_peer_id = self.network.local_peer_id();
893
894		let schema::SignedAuthorityRecord { record, peer_signature, .. } =
895			Self::check_record_signed_with_authority_id(
896				peer_record.record.value.as_slice(),
897				&authority_id,
898			)?;
899
900		let authority_record =
901			schema::AuthorityRecord::decode(record.as_slice()).map_err(Error::DecodingProto)?;
902
903		let records_creation_time: u128 = authority_record
904			.creation_time
905			.as_ref()
906			.map(|creation_time| {
907				u128::decode(&mut &creation_time.timestamp[..]).unwrap_or_default()
908			})
909			.unwrap_or_default(); // 0 is a sane default for records that do not have creation time present.
910
911		let addresses: Vec<Multiaddr> = authority_record
912			.addresses
913			.into_iter()
914			.map(|a| a.try_into())
915			.collect::<std::result::Result<_, _>>()
916			.map_err(Error::ParsingMultiaddress)?;
917
918		let get_peer_id = |a: &Multiaddr| match a.iter().last() {
919			Some(multiaddr::Protocol::P2p(key)) => PeerId::from_multihash(key).ok(),
920			_ => None,
921		};
922
923		// Ignore [`Multiaddr`]s without [`PeerId`] or with own addresses.
924		let addresses: Vec<Multiaddr> = addresses
925			.into_iter()
926			.filter(|a| get_peer_id(&a).filter(|p| *p != local_peer_id).is_some())
927			.collect();
928
929		let remote_peer_id = single(addresses.iter().map(|a| get_peer_id(&a)))
930			.map_err(|_| Error::ReceivingDhtValueFoundEventWithDifferentPeerIds)? // different peer_id in records
931			.flatten()
932			.ok_or(Error::ReceivingDhtValueFoundEventWithNoPeerIds)?; // no records with peer_id in them
933
934		// At this point we know all the valid multiaddresses from the record, know that
935		// each of them belong to the same PeerId, we just need to check if the record is
936		// properly signed by the owner of the PeerId
937		self.check_record_signed_with_network_key(
938			&record,
939			peer_signature,
940			remote_peer_id,
941			&authority_id,
942		)?;
943
944		let remote_addresses: Vec<Multiaddr> =
945			addresses.into_iter().take(MAX_ADDRESSES_PER_AUTHORITY).collect();
946
947		let answering_peer_id = peer_record.peer.map(|peer| peer.into());
948
949		let addr_cache_needs_update = self.handle_new_record(
950			&authority_id,
951			remote_key.clone(),
952			RecordInfo {
953				creation_time: records_creation_time,
954				peers_with_record: answering_peer_id.into_iter().collect(),
955				record: peer_record.record,
956			},
957		);
958
959		if !remote_addresses.is_empty() && addr_cache_needs_update {
960			self.addr_cache.insert(authority_id, remote_addresses);
961			if let Some(metrics) = &self.metrics {
962				metrics
963					.known_authorities_count
964					.set(self.addr_cache.num_authority_ids().try_into().unwrap_or(std::u64::MAX));
965			}
966		}
967		Ok(())
968	}
969
970	// Handles receiving a new DHT record for the authorithy.
971	// Returns true if the record was new, false if the record was older than the current one.
972	fn handle_new_record(
973		&mut self,
974		authority_id: &AuthorityId,
975		kademlia_key: KademliaKey,
976		new_record: RecordInfo,
977	) -> bool {
978		let current_record_info = self
979			.last_known_records
980			.entry(kademlia_key.clone())
981			.or_insert_with(|| new_record.clone());
982
983		if new_record.creation_time > current_record_info.creation_time {
984			let peers_that_need_updating = current_record_info.peers_with_record.clone();
985			self.network.put_record_to(
986				new_record.record.clone(),
987				peers_that_need_updating.clone(),
988				// If this is empty it means we received the answer from our node local
989				// storage, so we need to update that as well.
990				current_record_info.peers_with_record.is_empty(),
991			);
992			debug!(
993					target: LOG_TARGET,
994					"Found a newer record for {:?} new record creation time {:?} old record creation time {:?}",
995					authority_id, new_record.creation_time, current_record_info.creation_time
996			);
997			self.last_known_records.insert(kademlia_key, new_record);
998			return true;
999		}
1000
1001		if new_record.creation_time == current_record_info.creation_time {
1002			// Same record just update in case this is a record from old nodes that don't have
1003			// timestamp.
1004			debug!(
1005					target: LOG_TARGET,
1006					"Found same record for {:?} record creation time {:?}",
1007					authority_id, new_record.creation_time
1008			);
1009			if current_record_info.peers_with_record.len() + new_record.peers_with_record.len() <=
1010				DEFAULT_KADEMLIA_REPLICATION_FACTOR
1011			{
1012				current_record_info.peers_with_record.extend(new_record.peers_with_record);
1013			}
1014			return true;
1015		}
1016
1017		debug!(
1018				target: LOG_TARGET,
1019				"Found old record for {:?} received record creation time {:?} current record creation time {:?}",
1020				authority_id, new_record.creation_time, current_record_info.creation_time,
1021		);
1022		self.network.put_record_to(
1023			current_record_info.record.clone().into(),
1024			new_record.peers_with_record.clone(),
1025			// If this is empty it means we received the answer from our node local
1026			// storage, so we need to update that as well.
1027			new_record.peers_with_record.is_empty(),
1028		);
1029		return false;
1030	}
1031
1032	/// Retrieve our public keys within the current and next authority set.
1033	// A node might have multiple authority discovery keys within its keystore, e.g. an old one and
1034	// one for the upcoming session. In addition it could be participating in the current and (/ or)
1035	// next authority set with two keys. The function does not return all of the local authority
1036	// discovery public keys, but only the ones intersecting with the current or next authority set.
1037	async fn get_own_public_keys_within_authority_set(
1038		key_store: KeystorePtr,
1039		client: &Client,
1040	) -> Result<HashSet<AuthorityId>> {
1041		let local_pub_keys = key_store
1042			.sr25519_public_keys(key_types::AUTHORITY_DISCOVERY)
1043			.into_iter()
1044			.collect::<HashSet<_>>();
1045
1046		let best_hash = client.best_hash().await?;
1047		let authorities = client
1048			.authorities(best_hash)
1049			.await
1050			.map_err(|e| Error::CallingRuntime(e.into()))?
1051			.into_iter()
1052			.map(Into::into)
1053			.collect::<HashSet<_>>();
1054
1055		let intersection =
1056			local_pub_keys.intersection(&authorities).cloned().map(Into::into).collect();
1057
1058		Ok(intersection)
1059	}
1060}
1061
1062/// Removes the `/p2p/..` from the address if it is present.
1063#[derive(Debug, Clone, PartialEq, Eq)]
1064enum AddressType {
1065	/// The address is specified as a public address via the CLI.
1066	PublicAddress(Multiaddr),
1067	/// The address is a global listen address.
1068	GlobalListenAddress(Multiaddr),
1069	/// The address is discovered via the network (ie /identify protocol).
1070	ExternalAddress(Multiaddr),
1071}
1072
1073impl AddressType {
1074	/// Removes the `/p2p/..` from the address if it is present.
1075	///
1076	/// In case the peer id in the address does not match the local peer id, an error is logged for
1077	/// `ExternalAddress` and `GlobalListenAddress`.
1078	fn without_p2p(self, local_peer_id: PeerId) -> Multiaddr {
1079		// Get the address and the source str for logging.
1080		let (mut address, source) = match self {
1081			AddressType::PublicAddress(address) => (address, "public address"),
1082			AddressType::GlobalListenAddress(address) => (address, "global listen address"),
1083			AddressType::ExternalAddress(address) => (address, "external address"),
1084		};
1085
1086		if let Some(multiaddr::Protocol::P2p(peer_id)) = address.iter().last() {
1087			if peer_id != *local_peer_id.as_ref() {
1088				error!(
1089					target: LOG_TARGET,
1090					"Network returned '{source}' '{address}' with peer id \
1091					 not matching the local peer id '{local_peer_id}'.",
1092				);
1093			}
1094			address.pop();
1095		}
1096		address
1097	}
1098}
1099
1100/// NetworkProvider provides [`Worker`] with all necessary hooks into the
1101/// underlying Substrate networking. Using this trait abstraction instead of
1102/// `sc_network::NetworkService` directly is necessary to unit test [`Worker`].
1103pub trait NetworkProvider:
1104	NetworkDHTProvider + NetworkStateInfo + NetworkSigner + Send + Sync
1105{
1106}
1107
1108impl<T> NetworkProvider for T where
1109	T: NetworkDHTProvider + NetworkStateInfo + NetworkSigner + Send + Sync
1110{
1111}
1112
1113fn hash_authority_id(id: &[u8]) -> KademliaKey {
1114	KademliaKey::new(&Code::Sha2_256.digest(id).digest())
1115}
1116
1117// Makes sure all values are the same and returns it
1118//
1119// Returns Err(_) if not all values are equal. Returns Ok(None) if there are
1120// no values.
1121fn single<T>(values: impl IntoIterator<Item = T>) -> std::result::Result<Option<T>, ()>
1122where
1123	T: PartialEq<T>,
1124{
1125	values.into_iter().try_fold(None, |acc, item| match acc {
1126		None => Ok(Some(item)),
1127		Some(ref prev) if *prev != item => Err(()),
1128		Some(x) => Ok(Some(x)),
1129	})
1130}
1131
1132fn serialize_addresses(addresses: impl Iterator<Item = Multiaddr>) -> Vec<Vec<u8>> {
1133	addresses.map(|a| a.to_vec()).collect()
1134}
1135
1136fn build_creation_time() -> schema::TimestampInfo {
1137	let creation_time = SystemTime::now()
1138		.duration_since(UNIX_EPOCH)
1139		.map(|time| time.as_nanos())
1140		.unwrap_or_default();
1141	schema::TimestampInfo { timestamp: creation_time.encode() }
1142}
1143
1144fn serialize_authority_record(
1145	addresses: Vec<Vec<u8>>,
1146	creation_time: Option<schema::TimestampInfo>,
1147) -> Result<Vec<u8>> {
1148	let mut serialized_record = vec![];
1149
1150	schema::AuthorityRecord { addresses, creation_time }
1151		.encode(&mut serialized_record)
1152		.map_err(Error::EncodingProto)?;
1153	Ok(serialized_record)
1154}
1155
1156fn sign_record_with_peer_id(
1157	serialized_record: &[u8],
1158	network: &impl NetworkSigner,
1159) -> Result<schema::PeerSignature> {
1160	let signature = network
1161		.sign_with_local_identity(serialized_record.to_vec())
1162		.map_err(|e| Error::CannotSign(format!("{} (network packet)", e)))?;
1163	let public_key = signature.public_key.encode_protobuf();
1164	let signature = signature.bytes;
1165	Ok(schema::PeerSignature { signature, public_key })
1166}
1167
1168fn sign_record_with_authority_ids(
1169	serialized_record: Vec<u8>,
1170	peer_signature: Option<schema::PeerSignature>,
1171	key_store: &dyn Keystore,
1172	keys: Vec<AuthorityId>,
1173) -> Result<Vec<(KademliaKey, Vec<u8>)>> {
1174	let mut result = Vec::with_capacity(keys.len());
1175
1176	for key in keys.iter() {
1177		let auth_signature = key_store
1178			.sr25519_sign(key_types::AUTHORITY_DISCOVERY, key.as_ref(), &serialized_record)
1179			.map_err(|e| Error::CannotSign(format!("{}. Key: {:?}", e, key)))?
1180			.ok_or_else(|| {
1181				Error::CannotSign(format!("Could not find key in keystore. Key: {:?}", key))
1182			})?;
1183
1184		// Scale encode
1185		let auth_signature = auth_signature.encode();
1186		let signed_record = schema::SignedAuthorityRecord {
1187			record: serialized_record.clone(),
1188			auth_signature,
1189			peer_signature: peer_signature.clone(),
1190		}
1191		.encode_to_vec();
1192
1193		result.push((hash_authority_id(key.as_slice()), signed_record));
1194	}
1195
1196	Ok(result)
1197}
1198
1199/// Prometheus metrics for a [`Worker`].
1200#[derive(Clone)]
1201pub(crate) struct Metrics {
1202	publish: Counter<U64>,
1203	amount_addresses_last_published: Gauge<U64>,
1204	requests: Counter<U64>,
1205	requests_pending: Gauge<U64>,
1206	dht_event_received: CounterVec<U64>,
1207	handle_value_found_event_failure: Counter<U64>,
1208	known_authorities_count: Gauge<U64>,
1209}
1210
1211impl Metrics {
1212	pub(crate) fn register(registry: &prometheus_endpoint::Registry) -> Result<Self> {
1213		Ok(Self {
1214			publish: register(
1215				Counter::new(
1216					"substrate_authority_discovery_times_published_total",
1217					"Number of times authority discovery has published external addresses.",
1218				)?,
1219				registry,
1220			)?,
1221			amount_addresses_last_published: register(
1222				Gauge::new(
1223					"substrate_authority_discovery_amount_external_addresses_last_published",
1224					"Number of external addresses published when authority discovery last \
1225					 published addresses.",
1226				)?,
1227				registry,
1228			)?,
1229			requests: register(
1230				Counter::new(
1231					"substrate_authority_discovery_authority_addresses_requested_total",
1232					"Number of times authority discovery has requested external addresses of a \
1233					 single authority.",
1234				)?,
1235				registry,
1236			)?,
1237			requests_pending: register(
1238				Gauge::new(
1239					"substrate_authority_discovery_authority_address_requests_pending",
1240					"Number of pending authority address requests.",
1241				)?,
1242				registry,
1243			)?,
1244			dht_event_received: register(
1245				CounterVec::new(
1246					Opts::new(
1247						"substrate_authority_discovery_dht_event_received",
1248						"Number of dht events received by authority discovery.",
1249					),
1250					&["name"],
1251				)?,
1252				registry,
1253			)?,
1254			handle_value_found_event_failure: register(
1255				Counter::new(
1256					"substrate_authority_discovery_handle_value_found_event_failure",
1257					"Number of times handling a dht value found event failed.",
1258				)?,
1259				registry,
1260			)?,
1261			known_authorities_count: register(
1262				Gauge::new(
1263					"substrate_authority_discovery_known_authorities_count",
1264					"Number of authorities known by authority discovery.",
1265				)?,
1266				registry,
1267			)?,
1268		})
1269	}
1270}
1271
1272// Helper functions for unit testing.
1273#[cfg(test)]
1274impl<Block: BlockT, Client, DhtEventStream> Worker<Client, Block, DhtEventStream> {
1275	pub(crate) fn inject_addresses(&mut self, authority: AuthorityId, addresses: Vec<Multiaddr>) {
1276		self.addr_cache.insert(authority, addresses)
1277	}
1278
1279	pub(crate) fn contains_authority(&self, authority: &AuthorityId) -> bool {
1280		self.addr_cache.get_addresses_by_authority_id(authority).is_some()
1281	}
1282}