1use crate::{config::MAX_ADDRESSES, schema::Response};
36use codec::{CompactRef, Decode, Encode};
37use cumulus_primitives_core::{relay_chain::Hash as RelayHash, ParaId};
38use cumulus_relay_chain_interface::{RelayChainError, RelayChainInterface, RelayChainResult};
39use futures::{
40 channel::oneshot,
41 future::{BoxFuture, Fuse, FusedFuture},
42 pin_mut,
43 stream::FuturesUnordered,
44 FutureExt, StreamExt,
45};
46use log::{debug, error, info, trace, warn};
47use parachains_common::Hash as ParaHash;
48use prost::Message;
49use sc_network::{
50 event::{DhtEvent, Event},
51 request_responses::{IfDisconnected, RequestFailure},
52 service::traits::NetworkService,
53 KademliaKey, Multiaddr, PeerId, ProtocolName,
54};
55use sp_consensus_babe::{Epoch, Randomness};
56use std::{collections::HashSet, pin::Pin, sync::Arc, time::Duration};
57use tokio::time::{sleep, Sleep};
58
59const LOG_TARGET: &str = "bootnodes::discovery";
61
62const RETRY_DELAY: Duration = Duration::from_secs(30);
65
66pub struct BootnodeDiscoveryParams {
68 pub para_id: ParaId,
70 pub parachain_network: Arc<dyn NetworkService>,
72 pub parachain_genesis_hash: ParaHash,
74 pub parachain_fork_id: Option<String>,
76 pub relay_chain_interface: Arc<dyn RelayChainInterface>,
78 pub relay_chain_network: Arc<dyn NetworkService>,
80 pub paranode_protocol_name: ProtocolName,
82}
83
84pub struct BootnodeDiscovery {
86 para_id_scale_compact: Vec<u8>,
87 parachain_network: Arc<dyn NetworkService>,
88 parachain_genesis_hash: ParaHash,
89 parachain_fork_id: Option<String>,
90 relay_chain_interface: Arc<dyn RelayChainInterface>,
91 relay_chain_network: Arc<dyn NetworkService>,
92 latest_relay_chain_hash: Option<RelayHash>,
93 key_being_discovered: Option<KademliaKey>,
94 paranode_protocol_name: ProtocolName,
95 pending_responses: FuturesUnordered<
96 BoxFuture<
97 'static,
98 (PeerId, Result<Result<(Vec<u8>, ProtocolName), RequestFailure>, oneshot::Canceled>),
99 >,
100 >,
101 direct_requests: HashSet<PeerId>,
102 find_node_queries: HashSet<PeerId>,
103 pending_start_discovery: Pin<Box<Fuse<Sleep>>>,
104 succeeded: bool,
105}
106
107impl BootnodeDiscovery {
108 pub fn new(
110 BootnodeDiscoveryParams {
111 para_id,
112 parachain_network,
113 parachain_genesis_hash,
114 parachain_fork_id,
115 relay_chain_interface,
116 relay_chain_network,
117 paranode_protocol_name,
118 }: BootnodeDiscoveryParams,
119 ) -> Self {
120 Self {
121 para_id_scale_compact: CompactRef(¶_id).encode(),
122 parachain_network,
123 parachain_genesis_hash,
124 parachain_fork_id,
125 relay_chain_interface,
126 relay_chain_network,
127 latest_relay_chain_hash: None,
128 key_being_discovered: None,
129 paranode_protocol_name,
130 pending_responses: FuturesUnordered::default(),
131 direct_requests: HashSet::new(),
132 find_node_queries: HashSet::new(),
133 pending_start_discovery: Box::pin(sleep(Duration::ZERO).fuse()),
135 succeeded: false,
136 }
137 }
138
139 async fn current_epoch(&mut self, hash: RelayHash) -> RelayChainResult<Epoch> {
140 let res = self
141 .relay_chain_interface
142 .call_runtime_api("BabeApi_current_epoch", hash, &[])
143 .await?;
144 Decode::decode(&mut &*res).map_err(Into::into)
145 }
146
147 fn epoch_key(&self, randomness: Randomness) -> KademliaKey {
148 self.para_id_scale_compact
149 .clone()
150 .into_iter()
151 .chain(randomness.into_iter())
152 .collect::<Vec<_>>()
153 .into()
154 }
155
156 async fn start_discovery(&mut self) -> RelayChainResult<()> {
158 let Some(hash) = self.latest_relay_chain_hash else {
159 error!(
160 target: LOG_TARGET,
161 "Failed to start bootnode discovery: no relay chain hash available. This is a bug.",
162 );
163 return Err(RelayChainError::GenericError("no relay chain hash available".to_string()));
165 };
166
167 let current_epoch = self.current_epoch(hash).await?;
168 let current_epoch_key = self.epoch_key(current_epoch.randomness);
169 self.key_being_discovered = Some(current_epoch_key.clone());
170 self.relay_chain_network.get_providers(current_epoch_key.clone());
171
172 debug!(
173 target: LOG_TARGET,
174 "Started discovery of parachain bootnode providers for current epoch key {}",
175 hex::encode(current_epoch_key),
176 );
177
178 Ok(())
179 }
180
181 fn maybe_retry_discovery(&mut self) -> bool {
184 let discovery_in_progress = self.key_being_discovered.is_some() ||
185 !self.pending_responses.is_empty() ||
186 !self.find_node_queries.is_empty();
187 let discovery_scheduled = !self.pending_start_discovery.is_terminated();
188
189 if discovery_in_progress || discovery_scheduled {
190 true
192 } else {
193 if self.succeeded {
194 info!(
196 target: LOG_TARGET,
197 "Parachain bootnode discovery on the relay chain DHT succeeded",
198 );
199
200 false
201 } else {
202 debug!(
203 target: LOG_TARGET,
204 "Retrying parachain bootnode discovery on the relay chain DHT in {RETRY_DELAY:?}",
205 );
206 self.pending_start_discovery = Box::pin(sleep(RETRY_DELAY).fuse());
207
208 true
209 }
210 }
211 }
212
213 fn request_bootnode(&mut self, peer_id: PeerId) {
214 trace!(
215 target: LOG_TARGET,
216 "Requesting parachain bootnode from the relay chain {peer_id:?}",
217 );
218
219 let (tx, rx) = oneshot::channel();
220
221 self.relay_chain_network.start_request(
222 peer_id,
223 self.paranode_protocol_name.clone(),
224 self.para_id_scale_compact.clone(),
225 None,
226 tx,
227 IfDisconnected::TryConnect,
228 );
229
230 self.pending_responses.push(async move { (peer_id, rx.await) }.boxed());
231 }
232
233 fn handle_providers(&mut self, providers: Vec<PeerId>) {
234 debug!(
235 target: LOG_TARGET,
236 "Found parachain bootnode providers on the relay chain: {providers:?}",
237 );
238
239 for peer_id in providers {
240 if peer_id == self.relay_chain_network.local_peer_id() {
241 continue;
242 }
243
244 if self.direct_requests.contains(&peer_id) || self.find_node_queries.contains(&peer_id)
246 {
247 continue;
248 }
249
250 self.direct_requests.insert(peer_id);
257 self.request_bootnode(peer_id);
258 }
259 }
260
261 fn handle_response(
262 &mut self,
263 peer_id: PeerId,
264 res: Result<Result<(Vec<u8>, ProtocolName), RequestFailure>, oneshot::Canceled>,
265 ) {
266 let direct_request = self.direct_requests.remove(&peer_id);
267
268 let response = match res {
269 Ok(Ok((payload, _))) => match Response::decode(payload.as_slice()) {
270 Ok(response) => response,
271 Err(e) => {
272 warn!(
273 target: LOG_TARGET,
274 "Failed to decode parachain bootnode response from {peer_id:?}: {e}",
275 );
276 return;
277 },
278 },
279 Ok(Err(e)) => {
280 if direct_request {
281 debug!(
287 target: LOG_TARGET,
288 "Failed to directly query parachain bootnode from {peer_id:?}: {e}. \
289 Starting FIND_NODE query on the DHT",
290 );
291 self.find_node_queries.insert(peer_id);
292 self.relay_chain_network.find_closest_peers(peer_id);
293 } else {
294 debug!(
295 target: LOG_TARGET,
296 "Failed to query parachain bootnode from {peer_id:?} after finding
297 the node addresses on the DHT: {e}",
298 );
299 }
300 return;
301 },
302 Err(_) => {
303 debug!(
304 target: LOG_TARGET,
305 "Parachain bootnode request to {peer_id:?} canceled. \
306 The node is likely terminating.",
307 );
308 return;
309 },
310 };
311
312 match (response.genesis_hash, response.fork_id) {
313 (genesis_hash, fork_id)
314 if genesis_hash == self.parachain_genesis_hash.as_ref() &&
315 fork_id == self.parachain_fork_id => {},
316 (genesis_hash, fork_id) => {
317 warn!(
318 target: LOG_TARGET,
319 "Received invalid parachain bootnode response from {peer_id:?}: \
320 genesis hash {}, fork ID {:?} don't match expected genesis hash {}, fork ID {:?}",
321 hex::encode(genesis_hash),
322 fork_id,
323 hex::encode(self.parachain_genesis_hash),
324 self.parachain_fork_id,
325 );
326 return;
327 },
328 }
329
330 let paranode_peer_id = match PeerId::from_bytes(response.peer_id.as_slice()) {
331 Ok(peer_id) => peer_id,
332 Err(e) => {
333 warn!(
334 target: LOG_TARGET,
335 "Failed to decode parachain peer ID in response from {peer_id:?}: {e}",
336 );
337 return;
338 },
339 };
340
341 if paranode_peer_id == self.parachain_network.local_peer_id() {
342 warn!(
343 target: LOG_TARGET,
344 "Received own parachain node peer ID in bootnode response from {peer_id:?}. \
345 This should not happen as we don't request parachain bootnodes from self.",
346 );
347 return;
348 }
349
350 let paranode_addresses = response
351 .addrs
352 .into_iter()
353 .map(Multiaddr::try_from)
354 .take(MAX_ADDRESSES)
355 .collect::<Result<Vec<_>, _>>();
356 let paranode_addresses = match paranode_addresses {
357 Ok(paranode_addresses) => paranode_addresses,
358 Err(e) => {
359 warn!(
360 target: LOG_TARGET,
361 "Failed to decode parachain node addresses in response from {peer_id:?}: {e}",
362 );
363 return;
364 },
365 };
366
367 debug!(
368 target: LOG_TARGET,
369 "Discovered parachain bootnode {paranode_peer_id:?} with addresses {paranode_addresses:?}",
370 );
371
372 paranode_addresses.into_iter().for_each(|addr| {
373 self.parachain_network.add_known_address(paranode_peer_id, addr);
374 self.succeeded = true;
375 });
376 }
377
378 fn handle_dht_event(&mut self, event: DhtEvent) {
379 match event {
380 DhtEvent::ProvidersFound(key, providers)
381 if Some(key.clone()) == self.key_being_discovered && !providers.is_empty() =>
383 self.handle_providers(providers),
384 DhtEvent::NoMoreProviders(key) if Some(key.clone()) == self.key_being_discovered => {
385 debug!(
386 target: LOG_TARGET,
387 "Parachain bootnode providers discovery finished for key {}",
388 hex::encode(key),
389 );
390 self.key_being_discovered = None;
391 },
392 DhtEvent::ProvidersNotFound(key) if Some(key.clone()) == self.key_being_discovered => {
393 debug!(
394 target: LOG_TARGET,
395 "Parachain bootnode providers not found for key {}",
396 hex::encode(key),
397 );
398 self.key_being_discovered = None;
399 },
400 DhtEvent::ClosestPeersFound(peer_id, peers) if self.find_node_queries.remove(&peer_id) => {
401 if let Some((_, addrs)) = peers
402 .into_iter()
403 .find(|(peer, addrs)| peer == &peer_id && !addrs.is_empty())
404 {
405 trace!(
406 target: LOG_TARGET,
407 "Found addresses on the DHT for parachain bootnode provider {peer_id:?}: {addrs:?}",
408 );
409 for address in addrs {
410 self.relay_chain_network.add_known_address(peer_id, address);
411 }
412 self.request_bootnode(peer_id);
413 } else {
414 debug!(
415 target: LOG_TARGET,
416 "Failed to find addresses on the DHT for parachain bootnode provider {peer_id:?}",
417 );
418 }
419 },
420 DhtEvent::ClosestPeersNotFound(peer_id) if self.find_node_queries.remove(&peer_id) => {
421 debug!(
422 target: LOG_TARGET,
423 "Failed to find addresses on the DHT for parachain bootnode provider {peer_id:?}",
424 );
425 },
426 _ => {},
427 }
428 }
429
430 pub async fn run(mut self) -> RelayChainResult<()> {
432 let mut import_notification_stream =
433 self.relay_chain_interface.import_notification_stream().await?.fuse();
434 let dht_event_stream = self
435 .relay_chain_network
436 .event_stream("parachain-bootnode-discovery")
437 .filter_map(|e| async move {
438 match e {
439 Event::Dht(e) => Some(e),
440 _ => None,
441 }
442 })
443 .fuse();
444 pin_mut!(dht_event_stream);
445
446 let header = import_notification_stream.select_next_some().await;
448 self.latest_relay_chain_hash = Some(header.hash());
449
450 loop {
451 if !self.maybe_retry_discovery() {
452 return Ok(());
453 }
454
455 tokio::select! {
456 _ = &mut self.pending_start_discovery => {
457 self.start_discovery().await?;
458 },
459 header = import_notification_stream.select_next_some() => {
460 self.latest_relay_chain_hash = Some(header.hash());
461 },
462 event = dht_event_stream.select_next_some() => self.handle_dht_event(event),
463 (peer_id, res) = self.pending_responses.select_next_some(),
464 if !self.pending_responses.is_empty() =>
465 self.handle_response(peer_id, res),
466 }
467 }
468 }
469}