sc_authority_discovery/
worker.rs

1// This file is part of Substrate.
2
3// Copyright (C) Parity Technologies (UK) Ltd.
4// SPDX-License-Identifier: GPL-3.0-or-later WITH Classpath-exception-2.0
5
6// This program is free software: you can redistribute it and/or modify
7// it under the terms of the GNU General Public License as published by
8// the Free Software Foundation, either version 3 of the License, or
9// (at your option) any later version.
10
11// This program is distributed in the hope that it will be useful,
12// but WITHOUT ANY WARRANTY; without even the implied warranty of
13// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
14// GNU General Public License for more details.
15
16// You should have received a copy of the GNU General Public License
17// along with this program. If not, see <https://www.gnu.org/licenses/>.
18
19use crate::{
20	error::{Error, Result},
21	interval::ExpIncInterval,
22	ServicetoWorkerMsg, WorkerConfig,
23};
24
25use std::{
26	collections::{HashMap, HashSet},
27	marker::PhantomData,
28	sync::Arc,
29	time::{Duration, Instant, SystemTime, UNIX_EPOCH},
30};
31
32use futures::{channel::mpsc, future, stream::Fuse, FutureExt, Stream, StreamExt};
33
34use addr_cache::AddrCache;
35use codec::{Decode, Encode};
36use ip_network::IpNetwork;
37use libp2p::kad::{PeerRecord, Record};
38use linked_hash_set::LinkedHashSet;
39
40use log::{debug, error, trace};
41use prometheus_endpoint::{register, Counter, CounterVec, Gauge, Opts, U64};
42use prost::Message;
43use rand::{seq::SliceRandom, thread_rng};
44
45use sc_network::{
46	config::DEFAULT_KADEMLIA_REPLICATION_FACTOR, event::DhtEvent, multiaddr, KademliaKey,
47	Multiaddr, NetworkDHTProvider, NetworkSigner, NetworkStateInfo,
48};
49use sc_network_types::{multihash::Code, PeerId};
50use schema::PeerSignature;
51use sp_api::{ApiError, ProvideRuntimeApi};
52use sp_authority_discovery::{
53	AuthorityDiscoveryApi, AuthorityId, AuthorityPair, AuthoritySignature,
54};
55use sp_blockchain::HeaderBackend;
56use sp_core::crypto::{key_types, ByteArray, Pair};
57use sp_keystore::{Keystore, KeystorePtr};
58use sp_runtime::traits::Block as BlockT;
59
60mod addr_cache;
61/// Dht payload schemas generated from Protobuf definitions via Prost crate in build.rs.
62mod schema {
63	#[cfg(test)]
64	mod tests;
65
66	include!(concat!(env!("OUT_DIR"), "/authority_discovery_v3.rs"));
67}
68#[cfg(test)]
69pub mod tests;
70
71const LOG_TARGET: &str = "sub-authority-discovery";
72
73/// Maximum number of addresses cached per authority. Additional addresses are discarded.
74const MAX_ADDRESSES_PER_AUTHORITY: usize = 10;
75
76/// Maximum number of in-flight DHT lookups at any given point in time.
77const MAX_IN_FLIGHT_LOOKUPS: usize = 8;
78
79/// Role an authority discovery [`Worker`] can run as.
80pub enum Role {
81	/// Publish own addresses and discover addresses of others.
82	PublishAndDiscover(KeystorePtr),
83	/// Discover addresses of others.
84	Discover,
85}
86
87/// An authority discovery [`Worker`] can publish the local node's addresses as well as discover
88/// those of other nodes via a Kademlia DHT.
89///
90/// When constructed with [`Role::PublishAndDiscover`] a [`Worker`] will
91///
92///    1. Retrieve its external addresses (including peer id).
93///
94///    2. Get the list of keys owned by the local node participating in the current authority set.
95///
96///    3. Sign the addresses with the keys.
97///
98///    4. Put addresses and signature as a record with the authority id as a key on a Kademlia DHT.
99///
100/// When constructed with either [`Role::PublishAndDiscover`] or [`Role::Discover`] a [`Worker`]
101/// will
102///
103///    1. Retrieve the current and next set of authorities.
104///
105///    2. Start DHT queries for the ids of the authorities.
106///
107///    3. Validate the signatures of the retrieved key value pairs.
108///
109///    4. Add the retrieved external addresses as priority nodes to the
110///    network peerset.
111///
112///    5. Allow querying of the collected addresses via the [`crate::Service`].
113pub struct Worker<Client, Block: BlockT, DhtEventStream> {
114	/// Channel receiver for messages send by a [`crate::Service`].
115	from_service: Fuse<mpsc::Receiver<ServicetoWorkerMsg>>,
116
117	client: Arc<Client>,
118
119	network: Arc<dyn NetworkProvider>,
120
121	/// Channel we receive Dht events on.
122	dht_event_rx: DhtEventStream,
123
124	/// Interval to be proactive, publishing own addresses.
125	publish_interval: ExpIncInterval,
126
127	/// Pro-actively publish our own addresses at this interval, if the keys in the keystore
128	/// have changed.
129	publish_if_changed_interval: ExpIncInterval,
130
131	/// List of keys onto which addresses have been published at the latest publication.
132	/// Used to check whether they have changed.
133	latest_published_keys: HashSet<AuthorityId>,
134	/// List of the kademlia keys that have been published at the latest publication.
135	/// Used to associate DHT events with our published records.
136	latest_published_kad_keys: HashSet<KademliaKey>,
137
138	/// Same value as in the configuration.
139	publish_non_global_ips: bool,
140
141	/// Public addresses set by the node operator to always publish first in the authority
142	/// discovery DHT record.
143	public_addresses: LinkedHashSet<Multiaddr>,
144
145	/// Same value as in the configuration.
146	strict_record_validation: bool,
147
148	/// Interval at which to request addresses of authorities, refilling the pending lookups queue.
149	query_interval: ExpIncInterval,
150
151	/// Queue of throttled lookups pending to be passed to the network.
152	pending_lookups: Vec<AuthorityId>,
153
154	/// The list of all known authorities.
155	known_authorities: HashMap<KademliaKey, AuthorityId>,
156
157	/// The last time we requested the list of authorities.
158	authorities_queried_at: Option<Block::Hash>,
159
160	/// Set of in-flight lookups.
161	in_flight_lookups: HashMap<KademliaKey, AuthorityId>,
162
163	/// Set of lookups we can still receive records.
164	/// These are the entries in the `in_flight_lookups` for which
165	/// we got at least one successfull result.
166	known_lookups: HashMap<KademliaKey, AuthorityId>,
167
168	/// Last known record by key, here we always keep the record with
169	/// the highest creation time and we don't accept records older than
170	/// that.
171	last_known_records: HashMap<KademliaKey, RecordInfo>,
172
173	addr_cache: addr_cache::AddrCache,
174
175	metrics: Option<Metrics>,
176
177	role: Role,
178
179	phantom: PhantomData<Block>,
180}
181
182#[derive(Debug, Clone)]
183struct RecordInfo {
184	/// Time since UNIX_EPOCH in nanoseconds.
185	creation_time: u128,
186	/// Peers that we know have this record, bounded to no more than
187	/// DEFAULT_KADEMLIA_REPLICATION_FACTOR(20).
188	peers_with_record: HashSet<PeerId>,
189	/// The record itself.
190	record: Record,
191}
192
193/// Wrapper for [`AuthorityDiscoveryApi`](sp_authority_discovery::AuthorityDiscoveryApi). Can be
194/// be implemented by any struct without dependency on the runtime.
195#[async_trait::async_trait]
196pub trait AuthorityDiscovery<Block: BlockT> {
197	/// Retrieve authority identifiers of the current and next authority set.
198	async fn authorities(&self, at: Block::Hash)
199		-> std::result::Result<Vec<AuthorityId>, ApiError>;
200
201	/// Retrieve best block hash
202	async fn best_hash(&self) -> std::result::Result<Block::Hash, Error>;
203}
204
205#[async_trait::async_trait]
206impl<Block, T> AuthorityDiscovery<Block> for T
207where
208	T: ProvideRuntimeApi<Block> + HeaderBackend<Block> + Send + Sync,
209	T::Api: AuthorityDiscoveryApi<Block>,
210	Block: BlockT,
211{
212	async fn authorities(
213		&self,
214		at: Block::Hash,
215	) -> std::result::Result<Vec<AuthorityId>, ApiError> {
216		self.runtime_api().authorities(at)
217	}
218
219	async fn best_hash(&self) -> std::result::Result<Block::Hash, Error> {
220		Ok(self.info().best_hash)
221	}
222}
223
224impl<Client, Block, DhtEventStream> Worker<Client, Block, DhtEventStream>
225where
226	Block: BlockT + Unpin + 'static,
227	Client: AuthorityDiscovery<Block> + 'static,
228	DhtEventStream: Stream<Item = DhtEvent> + Unpin,
229{
230	/// Construct a [`Worker`].
231	pub(crate) fn new(
232		from_service: mpsc::Receiver<ServicetoWorkerMsg>,
233		client: Arc<Client>,
234		network: Arc<dyn NetworkProvider>,
235		dht_event_rx: DhtEventStream,
236		role: Role,
237		prometheus_registry: Option<prometheus_endpoint::Registry>,
238		config: WorkerConfig,
239	) -> Self {
240		// When a node starts up publishing and querying might fail due to various reasons, for
241		// example due to being not yet fully bootstrapped on the DHT. Thus one should retry rather
242		// sooner than later. On the other hand, a long running node is likely well connected and
243		// thus timely retries are not needed. For this reasoning use an exponentially increasing
244		// interval for `publish_interval`, `query_interval` and `priority_group_set_interval`
245		// instead of a constant interval.
246		let publish_interval =
247			ExpIncInterval::new(Duration::from_secs(2), config.max_publish_interval);
248		let query_interval = ExpIncInterval::new(Duration::from_secs(2), config.max_query_interval);
249
250		// An `ExpIncInterval` is overkill here because the interval is constant, but consistency
251		// is more simple.
252		let publish_if_changed_interval =
253			ExpIncInterval::new(config.keystore_refresh_interval, config.keystore_refresh_interval);
254
255		let addr_cache = AddrCache::new();
256
257		let metrics = match prometheus_registry {
258			Some(registry) => match Metrics::register(&registry) {
259				Ok(metrics) => Some(metrics),
260				Err(e) => {
261					error!(target: LOG_TARGET, "Failed to register metrics: {}", e);
262					None
263				},
264			},
265			None => None,
266		};
267
268		let public_addresses = {
269			let local_peer_id = network.local_peer_id();
270
271			config
272				.public_addresses
273				.into_iter()
274				.map(|mut address| {
275					if let Some(multiaddr::Protocol::P2p(peer_id)) = address.iter().last() {
276						if peer_id != *local_peer_id.as_ref() {
277							error!(
278								target: LOG_TARGET,
279								"Discarding invalid local peer ID in public address {address}.",
280							);
281						}
282						// Always discard `/p2p/...` protocol for proper address comparison (local
283						// peer id will be added before publishing).
284						address.pop();
285					}
286					address
287				})
288				.collect()
289		};
290
291		Worker {
292			from_service: from_service.fuse(),
293			client,
294			network,
295			dht_event_rx,
296			publish_interval,
297			known_authorities: Default::default(),
298			authorities_queried_at: None,
299			publish_if_changed_interval,
300			latest_published_keys: HashSet::new(),
301			latest_published_kad_keys: HashSet::new(),
302			publish_non_global_ips: config.publish_non_global_ips,
303			public_addresses,
304			strict_record_validation: config.strict_record_validation,
305			query_interval,
306			pending_lookups: Vec::new(),
307			in_flight_lookups: HashMap::new(),
308			known_lookups: HashMap::new(),
309			addr_cache,
310			role,
311			metrics,
312			phantom: PhantomData,
313			last_known_records: HashMap::new(),
314		}
315	}
316
317	/// Start the worker
318	pub async fn run(mut self) {
319		loop {
320			self.start_new_lookups();
321
322			futures::select! {
323				// Process incoming events.
324				event = self.dht_event_rx.next().fuse() => {
325					if let Some(event) = event {
326						self.handle_dht_event(event).await;
327					} else {
328						// This point is reached if the network has shut down, at which point there is not
329						// much else to do than to shut down the authority discovery as well.
330						return;
331					}
332				},
333				// Handle messages from [`Service`]. Ignore if sender side is closed.
334				msg = self.from_service.select_next_some() => {
335					self.process_message_from_service(msg);
336				},
337				// Publish own addresses.
338				only_if_changed = future::select(
339					self.publish_interval.next().map(|_| false),
340					self.publish_if_changed_interval.next().map(|_| true)
341				).map(|e| e.factor_first().0).fuse() => {
342					if let Err(e) = self.publish_ext_addresses(only_if_changed).await {
343						error!(
344							target: LOG_TARGET,
345							"Failed to publish external addresses: {}", e,
346						);
347					}
348				},
349				// Request addresses of authorities.
350				_ = self.query_interval.next().fuse() => {
351					if let Err(e) = self.refill_pending_lookups_queue().await {
352						error!(
353							target: LOG_TARGET,
354							"Failed to request addresses of authorities: {}", e,
355						);
356					}
357				},
358			}
359		}
360	}
361
362	fn process_message_from_service(&self, msg: ServicetoWorkerMsg) {
363		match msg {
364			ServicetoWorkerMsg::GetAddressesByAuthorityId(authority, sender) => {
365				let _ = sender.send(
366					self.addr_cache.get_addresses_by_authority_id(&authority).map(Clone::clone),
367				);
368			},
369			ServicetoWorkerMsg::GetAuthorityIdsByPeerId(peer_id, sender) => {
370				let _ = sender
371					.send(self.addr_cache.get_authority_ids_by_peer_id(&peer_id).map(Clone::clone));
372			},
373		}
374	}
375
376	fn addresses_to_publish(&self) -> impl Iterator<Item = Multiaddr> {
377		let local_peer_id = self.network.local_peer_id();
378		let publish_non_global_ips = self.publish_non_global_ips;
379		let addresses = self
380			.public_addresses
381			.clone()
382			.into_iter()
383			.chain(self.network.external_addresses().into_iter().filter_map(|mut address| {
384				// Make sure the reported external address does not contain `/p2p/...` protocol.
385				if let Some(multiaddr::Protocol::P2p(peer_id)) = address.iter().last() {
386					if peer_id != *local_peer_id.as_ref() {
387						error!(
388							target: LOG_TARGET,
389							"Network returned external address '{address}' with peer id \
390							 not matching the local peer id '{local_peer_id}'.",
391						);
392						debug_assert!(false);
393					}
394					address.pop();
395				}
396
397				if self.public_addresses.contains(&address) {
398					// Already added above.
399					None
400				} else {
401					Some(address)
402				}
403			}))
404			.filter(move |address| {
405				if publish_non_global_ips {
406					return true
407				}
408
409				address.iter().all(|protocol| match protocol {
410					// The `ip_network` library is used because its `is_global()` method is stable,
411					// while `is_global()` in the standard library currently isn't.
412					multiaddr::Protocol::Ip4(ip) if !IpNetwork::from(ip).is_global() => false,
413					multiaddr::Protocol::Ip6(ip) if !IpNetwork::from(ip).is_global() => false,
414					_ => true,
415				})
416			})
417			.collect::<Vec<_>>();
418
419		if !addresses.is_empty() {
420			debug!(
421				target: LOG_TARGET,
422				"Publishing authority DHT record peer_id='{local_peer_id}' with addresses='{addresses:?}'",
423			);
424		}
425
426		// The address must include the local peer id.
427		addresses
428			.into_iter()
429			.map(move |a| a.with(multiaddr::Protocol::P2p(*local_peer_id.as_ref())))
430	}
431
432	/// Publish own public addresses.
433	///
434	/// If `only_if_changed` is true, the function has no effect if the list of keys to publish
435	/// is equal to `self.latest_published_keys`.
436	async fn publish_ext_addresses(&mut self, only_if_changed: bool) -> Result<()> {
437		let key_store = match &self.role {
438			Role::PublishAndDiscover(key_store) => key_store,
439			Role::Discover => return Ok(()),
440		};
441
442		let addresses = serialize_addresses(self.addresses_to_publish());
443		if addresses.is_empty() {
444			trace!(
445				target: LOG_TARGET,
446				"No addresses to publish. Skipping publication."
447			);
448
449			self.publish_interval.set_to_start();
450			return Ok(())
451		}
452
453		let keys =
454			Worker::<Client, Block, DhtEventStream>::get_own_public_keys_within_authority_set(
455				key_store.clone(),
456				self.client.as_ref(),
457			)
458			.await?
459			.into_iter()
460			.collect::<HashSet<_>>();
461
462		if only_if_changed {
463			// If the authority keys did not change and the `publish_if_changed_interval` was
464			// triggered then do nothing.
465			if keys == self.latest_published_keys {
466				return Ok(())
467			}
468
469			// We have detected a change in the authority keys, reset the timers to
470			// publish and gather data faster.
471			self.publish_interval.set_to_start();
472			self.query_interval.set_to_start();
473		}
474
475		if let Some(metrics) = &self.metrics {
476			metrics.publish.inc();
477			metrics
478				.amount_addresses_last_published
479				.set(addresses.len().try_into().unwrap_or(std::u64::MAX));
480		}
481
482		let serialized_record = serialize_authority_record(addresses, Some(build_creation_time()))?;
483		let peer_signature = sign_record_with_peer_id(&serialized_record, &self.network)?;
484
485		let keys_vec = keys.iter().cloned().collect::<Vec<_>>();
486
487		let kv_pairs = sign_record_with_authority_ids(
488			serialized_record,
489			Some(peer_signature),
490			key_store.as_ref(),
491			keys_vec,
492		)?;
493
494		self.latest_published_kad_keys = kv_pairs.iter().map(|(k, _)| k.clone()).collect();
495
496		for (key, value) in kv_pairs.into_iter() {
497			self.network.put_value(key, value);
498		}
499
500		self.latest_published_keys = keys;
501
502		Ok(())
503	}
504
505	async fn refill_pending_lookups_queue(&mut self) -> Result<()> {
506		let best_hash = self.client.best_hash().await?;
507
508		let local_keys = match &self.role {
509			Role::PublishAndDiscover(key_store) => key_store
510				.sr25519_public_keys(key_types::AUTHORITY_DISCOVERY)
511				.into_iter()
512				.collect::<HashSet<_>>(),
513			Role::Discover => HashSet::new(),
514		};
515
516		let mut authorities = self
517			.client
518			.authorities(best_hash)
519			.await
520			.map_err(|e| Error::CallingRuntime(e.into()))?
521			.into_iter()
522			.filter(|id| !local_keys.contains(id.as_ref()))
523			.collect::<Vec<_>>();
524
525		self.known_authorities = authorities
526			.clone()
527			.into_iter()
528			.map(|authority| (hash_authority_id(authority.as_ref()), authority))
529			.collect::<HashMap<_, _>>();
530		self.authorities_queried_at = Some(best_hash);
531
532		self.addr_cache.retain_ids(&authorities);
533		let now = Instant::now();
534		self.last_known_records.retain(|k, value| {
535			self.known_authorities.contains_key(k) && !value.record.is_expired(now)
536		});
537
538		authorities.shuffle(&mut thread_rng());
539		self.pending_lookups = authorities;
540		// Ignore all still in-flight lookups. Those that are still in-flight are likely stalled as
541		// query interval ticks are far enough apart for all lookups to succeed.
542		self.in_flight_lookups.clear();
543		self.known_lookups.clear();
544
545		if let Some(metrics) = &self.metrics {
546			metrics
547				.requests_pending
548				.set(self.pending_lookups.len().try_into().unwrap_or(std::u64::MAX));
549		}
550
551		Ok(())
552	}
553
554	fn start_new_lookups(&mut self) {
555		while self.in_flight_lookups.len() < MAX_IN_FLIGHT_LOOKUPS {
556			let authority_id = match self.pending_lookups.pop() {
557				Some(authority) => authority,
558				None => return,
559			};
560			let hash = hash_authority_id(authority_id.as_ref());
561			self.network.get_value(&hash);
562			self.in_flight_lookups.insert(hash, authority_id);
563
564			if let Some(metrics) = &self.metrics {
565				metrics.requests.inc();
566				metrics
567					.requests_pending
568					.set(self.pending_lookups.len().try_into().unwrap_or(std::u64::MAX));
569			}
570		}
571	}
572
573	/// Handle incoming Dht events.
574	async fn handle_dht_event(&mut self, event: DhtEvent) {
575		match event {
576			DhtEvent::ValueFound(v) => {
577				if let Some(metrics) = &self.metrics {
578					metrics.dht_event_received.with_label_values(&["value_found"]).inc();
579				}
580
581				debug!(target: LOG_TARGET, "Value for hash '{:?}' found on Dht.", v.record.key);
582
583				if let Err(e) = self.handle_dht_value_found_event(v) {
584					if let Some(metrics) = &self.metrics {
585						metrics.handle_value_found_event_failure.inc();
586					}
587					debug!(target: LOG_TARGET, "Failed to handle Dht value found event: {}", e);
588				}
589			},
590			DhtEvent::ValueNotFound(hash) => {
591				if let Some(metrics) = &self.metrics {
592					metrics.dht_event_received.with_label_values(&["value_not_found"]).inc();
593				}
594
595				if self.in_flight_lookups.remove(&hash).is_some() {
596					debug!(target: LOG_TARGET, "Value for hash '{:?}' not found on Dht.", hash)
597				} else {
598					debug!(
599						target: LOG_TARGET,
600						"Received 'ValueNotFound' for unexpected hash '{:?}'.", hash
601					)
602				}
603			},
604			DhtEvent::ValuePut(hash) => {
605				if !self.latest_published_kad_keys.contains(&hash) {
606					return;
607				}
608
609				// Fast forward the exponentially increasing interval to the configured maximum. In
610				// case this was the first successful address publishing there is no need for a
611				// timely retry.
612				self.publish_interval.set_to_max();
613
614				if let Some(metrics) = &self.metrics {
615					metrics.dht_event_received.with_label_values(&["value_put"]).inc();
616				}
617
618				debug!(target: LOG_TARGET, "Successfully put hash '{:?}' on Dht.", hash)
619			},
620			DhtEvent::ValuePutFailed(hash) => {
621				if !self.latest_published_kad_keys.contains(&hash) {
622					// Not a value we have published or received multiple times.
623					return;
624				}
625
626				if let Some(metrics) = &self.metrics {
627					metrics.dht_event_received.with_label_values(&["value_put_failed"]).inc();
628				}
629
630				debug!(target: LOG_TARGET, "Failed to put hash '{:?}' on Dht.", hash)
631			},
632			DhtEvent::PutRecordRequest(record_key, record_value, publisher, expires) => {
633				if let Err(e) = self
634					.handle_put_record_requested(record_key, record_value, publisher, expires)
635					.await
636				{
637					debug!(target: LOG_TARGET, "Failed to handle put record request: {}", e)
638				}
639
640				if let Some(metrics) = &self.metrics {
641					metrics.dht_event_received.with_label_values(&["put_record_req"]).inc();
642				}
643			},
644		}
645	}
646
647	async fn handle_put_record_requested(
648		&mut self,
649		record_key: KademliaKey,
650		record_value: Vec<u8>,
651		publisher: Option<PeerId>,
652		expires: Option<std::time::Instant>,
653	) -> Result<()> {
654		let publisher = publisher.ok_or(Error::MissingPublisher)?;
655
656		// Make sure we don't ever work with an outdated set of authorities
657		// and that we do not update known_authorithies too often.
658		let best_hash = self.client.best_hash().await?;
659		if !self.known_authorities.contains_key(&record_key) &&
660			self.authorities_queried_at
661				.map(|authorities_queried_at| authorities_queried_at != best_hash)
662				.unwrap_or(true)
663		{
664			let authorities = self
665				.client
666				.authorities(best_hash)
667				.await
668				.map_err(|e| Error::CallingRuntime(e.into()))?
669				.into_iter()
670				.collect::<Vec<_>>();
671
672			self.known_authorities = authorities
673				.into_iter()
674				.map(|authority| (hash_authority_id(authority.as_ref()), authority))
675				.collect::<HashMap<_, _>>();
676
677			self.authorities_queried_at = Some(best_hash);
678		}
679
680		let authority_id =
681			self.known_authorities.get(&record_key).ok_or(Error::UnknownAuthority)?;
682		let signed_record =
683			Self::check_record_signed_with_authority_id(record_value.as_slice(), authority_id)?;
684		self.check_record_signed_with_network_key(
685			&signed_record.record,
686			signed_record.peer_signature,
687			publisher,
688			authority_id,
689		)?;
690
691		let records_creation_time: u128 =
692			schema::AuthorityRecord::decode(signed_record.record.as_slice())
693				.map_err(Error::DecodingProto)?
694				.creation_time
695				.map(|creation_time| {
696					u128::decode(&mut &creation_time.timestamp[..]).unwrap_or_default()
697				})
698				.unwrap_or_default(); // 0 is a sane default for records that do not have creation time present.
699
700		let current_record_info = self.last_known_records.get(&record_key);
701		// If record creation time is older than the current record creation time,
702		// we don't store it since we want to give higher priority to newer records.
703		if let Some(current_record_info) = current_record_info {
704			if records_creation_time < current_record_info.creation_time {
705				debug!(
706					target: LOG_TARGET,
707					"Skip storing because record creation time {:?} is older than the current known record {:?}",
708					records_creation_time,
709					current_record_info.creation_time
710				);
711				return Ok(());
712			}
713		}
714
715		self.network.store_record(record_key, record_value, Some(publisher), expires);
716		Ok(())
717	}
718
719	fn check_record_signed_with_authority_id(
720		record: &[u8],
721		authority_id: &AuthorityId,
722	) -> Result<schema::SignedAuthorityRecord> {
723		let signed_record: schema::SignedAuthorityRecord =
724			schema::SignedAuthorityRecord::decode(record).map_err(Error::DecodingProto)?;
725
726		let auth_signature = AuthoritySignature::decode(&mut &signed_record.auth_signature[..])
727			.map_err(Error::EncodingDecodingScale)?;
728
729		if !AuthorityPair::verify(&auth_signature, &signed_record.record, &authority_id) {
730			return Err(Error::VerifyingDhtPayload)
731		}
732
733		Ok(signed_record)
734	}
735
736	fn check_record_signed_with_network_key(
737		&self,
738		record: &Vec<u8>,
739		peer_signature: Option<PeerSignature>,
740		remote_peer_id: PeerId,
741		authority_id: &AuthorityId,
742	) -> Result<()> {
743		if let Some(peer_signature) = peer_signature {
744			match self.network.verify(
745				remote_peer_id.into(),
746				&peer_signature.public_key,
747				&peer_signature.signature,
748				record,
749			) {
750				Ok(true) => {},
751				Ok(false) => return Err(Error::VerifyingDhtPayload),
752				Err(error) => return Err(Error::ParsingLibp2pIdentity(error)),
753			}
754		} else if self.strict_record_validation {
755			return Err(Error::MissingPeerIdSignature)
756		} else {
757			debug!(
758				target: LOG_TARGET,
759				"Received unsigned authority discovery record from {}", authority_id
760			);
761		}
762		Ok(())
763	}
764
765	fn handle_dht_value_found_event(&mut self, peer_record: PeerRecord) -> Result<()> {
766		// Ensure `values` is not empty and all its keys equal.
767		let remote_key = peer_record.record.key.clone();
768
769		let authority_id: AuthorityId =
770			if let Some(authority_id) = self.in_flight_lookups.remove(&remote_key) {
771				self.known_lookups.insert(remote_key.clone(), authority_id.clone());
772				authority_id
773			} else if let Some(authority_id) = self.known_lookups.get(&remote_key) {
774				authority_id.clone()
775			} else {
776				return Err(Error::ReceivingUnexpectedRecord);
777			};
778
779		let local_peer_id = self.network.local_peer_id();
780
781		let schema::SignedAuthorityRecord { record, peer_signature, .. } =
782			Self::check_record_signed_with_authority_id(
783				peer_record.record.value.as_slice(),
784				&authority_id,
785			)?;
786
787		let authority_record =
788			schema::AuthorityRecord::decode(record.as_slice()).map_err(Error::DecodingProto)?;
789
790		let records_creation_time: u128 = authority_record
791			.creation_time
792			.as_ref()
793			.map(|creation_time| {
794				u128::decode(&mut &creation_time.timestamp[..]).unwrap_or_default()
795			})
796			.unwrap_or_default(); // 0 is a sane default for records that do not have creation time present.
797
798		let addresses: Vec<Multiaddr> = authority_record
799			.addresses
800			.into_iter()
801			.map(|a| a.try_into())
802			.collect::<std::result::Result<_, _>>()
803			.map_err(Error::ParsingMultiaddress)?;
804
805		let get_peer_id = |a: &Multiaddr| match a.iter().last() {
806			Some(multiaddr::Protocol::P2p(key)) => PeerId::from_multihash(key).ok(),
807			_ => None,
808		};
809
810		// Ignore [`Multiaddr`]s without [`PeerId`] or with own addresses.
811		let addresses: Vec<Multiaddr> = addresses
812			.into_iter()
813			.filter(|a| get_peer_id(&a).filter(|p| *p != local_peer_id).is_some())
814			.collect();
815
816		let remote_peer_id = single(addresses.iter().map(|a| get_peer_id(&a)))
817			.map_err(|_| Error::ReceivingDhtValueFoundEventWithDifferentPeerIds)? // different peer_id in records
818			.flatten()
819			.ok_or(Error::ReceivingDhtValueFoundEventWithNoPeerIds)?; // no records with peer_id in them
820
821		// At this point we know all the valid multiaddresses from the record, know that
822		// each of them belong to the same PeerId, we just need to check if the record is
823		// properly signed by the owner of the PeerId
824		self.check_record_signed_with_network_key(
825			&record,
826			peer_signature,
827			remote_peer_id,
828			&authority_id,
829		)?;
830
831		let remote_addresses: Vec<Multiaddr> =
832			addresses.into_iter().take(MAX_ADDRESSES_PER_AUTHORITY).collect();
833
834		let answering_peer_id = peer_record.peer.map(|peer| peer.into());
835
836		let addr_cache_needs_update = self.handle_new_record(
837			&authority_id,
838			remote_key.clone(),
839			RecordInfo {
840				creation_time: records_creation_time,
841				peers_with_record: answering_peer_id.into_iter().collect(),
842				record: peer_record.record,
843			},
844		);
845
846		if !remote_addresses.is_empty() && addr_cache_needs_update {
847			self.addr_cache.insert(authority_id, remote_addresses);
848			if let Some(metrics) = &self.metrics {
849				metrics
850					.known_authorities_count
851					.set(self.addr_cache.num_authority_ids().try_into().unwrap_or(std::u64::MAX));
852			}
853		}
854		Ok(())
855	}
856
857	// Handles receiving a new DHT record for the authorithy.
858	// Returns true if the record was new, false if the record was older than the current one.
859	fn handle_new_record(
860		&mut self,
861		authority_id: &AuthorityId,
862		kademlia_key: KademliaKey,
863		new_record: RecordInfo,
864	) -> bool {
865		let current_record_info = self
866			.last_known_records
867			.entry(kademlia_key.clone())
868			.or_insert_with(|| new_record.clone());
869
870		if new_record.creation_time > current_record_info.creation_time {
871			let peers_that_need_updating = current_record_info.peers_with_record.clone();
872			self.network.put_record_to(
873				new_record.record.clone(),
874				peers_that_need_updating.clone(),
875				// If this is empty it means we received the answer from our node local
876				// storage, so we need to update that as well.
877				current_record_info.peers_with_record.is_empty(),
878			);
879			debug!(
880					target: LOG_TARGET,
881					"Found a newer record for {:?} new record creation time {:?} old record creation time {:?}",
882					authority_id, new_record.creation_time, current_record_info.creation_time
883			);
884			self.last_known_records.insert(kademlia_key, new_record);
885			return true
886		}
887
888		if new_record.creation_time == current_record_info.creation_time {
889			// Same record just update in case this is a record from old nodes that don't have
890			// timestamp.
891			debug!(
892					target: LOG_TARGET,
893					"Found same record for {:?} record creation time {:?}",
894					authority_id, new_record.creation_time
895			);
896			if current_record_info.peers_with_record.len() + new_record.peers_with_record.len() <=
897				DEFAULT_KADEMLIA_REPLICATION_FACTOR
898			{
899				current_record_info.peers_with_record.extend(new_record.peers_with_record);
900			}
901			return true
902		}
903
904		debug!(
905				target: LOG_TARGET,
906				"Found old record for {:?} received record creation time {:?} current record creation time {:?}",
907				authority_id, new_record.creation_time, current_record_info.creation_time,
908		);
909		self.network.put_record_to(
910			current_record_info.record.clone(),
911			new_record.peers_with_record.clone(),
912			// If this is empty it means we received the answer from our node local
913			// storage, so we need to update that as well.
914			new_record.peers_with_record.is_empty(),
915		);
916		return false
917	}
918
919	/// Retrieve our public keys within the current and next authority set.
920	// A node might have multiple authority discovery keys within its keystore, e.g. an old one and
921	// one for the upcoming session. In addition it could be participating in the current and (/ or)
922	// next authority set with two keys. The function does not return all of the local authority
923	// discovery public keys, but only the ones intersecting with the current or next authority set.
924	async fn get_own_public_keys_within_authority_set(
925		key_store: KeystorePtr,
926		client: &Client,
927	) -> Result<HashSet<AuthorityId>> {
928		let local_pub_keys = key_store
929			.sr25519_public_keys(key_types::AUTHORITY_DISCOVERY)
930			.into_iter()
931			.collect::<HashSet<_>>();
932
933		let best_hash = client.best_hash().await?;
934		let authorities = client
935			.authorities(best_hash)
936			.await
937			.map_err(|e| Error::CallingRuntime(e.into()))?
938			.into_iter()
939			.map(Into::into)
940			.collect::<HashSet<_>>();
941
942		let intersection =
943			local_pub_keys.intersection(&authorities).cloned().map(Into::into).collect();
944
945		Ok(intersection)
946	}
947}
948
949/// NetworkProvider provides [`Worker`] with all necessary hooks into the
950/// underlying Substrate networking. Using this trait abstraction instead of
951/// `sc_network::NetworkService` directly is necessary to unit test [`Worker`].
952pub trait NetworkProvider:
953	NetworkDHTProvider + NetworkStateInfo + NetworkSigner + Send + Sync
954{
955}
956
957impl<T> NetworkProvider for T where
958	T: NetworkDHTProvider + NetworkStateInfo + NetworkSigner + Send + Sync
959{
960}
961
962fn hash_authority_id(id: &[u8]) -> KademliaKey {
963	KademliaKey::new(&Code::Sha2_256.digest(id).digest())
964}
965
966// Makes sure all values are the same and returns it
967//
968// Returns Err(_) if not all values are equal. Returns Ok(None) if there are
969// no values.
970fn single<T>(values: impl IntoIterator<Item = T>) -> std::result::Result<Option<T>, ()>
971where
972	T: PartialEq<T>,
973{
974	values.into_iter().try_fold(None, |acc, item| match acc {
975		None => Ok(Some(item)),
976		Some(ref prev) if *prev != item => Err(()),
977		Some(x) => Ok(Some(x)),
978	})
979}
980
981fn serialize_addresses(addresses: impl Iterator<Item = Multiaddr>) -> Vec<Vec<u8>> {
982	addresses.map(|a| a.to_vec()).collect()
983}
984
985fn build_creation_time() -> schema::TimestampInfo {
986	let creation_time = SystemTime::now()
987		.duration_since(UNIX_EPOCH)
988		.map(|time| time.as_nanos())
989		.unwrap_or_default();
990	schema::TimestampInfo { timestamp: creation_time.encode() }
991}
992
993fn serialize_authority_record(
994	addresses: Vec<Vec<u8>>,
995	creation_time: Option<schema::TimestampInfo>,
996) -> Result<Vec<u8>> {
997	let mut serialized_record = vec![];
998
999	schema::AuthorityRecord { addresses, creation_time }
1000		.encode(&mut serialized_record)
1001		.map_err(Error::EncodingProto)?;
1002	Ok(serialized_record)
1003}
1004
1005fn sign_record_with_peer_id(
1006	serialized_record: &[u8],
1007	network: &impl NetworkSigner,
1008) -> Result<schema::PeerSignature> {
1009	let signature = network
1010		.sign_with_local_identity(serialized_record.to_vec())
1011		.map_err(|e| Error::CannotSign(format!("{} (network packet)", e)))?;
1012	let public_key = signature.public_key.encode_protobuf();
1013	let signature = signature.bytes;
1014	Ok(schema::PeerSignature { signature, public_key })
1015}
1016
1017fn sign_record_with_authority_ids(
1018	serialized_record: Vec<u8>,
1019	peer_signature: Option<schema::PeerSignature>,
1020	key_store: &dyn Keystore,
1021	keys: Vec<AuthorityId>,
1022) -> Result<Vec<(KademliaKey, Vec<u8>)>> {
1023	let mut result = Vec::with_capacity(keys.len());
1024
1025	for key in keys.iter() {
1026		let auth_signature = key_store
1027			.sr25519_sign(key_types::AUTHORITY_DISCOVERY, key.as_ref(), &serialized_record)
1028			.map_err(|e| Error::CannotSign(format!("{}. Key: {:?}", e, key)))?
1029			.ok_or_else(|| {
1030				Error::CannotSign(format!("Could not find key in keystore. Key: {:?}", key))
1031			})?;
1032
1033		// Scale encode
1034		let auth_signature = auth_signature.encode();
1035		let signed_record = schema::SignedAuthorityRecord {
1036			record: serialized_record.clone(),
1037			auth_signature,
1038			peer_signature: peer_signature.clone(),
1039		}
1040		.encode_to_vec();
1041
1042		result.push((hash_authority_id(key.as_slice()), signed_record));
1043	}
1044
1045	Ok(result)
1046}
1047
1048/// Prometheus metrics for a [`Worker`].
1049#[derive(Clone)]
1050pub(crate) struct Metrics {
1051	publish: Counter<U64>,
1052	amount_addresses_last_published: Gauge<U64>,
1053	requests: Counter<U64>,
1054	requests_pending: Gauge<U64>,
1055	dht_event_received: CounterVec<U64>,
1056	handle_value_found_event_failure: Counter<U64>,
1057	known_authorities_count: Gauge<U64>,
1058}
1059
1060impl Metrics {
1061	pub(crate) fn register(registry: &prometheus_endpoint::Registry) -> Result<Self> {
1062		Ok(Self {
1063			publish: register(
1064				Counter::new(
1065					"substrate_authority_discovery_times_published_total",
1066					"Number of times authority discovery has published external addresses.",
1067				)?,
1068				registry,
1069			)?,
1070			amount_addresses_last_published: register(
1071				Gauge::new(
1072					"substrate_authority_discovery_amount_external_addresses_last_published",
1073					"Number of external addresses published when authority discovery last \
1074					 published addresses.",
1075				)?,
1076				registry,
1077			)?,
1078			requests: register(
1079				Counter::new(
1080					"substrate_authority_discovery_authority_addresses_requested_total",
1081					"Number of times authority discovery has requested external addresses of a \
1082					 single authority.",
1083				)?,
1084				registry,
1085			)?,
1086			requests_pending: register(
1087				Gauge::new(
1088					"substrate_authority_discovery_authority_address_requests_pending",
1089					"Number of pending authority address requests.",
1090				)?,
1091				registry,
1092			)?,
1093			dht_event_received: register(
1094				CounterVec::new(
1095					Opts::new(
1096						"substrate_authority_discovery_dht_event_received",
1097						"Number of dht events received by authority discovery.",
1098					),
1099					&["name"],
1100				)?,
1101				registry,
1102			)?,
1103			handle_value_found_event_failure: register(
1104				Counter::new(
1105					"substrate_authority_discovery_handle_value_found_event_failure",
1106					"Number of times handling a dht value found event failed.",
1107				)?,
1108				registry,
1109			)?,
1110			known_authorities_count: register(
1111				Gauge::new(
1112					"substrate_authority_discovery_known_authorities_count",
1113					"Number of authorities known by authority discovery.",
1114				)?,
1115				registry,
1116			)?,
1117		})
1118	}
1119}
1120
1121// Helper functions for unit testing.
1122#[cfg(test)]
1123impl<Block: BlockT, Client, DhtEventStream> Worker<Client, Block, DhtEventStream> {
1124	pub(crate) fn inject_addresses(&mut self, authority: AuthorityId, addresses: Vec<Multiaddr>) {
1125		self.addr_cache.insert(authority, addresses);
1126	}
1127}