referrerpolicy=no-referrer-when-downgrade

cumulus_client_bootnodes/
advertisement.rs

1// Copyright (C) Parity Technologies (UK) Ltd.
2// This file is part of Cumulus.
3// SPDX-License-Identifier: GPL-3.0-or-later WITH Classpath-exception-2.0
4
5// Cumulus is free software: you can redistribute it and/or modify
6// it under the terms of the GNU General Public License as published by
7// the Free Software Foundation, either version 3 of the License, or
8// (at your option) any later version.
9
10// Cumulus is distributed in the hope that it will be useful,
11// but WITHOUT ANY WARRANTY; without even the implied warranty of
12// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
13// GNU General Public License for more details.
14
15// You should have received a copy of the GNU General Public License
16// along with Cumulus.  If not, see <http://www.gnu.org/licenses/>.
17
18//! Parachain bootnode advertisement.
19
20use crate::config::MAX_ADDRESSES;
21use codec::{Compact, CompactRef, Decode, Encode};
22use cumulus_primitives_core::{
23	relay_chain::{Hash as RelayHash, Header as RelayHeader},
24	ParaId,
25};
26use cumulus_relay_chain_interface::{RelayChainInterface, RelayChainResult};
27use futures::{future::Fuse, pin_mut, FutureExt, StreamExt};
28use ip_network::IpNetwork;
29use log::{debug, error, trace, warn};
30use parachains_common::Hash as ParaHash;
31use prost::Message;
32use sc_network::{
33	config::OutgoingResponse,
34	event::{DhtEvent, Event},
35	multiaddr::Protocol,
36	request_responses::IncomingRequest,
37	service::traits::NetworkService,
38	KademliaKey, Multiaddr,
39};
40use sp_consensus_babe::{digests::CompatibleDigestItem, Epoch, Randomness};
41use sp_runtime::traits::Header as _;
42use std::{collections::HashSet, pin::Pin, sync::Arc};
43use tokio::time::Sleep;
44
45/// Log target for this file.
46const LOG_TARGET: &str = "bootnodes::advertisement";
47
48/// Delay before retrying the DHT content provider publish operation.
49const RETRY_DELAY: std::time::Duration = std::time::Duration::from_secs(30);
50
51/// Parachain bootnode advertisement parameters.
52pub struct BootnodeAdvertisementParams {
53	/// Parachain ID.
54	pub para_id: ParaId,
55	/// Relay chain interface.
56	pub relay_chain_interface: Arc<dyn RelayChainInterface>,
57	/// Relay chain node network service.
58	pub relay_chain_network: Arc<dyn NetworkService>,
59	/// Bootnode request-response protocol request receiver.
60	pub request_receiver: async_channel::Receiver<IncomingRequest>,
61	/// Parachain node network service.
62	pub parachain_network: Arc<dyn NetworkService>,
63	/// Whether to advertise non-global IPs.
64	pub advertise_non_global_ips: bool,
65	/// Parachain genesis hash.
66	pub parachain_genesis_hash: ParaHash,
67	/// Parachain fork ID.
68	pub parachain_fork_id: Option<String>,
69	/// Parachain side public addresses.
70	pub public_addresses: Vec<Multiaddr>,
71}
72
73/// Parachain bootnode advertisement service.
74pub struct BootnodeAdvertisement {
75	para_id: ParaId,
76	para_id_scale_compact: Vec<u8>,
77	relay_chain_interface: Arc<dyn RelayChainInterface>,
78	relay_chain_network: Arc<dyn NetworkService>,
79	current_epoch_key: Option<KademliaKey>,
80	next_epoch_key: Option<KademliaKey>,
81	current_epoch_publish_retry: Pin<Box<Fuse<Sleep>>>,
82	next_epoch_publish_retry: Pin<Box<Fuse<Sleep>>>,
83	request_receiver: async_channel::Receiver<IncomingRequest>,
84	parachain_network: Arc<dyn NetworkService>,
85	advertise_non_global_ips: bool,
86	parachain_genesis_hash: ParaHash,
87	parachain_fork_id: Option<String>,
88	public_addresses: Vec<Multiaddr>,
89}
90
91impl BootnodeAdvertisement {
92	/// Create a new bootnode advertisement service.
93	pub fn new(
94		BootnodeAdvertisementParams {
95			para_id,
96			relay_chain_interface,
97			relay_chain_network,
98			request_receiver,
99			parachain_network,
100			advertise_non_global_ips,
101			parachain_genesis_hash,
102			parachain_fork_id,
103			public_addresses,
104		}: BootnodeAdvertisementParams,
105	) -> Self {
106		// Discard `/p2p/<peer_id>` from public addresses on initialization to not generate warnings
107		// on every request for what is an operator mistake.
108		let local_peer_id = parachain_network.local_peer_id();
109		let public_addresses = public_addresses
110			.into_iter()
111			.filter_map(|mut addr| match addr.iter().last() {
112				Some(Protocol::P2p(peer_id)) if &peer_id == local_peer_id.as_ref() => {
113					addr.pop();
114					Some(addr)
115				},
116				Some(Protocol::P2p(_)) => {
117					warn!(
118						target: LOG_TARGET,
119						"Discarding public address containing not our peer ID: {addr}",
120					);
121					None
122				},
123				_ => Some(addr),
124			})
125			.collect();
126
127		Self {
128			para_id,
129			para_id_scale_compact: CompactRef(&para_id).encode(),
130			relay_chain_interface,
131			relay_chain_network,
132			current_epoch_key: None,
133			next_epoch_key: None,
134			current_epoch_publish_retry: Box::pin(Fuse::terminated()),
135			next_epoch_publish_retry: Box::pin(Fuse::terminated()),
136			request_receiver,
137			parachain_network,
138			advertise_non_global_ips,
139			parachain_genesis_hash,
140			parachain_fork_id,
141			public_addresses,
142		}
143	}
144
145	async fn current_epoch(&self, hash: RelayHash) -> RelayChainResult<Epoch> {
146		let res = self
147			.relay_chain_interface
148			.call_runtime_api("BabeApi_current_epoch", hash, &[])
149			.await?;
150		Decode::decode(&mut &*res).map_err(Into::into)
151	}
152
153	async fn next_epoch(&self, hash: RelayHash) -> RelayChainResult<Epoch> {
154		let res = self
155			.relay_chain_interface
156			.call_runtime_api("BabeApi_next_epoch", hash, &[])
157			.await?;
158		Decode::decode(&mut &*res).map_err(Into::into)
159	}
160
161	fn epoch_key(&self, randomness: Randomness) -> KademliaKey {
162		self.para_id_scale_compact
163			.clone()
164			.into_iter()
165			.chain(randomness.into_iter())
166			.collect::<Vec<_>>()
167			.into()
168	}
169
170	async fn current_and_next_epoch_keys(
171		&self,
172		header: RelayHeader,
173	) -> (Option<KademliaKey>, Option<KademliaKey>) {
174		let hash = header.hash();
175		let number = header.number();
176
177		let current_epoch = match self.current_epoch(hash).await {
178			Ok(epoch) => Some(epoch),
179			Err(e) => {
180				warn!(
181					target: LOG_TARGET,
182					"Failed to query current epoch for #{number} {hash:?}: {e}",
183				);
184
185				None
186			},
187		};
188
189		let next_epoch = match self.next_epoch(hash).await {
190			Ok(epoch) => Some(epoch),
191			Err(e) => {
192				warn!(
193					target: LOG_TARGET,
194					"Failed to query next epoch for #{number} {hash:?}: {e}",
195				);
196
197				None
198			},
199		};
200
201		(
202			current_epoch.map(|epoch| self.epoch_key(epoch.randomness)),
203			next_epoch.map(|epoch| self.epoch_key(epoch.randomness)),
204		)
205	}
206
207	async fn handle_import_notification(&mut self, header: RelayHeader) {
208		if let Some(ref old_current_epoch_key) = self.current_epoch_key {
209			// Readvertise on start of new epoch only.
210			let Some(next_epoch_descriptor) =
211				header.digest().convert_first(|v| v.as_next_epoch_descriptor())
212			else {
213				return;
214			};
215
216			let next_epoch_key = self.epoch_key(next_epoch_descriptor.randomness);
217
218			if Some(&next_epoch_key) == self.next_epoch_key.as_ref() {
219				trace!(
220					target: LOG_TARGET,
221					"Next epoch descriptor contains the same randomness as the previous one, \
222					 not considering this as epoch change (switched fork?)",
223				);
224				return;
225			}
226
227			// Epoch changed, cancel retry attempts.
228			self.current_epoch_publish_retry = Box::pin(Fuse::terminated());
229			self.next_epoch_publish_retry = Box::pin(Fuse::terminated());
230
231			debug!(target: LOG_TARGET, "New epoch started, readvertising parachain bootnode.");
232
233			// Stop advertisement of the obsolete key.
234			debug!(
235				target: LOG_TARGET,
236				"Stopping advertisement of bootnode for old current epoch key {}",
237				hex::encode(old_current_epoch_key.as_ref()),
238			);
239			self.relay_chain_network.stop_providing(old_current_epoch_key.clone());
240
241			// Advertise current keys.
242			self.current_epoch_key = self.next_epoch_key.clone();
243			self.next_epoch_key = Some(next_epoch_key);
244
245			if let Some(ref current_epoch_key) = self.current_epoch_key {
246				debug!(
247					target: LOG_TARGET,
248					"Advertising bootnode for current (old next) epoch key {}",
249					hex::encode(current_epoch_key.as_ref()),
250				);
251				self.relay_chain_network.start_providing(current_epoch_key.clone());
252			}
253
254			if let Some(ref next_epoch_key) = self.next_epoch_key {
255				debug!(
256					target: LOG_TARGET,
257					"Advertising bootnode for next epoch key {}",
258					hex::encode(next_epoch_key.as_ref()),
259				);
260				self.relay_chain_network.start_providing(next_epoch_key.clone());
261			}
262		} else {
263			// First advertisement on startup.
264			let (current_epoch_key, next_epoch_key) =
265				self.current_and_next_epoch_keys(header).await;
266			self.current_epoch_key = current_epoch_key.clone();
267			self.next_epoch_key = next_epoch_key.clone();
268
269			if let Some(current_epoch_key) = current_epoch_key {
270				debug!(
271					target: LOG_TARGET,
272					"Initial advertisement of bootnode for current epoch key {}",
273					hex::encode(current_epoch_key.as_ref()),
274				);
275
276				self.relay_chain_network.start_providing(current_epoch_key);
277			} else {
278				warn!(
279					target: LOG_TARGET,
280					"Initial advertisement of bootnode for current epoch failed: no key."
281				);
282			}
283
284			if let Some(next_epoch_key) = next_epoch_key {
285				debug!(
286					target: LOG_TARGET,
287					"Initial advertisement of bootnode for next epoch key {}",
288					hex::encode(next_epoch_key.as_ref()),
289				);
290
291				self.relay_chain_network.start_providing(next_epoch_key);
292			} else {
293				warn!(
294					target: LOG_TARGET,
295					"Initial advertisement of bootnode for next epoch failed: no key."
296				);
297			}
298		}
299	}
300
301	/// The list of parachain side addresses.
302	///
303	/// The addresses are sorted as follows:
304	///  1) public addresses provided by the operator
305	///  2) global listen addresses
306	///  3) discovered external addresses
307	///  4) non-global listen addresses
308	///  5) loopback listen addresses
309	fn paranode_addresses(&self) -> Vec<Multiaddr> {
310		let local_peer_id = self.parachain_network.local_peer_id();
311
312		// Discard `/p2p/<peer_id>` part. `None` if the address contains foreign peer ID.
313		let without_p2p = |mut addr: Multiaddr| match addr.iter().last() {
314			Some(Protocol::P2p(peer_id)) if &peer_id == local_peer_id.as_ref() => {
315				addr.pop();
316				Some(addr)
317			},
318			Some(Protocol::P2p(_)) => {
319				warn!(
320					target: LOG_TARGET,
321					"Ignoring parachain side address containing not our peer ID: {addr}",
322				);
323				None
324			},
325			_ => Some(addr),
326		};
327
328		// Check if the address is global.
329		let is_global = |address: &Multiaddr| {
330			address.iter().all(|protocol| match protocol {
331				// The `ip_network` library is used because its `is_global()` method is stable,
332				// while `is_global()` in the standard library currently isn't.
333				Protocol::Ip4(ip) => IpNetwork::from(ip).is_global(),
334				Protocol::Ip6(ip) => IpNetwork::from(ip).is_global(),
335				_ => true,
336			})
337		};
338
339		// Check if the address is a loopback address.
340		let is_loopback = |address: &Multiaddr| {
341			address.iter().any(|protocol| match protocol {
342				Protocol::Ip4(ip) => IpNetwork::from(ip).is_loopback(),
343				Protocol::Ip6(ip) => IpNetwork::from(ip).is_loopback(),
344				_ => false,
345			})
346		};
347
348		// 1) public addresses provided by the operator
349		let public_addresses = self.public_addresses.clone().into_iter();
350
351		// 2) global listen addresses
352		let global_listen_addresses =
353			self.parachain_network.listen_addresses().into_iter().filter(is_global);
354
355		// 3a) discovered external addresses (global)
356		let global_external_addresses =
357			self.parachain_network.external_addresses().into_iter().filter(is_global);
358
359		// 3b) discovered external addresses (non-global)
360		let non_global_external_addresses = self
361			.parachain_network
362			.external_addresses()
363			.into_iter()
364			.filter(|addr| !is_global(addr));
365
366		// 4) non-global listen addresses
367		let non_global_listen_addresses = self
368			.parachain_network
369			.listen_addresses()
370			.into_iter()
371			.filter(|addr| !is_global(addr) && !is_loopback(addr));
372
373		// 5) loopback listen addresses
374		let loopback_listen_addresses =
375			self.parachain_network.listen_addresses().into_iter().filter(is_loopback);
376
377		let mut seen = HashSet::new();
378
379		public_addresses
380			.chain(global_listen_addresses)
381			.chain(global_external_addresses)
382			.chain(
383				self.advertise_non_global_ips
384					.then_some(
385						non_global_external_addresses
386							.chain(non_global_listen_addresses)
387							.chain(loopback_listen_addresses),
388					)
389					.into_iter()
390					.flatten(),
391			)
392			.filter_map(without_p2p)
393			// Deduplicate addresses.
394			.filter(|addr| seen.insert(addr.clone()))
395			.take(MAX_ADDRESSES)
396			.collect()
397	}
398
399	fn handle_request(&mut self, req: IncomingRequest) {
400		if req.payload == self.para_id_scale_compact {
401			trace!(
402				target: LOG_TARGET,
403				"Serving paranode addresses request from {:?} for parachain ID {}",
404				req.peer,
405				self.para_id,
406			);
407
408			let response = crate::schema::Response {
409				peer_id: self.parachain_network.local_peer_id().to_bytes(),
410				addrs: self.paranode_addresses().iter().map(|a| a.to_vec()).collect(),
411				genesis_hash: self.parachain_genesis_hash.clone().as_bytes().to_vec(),
412				fork_id: self.parachain_fork_id.clone(),
413			};
414
415			let _ = req.pending_response.send(OutgoingResponse {
416				result: Ok(response.encode_to_vec()),
417				reputation_changes: Vec::new(),
418				sent_feedback: None,
419			});
420		} else {
421			let payload = req.payload;
422			match Compact::<ParaId>::decode(&mut &payload[..]) {
423				Ok(para_id) => {
424					trace!(
425						target: LOG_TARGET,
426						"Ignoring request for parachain ID {} != self parachain ID {} from {:?}",
427						para_id.0,
428						self.para_id,
429						req.peer,
430					);
431				},
432				Err(e) => {
433					trace!(
434						target: LOG_TARGET,
435						"Cannot decode parachain ID in a request from {:?}: {e}",
436						req.peer,
437					);
438				},
439			}
440		}
441	}
442
443	fn handle_dht_event(&mut self, event: DhtEvent) {
444		match event {
445			DhtEvent::StartedProviding(key) =>
446				if Some(&key) == self.current_epoch_key.as_ref() {
447					debug!(
448						target: LOG_TARGET,
449						"Successfully published provider for current epoch key {}",
450						hex::encode(key.as_ref()),
451					);
452				} else if Some(&key) == self.next_epoch_key.as_ref() {
453					debug!(
454						target: LOG_TARGET,
455						"Successfully published provider for next epoch key {}",
456						hex::encode(key.as_ref()),
457					);
458				},
459			DhtEvent::StartProvidingFailed(key) =>
460				if Some(&key) == self.current_epoch_key.as_ref() {
461					debug!(
462						target: LOG_TARGET,
463						"Failed to publish provider for current epoch key {}. Retrying in {RETRY_DELAY:?}",
464						hex::encode(key.as_ref()),
465					);
466					self.current_epoch_publish_retry =
467						Box::pin(tokio::time::sleep(RETRY_DELAY).fuse());
468				} else if Some(&key) == self.next_epoch_key.as_ref() {
469					debug!(
470						target: LOG_TARGET,
471						"Failed to publish provider for next epoch key {}. Retrying in {RETRY_DELAY:?}",
472						hex::encode(key.as_ref()),
473					);
474					self.next_epoch_publish_retry =
475						Box::pin(tokio::time::sleep(RETRY_DELAY).fuse());
476				},
477			_ => {},
478		}
479	}
480
481	fn retry_for_current_epoch(&mut self) {
482		if let Some(current_epoch_key) = self.current_epoch_key.clone() {
483			debug!(
484				target: LOG_TARGET,
485				"Retrying advertising bootnode for current epoch key {}",
486				hex::encode(current_epoch_key.as_ref()),
487			);
488			self.relay_chain_network.start_providing(current_epoch_key);
489		} else {
490			error!(
491				target: LOG_TARGET,
492				"Retrying advertising bootnode for current epoch failed: no key. This is a bug."
493			);
494		}
495	}
496
497	fn retry_for_next_epoch(&mut self) {
498		if let Some(next_epoch_key) = self.next_epoch_key.clone() {
499			debug!(
500				target: LOG_TARGET,
501				"Retrying advertising bootnode for next epoch key {}",
502				hex::encode(next_epoch_key.as_ref()),
503			);
504			self.relay_chain_network.start_providing(next_epoch_key);
505		} else {
506			error!(
507				target: LOG_TARGET,
508				"Retrying advertising bootnode for next epoch failed: no key. This is a bug."
509			);
510		}
511	}
512
513	/// Run the bootnode advertisement service.
514	pub async fn run(mut self) -> RelayChainResult<()> {
515		let mut import_notification_stream =
516			self.relay_chain_interface.import_notification_stream().await?;
517		let dht_event_stream = self
518			.relay_chain_network
519			.event_stream("parachain-bootnode-discovery")
520			.filter_map(|e| async move {
521				match e {
522					Event::Dht(e) => Some(e),
523					_ => None,
524				}
525			})
526			.fuse();
527		pin_mut!(dht_event_stream);
528
529		loop {
530			tokio::select! {
531				header = import_notification_stream.next() => match header {
532					Some(header) => self.handle_import_notification(header).await,
533					None => {
534						debug!(
535							target: LOG_TARGET,
536							"Import notification stream terminated, terminating bootnode advertisement."
537						);
538						return Ok(());
539					}
540				},
541				req = self.request_receiver.recv() => match req {
542					Ok(req) => {
543						self.handle_request(req);
544					},
545					Err(_) => {
546						debug!(
547							target: LOG_TARGET,
548							"Paranode request receiver terminated, terminating bootnode advertisement."
549						);
550						return Ok(());
551					}
552				},
553				event = dht_event_stream.select_next_some() => self.handle_dht_event(event),
554				() = &mut self.current_epoch_publish_retry => self.retry_for_current_epoch(),
555				() = &mut self.next_epoch_publish_retry => self.retry_for_next_epoch(),
556			}
557		}
558	}
559}