referrerpolicy=no-referrer-when-downgrade

polkadot_node_core_chain_selection/
lib.rs

1// Copyright (C) Parity Technologies (UK) Ltd.
2// This file is part of Polkadot.
3
4// Polkadot is free software: you can redistribute it and/or modify
5// it under the terms of the GNU General Public License as published by
6// the Free Software Foundation, either version 3 of the License, or
7// (at your option) any later version.
8
9// Polkadot is distributed in the hope that it will be useful,
10// but WITHOUT ANY WARRANTY; without even the implied warranty of
11// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
12// GNU General Public License for more details.
13
14// You should have received a copy of the GNU General Public License
15// along with Polkadot.  If not, see <http://www.gnu.org/licenses/>.
16
17//! Implements the Chain Selection Subsystem.
18
19use polkadot_node_primitives::BlockWeight;
20use polkadot_node_subsystem::{
21	errors::ChainApiError,
22	messages::{ChainApiMessage, ChainSelectionMessage},
23	overseer::{self, SubsystemSender},
24	FromOrchestra, OverseerSignal, SpawnedSubsystem, SubsystemError,
25};
26use polkadot_node_subsystem_util::database::Database;
27use polkadot_primitives::{BlockNumber, ConsensusLog, Hash, Header};
28
29use codec::Error as CodecError;
30use futures::{channel::oneshot, future::Either, prelude::*};
31
32use std::{
33	sync::Arc,
34	time::{Duration, SystemTime, UNIX_EPOCH},
35};
36
37use crate::backend::{Backend, BackendWriteOp, OverlayedBackend};
38
39mod backend;
40mod db_backend;
41mod tree;
42
43#[cfg(test)]
44mod tests;
45
46const LOG_TARGET: &str = "parachain::chain-selection";
47/// Timestamp based on the 1 Jan 1970 UNIX base, which is persistent across node restarts and OS
48/// reboots.
49type Timestamp = u64;
50
51// If a block isn't approved in 120 seconds, nodes will abandon it
52// and begin building on another chain.
53const STAGNANT_TIMEOUT: Timestamp = 120;
54// Delay pruning of the stagnant keys in prune only mode by 25 hours to avoid interception with the
55// finality
56const STAGNANT_PRUNE_DELAY: Timestamp = 25 * 60 * 60;
57// Maximum number of stagnant entries cleaned during one `STAGNANT_TIMEOUT` iteration
58const MAX_STAGNANT_ENTRIES: usize = 1000;
59
60#[derive(Debug, Clone)]
61enum Approval {
62	// Approved
63	Approved,
64	// Unapproved but not stagnant
65	Unapproved,
66	// Unapproved and stagnant.
67	Stagnant,
68}
69
70impl Approval {
71	fn is_stagnant(&self) -> bool {
72		matches!(*self, Approval::Stagnant)
73	}
74}
75
76#[derive(Debug, Clone)]
77struct ViabilityCriteria {
78	// Whether this block has been explicitly reverted by one of its descendants.
79	explicitly_reverted: bool,
80	// The approval state of this block specifically.
81	approval: Approval,
82	// The earliest unviable ancestor - the hash of the earliest unfinalized
83	// block in the ancestry which is explicitly reverted or stagnant.
84	earliest_unviable_ancestor: Option<Hash>,
85}
86
87impl ViabilityCriteria {
88	fn is_viable(&self) -> bool {
89		self.is_parent_viable() && self.is_explicitly_viable()
90	}
91
92	// Whether the current block is explicitly viable.
93	// That is, whether the current block is neither reverted nor stagnant.
94	fn is_explicitly_viable(&self) -> bool {
95		!self.explicitly_reverted && !self.approval.is_stagnant()
96	}
97
98	// Whether the parent is viable. This assumes that the parent
99	// descends from the finalized chain.
100	fn is_parent_viable(&self) -> bool {
101		self.earliest_unviable_ancestor.is_none()
102	}
103}
104
105// Light entries describing leaves of the chain.
106//
107// These are ordered first by weight and then by block number.
108#[derive(Debug, Clone, PartialEq)]
109struct LeafEntry {
110	weight: BlockWeight,
111	block_number: BlockNumber,
112	block_hash: Hash,
113}
114
115impl PartialOrd for LeafEntry {
116	fn partial_cmp(&self, other: &Self) -> Option<std::cmp::Ordering> {
117		let ord = self.weight.cmp(&other.weight).then(self.block_number.cmp(&other.block_number));
118
119		if !matches!(ord, std::cmp::Ordering::Equal) {
120			Some(ord)
121		} else {
122			None
123		}
124	}
125}
126
127#[derive(Debug, Default, Clone)]
128struct LeafEntrySet {
129	inner: Vec<LeafEntry>,
130}
131
132impl LeafEntrySet {
133	fn remove(&mut self, hash: &Hash) -> bool {
134		match self.inner.iter().position(|e| &e.block_hash == hash) {
135			None => false,
136			Some(i) => {
137				self.inner.remove(i);
138				true
139			},
140		}
141	}
142
143	fn insert(&mut self, new: LeafEntry) {
144		let mut pos = None;
145		for (i, e) in self.inner.iter().enumerate() {
146			if e == &new {
147				return
148			}
149			if e < &new {
150				pos = Some(i);
151				break
152			}
153		}
154
155		match pos {
156			None => self.inner.push(new),
157			Some(i) => self.inner.insert(i, new),
158		}
159	}
160
161	fn into_hashes_descending(self) -> impl Iterator<Item = Hash> {
162		self.inner.into_iter().map(|e| e.block_hash)
163	}
164}
165
166#[derive(Debug, Clone)]
167struct BlockEntry {
168	block_hash: Hash,
169	block_number: BlockNumber,
170	parent_hash: Hash,
171	children: Vec<Hash>,
172	viability: ViabilityCriteria,
173	weight: BlockWeight,
174}
175
176impl BlockEntry {
177	fn leaf_entry(&self) -> LeafEntry {
178		LeafEntry {
179			block_hash: self.block_hash,
180			block_number: self.block_number,
181			weight: self.weight,
182		}
183	}
184
185	fn non_viable_ancestor_for_child(&self) -> Option<Hash> {
186		if self.viability.is_viable() {
187			None
188		} else {
189			self.viability.earliest_unviable_ancestor.or(Some(self.block_hash))
190		}
191	}
192}
193
194#[derive(Debug, thiserror::Error)]
195#[allow(missing_docs)]
196pub enum Error {
197	#[error(transparent)]
198	ChainApi(#[from] ChainApiError),
199
200	#[error(transparent)]
201	Io(#[from] std::io::Error),
202
203	#[error(transparent)]
204	Oneshot(#[from] oneshot::Canceled),
205
206	#[error(transparent)]
207	Subsystem(#[from] SubsystemError),
208
209	#[error(transparent)]
210	Codec(#[from] CodecError),
211}
212
213impl Error {
214	fn trace(&self) {
215		match self {
216			// don't spam the log with spurious errors
217			Self::Oneshot(_) => gum::debug!(target: LOG_TARGET, err = ?self),
218			// it's worth reporting otherwise
219			_ => gum::warn!(target: LOG_TARGET, err = ?self),
220		}
221	}
222}
223
224/// A clock used for fetching the current timestamp.
225pub trait Clock {
226	/// Get the current timestamp.
227	fn timestamp_now(&self) -> Timestamp;
228}
229
230struct SystemClock;
231
232impl Clock for SystemClock {
233	fn timestamp_now(&self) -> Timestamp {
234		// `SystemTime` is notoriously non-monotonic, so our timers might not work
235		// exactly as expected. Regardless, stagnation is detected on the order of minutes,
236		// and slippage of a few seconds in either direction won't cause any major harm.
237		//
238		// The exact time that a block becomes stagnant in the local node is always expected
239		// to differ from other nodes due to network asynchrony and delays in block propagation.
240		// Non-monotonicity exacerbates that somewhat, but not meaningfully.
241
242		match SystemTime::now().duration_since(UNIX_EPOCH) {
243			Ok(d) => d.as_secs(),
244			Err(e) => {
245				gum::warn!(
246					target: LOG_TARGET,
247					err = ?e,
248					"Current time is before unix epoch. Validation will not work correctly."
249				);
250
251				0
252			},
253		}
254	}
255}
256
257/// The interval, in seconds to check for stagnant blocks.
258#[derive(Debug, Clone)]
259pub struct StagnantCheckInterval(Option<Duration>);
260
261impl Default for StagnantCheckInterval {
262	fn default() -> Self {
263		// 5 seconds is a reasonable balance between avoiding DB reads and
264		// ensuring validators are generally in agreement on stagnant blocks.
265		//
266		// Assuming a network delay of D, the longest difference in view possible
267		// between 2 validators is D + 5s.
268		const DEFAULT_STAGNANT_CHECK_INTERVAL: Duration = Duration::from_secs(5);
269
270		StagnantCheckInterval(Some(DEFAULT_STAGNANT_CHECK_INTERVAL))
271	}
272}
273
274impl StagnantCheckInterval {
275	/// Create a new stagnant-check interval wrapping the given duration.
276	pub fn new(interval: Duration) -> Self {
277		StagnantCheckInterval(Some(interval))
278	}
279
280	/// Create a `StagnantCheckInterval` which never triggers.
281	pub fn never() -> Self {
282		StagnantCheckInterval(None)
283	}
284
285	fn timeout_stream(&self) -> impl Stream<Item = ()> {
286		match self.0 {
287			Some(interval) => Either::Left({
288				let mut delay = futures_timer::Delay::new(interval);
289
290				futures::stream::poll_fn(move |cx| {
291					let poll = delay.poll_unpin(cx);
292					if poll.is_ready() {
293						delay.reset(interval)
294					}
295
296					poll.map(Some)
297				})
298			}),
299			None => Either::Right(futures::stream::pending()),
300		}
301	}
302}
303
304/// Mode of the stagnant check operations: check and prune or prune only
305#[derive(Debug, Clone)]
306pub enum StagnantCheckMode {
307	CheckAndPrune,
308	PruneOnly,
309}
310
311impl Default for StagnantCheckMode {
312	fn default() -> Self {
313		StagnantCheckMode::PruneOnly
314	}
315}
316
317/// Configuration for the chain selection subsystem.
318#[derive(Debug, Clone)]
319pub struct Config {
320	/// The column in the database that the storage should use.
321	pub col_data: u32,
322	/// How often to check for stagnant blocks.
323	pub stagnant_check_interval: StagnantCheckInterval,
324	/// Mode of stagnant checks
325	pub stagnant_check_mode: StagnantCheckMode,
326}
327
328/// The chain selection subsystem.
329pub struct ChainSelectionSubsystem {
330	config: Config,
331	db: Arc<dyn Database>,
332}
333
334impl ChainSelectionSubsystem {
335	/// Create a new instance of the subsystem with the given config
336	/// and key-value store.
337	pub fn new(config: Config, db: Arc<dyn Database>) -> Self {
338		ChainSelectionSubsystem { config, db }
339	}
340
341	/// Revert to the block corresponding to the specified `hash`.
342	/// The operation is not allowed for blocks older than the last finalized one.
343	pub fn revert_to(&self, hash: Hash) -> Result<(), Error> {
344		let config = db_backend::v1::Config { col_data: self.config.col_data };
345		let mut backend = db_backend::v1::DbBackend::new(self.db.clone(), config);
346
347		let ops = tree::revert_to(&backend, hash)?.into_write_ops();
348
349		backend.write(ops)
350	}
351}
352
353#[overseer::subsystem(ChainSelection, error = SubsystemError, prefix = self::overseer)]
354impl<Context> ChainSelectionSubsystem {
355	fn start(self, ctx: Context) -> SpawnedSubsystem {
356		let backend = db_backend::v1::DbBackend::new(
357			self.db,
358			db_backend::v1::Config { col_data: self.config.col_data },
359		);
360
361		SpawnedSubsystem {
362			future: run(
363				ctx,
364				backend,
365				self.config.stagnant_check_interval,
366				self.config.stagnant_check_mode,
367				Box::new(SystemClock),
368			)
369			.map(Ok)
370			.boxed(),
371			name: "chain-selection-subsystem",
372		}
373	}
374}
375
376#[overseer::contextbounds(ChainSelection, prefix = self::overseer)]
377async fn run<Context, B>(
378	mut ctx: Context,
379	mut backend: B,
380	stagnant_check_interval: StagnantCheckInterval,
381	stagnant_check_mode: StagnantCheckMode,
382	clock: Box<dyn Clock + Send + Sync>,
383) where
384	B: Backend,
385{
386	#![allow(clippy::all)]
387	loop {
388		let res = run_until_error(
389			&mut ctx,
390			&mut backend,
391			&stagnant_check_interval,
392			&stagnant_check_mode,
393			&*clock,
394		)
395		.await;
396		match res {
397			Err(e) => {
398				e.trace();
399				// All errors are considered fatal right now:
400				break
401			},
402			Ok(()) => {
403				gum::info!(target: LOG_TARGET, "received `Conclude` signal, exiting");
404				break
405			},
406		}
407	}
408}
409
410// Run the subsystem until an error is encountered or a `conclude` signal is received.
411// Most errors are non-fatal and should lead to another call to this function.
412//
413// A return value of `Ok` indicates that an exit should be made, while non-fatal errors
414// lead to another call to this function.
415#[overseer::contextbounds(ChainSelection, prefix = self::overseer)]
416async fn run_until_error<Context, B>(
417	ctx: &mut Context,
418	backend: &mut B,
419	stagnant_check_interval: &StagnantCheckInterval,
420	stagnant_check_mode: &StagnantCheckMode,
421	clock: &(dyn Clock + Sync),
422) -> Result<(), Error>
423where
424	B: Backend,
425{
426	let mut stagnant_check_stream = stagnant_check_interval.timeout_stream();
427	loop {
428		futures::select! {
429			msg = ctx.recv().fuse() => {
430				let msg = msg?;
431				match msg {
432					FromOrchestra::Signal(OverseerSignal::Conclude) => {
433						return Ok(())
434					}
435					FromOrchestra::Signal(OverseerSignal::ActiveLeaves(update)) => {
436						if let Some(leaf) = update.activated {
437							let write_ops = handle_active_leaf(
438								ctx.sender(),
439								&*backend,
440								clock.timestamp_now() + STAGNANT_TIMEOUT,
441								leaf.hash,
442							).await?;
443
444							backend.write(write_ops)?;
445						}
446					}
447					FromOrchestra::Signal(OverseerSignal::BlockFinalized(h, n)) => {
448						handle_finalized_block(backend, h, n)?
449					}
450					FromOrchestra::Communication { msg } => match msg {
451						ChainSelectionMessage::Approved(hash) => {
452							handle_approved_block(backend, hash)?
453						}
454						ChainSelectionMessage::Leaves(tx) => {
455							let leaves = load_leaves(ctx.sender(), &*backend).await?;
456							let _ = tx.send(leaves);
457						}
458						ChainSelectionMessage::BestLeafContaining(required, tx) => {
459							let best_containing = backend::find_best_leaf_containing(
460								&*backend,
461								required,
462							)?;
463
464							// note - this may be none if the finalized block is
465							// a leaf. this is fine according to the expected usage of the
466							// function. `None` responses should just `unwrap_or(required)`,
467							// so if the required block is the finalized block, then voilá.
468
469							let _ = tx.send(best_containing);
470						}
471						ChainSelectionMessage::RevertBlocks(blocks_to_revert) => {
472							let write_ops = handle_revert_blocks(backend, blocks_to_revert)?;
473							backend.write(write_ops)?;
474						}
475					}
476				}
477			}
478			_ = stagnant_check_stream.next().fuse() => {
479				match stagnant_check_mode {
480					StagnantCheckMode::CheckAndPrune => detect_stagnant(backend, clock.timestamp_now(), MAX_STAGNANT_ENTRIES),
481					StagnantCheckMode::PruneOnly => {
482						let now_timestamp = clock.timestamp_now();
483						prune_only_stagnant(backend, now_timestamp - STAGNANT_PRUNE_DELAY, MAX_STAGNANT_ENTRIES)
484					},
485				}?;
486			}
487		}
488	}
489}
490
491async fn fetch_finalized(
492	sender: &mut impl SubsystemSender<ChainApiMessage>,
493) -> Result<Option<(Hash, BlockNumber)>, Error> {
494	let (number_tx, number_rx) = oneshot::channel();
495
496	sender.send_message(ChainApiMessage::FinalizedBlockNumber(number_tx)).await;
497
498	let number = match number_rx.await? {
499		Ok(number) => number,
500		Err(err) => {
501			gum::warn!(target: LOG_TARGET, ?err, "Fetching finalized number failed");
502			return Ok(None)
503		},
504	};
505
506	let (hash_tx, hash_rx) = oneshot::channel();
507
508	sender.send_message(ChainApiMessage::FinalizedBlockHash(number, hash_tx)).await;
509
510	match hash_rx.await? {
511		Err(err) => {
512			gum::warn!(target: LOG_TARGET, number, ?err, "Fetching finalized block number failed");
513			Ok(None)
514		},
515		Ok(None) => {
516			gum::warn!(target: LOG_TARGET, number, "Missing hash for finalized block number");
517			Ok(None)
518		},
519		Ok(Some(h)) => Ok(Some((h, number))),
520	}
521}
522
523async fn fetch_header(
524	sender: &mut impl SubsystemSender<ChainApiMessage>,
525	hash: Hash,
526) -> Result<Option<Header>, Error> {
527	let (tx, rx) = oneshot::channel();
528	sender.send_message(ChainApiMessage::BlockHeader(hash, tx)).await;
529
530	Ok(rx.await?.unwrap_or_else(|err| {
531		gum::warn!(target: LOG_TARGET, ?hash, ?err, "Missing hash for finalized block number");
532		None
533	}))
534}
535
536async fn fetch_block_weight(
537	sender: &mut impl overseer::SubsystemSender<ChainApiMessage>,
538	hash: Hash,
539) -> Result<Option<BlockWeight>, Error> {
540	let (tx, rx) = oneshot::channel();
541	sender.send_message(ChainApiMessage::BlockWeight(hash, tx)).await;
542
543	let res = rx.await?;
544
545	Ok(res.unwrap_or_else(|err| {
546		gum::warn!(target: LOG_TARGET, ?hash, ?err, "Missing hash for finalized block number");
547		None
548	}))
549}
550
551// Handle a new active leaf.
552async fn handle_active_leaf(
553	sender: &mut impl overseer::ChainSelectionSenderTrait,
554	backend: &impl Backend,
555	stagnant_at: Timestamp,
556	hash: Hash,
557) -> Result<Vec<BackendWriteOp>, Error> {
558	let lower_bound = match backend.load_first_block_number()? {
559		Some(l) => {
560			// We want to iterate back to finalized, and first block number
561			// is assumed to be 1 above finalized - the implicit root of the
562			// tree.
563			l.saturating_sub(1)
564		},
565		None => fetch_finalized(sender).await?.map_or(1, |(_, n)| n),
566	};
567
568	let header = match fetch_header(sender, hash).await? {
569		None => {
570			gum::warn!(target: LOG_TARGET, ?hash, "Missing header for new head");
571			return Ok(Vec::new())
572		},
573		Some(h) => h,
574	};
575
576	let new_blocks = polkadot_node_subsystem_util::determine_new_blocks(
577		sender,
578		|h| backend.load_block_entry(h).map(|b| b.is_some()),
579		hash,
580		&header,
581		lower_bound,
582	)
583	.await?;
584
585	let mut overlay = OverlayedBackend::new(backend);
586
587	// determine_new_blocks gives blocks in descending order.
588	// for this, we want ascending order.
589	for (hash, header) in new_blocks.into_iter().rev() {
590		let weight = match fetch_block_weight(sender, hash).await? {
591			None => {
592				gum::warn!(
593					target: LOG_TARGET,
594					?hash,
595					"Missing block weight for new head. Skipping chain.",
596				);
597
598				// If we don't know the weight, we can't import the block.
599				// And none of its descendants either.
600				break
601			},
602			Some(w) => w,
603		};
604
605		let reversion_logs = extract_reversion_logs(&header);
606		tree::import_block(
607			&mut overlay,
608			hash,
609			header.number,
610			header.parent_hash,
611			reversion_logs,
612			weight,
613			stagnant_at,
614		)?;
615	}
616
617	Ok(overlay.into_write_ops().collect())
618}
619
620// Extract all reversion logs from a header in ascending order.
621//
622// Ignores logs with number > the block header number.
623fn extract_reversion_logs(header: &Header) -> Vec<BlockNumber> {
624	let number = header.number;
625	let mut logs = header
626		.digest
627		.logs()
628		.iter()
629		.enumerate()
630		.filter_map(|(i, d)| match ConsensusLog::from_digest_item(d) {
631			Err(e) => {
632				gum::warn!(
633					target: LOG_TARGET,
634					err = ?e,
635					index = i,
636					block_hash = ?header.hash(),
637					"Digest item failed to encode"
638				);
639
640				None
641			},
642			Ok(Some(ConsensusLog::Revert(b))) if b <= number => Some(b),
643			Ok(Some(ConsensusLog::Revert(b))) => {
644				gum::warn!(
645					target: LOG_TARGET,
646					revert_target = b,
647					block_number = number,
648					block_hash = ?header.hash(),
649					"Block issued invalid revert digest targeting future"
650				);
651
652				None
653			},
654			Ok(_) => None,
655		})
656		.collect::<Vec<_>>();
657
658	logs.sort();
659
660	logs
661}
662
663/// Handle a finalized block event.
664fn handle_finalized_block(
665	backend: &mut impl Backend,
666	finalized_hash: Hash,
667	finalized_number: BlockNumber,
668) -> Result<(), Error> {
669	let ops = tree::finalize_block(&*backend, finalized_hash, finalized_number)?.into_write_ops();
670
671	backend.write(ops)
672}
673
674// Handle an approved block event.
675fn handle_approved_block(backend: &mut impl Backend, approved_block: Hash) -> Result<(), Error> {
676	let ops = {
677		let mut overlay = OverlayedBackend::new(&*backend);
678
679		tree::approve_block(&mut overlay, approved_block)?;
680
681		overlay.into_write_ops()
682	};
683
684	backend.write(ops)
685}
686
687// Here we revert a provided group of blocks. The most common cause for this is that
688// the dispute coordinator has notified chain selection of a dispute which concluded
689// against a candidate.
690fn handle_revert_blocks(
691	backend: &impl Backend,
692	blocks_to_revert: Vec<(BlockNumber, Hash)>,
693) -> Result<Vec<BackendWriteOp>, Error> {
694	let mut overlay = OverlayedBackend::new(backend);
695	for (block_number, block_hash) in blocks_to_revert {
696		tree::apply_single_reversion(&mut overlay, block_hash, block_number)?;
697	}
698
699	Ok(overlay.into_write_ops().collect())
700}
701
702fn detect_stagnant(
703	backend: &mut impl Backend,
704	now: Timestamp,
705	max_elements: usize,
706) -> Result<(), Error> {
707	let ops = {
708		let overlay = tree::detect_stagnant(&*backend, now, max_elements)?;
709
710		overlay.into_write_ops()
711	};
712
713	backend.write(ops)
714}
715
716fn prune_only_stagnant(
717	backend: &mut impl Backend,
718	up_to: Timestamp,
719	max_elements: usize,
720) -> Result<(), Error> {
721	let ops = {
722		let overlay = tree::prune_only_stagnant(&*backend, up_to, max_elements)?;
723
724		overlay.into_write_ops()
725	};
726
727	backend.write(ops)
728}
729
730// Load the leaves from the backend. If there are no leaves, then return
731// the finalized block.
732async fn load_leaves(
733	sender: &mut impl overseer::SubsystemSender<ChainApiMessage>,
734	backend: &impl Backend,
735) -> Result<Vec<Hash>, Error> {
736	let leaves: Vec<_> = backend.load_leaves()?.into_hashes_descending().collect();
737
738	if leaves.is_empty() {
739		Ok(fetch_finalized(sender).await?.map_or(Vec::new(), |(h, _)| vec![h]))
740	} else {
741		Ok(leaves)
742	}
743}