referrerpolicy=no-referrer-when-downgrade

sc_authority_discovery/
worker.rs

1// This file is part of Substrate.
2
3// Copyright (C) Parity Technologies (UK) Ltd.
4// SPDX-License-Identifier: GPL-3.0-or-later WITH Classpath-exception-2.0
5
6// This program is free software: you can redistribute it and/or modify
7// it under the terms of the GNU General Public License as published by
8// the Free Software Foundation, either version 3 of the License, or
9// (at your option) any later version.
10
11// This program is distributed in the hope that it will be useful,
12// but WITHOUT ANY WARRANTY; without even the implied warranty of
13// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
14// GNU General Public License for more details.
15
16// You should have received a copy of the GNU General Public License
17// along with this program. If not, see <https://www.gnu.org/licenses/>.
18
19pub(crate) use crate::worker::addr_cache::AddrCache;
20use crate::{
21	error::{Error, Result},
22	interval::ExpIncInterval,
23	ServicetoWorkerMsg, WorkerConfig,
24};
25
26use std::{
27	collections::{HashMap, HashSet},
28	marker::PhantomData,
29	path::PathBuf,
30	sync::Arc,
31	time::{Duration, Instant, SystemTime, UNIX_EPOCH},
32};
33
34use futures::{channel::mpsc, future, stream::Fuse, FutureExt, Stream, StreamExt};
35
36use codec::{Decode, Encode};
37use ip_network::IpNetwork;
38use linked_hash_set::LinkedHashSet;
39use sc_network_types::kad::{Key, PeerRecord, Record};
40
41use log::{debug, error, info, trace};
42use prometheus_endpoint::{register, Counter, CounterVec, Gauge, Opts, U64};
43use prost::Message;
44use rand::{seq::SliceRandom, thread_rng};
45
46use sc_network::{
47	config::DEFAULT_KADEMLIA_REPLICATION_FACTOR, event::DhtEvent, multiaddr, KademliaKey,
48	Multiaddr, NetworkDHTProvider, NetworkSigner, NetworkStateInfo,
49};
50use sc_network_types::{multihash::Code, PeerId};
51use schema::PeerSignature;
52use sp_api::{ApiError, ProvideRuntimeApi};
53use sp_authority_discovery::{
54	AuthorityDiscoveryApi, AuthorityId, AuthorityPair, AuthoritySignature,
55};
56use sp_blockchain::HeaderBackend;
57use sp_core::{
58	crypto::{key_types, ByteArray, Pair},
59	traits::SpawnNamed,
60};
61use sp_keystore::{Keystore, KeystorePtr};
62use sp_runtime::traits::Block as BlockT;
63
64mod addr_cache;
65/// Dht payload schemas generated from Protobuf definitions via Prost crate in build.rs.
66mod schema {
67	#[cfg(test)]
68	mod tests;
69
70	include!(concat!(env!("OUT_DIR"), "/authority_discovery_v3.rs"));
71}
72#[cfg(test)]
73pub mod tests;
74
75const LOG_TARGET: &str = "sub-authority-discovery";
76pub(crate) const ADDR_CACHE_FILE_NAME: &str = "authority_discovery_addr_cache.json";
77const ADDR_CACHE_PERSIST_INTERVAL: Duration = Duration::from_secs(60 * 10); // 10 minutes
78
79/// Maximum number of addresses cached per authority. Additional addresses are discarded.
80const MAX_ADDRESSES_PER_AUTHORITY: usize = 16;
81
82/// Maximum number of global listen addresses published by the node.
83const MAX_GLOBAL_LISTEN_ADDRESSES: usize = 4;
84
85/// Maximum number of addresses to publish in a single record.
86const MAX_ADDRESSES_TO_PUBLISH: usize = 32;
87
88/// Maximum number of in-flight DHT lookups at any given point in time.
89const MAX_IN_FLIGHT_LOOKUPS: usize = 8;
90
91/// Role an authority discovery [`Worker`] can run as.
92pub enum Role {
93	/// Publish own addresses and discover addresses of others.
94	PublishAndDiscover(KeystorePtr),
95	/// Discover addresses of others.
96	Discover,
97}
98
99/// An authority discovery [`Worker`] can publish the local node's addresses as well as discover
100/// those of other nodes via a Kademlia DHT.
101///
102/// When constructed with [`Role::PublishAndDiscover`] a [`Worker`] will
103///
104///    1. Retrieve its external addresses (including peer id).
105///
106///    2. Get the list of keys owned by the local node participating in the current authority set.
107///
108///    3. Sign the addresses with the keys.
109///
110///    4. Put addresses and signature as a record with the authority id as a key on a Kademlia DHT.
111///
112/// When constructed with either [`Role::PublishAndDiscover`] or [`Role::Discover`] a [`Worker`]
113/// will
114///
115///    1. Retrieve the current and next set of authorities.
116///
117///    2. Start DHT queries for the ids of the authorities.
118///
119///    3. Validate the signatures of the retrieved key value pairs.
120///
121///    4. Add the retrieved external addresses as priority nodes to the
122///    network peerset.
123///
124///    5. Allow querying of the collected addresses via the [`crate::Service`].
125pub struct Worker<Client, Block: BlockT, DhtEventStream> {
126	/// Channel receiver for messages send by a [`crate::Service`].
127	from_service: Fuse<mpsc::Receiver<ServicetoWorkerMsg>>,
128
129	client: Arc<Client>,
130
131	network: Arc<dyn NetworkProvider>,
132
133	/// Channel we receive Dht events on.
134	dht_event_rx: DhtEventStream,
135
136	/// Interval to be proactive, publishing own addresses.
137	publish_interval: ExpIncInterval,
138
139	/// Pro-actively publish our own addresses at this interval, if the keys in the keystore
140	/// have changed.
141	publish_if_changed_interval: ExpIncInterval,
142
143	/// List of keys onto which addresses have been published at the latest publication.
144	/// Used to check whether they have changed.
145	latest_published_keys: HashSet<AuthorityId>,
146	/// List of the kademlia keys that have been published at the latest publication.
147	/// Used to associate DHT events with our published records.
148	latest_published_kad_keys: HashSet<KademliaKey>,
149
150	/// Same value as in the configuration.
151	publish_non_global_ips: bool,
152
153	/// Public addresses set by the node operator to always publish first in the authority
154	/// discovery DHT record.
155	public_addresses: LinkedHashSet<Multiaddr>,
156
157	/// Same value as in the configuration.
158	strict_record_validation: bool,
159
160	/// Interval at which to request addresses of authorities, refilling the pending lookups queue.
161	query_interval: ExpIncInterval,
162
163	/// Queue of throttled lookups pending to be passed to the network.
164	pending_lookups: Vec<AuthorityId>,
165
166	/// The list of all known authorities.
167	known_authorities: HashMap<KademliaKey, AuthorityId>,
168
169	/// The last time we requested the list of authorities.
170	authorities_queried_at: Option<Block::Hash>,
171
172	/// Set of in-flight lookups.
173	in_flight_lookups: HashMap<KademliaKey, AuthorityId>,
174
175	/// Set of lookups we can still receive records.
176	/// These are the entries in the `in_flight_lookups` for which
177	/// we got at least one successfull result.
178	known_lookups: HashMap<KademliaKey, AuthorityId>,
179
180	/// Last known record by key, here we always keep the record with
181	/// the highest creation time and we don't accept records older than
182	/// that.
183	last_known_records: HashMap<KademliaKey, RecordInfo>,
184
185	addr_cache: addr_cache::AddrCache,
186
187	metrics: Option<Metrics>,
188
189	/// Flag to ensure the warning about missing public addresses is only printed once.
190	warn_public_addresses: bool,
191
192	role: Role,
193
194	phantom: PhantomData<Block>,
195
196	/// A spawner of tasks
197	spawner: Box<dyn SpawnNamed>,
198
199	/// The directory of where the persisted AddrCache file is located,
200	/// optional since NetworkConfiguration's `net_config_path` field
201	/// is optional. If None, we won't persist the AddrCache at all.
202	persisted_cache_file_path: Option<PathBuf>,
203}
204
205#[derive(Debug, Clone)]
206struct RecordInfo {
207	/// Time since UNIX_EPOCH in nanoseconds.
208	creation_time: u128,
209	/// Peers that we know have this record, bounded to no more than
210	/// DEFAULT_KADEMLIA_REPLICATION_FACTOR(20).
211	peers_with_record: HashSet<PeerId>,
212	/// The record itself.
213	record: Record,
214}
215
216/// Wrapper for [`AuthorityDiscoveryApi`](sp_authority_discovery::AuthorityDiscoveryApi). Can be
217/// be implemented by any struct without dependency on the runtime.
218#[async_trait::async_trait]
219pub trait AuthorityDiscovery<Block: BlockT> {
220	/// Retrieve authority identifiers of the current and next authority set.
221	async fn authorities(&self, at: Block::Hash)
222		-> std::result::Result<Vec<AuthorityId>, ApiError>;
223
224	/// Retrieve best block hash
225	async fn best_hash(&self) -> std::result::Result<Block::Hash, Error>;
226}
227
228#[async_trait::async_trait]
229impl<Block, T> AuthorityDiscovery<Block> for T
230where
231	T: ProvideRuntimeApi<Block> + HeaderBackend<Block> + Send + Sync,
232	T::Api: AuthorityDiscoveryApi<Block>,
233	Block: BlockT,
234{
235	async fn authorities(
236		&self,
237		at: Block::Hash,
238	) -> std::result::Result<Vec<AuthorityId>, ApiError> {
239		self.runtime_api().authorities(at)
240	}
241
242	async fn best_hash(&self) -> std::result::Result<Block::Hash, Error> {
243		Ok(self.info().best_hash)
244	}
245}
246
247impl<Client, Block, DhtEventStream> Worker<Client, Block, DhtEventStream>
248where
249	Block: BlockT + Unpin + 'static,
250	Client: AuthorityDiscovery<Block> + 'static,
251	DhtEventStream: Stream<Item = DhtEvent> + Unpin,
252{
253	/// Construct a [`Worker`].
254	pub(crate) fn new(
255		from_service: mpsc::Receiver<ServicetoWorkerMsg>,
256		client: Arc<Client>,
257		network: Arc<dyn NetworkProvider>,
258		dht_event_rx: DhtEventStream,
259		role: Role,
260		prometheus_registry: Option<prometheus_endpoint::Registry>,
261		config: WorkerConfig,
262		spawner: impl SpawnNamed + 'static,
263	) -> Self {
264		// When a node starts up publishing and querying might fail due to various reasons, for
265		// example due to being not yet fully bootstrapped on the DHT. Thus one should retry rather
266		// sooner than later. On the other hand, a long running node is likely well connected and
267		// thus timely retries are not needed. For this reasoning use an exponentially increasing
268		// interval for `publish_interval`, `query_interval` and `priority_group_set_interval`
269		// instead of a constant interval.
270		let publish_interval =
271			ExpIncInterval::new(Duration::from_secs(2), config.max_publish_interval);
272		let query_interval = ExpIncInterval::new(Duration::from_secs(2), config.max_query_interval);
273
274		// An `ExpIncInterval` is overkill here because the interval is constant, but consistency
275		// is more simple.
276		let publish_if_changed_interval =
277			ExpIncInterval::new(config.keystore_refresh_interval, config.keystore_refresh_interval);
278
279		let maybe_persisted_cache_file_path =
280			config.persisted_cache_directory.as_ref().map(|dir| {
281				let mut path = dir.clone();
282				path.push(ADDR_CACHE_FILE_NAME);
283				path
284			});
285
286		// If we have a path to persisted cache file, then we will try to
287		// load the contents of persisted cache from file, if it exists, and is valid.
288		// Create a new one otherwise.
289		let addr_cache: AddrCache = if let Some(persisted_cache_file_path) =
290			maybe_persisted_cache_file_path.as_ref()
291		{
292			let loaded =
293				AddrCache::try_from(persisted_cache_file_path.as_path()).unwrap_or_else(|e| {
294					info!(target: LOG_TARGET, "Failed to load AddrCache from file, using empty instead: {}", e);
295					AddrCache::new()
296				});
297			info!(target: LOG_TARGET, "Loaded persisted AddrCache with {} authority ids.", loaded.num_authority_ids());
298			loaded
299		} else {
300			info!(target: LOG_TARGET, "No persisted cache file path provided, authority discovery will not persist the address cache to disk.");
301			AddrCache::new()
302		};
303
304		let metrics = match prometheus_registry {
305			Some(registry) => match Metrics::register(&registry) {
306				Ok(metrics) => Some(metrics),
307				Err(e) => {
308					error!(target: LOG_TARGET, "Failed to register metrics: {}", e);
309					None
310				},
311			},
312			None => None,
313		};
314
315		let public_addresses = {
316			let local_peer_id = network.local_peer_id();
317
318			config
319				.public_addresses
320				.into_iter()
321				.map(|address| AddressType::PublicAddress(address).without_p2p(local_peer_id))
322				.collect()
323		};
324
325		Worker {
326			from_service: from_service.fuse(),
327			client,
328			network,
329			dht_event_rx,
330			publish_interval,
331			known_authorities: Default::default(),
332			authorities_queried_at: None,
333			publish_if_changed_interval,
334			latest_published_keys: HashSet::new(),
335			latest_published_kad_keys: HashSet::new(),
336			publish_non_global_ips: config.publish_non_global_ips,
337			public_addresses,
338			strict_record_validation: config.strict_record_validation,
339			query_interval,
340			pending_lookups: Vec::new(),
341			in_flight_lookups: HashMap::new(),
342			known_lookups: HashMap::new(),
343			addr_cache,
344			role,
345			metrics,
346			warn_public_addresses: false,
347			phantom: PhantomData,
348			last_known_records: HashMap::new(),
349			spawner: Box::new(spawner),
350			persisted_cache_file_path: maybe_persisted_cache_file_path,
351		}
352	}
353
354	/// Persists `AddrCache` to disk if the `persisted_cache_file_path` is set.
355	pub fn persist_addr_cache_if_supported(&self) {
356		let Some(path) = self.persisted_cache_file_path.as_ref().cloned() else {
357			return;
358		};
359		let cloned_cache = self.addr_cache.clone();
360		self.spawner.spawn_blocking(
361			"persist-addr-cache",
362			Some("authority-discovery-worker"),
363			Box::pin(async move {
364				cloned_cache.serialize_and_persist(path);
365			}),
366		)
367	}
368
369	/// Start the worker
370	pub async fn run(mut self) {
371		let mut persist_interval = tokio::time::interval(ADDR_CACHE_PERSIST_INTERVAL);
372
373		loop {
374			self.start_new_lookups();
375
376			futures::select! {
377				_ = persist_interval.tick().fuse() => {
378					self.persist_addr_cache_if_supported();
379				},
380				// Process incoming events.
381				event = self.dht_event_rx.next().fuse() => {
382					if let Some(event) = event {
383						self.handle_dht_event(event).await;
384					} else {
385						self.persist_addr_cache_if_supported();
386						// This point is reached if the network has shut down, at which point there is not
387						// much else to do than to shut down the authority discovery as well.
388						return;
389					}
390				},
391				// Handle messages from [`Service`]. Ignore if sender side is closed.
392				msg = self.from_service.select_next_some() => {
393					self.process_message_from_service(msg);
394				},
395				// Publish own addresses.
396				only_if_changed = future::select(
397					self.publish_interval.next().map(|_| false),
398					self.publish_if_changed_interval.next().map(|_| true)
399				).map(|e| e.factor_first().0).fuse() => {
400					if let Err(e) = self.publish_ext_addresses(only_if_changed).await {
401						error!(
402							target: LOG_TARGET,
403							"Failed to publish external addresses: {}", e,
404						);
405					}
406				},
407				// Request addresses of authorities.
408				_ = self.query_interval.next().fuse() => {
409					if let Err(e) = self.refill_pending_lookups_queue().await {
410						error!(
411							target: LOG_TARGET,
412							"Failed to request addresses of authorities: {}", e,
413						);
414					}
415				},
416			}
417		}
418	}
419
420	fn process_message_from_service(&self, msg: ServicetoWorkerMsg) {
421		match msg {
422			ServicetoWorkerMsg::GetAddressesByAuthorityId(authority, sender) => {
423				let _ = sender.send(
424					self.addr_cache.get_addresses_by_authority_id(&authority).map(Clone::clone),
425				);
426			},
427			ServicetoWorkerMsg::GetAuthorityIdsByPeerId(peer_id, sender) => {
428				let _ = sender
429					.send(self.addr_cache.get_authority_ids_by_peer_id(&peer_id).map(Clone::clone));
430			},
431		}
432	}
433
434	fn addresses_to_publish(&mut self) -> impl Iterator<Item = Multiaddr> {
435		let local_peer_id = self.network.local_peer_id();
436		let publish_non_global_ips = self.publish_non_global_ips;
437
438		// Checks that the address is global.
439		let address_is_global = |address: &Multiaddr| {
440			address.iter().all(|protocol| match protocol {
441				// The `ip_network` library is used because its `is_global()` method is stable,
442				// while `is_global()` in the standard library currently isn't.
443				multiaddr::Protocol::Ip4(ip) => IpNetwork::from(ip).is_global(),
444				multiaddr::Protocol::Ip6(ip) => IpNetwork::from(ip).is_global(),
445				_ => true,
446			})
447		};
448
449		// These are the addresses the node is listening for incoming connections,
450		// as reported by installed protocols (tcp / websocket etc).
451		//
452		// We double check the address is global. In other words, we double check the node
453		// is not running behind a NAT.
454		// Note: we do this regardless of the `publish_non_global_ips` setting, since the
455		// node discovers many external addresses via the identify protocol.
456		let mut global_listen_addresses = self
457			.network
458			.listen_addresses()
459			.into_iter()
460			.filter_map(|address| {
461				address_is_global(&address)
462					.then(|| AddressType::GlobalListenAddress(address).without_p2p(local_peer_id))
463			})
464			.take(MAX_GLOBAL_LISTEN_ADDRESSES)
465			.peekable();
466
467		// Similar to listen addresses that takes into consideration `publish_non_global_ips`.
468		let mut external_addresses = self
469			.network
470			.external_addresses()
471			.into_iter()
472			.filter_map(|address| {
473				(publish_non_global_ips || address_is_global(&address))
474					.then(|| AddressType::ExternalAddress(address).without_p2p(local_peer_id))
475			})
476			.peekable();
477
478		let has_global_listen_addresses = global_listen_addresses.peek().is_some();
479		trace!(
480			target: LOG_TARGET,
481			"Node has public addresses: {}, global listen addresses: {}, external addresses: {}",
482			!self.public_addresses.is_empty(),
483			has_global_listen_addresses,
484			external_addresses.peek().is_some(),
485		);
486
487		let mut seen_addresses = HashSet::new();
488
489		let addresses = self
490			.public_addresses
491			.clone()
492			.into_iter()
493			.chain(global_listen_addresses)
494			.chain(external_addresses)
495			// Deduplicate addresses.
496			.filter(|address| seen_addresses.insert(address.clone()))
497			.take(MAX_ADDRESSES_TO_PUBLISH)
498			.collect::<Vec<_>>();
499
500		if !addresses.is_empty() {
501			debug!(
502				target: LOG_TARGET,
503				"Publishing authority DHT record peer_id='{local_peer_id}' with addresses='{addresses:?}'",
504			);
505
506			if !self.warn_public_addresses &&
507				self.public_addresses.is_empty() &&
508				!has_global_listen_addresses
509			{
510				self.warn_public_addresses = true;
511
512				error!(
513					target: LOG_TARGET,
514					"No public addresses configured and no global listen addresses found. \
515					Authority DHT record may contain unreachable addresses. \
516					Consider setting `--public-addr` to the public IP address of this node. \
517					This will become a hard requirement in future versions for authorities."
518				);
519			}
520		}
521
522		// The address must include the local peer id.
523		addresses
524			.into_iter()
525			.map(move |a| a.with(multiaddr::Protocol::P2p(*local_peer_id.as_ref())))
526	}
527
528	/// Publish own public addresses.
529	///
530	/// If `only_if_changed` is true, the function has no effect if the list of keys to publish
531	/// is equal to `self.latest_published_keys`.
532	async fn publish_ext_addresses(&mut self, only_if_changed: bool) -> Result<()> {
533		let key_store = match &self.role {
534			Role::PublishAndDiscover(key_store) => key_store,
535			Role::Discover => return Ok(()),
536		}
537		.clone();
538
539		let addresses = serialize_addresses(self.addresses_to_publish());
540		if addresses.is_empty() {
541			trace!(
542				target: LOG_TARGET,
543				"No addresses to publish. Skipping publication."
544			);
545
546			self.publish_interval.set_to_start();
547			return Ok(())
548		}
549
550		let keys =
551			Worker::<Client, Block, DhtEventStream>::get_own_public_keys_within_authority_set(
552				key_store.clone(),
553				self.client.as_ref(),
554			)
555			.await?
556			.into_iter()
557			.collect::<HashSet<_>>();
558
559		if only_if_changed {
560			// If the authority keys did not change and the `publish_if_changed_interval` was
561			// triggered then do nothing.
562			if keys == self.latest_published_keys {
563				return Ok(())
564			}
565
566			// We have detected a change in the authority keys, reset the timers to
567			// publish and gather data faster.
568			self.publish_interval.set_to_start();
569			self.query_interval.set_to_start();
570		}
571
572		if let Some(metrics) = &self.metrics {
573			metrics.publish.inc();
574			metrics
575				.amount_addresses_last_published
576				.set(addresses.len().try_into().unwrap_or(std::u64::MAX));
577		}
578
579		let serialized_record = serialize_authority_record(addresses, Some(build_creation_time()))?;
580		let peer_signature = sign_record_with_peer_id(&serialized_record, &self.network)?;
581
582		let keys_vec = keys.iter().cloned().collect::<Vec<_>>();
583
584		let kv_pairs = sign_record_with_authority_ids(
585			serialized_record,
586			Some(peer_signature),
587			key_store.as_ref(),
588			keys_vec,
589		)?;
590
591		self.latest_published_kad_keys = kv_pairs.iter().map(|(k, _)| k.clone()).collect();
592
593		for (key, value) in kv_pairs.into_iter() {
594			self.network.put_value(key, value);
595		}
596
597		self.latest_published_keys = keys;
598
599		Ok(())
600	}
601
602	async fn refill_pending_lookups_queue(&mut self) -> Result<()> {
603		let best_hash = self.client.best_hash().await?;
604
605		let local_keys = match &self.role {
606			Role::PublishAndDiscover(key_store) => key_store
607				.sr25519_public_keys(key_types::AUTHORITY_DISCOVERY)
608				.into_iter()
609				.collect::<HashSet<_>>(),
610			Role::Discover => HashSet::new(),
611		};
612
613		let mut authorities = self
614			.client
615			.authorities(best_hash)
616			.await
617			.map_err(|e| Error::CallingRuntime(e.into()))?
618			.into_iter()
619			.filter(|id| !local_keys.contains(id.as_ref()))
620			.collect::<Vec<_>>();
621
622		self.known_authorities = authorities
623			.clone()
624			.into_iter()
625			.map(|authority| (hash_authority_id(authority.as_ref()), authority))
626			.collect::<HashMap<_, _>>();
627		self.authorities_queried_at = Some(best_hash);
628
629		self.addr_cache.retain_ids(&authorities);
630		let now = Instant::now();
631		self.last_known_records.retain(|k, value| {
632			self.known_authorities.contains_key(k) && !value.record.is_expired(now)
633		});
634
635		authorities.shuffle(&mut thread_rng());
636		self.pending_lookups = authorities;
637		// Ignore all still in-flight lookups. Those that are still in-flight are likely stalled as
638		// query interval ticks are far enough apart for all lookups to succeed.
639		self.in_flight_lookups.clear();
640		self.known_lookups.clear();
641
642		if let Some(metrics) = &self.metrics {
643			metrics
644				.requests_pending
645				.set(self.pending_lookups.len().try_into().unwrap_or(std::u64::MAX));
646		}
647
648		Ok(())
649	}
650
651	fn start_new_lookups(&mut self) {
652		while self.in_flight_lookups.len() < MAX_IN_FLIGHT_LOOKUPS {
653			let authority_id = match self.pending_lookups.pop() {
654				Some(authority) => authority,
655				None => return,
656			};
657			let hash = hash_authority_id(authority_id.as_ref());
658			self.network.get_value(&hash);
659			self.in_flight_lookups.insert(hash, authority_id);
660
661			if let Some(metrics) = &self.metrics {
662				metrics.requests.inc();
663				metrics
664					.requests_pending
665					.set(self.pending_lookups.len().try_into().unwrap_or(std::u64::MAX));
666			}
667		}
668	}
669
670	/// Handle incoming Dht events.
671	async fn handle_dht_event(&mut self, event: DhtEvent) {
672		match event {
673			DhtEvent::ValueFound(v) => {
674				if let Some(metrics) = &self.metrics {
675					metrics.dht_event_received.with_label_values(&["value_found"]).inc();
676				}
677
678				debug!(target: LOG_TARGET, "Value for hash '{:?}' found on Dht.", v.record.key);
679
680				if let Err(e) = self.handle_dht_value_found_event(v) {
681					if let Some(metrics) = &self.metrics {
682						metrics.handle_value_found_event_failure.inc();
683					}
684					debug!(target: LOG_TARGET, "Failed to handle Dht value found event: {}", e);
685				}
686			},
687			DhtEvent::ValueNotFound(hash) => {
688				if let Some(metrics) = &self.metrics {
689					metrics.dht_event_received.with_label_values(&["value_not_found"]).inc();
690				}
691
692				if self.in_flight_lookups.remove(&hash).is_some() {
693					debug!(target: LOG_TARGET, "Value for hash '{:?}' not found on Dht.", hash)
694				} else {
695					debug!(
696						target: LOG_TARGET,
697						"Received 'ValueNotFound' for unexpected hash '{:?}'.", hash
698					)
699				}
700			},
701			DhtEvent::ValuePut(hash) => {
702				if !self.latest_published_kad_keys.contains(&hash) {
703					return;
704				}
705
706				// Fast forward the exponentially increasing interval to the configured maximum. In
707				// case this was the first successful address publishing there is no need for a
708				// timely retry.
709				self.publish_interval.set_to_max();
710
711				if let Some(metrics) = &self.metrics {
712					metrics.dht_event_received.with_label_values(&["value_put"]).inc();
713				}
714
715				debug!(target: LOG_TARGET, "Successfully put hash '{:?}' on Dht.", hash)
716			},
717			DhtEvent::ValuePutFailed(hash) => {
718				if !self.latest_published_kad_keys.contains(&hash) {
719					// Not a value we have published or received multiple times.
720					return;
721				}
722
723				if let Some(metrics) = &self.metrics {
724					metrics.dht_event_received.with_label_values(&["value_put_failed"]).inc();
725				}
726
727				debug!(target: LOG_TARGET, "Failed to put hash '{:?}' on Dht.", hash)
728			},
729			DhtEvent::PutRecordRequest(record_key, record_value, publisher, expires) => {
730				if let Err(e) = self
731					.handle_put_record_requested(record_key, record_value, publisher, expires)
732					.await
733				{
734					debug!(target: LOG_TARGET, "Failed to handle put record request: {}", e)
735				}
736
737				if let Some(metrics) = &self.metrics {
738					metrics.dht_event_received.with_label_values(&["put_record_req"]).inc();
739				}
740			},
741			_ => {},
742		}
743	}
744
745	async fn handle_put_record_requested(
746		&mut self,
747		record_key: Key,
748		record_value: Vec<u8>,
749		publisher: Option<PeerId>,
750		expires: Option<std::time::Instant>,
751	) -> Result<()> {
752		let publisher = publisher.ok_or(Error::MissingPublisher)?;
753
754		// Make sure we don't ever work with an outdated set of authorities
755		// and that we do not update known_authorithies too often.
756		let best_hash = self.client.best_hash().await?;
757		if !self.known_authorities.contains_key(&record_key) &&
758			self.authorities_queried_at
759				.map(|authorities_queried_at| authorities_queried_at != best_hash)
760				.unwrap_or(true)
761		{
762			let authorities = self
763				.client
764				.authorities(best_hash)
765				.await
766				.map_err(|e| Error::CallingRuntime(e.into()))?
767				.into_iter()
768				.collect::<Vec<_>>();
769
770			self.known_authorities = authorities
771				.into_iter()
772				.map(|authority| (hash_authority_id(authority.as_ref()), authority))
773				.collect::<HashMap<_, _>>();
774
775			self.authorities_queried_at = Some(best_hash);
776		}
777
778		let authority_id =
779			self.known_authorities.get(&record_key).ok_or(Error::UnknownAuthority)?;
780		let signed_record =
781			Self::check_record_signed_with_authority_id(record_value.as_slice(), authority_id)?;
782		self.check_record_signed_with_network_key(
783			&signed_record.record,
784			signed_record.peer_signature,
785			publisher,
786			authority_id,
787		)?;
788
789		let records_creation_time: u128 =
790			schema::AuthorityRecord::decode(signed_record.record.as_slice())
791				.map_err(Error::DecodingProto)?
792				.creation_time
793				.map(|creation_time| {
794					u128::decode(&mut &creation_time.timestamp[..]).unwrap_or_default()
795				})
796				.unwrap_or_default(); // 0 is a sane default for records that do not have creation time present.
797
798		let current_record_info = self.last_known_records.get(&record_key);
799		// If record creation time is older than the current record creation time,
800		// we don't store it since we want to give higher priority to newer records.
801		if let Some(current_record_info) = current_record_info {
802			if records_creation_time < current_record_info.creation_time {
803				debug!(
804					target: LOG_TARGET,
805					"Skip storing because record creation time {:?} is older than the current known record {:?}",
806					records_creation_time,
807					current_record_info.creation_time
808				);
809				return Ok(());
810			}
811		}
812
813		self.network.store_record(record_key, record_value, Some(publisher), expires);
814		Ok(())
815	}
816
817	fn check_record_signed_with_authority_id(
818		record: &[u8],
819		authority_id: &AuthorityId,
820	) -> Result<schema::SignedAuthorityRecord> {
821		let signed_record: schema::SignedAuthorityRecord =
822			schema::SignedAuthorityRecord::decode(record).map_err(Error::DecodingProto)?;
823
824		let auth_signature = AuthoritySignature::decode(&mut &signed_record.auth_signature[..])
825			.map_err(Error::EncodingDecodingScale)?;
826
827		if !AuthorityPair::verify(&auth_signature, &signed_record.record, &authority_id) {
828			return Err(Error::VerifyingDhtPayload)
829		}
830
831		Ok(signed_record)
832	}
833
834	fn check_record_signed_with_network_key(
835		&self,
836		record: &Vec<u8>,
837		peer_signature: Option<PeerSignature>,
838		remote_peer_id: PeerId,
839		authority_id: &AuthorityId,
840	) -> Result<()> {
841		if let Some(peer_signature) = peer_signature {
842			match self.network.verify(
843				remote_peer_id.into(),
844				&peer_signature.public_key,
845				&peer_signature.signature,
846				record,
847			) {
848				Ok(true) => {},
849				Ok(false) => return Err(Error::VerifyingDhtPayload),
850				Err(error) => return Err(Error::ParsingLibp2pIdentity(error)),
851			}
852		} else if self.strict_record_validation {
853			return Err(Error::MissingPeerIdSignature)
854		} else {
855			debug!(
856				target: LOG_TARGET,
857				"Received unsigned authority discovery record from {}", authority_id
858			);
859		}
860		Ok(())
861	}
862
863	fn handle_dht_value_found_event(&mut self, peer_record: PeerRecord) -> Result<()> {
864		// Ensure `values` is not empty and all its keys equal.
865		let remote_key = peer_record.record.key.clone();
866
867		let authority_id: AuthorityId =
868			if let Some(authority_id) = self.in_flight_lookups.remove(&remote_key) {
869				self.known_lookups.insert(remote_key.clone(), authority_id.clone());
870				authority_id
871			} else if let Some(authority_id) = self.known_lookups.get(&remote_key) {
872				authority_id.clone()
873			} else {
874				return Err(Error::ReceivingUnexpectedRecord);
875			};
876
877		let local_peer_id = self.network.local_peer_id();
878
879		let schema::SignedAuthorityRecord { record, peer_signature, .. } =
880			Self::check_record_signed_with_authority_id(
881				peer_record.record.value.as_slice(),
882				&authority_id,
883			)?;
884
885		let authority_record =
886			schema::AuthorityRecord::decode(record.as_slice()).map_err(Error::DecodingProto)?;
887
888		let records_creation_time: u128 = authority_record
889			.creation_time
890			.as_ref()
891			.map(|creation_time| {
892				u128::decode(&mut &creation_time.timestamp[..]).unwrap_or_default()
893			})
894			.unwrap_or_default(); // 0 is a sane default for records that do not have creation time present.
895
896		let addresses: Vec<Multiaddr> = authority_record
897			.addresses
898			.into_iter()
899			.map(|a| a.try_into())
900			.collect::<std::result::Result<_, _>>()
901			.map_err(Error::ParsingMultiaddress)?;
902
903		let get_peer_id = |a: &Multiaddr| match a.iter().last() {
904			Some(multiaddr::Protocol::P2p(key)) => PeerId::from_multihash(key).ok(),
905			_ => None,
906		};
907
908		// Ignore [`Multiaddr`]s without [`PeerId`] or with own addresses.
909		let addresses: Vec<Multiaddr> = addresses
910			.into_iter()
911			.filter(|a| get_peer_id(&a).filter(|p| *p != local_peer_id).is_some())
912			.collect();
913
914		let remote_peer_id = single(addresses.iter().map(|a| get_peer_id(&a)))
915			.map_err(|_| Error::ReceivingDhtValueFoundEventWithDifferentPeerIds)? // different peer_id in records
916			.flatten()
917			.ok_or(Error::ReceivingDhtValueFoundEventWithNoPeerIds)?; // no records with peer_id in them
918
919		// At this point we know all the valid multiaddresses from the record, know that
920		// each of them belong to the same PeerId, we just need to check if the record is
921		// properly signed by the owner of the PeerId
922		self.check_record_signed_with_network_key(
923			&record,
924			peer_signature,
925			remote_peer_id,
926			&authority_id,
927		)?;
928
929		let remote_addresses: Vec<Multiaddr> =
930			addresses.into_iter().take(MAX_ADDRESSES_PER_AUTHORITY).collect();
931
932		let answering_peer_id = peer_record.peer.map(|peer| peer.into());
933
934		let addr_cache_needs_update = self.handle_new_record(
935			&authority_id,
936			remote_key.clone(),
937			RecordInfo {
938				creation_time: records_creation_time,
939				peers_with_record: answering_peer_id.into_iter().collect(),
940				record: peer_record.record,
941			},
942		);
943
944		if !remote_addresses.is_empty() && addr_cache_needs_update {
945			self.addr_cache.insert(authority_id, remote_addresses);
946			if let Some(metrics) = &self.metrics {
947				metrics
948					.known_authorities_count
949					.set(self.addr_cache.num_authority_ids().try_into().unwrap_or(std::u64::MAX));
950			}
951		}
952		Ok(())
953	}
954
955	// Handles receiving a new DHT record for the authorithy.
956	// Returns true if the record was new, false if the record was older than the current one.
957	fn handle_new_record(
958		&mut self,
959		authority_id: &AuthorityId,
960		kademlia_key: KademliaKey,
961		new_record: RecordInfo,
962	) -> bool {
963		let current_record_info = self
964			.last_known_records
965			.entry(kademlia_key.clone())
966			.or_insert_with(|| new_record.clone());
967
968		if new_record.creation_time > current_record_info.creation_time {
969			let peers_that_need_updating = current_record_info.peers_with_record.clone();
970			self.network.put_record_to(
971				new_record.record.clone(),
972				peers_that_need_updating.clone(),
973				// If this is empty it means we received the answer from our node local
974				// storage, so we need to update that as well.
975				current_record_info.peers_with_record.is_empty(),
976			);
977			debug!(
978					target: LOG_TARGET,
979					"Found a newer record for {:?} new record creation time {:?} old record creation time {:?}",
980					authority_id, new_record.creation_time, current_record_info.creation_time
981			);
982			self.last_known_records.insert(kademlia_key, new_record);
983			return true
984		}
985
986		if new_record.creation_time == current_record_info.creation_time {
987			// Same record just update in case this is a record from old nodes that don't have
988			// timestamp.
989			debug!(
990					target: LOG_TARGET,
991					"Found same record for {:?} record creation time {:?}",
992					authority_id, new_record.creation_time
993			);
994			if current_record_info.peers_with_record.len() + new_record.peers_with_record.len() <=
995				DEFAULT_KADEMLIA_REPLICATION_FACTOR
996			{
997				current_record_info.peers_with_record.extend(new_record.peers_with_record);
998			}
999			return true
1000		}
1001
1002		debug!(
1003				target: LOG_TARGET,
1004				"Found old record for {:?} received record creation time {:?} current record creation time {:?}",
1005				authority_id, new_record.creation_time, current_record_info.creation_time,
1006		);
1007		self.network.put_record_to(
1008			current_record_info.record.clone().into(),
1009			new_record.peers_with_record.clone(),
1010			// If this is empty it means we received the answer from our node local
1011			// storage, so we need to update that as well.
1012			new_record.peers_with_record.is_empty(),
1013		);
1014		return false
1015	}
1016
1017	/// Retrieve our public keys within the current and next authority set.
1018	// A node might have multiple authority discovery keys within its keystore, e.g. an old one and
1019	// one for the upcoming session. In addition it could be participating in the current and (/ or)
1020	// next authority set with two keys. The function does not return all of the local authority
1021	// discovery public keys, but only the ones intersecting with the current or next authority set.
1022	async fn get_own_public_keys_within_authority_set(
1023		key_store: KeystorePtr,
1024		client: &Client,
1025	) -> Result<HashSet<AuthorityId>> {
1026		let local_pub_keys = key_store
1027			.sr25519_public_keys(key_types::AUTHORITY_DISCOVERY)
1028			.into_iter()
1029			.collect::<HashSet<_>>();
1030
1031		let best_hash = client.best_hash().await?;
1032		let authorities = client
1033			.authorities(best_hash)
1034			.await
1035			.map_err(|e| Error::CallingRuntime(e.into()))?
1036			.into_iter()
1037			.map(Into::into)
1038			.collect::<HashSet<_>>();
1039
1040		let intersection =
1041			local_pub_keys.intersection(&authorities).cloned().map(Into::into).collect();
1042
1043		Ok(intersection)
1044	}
1045}
1046
1047/// Removes the `/p2p/..` from the address if it is present.
1048#[derive(Debug, Clone, PartialEq, Eq)]
1049enum AddressType {
1050	/// The address is specified as a public address via the CLI.
1051	PublicAddress(Multiaddr),
1052	/// The address is a global listen address.
1053	GlobalListenAddress(Multiaddr),
1054	/// The address is discovered via the network (ie /identify protocol).
1055	ExternalAddress(Multiaddr),
1056}
1057
1058impl AddressType {
1059	/// Removes the `/p2p/..` from the address if it is present.
1060	///
1061	/// In case the peer id in the address does not match the local peer id, an error is logged for
1062	/// `ExternalAddress` and `GlobalListenAddress`.
1063	fn without_p2p(self, local_peer_id: PeerId) -> Multiaddr {
1064		// Get the address and the source str for logging.
1065		let (mut address, source) = match self {
1066			AddressType::PublicAddress(address) => (address, "public address"),
1067			AddressType::GlobalListenAddress(address) => (address, "global listen address"),
1068			AddressType::ExternalAddress(address) => (address, "external address"),
1069		};
1070
1071		if let Some(multiaddr::Protocol::P2p(peer_id)) = address.iter().last() {
1072			if peer_id != *local_peer_id.as_ref() {
1073				error!(
1074					target: LOG_TARGET,
1075					"Network returned '{source}' '{address}' with peer id \
1076					 not matching the local peer id '{local_peer_id}'.",
1077				);
1078			}
1079			address.pop();
1080		}
1081		address
1082	}
1083}
1084
1085/// NetworkProvider provides [`Worker`] with all necessary hooks into the
1086/// underlying Substrate networking. Using this trait abstraction instead of
1087/// `sc_network::NetworkService` directly is necessary to unit test [`Worker`].
1088pub trait NetworkProvider:
1089	NetworkDHTProvider + NetworkStateInfo + NetworkSigner + Send + Sync
1090{
1091}
1092
1093impl<T> NetworkProvider for T where
1094	T: NetworkDHTProvider + NetworkStateInfo + NetworkSigner + Send + Sync
1095{
1096}
1097
1098fn hash_authority_id(id: &[u8]) -> KademliaKey {
1099	KademliaKey::new(&Code::Sha2_256.digest(id).digest())
1100}
1101
1102// Makes sure all values are the same and returns it
1103//
1104// Returns Err(_) if not all values are equal. Returns Ok(None) if there are
1105// no values.
1106fn single<T>(values: impl IntoIterator<Item = T>) -> std::result::Result<Option<T>, ()>
1107where
1108	T: PartialEq<T>,
1109{
1110	values.into_iter().try_fold(None, |acc, item| match acc {
1111		None => Ok(Some(item)),
1112		Some(ref prev) if *prev != item => Err(()),
1113		Some(x) => Ok(Some(x)),
1114	})
1115}
1116
1117fn serialize_addresses(addresses: impl Iterator<Item = Multiaddr>) -> Vec<Vec<u8>> {
1118	addresses.map(|a| a.to_vec()).collect()
1119}
1120
1121fn build_creation_time() -> schema::TimestampInfo {
1122	let creation_time = SystemTime::now()
1123		.duration_since(UNIX_EPOCH)
1124		.map(|time| time.as_nanos())
1125		.unwrap_or_default();
1126	schema::TimestampInfo { timestamp: creation_time.encode() }
1127}
1128
1129fn serialize_authority_record(
1130	addresses: Vec<Vec<u8>>,
1131	creation_time: Option<schema::TimestampInfo>,
1132) -> Result<Vec<u8>> {
1133	let mut serialized_record = vec![];
1134
1135	schema::AuthorityRecord { addresses, creation_time }
1136		.encode(&mut serialized_record)
1137		.map_err(Error::EncodingProto)?;
1138	Ok(serialized_record)
1139}
1140
1141fn sign_record_with_peer_id(
1142	serialized_record: &[u8],
1143	network: &impl NetworkSigner,
1144) -> Result<schema::PeerSignature> {
1145	let signature = network
1146		.sign_with_local_identity(serialized_record.to_vec())
1147		.map_err(|e| Error::CannotSign(format!("{} (network packet)", e)))?;
1148	let public_key = signature.public_key.encode_protobuf();
1149	let signature = signature.bytes;
1150	Ok(schema::PeerSignature { signature, public_key })
1151}
1152
1153fn sign_record_with_authority_ids(
1154	serialized_record: Vec<u8>,
1155	peer_signature: Option<schema::PeerSignature>,
1156	key_store: &dyn Keystore,
1157	keys: Vec<AuthorityId>,
1158) -> Result<Vec<(KademliaKey, Vec<u8>)>> {
1159	let mut result = Vec::with_capacity(keys.len());
1160
1161	for key in keys.iter() {
1162		let auth_signature = key_store
1163			.sr25519_sign(key_types::AUTHORITY_DISCOVERY, key.as_ref(), &serialized_record)
1164			.map_err(|e| Error::CannotSign(format!("{}. Key: {:?}", e, key)))?
1165			.ok_or_else(|| {
1166				Error::CannotSign(format!("Could not find key in keystore. Key: {:?}", key))
1167			})?;
1168
1169		// Scale encode
1170		let auth_signature = auth_signature.encode();
1171		let signed_record = schema::SignedAuthorityRecord {
1172			record: serialized_record.clone(),
1173			auth_signature,
1174			peer_signature: peer_signature.clone(),
1175		}
1176		.encode_to_vec();
1177
1178		result.push((hash_authority_id(key.as_slice()), signed_record));
1179	}
1180
1181	Ok(result)
1182}
1183
1184/// Prometheus metrics for a [`Worker`].
1185#[derive(Clone)]
1186pub(crate) struct Metrics {
1187	publish: Counter<U64>,
1188	amount_addresses_last_published: Gauge<U64>,
1189	requests: Counter<U64>,
1190	requests_pending: Gauge<U64>,
1191	dht_event_received: CounterVec<U64>,
1192	handle_value_found_event_failure: Counter<U64>,
1193	known_authorities_count: Gauge<U64>,
1194}
1195
1196impl Metrics {
1197	pub(crate) fn register(registry: &prometheus_endpoint::Registry) -> Result<Self> {
1198		Ok(Self {
1199			publish: register(
1200				Counter::new(
1201					"substrate_authority_discovery_times_published_total",
1202					"Number of times authority discovery has published external addresses.",
1203				)?,
1204				registry,
1205			)?,
1206			amount_addresses_last_published: register(
1207				Gauge::new(
1208					"substrate_authority_discovery_amount_external_addresses_last_published",
1209					"Number of external addresses published when authority discovery last \
1210					 published addresses.",
1211				)?,
1212				registry,
1213			)?,
1214			requests: register(
1215				Counter::new(
1216					"substrate_authority_discovery_authority_addresses_requested_total",
1217					"Number of times authority discovery has requested external addresses of a \
1218					 single authority.",
1219				)?,
1220				registry,
1221			)?,
1222			requests_pending: register(
1223				Gauge::new(
1224					"substrate_authority_discovery_authority_address_requests_pending",
1225					"Number of pending authority address requests.",
1226				)?,
1227				registry,
1228			)?,
1229			dht_event_received: register(
1230				CounterVec::new(
1231					Opts::new(
1232						"substrate_authority_discovery_dht_event_received",
1233						"Number of dht events received by authority discovery.",
1234					),
1235					&["name"],
1236				)?,
1237				registry,
1238			)?,
1239			handle_value_found_event_failure: register(
1240				Counter::new(
1241					"substrate_authority_discovery_handle_value_found_event_failure",
1242					"Number of times handling a dht value found event failed.",
1243				)?,
1244				registry,
1245			)?,
1246			known_authorities_count: register(
1247				Gauge::new(
1248					"substrate_authority_discovery_known_authorities_count",
1249					"Number of authorities known by authority discovery.",
1250				)?,
1251				registry,
1252			)?,
1253		})
1254	}
1255}
1256
1257// Helper functions for unit testing.
1258#[cfg(test)]
1259impl<Block: BlockT, Client, DhtEventStream> Worker<Client, Block, DhtEventStream> {
1260	pub(crate) fn inject_addresses(&mut self, authority: AuthorityId, addresses: Vec<Multiaddr>) {
1261		self.addr_cache.insert(authority, addresses)
1262	}
1263
1264	pub(crate) fn contains_authority(&self, authority: &AuthorityId) -> bool {
1265		self.addr_cache.get_addresses_by_authority_id(authority).is_some()
1266	}
1267}