1use crate::config::MAX_ADDRESSES;
21use codec::{Compact, CompactRef, Decode, Encode};
22use cumulus_primitives_core::{
23 relay_chain::{Hash as RelayHash, Header as RelayHeader},
24 ParaId,
25};
26use cumulus_relay_chain_interface::{RelayChainInterface, RelayChainResult};
27use futures::{future::Fuse, pin_mut, FutureExt, StreamExt};
28use ip_network::IpNetwork;
29use log::{debug, error, trace, warn};
30use parachains_common::Hash as ParaHash;
31use prost::Message;
32use sc_network::{
33 config::OutgoingResponse,
34 event::{DhtEvent, Event},
35 multiaddr::Protocol,
36 request_responses::IncomingRequest,
37 service::traits::NetworkService,
38 KademliaKey, Multiaddr,
39};
40use sp_consensus_babe::{digests::CompatibleDigestItem, Epoch, Randomness};
41use sp_runtime::traits::Header as _;
42use std::{collections::HashSet, pin::Pin, sync::Arc};
43use tokio::time::Sleep;
44
45const LOG_TARGET: &str = "bootnodes::advertisement";
47
48const RETRY_DELAY: std::time::Duration = std::time::Duration::from_secs(30);
50
51pub struct BootnodeAdvertisementParams {
53 pub para_id: ParaId,
55 pub relay_chain_interface: Arc<dyn RelayChainInterface>,
57 pub relay_chain_network: Arc<dyn NetworkService>,
59 pub request_receiver: async_channel::Receiver<IncomingRequest>,
61 pub parachain_network: Arc<dyn NetworkService>,
63 pub advertise_non_global_ips: bool,
65 pub parachain_genesis_hash: ParaHash,
67 pub parachain_fork_id: Option<String>,
69 pub public_addresses: Vec<Multiaddr>,
71}
72
73pub struct BootnodeAdvertisement {
75 para_id: ParaId,
76 para_id_scale_compact: Vec<u8>,
77 relay_chain_interface: Arc<dyn RelayChainInterface>,
78 relay_chain_network: Arc<dyn NetworkService>,
79 current_epoch_key: Option<KademliaKey>,
80 next_epoch_key: Option<KademliaKey>,
81 current_epoch_publish_retry: Pin<Box<Fuse<Sleep>>>,
82 next_epoch_publish_retry: Pin<Box<Fuse<Sleep>>>,
83 request_receiver: async_channel::Receiver<IncomingRequest>,
84 parachain_network: Arc<dyn NetworkService>,
85 advertise_non_global_ips: bool,
86 parachain_genesis_hash: ParaHash,
87 parachain_fork_id: Option<String>,
88 public_addresses: Vec<Multiaddr>,
89}
90
91impl BootnodeAdvertisement {
92 pub fn new(
94 BootnodeAdvertisementParams {
95 para_id,
96 relay_chain_interface,
97 relay_chain_network,
98 request_receiver,
99 parachain_network,
100 advertise_non_global_ips,
101 parachain_genesis_hash,
102 parachain_fork_id,
103 public_addresses,
104 }: BootnodeAdvertisementParams,
105 ) -> Self {
106 let local_peer_id = parachain_network.local_peer_id();
109 let public_addresses = public_addresses
110 .into_iter()
111 .filter_map(|mut addr| match addr.iter().last() {
112 Some(Protocol::P2p(peer_id)) if &peer_id == local_peer_id.as_ref() => {
113 addr.pop();
114 Some(addr)
115 },
116 Some(Protocol::P2p(_)) => {
117 warn!(
118 target: LOG_TARGET,
119 "Discarding public address containing not our peer ID: {addr}",
120 );
121 None
122 },
123 _ => Some(addr),
124 })
125 .collect();
126
127 Self {
128 para_id,
129 para_id_scale_compact: CompactRef(¶_id).encode(),
130 relay_chain_interface,
131 relay_chain_network,
132 current_epoch_key: None,
133 next_epoch_key: None,
134 current_epoch_publish_retry: Box::pin(Fuse::terminated()),
135 next_epoch_publish_retry: Box::pin(Fuse::terminated()),
136 request_receiver,
137 parachain_network,
138 advertise_non_global_ips,
139 parachain_genesis_hash,
140 parachain_fork_id,
141 public_addresses,
142 }
143 }
144
145 async fn current_epoch(&self, hash: RelayHash) -> RelayChainResult<Epoch> {
146 let res = self
147 .relay_chain_interface
148 .call_runtime_api("BabeApi_current_epoch", hash, &[])
149 .await?;
150 Decode::decode(&mut &*res).map_err(Into::into)
151 }
152
153 async fn next_epoch(&self, hash: RelayHash) -> RelayChainResult<Epoch> {
154 let res = self
155 .relay_chain_interface
156 .call_runtime_api("BabeApi_next_epoch", hash, &[])
157 .await?;
158 Decode::decode(&mut &*res).map_err(Into::into)
159 }
160
161 fn epoch_key(&self, randomness: Randomness) -> KademliaKey {
162 self.para_id_scale_compact
163 .clone()
164 .into_iter()
165 .chain(randomness.into_iter())
166 .collect::<Vec<_>>()
167 .into()
168 }
169
170 async fn current_and_next_epoch_keys(
171 &self,
172 header: RelayHeader,
173 ) -> (Option<KademliaKey>, Option<KademliaKey>) {
174 let hash = header.hash();
175 let number = header.number();
176
177 let current_epoch = match self.current_epoch(hash).await {
178 Ok(epoch) => Some(epoch),
179 Err(e) => {
180 warn!(
181 target: LOG_TARGET,
182 "Failed to query current epoch for #{number} {hash:?}: {e}",
183 );
184
185 None
186 },
187 };
188
189 let next_epoch = match self.next_epoch(hash).await {
190 Ok(epoch) => Some(epoch),
191 Err(e) => {
192 warn!(
193 target: LOG_TARGET,
194 "Failed to query next epoch for #{number} {hash:?}: {e}",
195 );
196
197 None
198 },
199 };
200
201 (
202 current_epoch.map(|epoch| self.epoch_key(epoch.randomness)),
203 next_epoch.map(|epoch| self.epoch_key(epoch.randomness)),
204 )
205 }
206
207 async fn handle_import_notification(&mut self, header: RelayHeader) {
208 if let Some(ref old_current_epoch_key) = self.current_epoch_key {
209 let Some(next_epoch_descriptor) =
211 header.digest().convert_first(|v| v.as_next_epoch_descriptor())
212 else {
213 return;
214 };
215
216 let next_epoch_key = self.epoch_key(next_epoch_descriptor.randomness);
217
218 if Some(&next_epoch_key) == self.next_epoch_key.as_ref() {
219 trace!(
220 target: LOG_TARGET,
221 "Next epoch descriptor contains the same randomness as the previous one, \
222 not considering this as epoch change (switched fork?)",
223 );
224 return;
225 }
226
227 self.current_epoch_publish_retry = Box::pin(Fuse::terminated());
229 self.next_epoch_publish_retry = Box::pin(Fuse::terminated());
230
231 debug!(target: LOG_TARGET, "New epoch started, readvertising parachain bootnode.");
232
233 debug!(
235 target: LOG_TARGET,
236 "Stopping advertisement of bootnode for old current epoch key {}",
237 hex::encode(old_current_epoch_key.as_ref()),
238 );
239 self.relay_chain_network.stop_providing(old_current_epoch_key.clone());
240
241 self.current_epoch_key = self.next_epoch_key.clone();
243 self.next_epoch_key = Some(next_epoch_key);
244
245 if let Some(ref current_epoch_key) = self.current_epoch_key {
246 debug!(
247 target: LOG_TARGET,
248 "Advertising bootnode for current (old next) epoch key {}",
249 hex::encode(current_epoch_key.as_ref()),
250 );
251 self.relay_chain_network.start_providing(current_epoch_key.clone());
252 }
253
254 if let Some(ref next_epoch_key) = self.next_epoch_key {
255 debug!(
256 target: LOG_TARGET,
257 "Advertising bootnode for next epoch key {}",
258 hex::encode(next_epoch_key.as_ref()),
259 );
260 self.relay_chain_network.start_providing(next_epoch_key.clone());
261 }
262 } else {
263 let (current_epoch_key, next_epoch_key) =
265 self.current_and_next_epoch_keys(header).await;
266 self.current_epoch_key = current_epoch_key.clone();
267 self.next_epoch_key = next_epoch_key.clone();
268
269 if let Some(current_epoch_key) = current_epoch_key {
270 debug!(
271 target: LOG_TARGET,
272 "Initial advertisement of bootnode for current epoch key {}",
273 hex::encode(current_epoch_key.as_ref()),
274 );
275
276 self.relay_chain_network.start_providing(current_epoch_key);
277 } else {
278 warn!(
279 target: LOG_TARGET,
280 "Initial advertisement of bootnode for current epoch failed: no key."
281 );
282 }
283
284 if let Some(next_epoch_key) = next_epoch_key {
285 debug!(
286 target: LOG_TARGET,
287 "Initial advertisement of bootnode for next epoch key {}",
288 hex::encode(next_epoch_key.as_ref()),
289 );
290
291 self.relay_chain_network.start_providing(next_epoch_key);
292 } else {
293 warn!(
294 target: LOG_TARGET,
295 "Initial advertisement of bootnode for next epoch failed: no key."
296 );
297 }
298 }
299 }
300
301 fn paranode_addresses(&self) -> Vec<Multiaddr> {
310 let local_peer_id = self.parachain_network.local_peer_id();
311
312 let without_p2p = |mut addr: Multiaddr| match addr.iter().last() {
314 Some(Protocol::P2p(peer_id)) if &peer_id == local_peer_id.as_ref() => {
315 addr.pop();
316 Some(addr)
317 },
318 Some(Protocol::P2p(_)) => {
319 warn!(
320 target: LOG_TARGET,
321 "Ignoring parachain side address containing not our peer ID: {addr}",
322 );
323 None
324 },
325 _ => Some(addr),
326 };
327
328 let is_global = |address: &Multiaddr| {
330 address.iter().all(|protocol| match protocol {
331 Protocol::Ip4(ip) => IpNetwork::from(ip).is_global(),
334 Protocol::Ip6(ip) => IpNetwork::from(ip).is_global(),
335 _ => true,
336 })
337 };
338
339 let is_loopback = |address: &Multiaddr| {
341 address.iter().any(|protocol| match protocol {
342 Protocol::Ip4(ip) => IpNetwork::from(ip).is_loopback(),
343 Protocol::Ip6(ip) => IpNetwork::from(ip).is_loopback(),
344 _ => false,
345 })
346 };
347
348 let public_addresses = self.public_addresses.clone().into_iter();
350
351 let global_listen_addresses =
353 self.parachain_network.listen_addresses().into_iter().filter(is_global);
354
355 let global_external_addresses =
357 self.parachain_network.external_addresses().into_iter().filter(is_global);
358
359 let non_global_external_addresses = self
361 .parachain_network
362 .external_addresses()
363 .into_iter()
364 .filter(|addr| !is_global(addr));
365
366 let non_global_listen_addresses = self
368 .parachain_network
369 .listen_addresses()
370 .into_iter()
371 .filter(|addr| !is_global(addr) && !is_loopback(addr));
372
373 let loopback_listen_addresses =
375 self.parachain_network.listen_addresses().into_iter().filter(is_loopback);
376
377 let mut seen = HashSet::new();
378
379 public_addresses
380 .chain(global_listen_addresses)
381 .chain(global_external_addresses)
382 .chain(
383 self.advertise_non_global_ips
384 .then_some(
385 non_global_external_addresses
386 .chain(non_global_listen_addresses)
387 .chain(loopback_listen_addresses),
388 )
389 .into_iter()
390 .flatten(),
391 )
392 .filter_map(without_p2p)
393 .filter(|addr| seen.insert(addr.clone()))
395 .take(MAX_ADDRESSES)
396 .collect()
397 }
398
399 fn handle_request(&mut self, req: IncomingRequest) {
400 if req.payload == self.para_id_scale_compact {
401 trace!(
402 target: LOG_TARGET,
403 "Serving paranode addresses request from {:?} for parachain ID {}",
404 req.peer,
405 self.para_id,
406 );
407
408 let response = crate::schema::Response {
409 peer_id: self.parachain_network.local_peer_id().to_bytes(),
410 addrs: self.paranode_addresses().iter().map(|a| a.to_vec()).collect(),
411 genesis_hash: self.parachain_genesis_hash.clone().as_bytes().to_vec(),
412 fork_id: self.parachain_fork_id.clone(),
413 };
414
415 let _ = req.pending_response.send(OutgoingResponse {
416 result: Ok(response.encode_to_vec()),
417 reputation_changes: Vec::new(),
418 sent_feedback: None,
419 });
420 } else {
421 let payload = req.payload;
422 match Compact::<ParaId>::decode(&mut &payload[..]) {
423 Ok(para_id) => {
424 trace!(
425 target: LOG_TARGET,
426 "Ignoring request for parachain ID {} != self parachain ID {} from {:?}",
427 para_id.0,
428 self.para_id,
429 req.peer,
430 );
431 },
432 Err(e) => {
433 trace!(
434 target: LOG_TARGET,
435 "Cannot decode parachain ID in a request from {:?}: {e}",
436 req.peer,
437 );
438 },
439 }
440 }
441 }
442
443 fn handle_dht_event(&mut self, event: DhtEvent) {
444 match event {
445 DhtEvent::StartedProviding(key) =>
446 if Some(&key) == self.current_epoch_key.as_ref() {
447 debug!(
448 target: LOG_TARGET,
449 "Successfully published provider for current epoch key {}",
450 hex::encode(key.as_ref()),
451 );
452 } else if Some(&key) == self.next_epoch_key.as_ref() {
453 debug!(
454 target: LOG_TARGET,
455 "Successfully published provider for next epoch key {}",
456 hex::encode(key.as_ref()),
457 );
458 },
459 DhtEvent::StartProvidingFailed(key) =>
460 if Some(&key) == self.current_epoch_key.as_ref() {
461 debug!(
462 target: LOG_TARGET,
463 "Failed to publish provider for current epoch key {}. Retrying in {RETRY_DELAY:?}",
464 hex::encode(key.as_ref()),
465 );
466 self.current_epoch_publish_retry =
467 Box::pin(tokio::time::sleep(RETRY_DELAY).fuse());
468 } else if Some(&key) == self.next_epoch_key.as_ref() {
469 debug!(
470 target: LOG_TARGET,
471 "Failed to publish provider for next epoch key {}. Retrying in {RETRY_DELAY:?}",
472 hex::encode(key.as_ref()),
473 );
474 self.next_epoch_publish_retry =
475 Box::pin(tokio::time::sleep(RETRY_DELAY).fuse());
476 },
477 _ => {},
478 }
479 }
480
481 fn retry_for_current_epoch(&mut self) {
482 if let Some(current_epoch_key) = self.current_epoch_key.clone() {
483 debug!(
484 target: LOG_TARGET,
485 "Retrying advertising bootnode for current epoch key {}",
486 hex::encode(current_epoch_key.as_ref()),
487 );
488 self.relay_chain_network.start_providing(current_epoch_key);
489 } else {
490 error!(
491 target: LOG_TARGET,
492 "Retrying advertising bootnode for current epoch failed: no key. This is a bug."
493 );
494 }
495 }
496
497 fn retry_for_next_epoch(&mut self) {
498 if let Some(next_epoch_key) = self.next_epoch_key.clone() {
499 debug!(
500 target: LOG_TARGET,
501 "Retrying advertising bootnode for next epoch key {}",
502 hex::encode(next_epoch_key.as_ref()),
503 );
504 self.relay_chain_network.start_providing(next_epoch_key);
505 } else {
506 error!(
507 target: LOG_TARGET,
508 "Retrying advertising bootnode for next epoch failed: no key. This is a bug."
509 );
510 }
511 }
512
513 pub async fn run(mut self) -> RelayChainResult<()> {
515 let mut import_notification_stream =
516 self.relay_chain_interface.import_notification_stream().await?;
517 let dht_event_stream = self
518 .relay_chain_network
519 .event_stream("parachain-bootnode-discovery")
520 .filter_map(|e| async move {
521 match e {
522 Event::Dht(e) => Some(e),
523 _ => None,
524 }
525 })
526 .fuse();
527 pin_mut!(dht_event_stream);
528
529 loop {
530 tokio::select! {
531 header = import_notification_stream.next() => match header {
532 Some(header) => self.handle_import_notification(header).await,
533 None => {
534 debug!(
535 target: LOG_TARGET,
536 "Import notification stream terminated, terminating bootnode advertisement."
537 );
538 return Ok(());
539 }
540 },
541 req = self.request_receiver.recv() => match req {
542 Ok(req) => {
543 self.handle_request(req);
544 },
545 Err(_) => {
546 debug!(
547 target: LOG_TARGET,
548 "Paranode request receiver terminated, terminating bootnode advertisement."
549 );
550 return Ok(());
551 }
552 },
553 event = dht_event_stream.select_next_some() => self.handle_dht_event(event),
554 () = &mut self.current_epoch_publish_retry => self.retry_for_current_epoch(),
555 () = &mut self.next_epoch_publish_retry => self.retry_for_next_epoch(),
556 }
557 }
558 }
559}