1use polkadot_node_primitives::BlockWeight;
20use polkadot_node_subsystem::{
21 errors::ChainApiError,
22 messages::{ChainApiMessage, ChainSelectionMessage},
23 overseer::{self, SubsystemSender},
24 FromOrchestra, OverseerSignal, SpawnedSubsystem, SubsystemError,
25};
26use polkadot_node_subsystem_util::database::Database;
27use polkadot_primitives::{BlockNumber, ConsensusLog, Hash, Header};
28
29use codec::Error as CodecError;
30use futures::{channel::oneshot, future::Either, prelude::*};
31
32use std::{
33 sync::Arc,
34 time::{Duration, SystemTime, UNIX_EPOCH},
35};
36
37use crate::backend::{Backend, BackendWriteOp, OverlayedBackend};
38
39mod backend;
40mod db_backend;
41mod tree;
42
43#[cfg(test)]
44mod tests;
45
46const LOG_TARGET: &str = "parachain::chain-selection";
47type Timestamp = u64;
50
51const STAGNANT_TIMEOUT: Timestamp = 120;
54const STAGNANT_PRUNE_DELAY: Timestamp = 25 * 60 * 60;
57const MAX_STAGNANT_ENTRIES: usize = 1000;
59
60#[derive(Debug, Clone)]
61enum Approval {
62 Approved,
64 Unapproved,
66 Stagnant,
68}
69
70impl Approval {
71 fn is_stagnant(&self) -> bool {
72 matches!(*self, Approval::Stagnant)
73 }
74}
75
76#[derive(Debug, Clone)]
77struct ViabilityCriteria {
78 explicitly_reverted: bool,
80 approval: Approval,
82 earliest_unviable_ancestor: Option<Hash>,
85}
86
87impl ViabilityCriteria {
88 fn is_viable(&self) -> bool {
89 self.is_parent_viable() && self.is_explicitly_viable()
90 }
91
92 fn is_explicitly_viable(&self) -> bool {
95 !self.explicitly_reverted && !self.approval.is_stagnant()
96 }
97
98 fn is_parent_viable(&self) -> bool {
101 self.earliest_unviable_ancestor.is_none()
102 }
103}
104
105#[derive(Debug, Clone, PartialEq)]
109struct LeafEntry {
110 weight: BlockWeight,
111 block_number: BlockNumber,
112 block_hash: Hash,
113}
114
115impl PartialOrd for LeafEntry {
116 fn partial_cmp(&self, other: &Self) -> Option<std::cmp::Ordering> {
117 let ord = self.weight.cmp(&other.weight).then(self.block_number.cmp(&other.block_number));
118
119 if !matches!(ord, std::cmp::Ordering::Equal) {
120 Some(ord)
121 } else {
122 None
123 }
124 }
125}
126
127#[derive(Debug, Default, Clone)]
128struct LeafEntrySet {
129 inner: Vec<LeafEntry>,
130}
131
132impl LeafEntrySet {
133 fn remove(&mut self, hash: &Hash) -> bool {
134 match self.inner.iter().position(|e| &e.block_hash == hash) {
135 None => false,
136 Some(i) => {
137 self.inner.remove(i);
138 true
139 },
140 }
141 }
142
143 fn insert(&mut self, new: LeafEntry) {
144 let mut pos = None;
145 for (i, e) in self.inner.iter().enumerate() {
146 if e == &new {
147 return
148 }
149 if e < &new {
150 pos = Some(i);
151 break
152 }
153 }
154
155 match pos {
156 None => self.inner.push(new),
157 Some(i) => self.inner.insert(i, new),
158 }
159 }
160
161 fn into_hashes_descending(self) -> impl Iterator<Item = Hash> {
162 self.inner.into_iter().map(|e| e.block_hash)
163 }
164}
165
166#[derive(Debug, Clone)]
167struct BlockEntry {
168 block_hash: Hash,
169 block_number: BlockNumber,
170 parent_hash: Hash,
171 children: Vec<Hash>,
172 viability: ViabilityCriteria,
173 weight: BlockWeight,
174}
175
176impl BlockEntry {
177 fn leaf_entry(&self) -> LeafEntry {
178 LeafEntry {
179 block_hash: self.block_hash,
180 block_number: self.block_number,
181 weight: self.weight,
182 }
183 }
184
185 fn non_viable_ancestor_for_child(&self) -> Option<Hash> {
186 if self.viability.is_viable() {
187 None
188 } else {
189 self.viability.earliest_unviable_ancestor.or(Some(self.block_hash))
190 }
191 }
192}
193
194#[derive(Debug, thiserror::Error)]
195#[allow(missing_docs)]
196pub enum Error {
197 #[error(transparent)]
198 ChainApi(#[from] ChainApiError),
199
200 #[error(transparent)]
201 Io(#[from] std::io::Error),
202
203 #[error(transparent)]
204 Oneshot(#[from] oneshot::Canceled),
205
206 #[error(transparent)]
207 Subsystem(#[from] SubsystemError),
208
209 #[error(transparent)]
210 Codec(#[from] CodecError),
211}
212
213impl Error {
214 fn trace(&self) {
215 match self {
216 Self::Oneshot(_) => gum::debug!(target: LOG_TARGET, err = ?self),
218 _ => gum::warn!(target: LOG_TARGET, err = ?self),
220 }
221 }
222}
223
224pub trait Clock {
226 fn timestamp_now(&self) -> Timestamp;
228}
229
230struct SystemClock;
231
232impl Clock for SystemClock {
233 fn timestamp_now(&self) -> Timestamp {
234 match SystemTime::now().duration_since(UNIX_EPOCH) {
243 Ok(d) => d.as_secs(),
244 Err(e) => {
245 gum::warn!(
246 target: LOG_TARGET,
247 err = ?e,
248 "Current time is before unix epoch. Validation will not work correctly."
249 );
250
251 0
252 },
253 }
254 }
255}
256
257#[derive(Debug, Clone)]
259pub struct StagnantCheckInterval(Option<Duration>);
260
261impl Default for StagnantCheckInterval {
262 fn default() -> Self {
263 const DEFAULT_STAGNANT_CHECK_INTERVAL: Duration = Duration::from_secs(5);
269
270 StagnantCheckInterval(Some(DEFAULT_STAGNANT_CHECK_INTERVAL))
271 }
272}
273
274impl StagnantCheckInterval {
275 pub fn new(interval: Duration) -> Self {
277 StagnantCheckInterval(Some(interval))
278 }
279
280 pub fn never() -> Self {
282 StagnantCheckInterval(None)
283 }
284
285 fn timeout_stream(&self) -> impl Stream<Item = ()> {
286 match self.0 {
287 Some(interval) => Either::Left({
288 let mut delay = futures_timer::Delay::new(interval);
289
290 futures::stream::poll_fn(move |cx| {
291 let poll = delay.poll_unpin(cx);
292 if poll.is_ready() {
293 delay.reset(interval)
294 }
295
296 poll.map(Some)
297 })
298 }),
299 None => Either::Right(futures::stream::pending()),
300 }
301 }
302}
303
304#[derive(Debug, Clone)]
306pub enum StagnantCheckMode {
307 CheckAndPrune,
308 PruneOnly,
309}
310
311impl Default for StagnantCheckMode {
312 fn default() -> Self {
313 StagnantCheckMode::PruneOnly
314 }
315}
316
317#[derive(Debug, Clone)]
319pub struct Config {
320 pub col_data: u32,
322 pub stagnant_check_interval: StagnantCheckInterval,
324 pub stagnant_check_mode: StagnantCheckMode,
326}
327
328pub struct ChainSelectionSubsystem {
330 config: Config,
331 db: Arc<dyn Database>,
332}
333
334impl ChainSelectionSubsystem {
335 pub fn new(config: Config, db: Arc<dyn Database>) -> Self {
338 ChainSelectionSubsystem { config, db }
339 }
340
341 pub fn revert_to(&self, hash: Hash) -> Result<(), Error> {
344 let config = db_backend::v1::Config { col_data: self.config.col_data };
345 let mut backend = db_backend::v1::DbBackend::new(self.db.clone(), config);
346
347 let ops = tree::revert_to(&backend, hash)?.into_write_ops();
348
349 backend.write(ops)
350 }
351}
352
353#[overseer::subsystem(ChainSelection, error = SubsystemError, prefix = self::overseer)]
354impl<Context> ChainSelectionSubsystem {
355 fn start(self, ctx: Context) -> SpawnedSubsystem {
356 let backend = db_backend::v1::DbBackend::new(
357 self.db,
358 db_backend::v1::Config { col_data: self.config.col_data },
359 );
360
361 SpawnedSubsystem {
362 future: run(
363 ctx,
364 backend,
365 self.config.stagnant_check_interval,
366 self.config.stagnant_check_mode,
367 Box::new(SystemClock),
368 )
369 .map(Ok)
370 .boxed(),
371 name: "chain-selection-subsystem",
372 }
373 }
374}
375
376#[overseer::contextbounds(ChainSelection, prefix = self::overseer)]
377async fn run<Context, B>(
378 mut ctx: Context,
379 mut backend: B,
380 stagnant_check_interval: StagnantCheckInterval,
381 stagnant_check_mode: StagnantCheckMode,
382 clock: Box<dyn Clock + Send + Sync>,
383) where
384 B: Backend,
385{
386 #![allow(clippy::all)]
387 loop {
388 let res = run_until_error(
389 &mut ctx,
390 &mut backend,
391 &stagnant_check_interval,
392 &stagnant_check_mode,
393 &*clock,
394 )
395 .await;
396 match res {
397 Err(e) => {
398 e.trace();
399 break
401 },
402 Ok(()) => {
403 gum::info!(target: LOG_TARGET, "received `Conclude` signal, exiting");
404 break
405 },
406 }
407 }
408}
409
410#[overseer::contextbounds(ChainSelection, prefix = self::overseer)]
416async fn run_until_error<Context, B>(
417 ctx: &mut Context,
418 backend: &mut B,
419 stagnant_check_interval: &StagnantCheckInterval,
420 stagnant_check_mode: &StagnantCheckMode,
421 clock: &(dyn Clock + Sync),
422) -> Result<(), Error>
423where
424 B: Backend,
425{
426 let mut stagnant_check_stream = stagnant_check_interval.timeout_stream();
427 loop {
428 futures::select! {
429 msg = ctx.recv().fuse() => {
430 let msg = msg?;
431 match msg {
432 FromOrchestra::Signal(OverseerSignal::Conclude) => {
433 return Ok(())
434 }
435 FromOrchestra::Signal(OverseerSignal::ActiveLeaves(update)) => {
436 if let Some(leaf) = update.activated {
437 let write_ops = handle_active_leaf(
438 ctx.sender(),
439 &*backend,
440 clock.timestamp_now() + STAGNANT_TIMEOUT,
441 leaf.hash,
442 ).await?;
443
444 backend.write(write_ops)?;
445 }
446 }
447 FromOrchestra::Signal(OverseerSignal::BlockFinalized(h, n)) => {
448 handle_finalized_block(backend, h, n)?
449 }
450 FromOrchestra::Communication { msg } => match msg {
451 ChainSelectionMessage::Approved(hash) => {
452 handle_approved_block(backend, hash)?
453 }
454 ChainSelectionMessage::Leaves(tx) => {
455 let leaves = load_leaves(ctx.sender(), &*backend).await?;
456 let _ = tx.send(leaves);
457 }
458 ChainSelectionMessage::BestLeafContaining(required, tx) => {
459 let best_containing = backend::find_best_leaf_containing(
460 &*backend,
461 required,
462 )?;
463
464 let _ = tx.send(best_containing);
470 }
471 ChainSelectionMessage::RevertBlocks(blocks_to_revert) => {
472 let write_ops = handle_revert_blocks(backend, blocks_to_revert)?;
473 backend.write(write_ops)?;
474 }
475 }
476 }
477 }
478 _ = stagnant_check_stream.next().fuse() => {
479 match stagnant_check_mode {
480 StagnantCheckMode::CheckAndPrune => detect_stagnant(backend, clock.timestamp_now(), MAX_STAGNANT_ENTRIES),
481 StagnantCheckMode::PruneOnly => {
482 let now_timestamp = clock.timestamp_now();
483 prune_only_stagnant(backend, now_timestamp - STAGNANT_PRUNE_DELAY, MAX_STAGNANT_ENTRIES)
484 },
485 }?;
486 }
487 }
488 }
489}
490
491async fn fetch_finalized(
492 sender: &mut impl SubsystemSender<ChainApiMessage>,
493) -> Result<Option<(Hash, BlockNumber)>, Error> {
494 let (number_tx, number_rx) = oneshot::channel();
495
496 sender.send_message(ChainApiMessage::FinalizedBlockNumber(number_tx)).await;
497
498 let number = match number_rx.await? {
499 Ok(number) => number,
500 Err(err) => {
501 gum::warn!(target: LOG_TARGET, ?err, "Fetching finalized number failed");
502 return Ok(None)
503 },
504 };
505
506 let (hash_tx, hash_rx) = oneshot::channel();
507
508 sender.send_message(ChainApiMessage::FinalizedBlockHash(number, hash_tx)).await;
509
510 match hash_rx.await? {
511 Err(err) => {
512 gum::warn!(target: LOG_TARGET, number, ?err, "Fetching finalized block number failed");
513 Ok(None)
514 },
515 Ok(None) => {
516 gum::warn!(target: LOG_TARGET, number, "Missing hash for finalized block number");
517 Ok(None)
518 },
519 Ok(Some(h)) => Ok(Some((h, number))),
520 }
521}
522
523async fn fetch_header(
524 sender: &mut impl SubsystemSender<ChainApiMessage>,
525 hash: Hash,
526) -> Result<Option<Header>, Error> {
527 let (tx, rx) = oneshot::channel();
528 sender.send_message(ChainApiMessage::BlockHeader(hash, tx)).await;
529
530 Ok(rx.await?.unwrap_or_else(|err| {
531 gum::warn!(target: LOG_TARGET, ?hash, ?err, "Missing hash for finalized block number");
532 None
533 }))
534}
535
536async fn fetch_block_weight(
537 sender: &mut impl overseer::SubsystemSender<ChainApiMessage>,
538 hash: Hash,
539) -> Result<Option<BlockWeight>, Error> {
540 let (tx, rx) = oneshot::channel();
541 sender.send_message(ChainApiMessage::BlockWeight(hash, tx)).await;
542
543 let res = rx.await?;
544
545 Ok(res.unwrap_or_else(|err| {
546 gum::warn!(target: LOG_TARGET, ?hash, ?err, "Missing hash for finalized block number");
547 None
548 }))
549}
550
551async fn handle_active_leaf(
553 sender: &mut impl overseer::ChainSelectionSenderTrait,
554 backend: &impl Backend,
555 stagnant_at: Timestamp,
556 hash: Hash,
557) -> Result<Vec<BackendWriteOp>, Error> {
558 let lower_bound = match backend.load_first_block_number()? {
559 Some(l) => {
560 l.saturating_sub(1)
564 },
565 None => fetch_finalized(sender).await?.map_or(1, |(_, n)| n),
566 };
567
568 let header = match fetch_header(sender, hash).await? {
569 None => {
570 gum::warn!(target: LOG_TARGET, ?hash, "Missing header for new head");
571 return Ok(Vec::new())
572 },
573 Some(h) => h,
574 };
575
576 let new_blocks = polkadot_node_subsystem_util::determine_new_blocks(
577 sender,
578 |h| backend.load_block_entry(h).map(|b| b.is_some()),
579 hash,
580 &header,
581 lower_bound,
582 )
583 .await?;
584
585 let mut overlay = OverlayedBackend::new(backend);
586
587 for (hash, header) in new_blocks.into_iter().rev() {
590 let weight = match fetch_block_weight(sender, hash).await? {
591 None => {
592 gum::warn!(
593 target: LOG_TARGET,
594 ?hash,
595 "Missing block weight for new head. Skipping chain.",
596 );
597
598 break
601 },
602 Some(w) => w,
603 };
604
605 let reversion_logs = extract_reversion_logs(&header);
606 tree::import_block(
607 &mut overlay,
608 hash,
609 header.number,
610 header.parent_hash,
611 reversion_logs,
612 weight,
613 stagnant_at,
614 )?;
615 }
616
617 Ok(overlay.into_write_ops().collect())
618}
619
620fn extract_reversion_logs(header: &Header) -> Vec<BlockNumber> {
624 let number = header.number;
625 let mut logs = header
626 .digest
627 .logs()
628 .iter()
629 .enumerate()
630 .filter_map(|(i, d)| match ConsensusLog::from_digest_item(d) {
631 Err(e) => {
632 gum::warn!(
633 target: LOG_TARGET,
634 err = ?e,
635 index = i,
636 block_hash = ?header.hash(),
637 "Digest item failed to encode"
638 );
639
640 None
641 },
642 Ok(Some(ConsensusLog::Revert(b))) if b <= number => Some(b),
643 Ok(Some(ConsensusLog::Revert(b))) => {
644 gum::warn!(
645 target: LOG_TARGET,
646 revert_target = b,
647 block_number = number,
648 block_hash = ?header.hash(),
649 "Block issued invalid revert digest targeting future"
650 );
651
652 None
653 },
654 Ok(_) => None,
655 })
656 .collect::<Vec<_>>();
657
658 logs.sort();
659
660 logs
661}
662
663fn handle_finalized_block(
665 backend: &mut impl Backend,
666 finalized_hash: Hash,
667 finalized_number: BlockNumber,
668) -> Result<(), Error> {
669 let ops = tree::finalize_block(&*backend, finalized_hash, finalized_number)?.into_write_ops();
670
671 backend.write(ops)
672}
673
674fn handle_approved_block(backend: &mut impl Backend, approved_block: Hash) -> Result<(), Error> {
676 let ops = {
677 let mut overlay = OverlayedBackend::new(&*backend);
678
679 tree::approve_block(&mut overlay, approved_block)?;
680
681 overlay.into_write_ops()
682 };
683
684 backend.write(ops)
685}
686
687fn handle_revert_blocks(
691 backend: &impl Backend,
692 blocks_to_revert: Vec<(BlockNumber, Hash)>,
693) -> Result<Vec<BackendWriteOp>, Error> {
694 let mut overlay = OverlayedBackend::new(backend);
695 for (block_number, block_hash) in blocks_to_revert {
696 tree::apply_single_reversion(&mut overlay, block_hash, block_number)?;
697 }
698
699 Ok(overlay.into_write_ops().collect())
700}
701
702fn detect_stagnant(
703 backend: &mut impl Backend,
704 now: Timestamp,
705 max_elements: usize,
706) -> Result<(), Error> {
707 let ops = {
708 let overlay = tree::detect_stagnant(&*backend, now, max_elements)?;
709
710 overlay.into_write_ops()
711 };
712
713 backend.write(ops)
714}
715
716fn prune_only_stagnant(
717 backend: &mut impl Backend,
718 up_to: Timestamp,
719 max_elements: usize,
720) -> Result<(), Error> {
721 let ops = {
722 let overlay = tree::prune_only_stagnant(&*backend, up_to, max_elements)?;
723
724 overlay.into_write_ops()
725 };
726
727 backend.write(ops)
728}
729
730async fn load_leaves(
733 sender: &mut impl overseer::SubsystemSender<ChainApiMessage>,
734 backend: &impl Backend,
735) -> Result<Vec<Hash>, Error> {
736 let leaves: Vec<_> = backend.load_leaves()?.into_hashes_descending().collect();
737
738 if leaves.is_empty() {
739 Ok(fetch_finalized(sender).await?.map_or(Vec::new(), |(h, _)| vec![h]))
740 } else {
741 Ok(leaves)
742 }
743}