referrerpolicy=no-referrer-when-downgrade

cumulus_client_bootnodes/
discovery.rs

1// Copyright (C) Parity Technologies (UK) Ltd.
2// This file is part of Cumulus.
3// SPDX-License-Identifier: GPL-3.0-or-later WITH Classpath-exception-2.0
4
5// Cumulus is free software: you can redistribute it and/or modify
6// it under the terms of the GNU General Public License as published by
7// the Free Software Foundation, either version 3 of the License, or
8// (at your option) any later version.
9
10// Cumulus is distributed in the hope that it will be useful,
11// but WITHOUT ANY WARRANTY; without even the implied warranty of
12// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
13// GNU General Public License for more details.
14
15// You should have received a copy of the GNU General Public License
16// along with Cumulus.  If not, see <http://www.gnu.org/licenses/>.
17
18//! Parachain bootnode discovery.
19//!
20//! The discovery works as follows:
21//!  1. We start parachain bootnode content provider discovery on the relay chain DHT in
22//!     [`BootnodeDiscovery::start_discovery`].
23//!  2. We handle every provider discovered in [`BootnodeDiscovery::handle_providers`] and try to
24//!     request the bootnodes from the provider over a `/paranode` request-response protocol.
25//!  3. The request result is handled in [`BootnodeDiscovery::handle_response`]. If the request
26//!     fails this is a sign of the provider addresses not being cached by the remote / dropped by
27//!     the networking library (the case with libp2p). In this case we perform a `FIND_NODE` query
28//!     to get the provider addresses first and repeat the request once we know them.
29//!  4. When the request over the `/paranode` protocol succeeds, we add the bootnode addresses as
30//!     known addresses to the parachain networking.
31//!  5. If the content provider discovery had completed, all `FIND_NODE` queries finished, and all
32//!     requests over the `/paranode` protocol succeded or failed, but we have not found any
33//!     bootnode addresses, we repeat the discovery process after a cooldown period.
34
35use crate::{config::MAX_ADDRESSES, schema::Response};
36use codec::{CompactRef, Decode, Encode};
37use cumulus_primitives_core::{relay_chain::Hash as RelayHash, ParaId};
38use cumulus_relay_chain_interface::{RelayChainError, RelayChainInterface, RelayChainResult};
39use futures::{
40	channel::oneshot,
41	future::{BoxFuture, Fuse, FusedFuture},
42	pin_mut,
43	stream::FuturesUnordered,
44	FutureExt, StreamExt,
45};
46use log::{debug, error, info, trace, warn};
47use parachains_common::Hash as ParaHash;
48use prost::Message;
49use sc_network::{
50	event::{DhtEvent, Event},
51	request_responses::{IfDisconnected, RequestFailure},
52	service::traits::NetworkService,
53	KademliaKey, Multiaddr, PeerId, ProtocolName,
54};
55use sp_consensus_babe::{Epoch, Randomness};
56use std::{collections::HashSet, pin::Pin, sync::Arc, time::Duration};
57use tokio::time::{sleep, Sleep};
58
59/// Log target for this file.
60const LOG_TARGET: &str = "bootnodes::discovery";
61
62/// Delay before retrying discovery in case of failure. Needed to rate-limit the attempts,
63/// especially in small testnets where a discovery attempt can be almost instant.
64const RETRY_DELAY: Duration = Duration::from_secs(30);
65
66/// Parachain bootnode discovery parameters.
67pub struct BootnodeDiscoveryParams {
68	/// Parachain ID.
69	pub para_id: ParaId,
70	/// Parachain node network service.
71	pub parachain_network: Arc<dyn NetworkService>,
72	/// Parachain genesis hash.
73	pub parachain_genesis_hash: ParaHash,
74	/// Parachain fork ID.
75	pub parachain_fork_id: Option<String>,
76	/// Relay chain interface.
77	pub relay_chain_interface: Arc<dyn RelayChainInterface>,
78	/// Relay chain network service.
79	pub relay_chain_network: Arc<dyn NetworkService>,
80	/// `/paranode` protocol name.
81	pub paranode_protocol_name: ProtocolName,
82}
83
84/// Parachain bootnode discovery service.
85pub struct BootnodeDiscovery {
86	para_id_scale_compact: Vec<u8>,
87	parachain_network: Arc<dyn NetworkService>,
88	parachain_genesis_hash: ParaHash,
89	parachain_fork_id: Option<String>,
90	relay_chain_interface: Arc<dyn RelayChainInterface>,
91	relay_chain_network: Arc<dyn NetworkService>,
92	latest_relay_chain_hash: Option<RelayHash>,
93	key_being_discovered: Option<KademliaKey>,
94	paranode_protocol_name: ProtocolName,
95	pending_responses: FuturesUnordered<
96		BoxFuture<
97			'static,
98			(PeerId, Result<Result<(Vec<u8>, ProtocolName), RequestFailure>, oneshot::Canceled>),
99		>,
100	>,
101	direct_requests: HashSet<PeerId>,
102	find_node_queries: HashSet<PeerId>,
103	pending_start_discovery: Pin<Box<Fuse<Sleep>>>,
104	succeeded: bool,
105}
106
107impl BootnodeDiscovery {
108	/// Create a new bootnode discovery service.
109	pub fn new(
110		BootnodeDiscoveryParams {
111			para_id,
112			parachain_network,
113			parachain_genesis_hash,
114			parachain_fork_id,
115			relay_chain_interface,
116			relay_chain_network,
117			paranode_protocol_name,
118		}: BootnodeDiscoveryParams,
119	) -> Self {
120		Self {
121			para_id_scale_compact: CompactRef(&para_id).encode(),
122			parachain_network,
123			parachain_genesis_hash,
124			parachain_fork_id,
125			relay_chain_interface,
126			relay_chain_network,
127			latest_relay_chain_hash: None,
128			key_being_discovered: None,
129			paranode_protocol_name,
130			pending_responses: FuturesUnordered::default(),
131			direct_requests: HashSet::new(),
132			find_node_queries: HashSet::new(),
133			// Trigger the discovery immediately on startup.
134			pending_start_discovery: Box::pin(sleep(Duration::ZERO).fuse()),
135			succeeded: false,
136		}
137	}
138
139	async fn current_epoch(&mut self, hash: RelayHash) -> RelayChainResult<Epoch> {
140		let res = self
141			.relay_chain_interface
142			.call_runtime_api("BabeApi_current_epoch", hash, &[])
143			.await?;
144		Decode::decode(&mut &*res).map_err(Into::into)
145	}
146
147	fn epoch_key(&self, randomness: Randomness) -> KademliaKey {
148		self.para_id_scale_compact
149			.clone()
150			.into_iter()
151			.chain(randomness.into_iter())
152			.collect::<Vec<_>>()
153			.into()
154	}
155
156	/// Start bootnode discovery.
157	async fn start_discovery(&mut self) -> RelayChainResult<()> {
158		let Some(hash) = self.latest_relay_chain_hash else {
159			error!(
160				target: LOG_TARGET,
161				"Failed to start bootnode discovery: no relay chain hash available. This is a bug.",
162			);
163			// This is a graceful panic via the failure of essential task.
164			return Err(RelayChainError::GenericError("no relay chain hash available".to_string()));
165		};
166
167		let current_epoch = self.current_epoch(hash).await?;
168		let current_epoch_key = self.epoch_key(current_epoch.randomness);
169		self.key_being_discovered = Some(current_epoch_key.clone());
170		self.relay_chain_network.get_providers(current_epoch_key.clone());
171
172		debug!(
173			target: LOG_TARGET,
174			"Started discovery of parachain bootnode providers for current epoch key {}",
175			hex::encode(current_epoch_key),
176		);
177
178		Ok(())
179	}
180
181	/// Schedule bootnode discovery if needed. Returns `false` if the discovery event loop should be
182	/// terminated.
183	fn maybe_retry_discovery(&mut self) -> bool {
184		let discovery_in_progress = self.key_being_discovered.is_some() ||
185			!self.pending_responses.is_empty() ||
186			!self.find_node_queries.is_empty();
187		let discovery_scheduled = !self.pending_start_discovery.is_terminated();
188
189		if discovery_in_progress || discovery_scheduled {
190			// Discovery is already in progress or scheduled, just continue the event loop.
191			true
192		} else {
193			if self.succeeded {
194				// No need to start discovery again if the previous attempt succeeded.
195				info!(
196					target: LOG_TARGET,
197					"Parachain bootnode discovery on the relay chain DHT succeeded",
198				);
199
200				false
201			} else {
202				debug!(
203					target: LOG_TARGET,
204					"Retrying parachain bootnode discovery on the relay chain DHT in {RETRY_DELAY:?}",
205				);
206				self.pending_start_discovery = Box::pin(sleep(RETRY_DELAY).fuse());
207
208				true
209			}
210		}
211	}
212
213	fn request_bootnode(&mut self, peer_id: PeerId) {
214		trace!(
215			target: LOG_TARGET,
216			"Requesting parachain bootnode from the relay chain {peer_id:?}",
217		);
218
219		let (tx, rx) = oneshot::channel();
220
221		self.relay_chain_network.start_request(
222			peer_id,
223			self.paranode_protocol_name.clone(),
224			self.para_id_scale_compact.clone(),
225			None,
226			tx,
227			IfDisconnected::TryConnect,
228		);
229
230		self.pending_responses.push(async move { (peer_id, rx.await) }.boxed());
231	}
232
233	fn handle_providers(&mut self, providers: Vec<PeerId>) {
234		debug!(
235			target: LOG_TARGET,
236			"Found parachain bootnode providers on the relay chain: {providers:?}",
237		);
238
239		for peer_id in providers {
240			if peer_id == self.relay_chain_network.local_peer_id() {
241				continue;
242			}
243
244			// libp2p may yield the same provider multiple times; skip if we alredy queried it.
245			if self.direct_requests.contains(&peer_id) || self.find_node_queries.contains(&peer_id)
246			{
247				continue;
248			}
249
250			// Directly request a bootnode from the peer without performing a `FIND_NODE` query
251			// first. With litep2p backend this will likely succeed, because cached provider
252			// addresses are automatically added to the transport manager known addresses list.
253			//
254			// With libp2p backend, or if the remote did not return the cached addresses of the
255			// provider, the request will fail and we will perform a `FIND_NODE` query.
256			self.direct_requests.insert(peer_id);
257			self.request_bootnode(peer_id);
258		}
259	}
260
261	fn handle_response(
262		&mut self,
263		peer_id: PeerId,
264		res: Result<Result<(Vec<u8>, ProtocolName), RequestFailure>, oneshot::Canceled>,
265	) {
266		let direct_request = self.direct_requests.remove(&peer_id);
267
268		let response = match res {
269			Ok(Ok((payload, _))) => match Response::decode(payload.as_slice()) {
270				Ok(response) => response,
271				Err(e) => {
272					warn!(
273						target: LOG_TARGET,
274						"Failed to decode parachain bootnode response from {peer_id:?}: {e}",
275					);
276					return;
277				},
278			},
279			Ok(Err(e)) => {
280				if direct_request {
281					// It only makes sense to try to find the node on the DHT in case of "address
282					// not available" error. Unfortunately, libp2p and litep2p backends report such
283					// errors differently, and also some network library could break the error
284					// reporting in the future. So, to be on the safe side and avoid subtle bugs,
285					// we always try to find the node on the DHT in case of the request failure.
286					debug!(
287						target: LOG_TARGET,
288						"Failed to directly query parachain bootnode from {peer_id:?}: {e}. \
289						 Starting FIND_NODE query on the DHT",
290					);
291					self.find_node_queries.insert(peer_id);
292					self.relay_chain_network.find_closest_peers(peer_id);
293				} else {
294					debug!(
295						target: LOG_TARGET,
296						"Failed to query parachain bootnode from {peer_id:?} after finding
297						 the node addresses on the DHT: {e}",
298					);
299				}
300				return;
301			},
302			Err(_) => {
303				debug!(
304					target: LOG_TARGET,
305					"Parachain bootnode request to {peer_id:?} canceled. \
306					 The node is likely terminating.",
307				);
308				return;
309			},
310		};
311
312		match (response.genesis_hash, response.fork_id) {
313			(genesis_hash, fork_id)
314				if genesis_hash == self.parachain_genesis_hash.as_ref() &&
315					fork_id == self.parachain_fork_id => {},
316			(genesis_hash, fork_id) => {
317				warn!(
318					target: LOG_TARGET,
319					"Received invalid parachain bootnode response from {peer_id:?}: \
320					 genesis hash {}, fork ID {:?} don't match expected genesis hash {}, fork ID {:?}",
321					hex::encode(genesis_hash),
322					fork_id,
323					hex::encode(self.parachain_genesis_hash),
324					self.parachain_fork_id,
325				);
326				return;
327			},
328		}
329
330		let paranode_peer_id = match PeerId::from_bytes(response.peer_id.as_slice()) {
331			Ok(peer_id) => peer_id,
332			Err(e) => {
333				warn!(
334					target: LOG_TARGET,
335					"Failed to decode parachain peer ID in response from {peer_id:?}: {e}",
336				);
337				return;
338			},
339		};
340
341		if paranode_peer_id == self.parachain_network.local_peer_id() {
342			warn!(
343				target: LOG_TARGET,
344				"Received own parachain node peer ID in bootnode response from {peer_id:?}. \
345				 This should not happen as we don't request parachain bootnodes from self.",
346			);
347			return;
348		}
349
350		let paranode_addresses = response
351			.addrs
352			.into_iter()
353			.map(Multiaddr::try_from)
354			.take(MAX_ADDRESSES)
355			.collect::<Result<Vec<_>, _>>();
356		let paranode_addresses = match paranode_addresses {
357			Ok(paranode_addresses) => paranode_addresses,
358			Err(e) => {
359				warn!(
360					target: LOG_TARGET,
361					"Failed to decode parachain node addresses in response from {peer_id:?}: {e}",
362				);
363				return;
364			},
365		};
366
367		debug!(
368			target: LOG_TARGET,
369			"Discovered parachain bootnode {paranode_peer_id:?} with addresses {paranode_addresses:?}",
370		);
371
372		paranode_addresses.into_iter().for_each(|addr| {
373			self.parachain_network.add_known_address(paranode_peer_id, addr);
374			self.succeeded = true;
375		});
376	}
377
378	fn handle_dht_event(&mut self, event: DhtEvent) {
379		match event {
380			DhtEvent::ProvidersFound(key, providers)
381				// libp2p generates empty events, so also check if `providers` are not empty.
382				if Some(key.clone()) == self.key_being_discovered && !providers.is_empty() =>
383					self.handle_providers(providers),
384			DhtEvent::NoMoreProviders(key) if Some(key.clone()) == self.key_being_discovered => {
385				debug!(
386					target: LOG_TARGET,
387					"Parachain bootnode providers discovery finished for key {}",
388					hex::encode(key),
389				);
390				self.key_being_discovered = None;
391			},
392			DhtEvent::ProvidersNotFound(key) if Some(key.clone()) == self.key_being_discovered => {
393				debug!(
394					target: LOG_TARGET,
395					"Parachain bootnode providers not found for key {}",
396					hex::encode(key),
397				);
398				self.key_being_discovered = None;
399			},
400			DhtEvent::ClosestPeersFound(peer_id, peers)	if self.find_node_queries.remove(&peer_id) => {
401				if let Some((_, addrs)) = peers
402					.into_iter()
403					.find(|(peer, addrs)| peer == &peer_id && !addrs.is_empty())
404				{
405					trace!(
406						target: LOG_TARGET,
407						"Found addresses on the DHT for parachain bootnode provider {peer_id:?}: {addrs:?}",
408					);
409					for address in addrs {
410						self.relay_chain_network.add_known_address(peer_id, address);
411					}
412					self.request_bootnode(peer_id);
413				} else {
414					debug!(
415						target: LOG_TARGET,
416						"Failed to find addresses on the DHT for parachain bootnode provider {peer_id:?}",
417					);
418				}
419			},
420			DhtEvent::ClosestPeersNotFound(peer_id) if self.find_node_queries.remove(&peer_id) => {
421				debug!(
422					target: LOG_TARGET,
423					"Failed to find addresses on the DHT for parachain bootnode provider {peer_id:?}",
424				);
425			},
426			_ => {},
427		}
428	}
429
430	/// Run the bootnode discovery service.
431	pub async fn run(mut self) -> RelayChainResult<()> {
432		let mut import_notification_stream =
433			self.relay_chain_interface.import_notification_stream().await?.fuse();
434		let dht_event_stream = self
435			.relay_chain_network
436			.event_stream("parachain-bootnode-discovery")
437			.filter_map(|e| async move {
438				match e {
439					Event::Dht(e) => Some(e),
440					_ => None,
441				}
442			})
443			.fuse();
444		pin_mut!(dht_event_stream);
445
446		// Make sure the relay chain hash is always available before starting the discovery.
447		let header = import_notification_stream.select_next_some().await;
448		self.latest_relay_chain_hash = Some(header.hash());
449
450		loop {
451			if !self.maybe_retry_discovery() {
452				return Ok(());
453			}
454
455			tokio::select! {
456				_ = &mut self.pending_start_discovery => {
457					self.start_discovery().await?;
458				},
459				header = import_notification_stream.select_next_some() => {
460					self.latest_relay_chain_hash = Some(header.hash());
461				},
462				event = dht_event_stream.select_next_some() => self.handle_dht_event(event),
463				(peer_id, res) = self.pending_responses.select_next_some(),
464					if !self.pending_responses.is_empty() =>
465						self.handle_response(peer_id, res),
466			}
467		}
468	}
469}