referrerpolicy=no-referrer-when-downgrade

polkadot_node_core_av_store/
lib.rs

1// Copyright (C) Parity Technologies (UK) Ltd.
2// This file is part of Polkadot.
3
4// Polkadot is free software: you can redistribute it and/or modify
5// it under the terms of the GNU General Public License as published by
6// the Free Software Foundation, either version 3 of the License, or
7// (at your option) any later version.
8
9// Polkadot is distributed in the hope that it will be useful,
10// but WITHOUT ANY WARRANTY; without even the implied warranty of
11// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
12// GNU General Public License for more details.
13
14// You should have received a copy of the GNU General Public License
15// along with Polkadot.  If not, see <http://www.gnu.org/licenses/>.
16
17//! Implements a `AvailabilityStoreSubsystem`.
18
19#![recursion_limit = "256"]
20#![warn(missing_docs)]
21
22use std::{
23	collections::{BTreeSet, HashMap, HashSet},
24	io,
25	sync::Arc,
26	time::{Duration, SystemTime, SystemTimeError, UNIX_EPOCH},
27};
28
29use codec::{Decode, Encode, Error as CodecError, Input};
30use futures::{
31	channel::{
32		mpsc::{channel, Receiver as MpscReceiver, Sender as MpscSender},
33		oneshot,
34	},
35	future, select, FutureExt, SinkExt, StreamExt,
36};
37use futures_timer::Delay;
38use polkadot_node_subsystem_util::database::{DBTransaction, Database};
39use sp_consensus::SyncOracle;
40
41use bitvec::{order::Lsb0 as BitOrderLsb0, vec::BitVec};
42use polkadot_node_primitives::{AvailableData, ErasureChunk};
43use polkadot_node_subsystem::{
44	errors::{ChainApiError, RuntimeApiError},
45	messages::{AvailabilityStoreMessage, ChainApiMessage, StoreAvailableDataError},
46	overseer, ActiveLeavesUpdate, FromOrchestra, OverseerSignal, SpawnedSubsystem, SubsystemError,
47};
48use polkadot_node_subsystem_util as util;
49use polkadot_primitives::{
50	BlockNumber, CandidateEvent, CandidateHash, CandidateReceiptV2 as CandidateReceipt, ChunkIndex,
51	CoreIndex, Hash, Header, NodeFeatures, ValidatorIndex,
52};
53use util::availability_chunks::availability_chunk_indices;
54
55mod metrics;
56pub use self::metrics::*;
57
58#[cfg(test)]
59mod tests;
60
61const LOG_TARGET: &str = "parachain::availability-store";
62
63/// The following constants are used under normal conditions:
64
65const AVAILABLE_PREFIX: &[u8; 9] = b"available";
66const CHUNK_PREFIX: &[u8; 5] = b"chunk";
67const META_PREFIX: &[u8; 4] = b"meta";
68const UNFINALIZED_PREFIX: &[u8; 11] = b"unfinalized";
69const PRUNE_BY_TIME_PREFIX: &[u8; 13] = b"prune_by_time";
70
71// We have some keys we want to map to empty values because existence of the key is enough. We use
72// this because rocksdb doesn't support empty values.
73const TOMBSTONE_VALUE: &[u8] = b" ";
74
75/// Unavailable blocks are kept for 1 hour.
76const KEEP_UNAVAILABLE_FOR: Duration = Duration::from_secs(60 * 60);
77
78/// The pruning interval.
79const PRUNING_INTERVAL: Duration = Duration::from_secs(60 * 5);
80
81/// Unix time wrapper with big-endian encoding.
82#[derive(Debug, Clone, Copy, PartialEq, PartialOrd, Eq, Ord)]
83struct BETimestamp(u64);
84
85impl Encode for BETimestamp {
86	fn size_hint(&self) -> usize {
87		std::mem::size_of::<u64>()
88	}
89
90	fn using_encoded<R, F: FnOnce(&[u8]) -> R>(&self, f: F) -> R {
91		f(&self.0.to_be_bytes())
92	}
93}
94
95impl Decode for BETimestamp {
96	fn decode<I: Input>(value: &mut I) -> Result<Self, CodecError> {
97		<[u8; 8]>::decode(value).map(u64::from_be_bytes).map(Self)
98	}
99}
100
101impl From<Duration> for BETimestamp {
102	fn from(d: Duration) -> Self {
103		BETimestamp(d.as_secs())
104	}
105}
106
107impl Into<Duration> for BETimestamp {
108	fn into(self) -> Duration {
109		Duration::from_secs(self.0)
110	}
111}
112
113/// [`BlockNumber`] wrapper with big-endian encoding.
114#[derive(Debug, Clone, PartialEq, PartialOrd, Eq, Ord)]
115struct BEBlockNumber(BlockNumber);
116
117impl Encode for BEBlockNumber {
118	fn size_hint(&self) -> usize {
119		std::mem::size_of::<BlockNumber>()
120	}
121
122	fn using_encoded<R, F: FnOnce(&[u8]) -> R>(&self, f: F) -> R {
123		f(&self.0.to_be_bytes())
124	}
125}
126
127impl Decode for BEBlockNumber {
128	fn decode<I: Input>(value: &mut I) -> Result<Self, CodecError> {
129		<[u8; std::mem::size_of::<BlockNumber>()]>::decode(value)
130			.map(BlockNumber::from_be_bytes)
131			.map(Self)
132	}
133}
134
135#[derive(Debug, Encode, Decode)]
136enum State {
137	/// Candidate data was first observed at the given time but is not available in any block.
138	#[codec(index = 0)]
139	Unavailable(BETimestamp),
140	/// The candidate was first observed at the given time and was included in the given list of
141	/// unfinalized blocks, which may be empty. The timestamp here is not used for pruning. Either
142	/// one of these blocks will be finalized or the state will regress to `State::Unavailable`, in
143	/// which case the same timestamp will be reused. Blocks are sorted ascending first by block
144	/// number and then hash.
145	#[codec(index = 1)]
146	Unfinalized(BETimestamp, Vec<(BEBlockNumber, Hash)>),
147	/// Candidate data has appeared in a finalized block and did so at the given time.
148	#[codec(index = 2)]
149	Finalized(BETimestamp),
150}
151
152// Meta information about a candidate.
153#[derive(Debug, Encode, Decode)]
154struct CandidateMeta {
155	state: State,
156	data_available: bool,
157	chunks_stored: BitVec<u8, BitOrderLsb0>,
158}
159
160fn query_inner<D: Decode>(
161	db: &Arc<dyn Database>,
162	column: u32,
163	key: &[u8],
164) -> Result<Option<D>, Error> {
165	match db.get(column, key) {
166		Ok(Some(raw)) => {
167			let res = D::decode(&mut &raw[..])?;
168			Ok(Some(res))
169		},
170		Ok(None) => Ok(None),
171		Err(err) => {
172			gum::warn!(target: LOG_TARGET, ?err, "Error reading from the availability store");
173			Err(err.into())
174		},
175	}
176}
177
178fn write_available_data(
179	tx: &mut DBTransaction,
180	config: &Config,
181	hash: &CandidateHash,
182	available_data: &AvailableData,
183) {
184	let key = (AVAILABLE_PREFIX, hash).encode();
185
186	tx.put_vec(config.col_data, &key[..], available_data.encode());
187}
188
189fn load_available_data(
190	db: &Arc<dyn Database>,
191	config: &Config,
192	hash: &CandidateHash,
193) -> Result<Option<AvailableData>, Error> {
194	let key = (AVAILABLE_PREFIX, hash).encode();
195
196	query_inner(db, config.col_data, &key)
197}
198
199fn delete_available_data(tx: &mut DBTransaction, config: &Config, hash: &CandidateHash) {
200	let key = (AVAILABLE_PREFIX, hash).encode();
201
202	tx.delete(config.col_data, &key[..])
203}
204
205fn load_chunk(
206	db: &Arc<dyn Database>,
207	config: &Config,
208	candidate_hash: &CandidateHash,
209	validator_index: ValidatorIndex,
210) -> Result<Option<ErasureChunk>, Error> {
211	let key = (CHUNK_PREFIX, candidate_hash, validator_index).encode();
212
213	query_inner(db, config.col_data, &key)
214}
215
216fn write_chunk(
217	tx: &mut DBTransaction,
218	config: &Config,
219	candidate_hash: &CandidateHash,
220	validator_index: ValidatorIndex,
221	erasure_chunk: &ErasureChunk,
222) {
223	let key = (CHUNK_PREFIX, candidate_hash, validator_index).encode();
224
225	tx.put_vec(config.col_data, &key, erasure_chunk.encode());
226}
227
228fn delete_chunk(
229	tx: &mut DBTransaction,
230	config: &Config,
231	candidate_hash: &CandidateHash,
232	validator_index: ValidatorIndex,
233) {
234	let key = (CHUNK_PREFIX, candidate_hash, validator_index).encode();
235
236	tx.delete(config.col_data, &key[..]);
237}
238
239fn load_meta(
240	db: &Arc<dyn Database>,
241	config: &Config,
242	hash: &CandidateHash,
243) -> Result<Option<CandidateMeta>, Error> {
244	let key = (META_PREFIX, hash).encode();
245
246	query_inner(db, config.col_meta, &key)
247}
248
249fn write_meta(tx: &mut DBTransaction, config: &Config, hash: &CandidateHash, meta: &CandidateMeta) {
250	let key = (META_PREFIX, hash).encode();
251
252	tx.put_vec(config.col_meta, &key, meta.encode());
253}
254
255fn delete_meta(tx: &mut DBTransaction, config: &Config, hash: &CandidateHash) {
256	let key = (META_PREFIX, hash).encode();
257	tx.delete(config.col_meta, &key[..])
258}
259
260fn delete_unfinalized_height(tx: &mut DBTransaction, config: &Config, block_number: BlockNumber) {
261	let prefix = (UNFINALIZED_PREFIX, BEBlockNumber(block_number)).encode();
262	tx.delete_prefix(config.col_meta, &prefix);
263}
264
265fn delete_unfinalized_inclusion(
266	tx: &mut DBTransaction,
267	config: &Config,
268	block_number: BlockNumber,
269	block_hash: &Hash,
270	candidate_hash: &CandidateHash,
271) {
272	let key =
273		(UNFINALIZED_PREFIX, BEBlockNumber(block_number), block_hash, candidate_hash).encode();
274
275	tx.delete(config.col_meta, &key[..]);
276}
277
278fn delete_pruning_key(
279	tx: &mut DBTransaction,
280	config: &Config,
281	t: impl Into<BETimestamp>,
282	h: &CandidateHash,
283) {
284	let key = (PRUNE_BY_TIME_PREFIX, t.into(), h).encode();
285	tx.delete(config.col_meta, &key);
286}
287
288fn write_pruning_key(
289	tx: &mut DBTransaction,
290	config: &Config,
291	t: impl Into<BETimestamp>,
292	h: &CandidateHash,
293) {
294	let t = t.into();
295	let key = (PRUNE_BY_TIME_PREFIX, t, h).encode();
296	tx.put(config.col_meta, &key, TOMBSTONE_VALUE);
297}
298
299fn finalized_block_range(finalized: BlockNumber) -> (Vec<u8>, Vec<u8>) {
300	// We use big-endian encoding to iterate in ascending order.
301	let start = UNFINALIZED_PREFIX.encode();
302	let end = (UNFINALIZED_PREFIX, BEBlockNumber(finalized + 1)).encode();
303
304	(start, end)
305}
306
307fn write_unfinalized_block_contains(
308	tx: &mut DBTransaction,
309	config: &Config,
310	n: BlockNumber,
311	h: &Hash,
312	ch: &CandidateHash,
313) {
314	let key = (UNFINALIZED_PREFIX, BEBlockNumber(n), h, ch).encode();
315	tx.put(config.col_meta, &key, TOMBSTONE_VALUE);
316}
317
318fn pruning_range(now: impl Into<BETimestamp>) -> (Vec<u8>, Vec<u8>) {
319	let start = PRUNE_BY_TIME_PREFIX.encode();
320	let end = (PRUNE_BY_TIME_PREFIX, BETimestamp(now.into().0 + 1)).encode();
321
322	(start, end)
323}
324
325fn decode_unfinalized_key(s: &[u8]) -> Result<(BlockNumber, Hash, CandidateHash), CodecError> {
326	if !s.starts_with(UNFINALIZED_PREFIX) {
327		return Err("missing magic string".into())
328	}
329
330	<(BEBlockNumber, Hash, CandidateHash)>::decode(&mut &s[UNFINALIZED_PREFIX.len()..])
331		.map(|(b, h, ch)| (b.0, h, ch))
332}
333
334fn decode_pruning_key(s: &[u8]) -> Result<(Duration, CandidateHash), CodecError> {
335	if !s.starts_with(PRUNE_BY_TIME_PREFIX) {
336		return Err("missing magic string".into())
337	}
338
339	<(BETimestamp, CandidateHash)>::decode(&mut &s[PRUNE_BY_TIME_PREFIX.len()..])
340		.map(|(t, ch)| (t.into(), ch))
341}
342
343#[derive(Debug, thiserror::Error)]
344#[allow(missing_docs)]
345pub enum Error {
346	#[error(transparent)]
347	RuntimeApi(#[from] RuntimeApiError),
348
349	#[error(transparent)]
350	ChainApi(#[from] ChainApiError),
351
352	#[error(transparent)]
353	Erasure(#[from] polkadot_erasure_coding::Error),
354
355	#[error(transparent)]
356	Io(#[from] io::Error),
357
358	#[error(transparent)]
359	Oneshot(#[from] oneshot::Canceled),
360
361	#[error(transparent)]
362	Subsystem(#[from] SubsystemError),
363
364	#[error("Context signal channel closed")]
365	ContextChannelClosed,
366
367	#[error(transparent)]
368	Time(#[from] SystemTimeError),
369
370	#[error(transparent)]
371	Codec(#[from] CodecError),
372
373	#[error("Custom databases are not supported")]
374	CustomDatabase,
375
376	#[error("Erasure root does not match expected one")]
377	InvalidErasureRoot,
378}
379
380impl Error {
381	/// Determine if the error is irrecoverable
382	/// or notifying the user via means of logging
383	/// is sufficient.
384	fn is_fatal(&self) -> bool {
385		match self {
386			Self::Io(_) => true,
387			Self::Oneshot(_) => true,
388			Self::CustomDatabase => true,
389			Self::ContextChannelClosed => true,
390			_ => false,
391		}
392	}
393}
394
395impl Error {
396	fn trace(&self) {
397		match self {
398			// don't spam the log with spurious errors
399			Self::RuntimeApi(_) | Self::Oneshot(_) => {
400				gum::debug!(target: LOG_TARGET, err = ?self)
401			},
402			// it's worth reporting otherwise
403			_ => gum::warn!(target: LOG_TARGET, err = ?self),
404		}
405	}
406}
407
408/// Struct holding pruning timing configuration.
409/// The only purpose of this structure is to use different timing
410/// configurations in production and in testing.
411#[derive(Clone)]
412struct PruningConfig {
413	/// How long unavailable data should be kept.
414	keep_unavailable_for: Duration,
415
416	/// How long finalized data should be kept.
417	keep_finalized_for: Duration,
418
419	/// How often to perform data pruning.
420	pruning_interval: Duration,
421}
422
423/// Configuration for the availability store.
424#[derive(Debug, Clone, Copy)]
425pub struct Config {
426	/// The column family for availability data and chunks.
427	pub col_data: u32,
428	/// The column family for availability store meta information.
429	pub col_meta: u32,
430	/// How long finalized data should be kept (in hours).
431	pub keep_finalized_for: u32,
432}
433
434trait Clock: Send + Sync {
435	// Returns time since unix epoch.
436	fn now(&self) -> Result<Duration, Error>;
437}
438
439struct SystemClock;
440
441impl Clock for SystemClock {
442	fn now(&self) -> Result<Duration, Error> {
443		SystemTime::now().duration_since(UNIX_EPOCH).map_err(Into::into)
444	}
445}
446
447/// An implementation of the Availability Store subsystem.
448pub struct AvailabilityStoreSubsystem {
449	pruning_config: PruningConfig,
450	config: Config,
451	db: Arc<dyn Database>,
452	known_blocks: KnownUnfinalizedBlocks,
453	finalized_number: Option<BlockNumber>,
454	metrics: Metrics,
455	clock: Box<dyn Clock>,
456	sync_oracle: Box<dyn SyncOracle + Send + Sync>,
457}
458
459impl AvailabilityStoreSubsystem {
460	/// Create a new `AvailabilityStoreSubsystem` with a given config on disk.
461	pub fn new(
462		db: Arc<dyn Database>,
463		config: Config,
464		sync_oracle: Box<dyn SyncOracle + Send + Sync>,
465		metrics: Metrics,
466	) -> Self {
467		let pruning_config = PruningConfig {
468			keep_unavailable_for: KEEP_UNAVAILABLE_FOR,
469			keep_finalized_for: Duration::from_secs(config.keep_finalized_for as u64 * 3600),
470			pruning_interval: PRUNING_INTERVAL,
471		};
472
473		Self::with_pruning_config_and_clock(
474			db,
475			config,
476			pruning_config,
477			Box::new(SystemClock),
478			sync_oracle,
479			metrics,
480		)
481	}
482
483	/// Create a new `AvailabilityStoreSubsystem` with a given config on disk.
484	fn with_pruning_config_and_clock(
485		db: Arc<dyn Database>,
486		config: Config,
487		pruning_config: PruningConfig,
488		clock: Box<dyn Clock>,
489		sync_oracle: Box<dyn SyncOracle + Send + Sync>,
490		metrics: Metrics,
491	) -> Self {
492		Self {
493			pruning_config,
494			config,
495			db,
496			metrics,
497			clock,
498			known_blocks: KnownUnfinalizedBlocks::default(),
499			sync_oracle,
500			finalized_number: None,
501		}
502	}
503}
504
505/// We keep the hashes and numbers of all unfinalized
506/// processed blocks in memory.
507#[derive(Default, Debug)]
508struct KnownUnfinalizedBlocks {
509	by_hash: HashSet<Hash>,
510	by_number: BTreeSet<(BlockNumber, Hash)>,
511}
512
513impl KnownUnfinalizedBlocks {
514	/// Check whether the block has been already processed.
515	fn is_known(&self, hash: &Hash) -> bool {
516		self.by_hash.contains(hash)
517	}
518
519	/// Insert a new block into the known set.
520	fn insert(&mut self, hash: Hash, number: BlockNumber) {
521		self.by_hash.insert(hash);
522		self.by_number.insert((number, hash));
523	}
524
525	/// Prune all finalized blocks.
526	fn prune_finalized(&mut self, finalized: BlockNumber) {
527		// split_off returns everything after the given key, including the key
528		let split_point = finalized.saturating_add(1);
529		let mut finalized = self.by_number.split_off(&(split_point, Hash::zero()));
530		// after split_off `finalized` actually contains unfinalized blocks, we need to swap
531		std::mem::swap(&mut self.by_number, &mut finalized);
532		for (_, block) in finalized {
533			self.by_hash.remove(&block);
534		}
535	}
536}
537
538#[overseer::subsystem(AvailabilityStore, error=SubsystemError, prefix=self::overseer)]
539impl<Context> AvailabilityStoreSubsystem {
540	fn start(self, ctx: Context) -> SpawnedSubsystem {
541		let future = run::<Context>(self, ctx).map(|_| Ok(())).boxed();
542
543		SpawnedSubsystem { name: "availability-store-subsystem", future }
544	}
545}
546
547#[overseer::contextbounds(AvailabilityStore, prefix = self::overseer)]
548async fn run<Context>(mut subsystem: AvailabilityStoreSubsystem, mut ctx: Context) {
549	let mut next_pruning = Delay::new(subsystem.pruning_config.pruning_interval).fuse();
550	// Pruning interval is in the order of minutes so we shouldn't have more than one task running
551	// at one moment in time, so 10 should be more than enough.
552	let (mut pruning_result_tx, mut pruning_result_rx) = channel(10);
553	loop {
554		let res = run_iteration(
555			&mut ctx,
556			&mut subsystem,
557			&mut next_pruning,
558			(&mut pruning_result_tx, &mut pruning_result_rx),
559		)
560		.await;
561		match res {
562			Err(e) => {
563				e.trace();
564				if e.is_fatal() {
565					break
566				}
567			},
568			Ok(true) => {
569				gum::info!(target: LOG_TARGET, "received `Conclude` signal, exiting");
570				break
571			},
572			Ok(false) => continue,
573		}
574	}
575}
576
577#[overseer::contextbounds(AvailabilityStore, prefix = self::overseer)]
578async fn run_iteration<Context>(
579	ctx: &mut Context,
580	subsystem: &mut AvailabilityStoreSubsystem,
581	mut next_pruning: &mut future::Fuse<Delay>,
582	(pruning_result_tx, pruning_result_rx): (
583		&mut MpscSender<Result<(), Error>>,
584		&mut MpscReceiver<Result<(), Error>>,
585	),
586) -> Result<bool, Error> {
587	select! {
588		incoming = ctx.recv().fuse() => {
589			match incoming.map_err(|_| Error::ContextChannelClosed)? {
590				FromOrchestra::Signal(OverseerSignal::Conclude) => return Ok(true),
591				FromOrchestra::Signal(OverseerSignal::ActiveLeaves(
592					ActiveLeavesUpdate { activated, .. })
593				) => {
594					for activated in activated.into_iter() {
595						let _timer = subsystem.metrics.time_block_activated();
596						process_block_activated(ctx, subsystem, activated.hash).await?;
597					}
598				}
599				FromOrchestra::Signal(OverseerSignal::BlockFinalized(hash, number)) => {
600					let _timer = subsystem.metrics.time_process_block_finalized();
601
602					if !subsystem.known_blocks.is_known(&hash) {
603						// If we haven't processed this block yet,
604						// make sure we write the metadata about the
605						// candidates backed in this finalized block.
606						// Otherwise, we won't be able to store our chunk
607						// for these candidates.
608						if !subsystem.sync_oracle.is_major_syncing() {
609							// If we're major syncing, processing finalized
610							// blocks might take quite a very long time
611							// and make the subsystem unresponsive.
612							process_block_activated(ctx, subsystem, hash).await?;
613						}
614					}
615					subsystem.finalized_number = Some(number);
616					subsystem.known_blocks.prune_finalized(number);
617					process_block_finalized(
618						ctx,
619						&subsystem,
620						hash,
621						number,
622					).await?;
623				}
624				FromOrchestra::Communication { msg } => {
625					let _timer = subsystem.metrics.time_process_message();
626					process_message(subsystem, msg)?;
627				}
628			}
629		}
630		_ = next_pruning => {
631			// It's important to set the delay before calling `prune_all` because an error in `prune_all`
632			// could lead to the delay not being set again. Then we would never prune anything anymore.
633			*next_pruning = Delay::new(subsystem.pruning_config.pruning_interval).fuse();
634			start_prune_all(ctx, subsystem, pruning_result_tx.clone()).await?;
635		},
636		// Received the prune result and propagate the errors, so that in case of a fatal error
637		// the main loop of the subsystem can exit graciously.
638		result = pruning_result_rx.next() => {
639			if let Some(result) = result {
640				result?;
641			}
642		},
643	}
644
645	Ok(false)
646}
647
648// Start prune-all on a separate thread, so that in the case when the operation takes
649// longer than expected we don't keep the whole subsystem blocked.
650// See: https://github.com/paritytech/polkadot/issues/7237 for more details.
651#[overseer::contextbounds(AvailabilityStore, prefix = self::overseer)]
652async fn start_prune_all<Context>(
653	ctx: &mut Context,
654	subsystem: &mut AvailabilityStoreSubsystem,
655	mut pruning_result_tx: MpscSender<Result<(), Error>>,
656) -> Result<(), Error> {
657	let metrics = subsystem.metrics.clone();
658	let db = subsystem.db.clone();
659	let config = subsystem.config;
660	let time_now = subsystem.clock.now()?;
661
662	ctx.spawn_blocking(
663		"av-store-prunning",
664		Box::pin(async move {
665			let _timer = metrics.time_pruning();
666
667			gum::debug!(target: LOG_TARGET, "Prunning started");
668			let result = prune_all(&db, &config, time_now);
669
670			if let Err(err) = pruning_result_tx.send(result).await {
671				// This usually means that the node is closing down, log it just in case
672				gum::debug!(target: LOG_TARGET, ?err, "Failed to send prune_all result",);
673			}
674		}),
675	)?;
676	Ok(())
677}
678
679#[overseer::contextbounds(AvailabilityStore, prefix = self::overseer)]
680async fn process_block_activated<Context>(
681	ctx: &mut Context,
682	subsystem: &mut AvailabilityStoreSubsystem,
683	activated: Hash,
684) -> Result<(), Error> {
685	let now = subsystem.clock.now()?;
686
687	let block_header = {
688		let (tx, rx) = oneshot::channel();
689
690		ctx.send_message(ChainApiMessage::BlockHeader(activated, tx)).await;
691
692		match rx.await?? {
693			None => return Ok(()),
694			Some(n) => n,
695		}
696	};
697	let block_number = block_header.number;
698
699	let new_blocks = util::determine_new_blocks(
700		ctx.sender(),
701		|hash| -> Result<bool, Error> { Ok(subsystem.known_blocks.is_known(hash)) },
702		activated,
703		&block_header,
704		subsystem.finalized_number.unwrap_or(block_number.saturating_sub(1)),
705	)
706	.await?;
707
708	// determine_new_blocks is descending in block height
709	for (hash, header) in new_blocks.into_iter().rev() {
710		// it's important to commit the db transactions for a head before the next one is processed
711		// alternatively, we could utilize the OverlayBackend from approval-voting
712		let mut tx = DBTransaction::new();
713		process_new_head(
714			ctx,
715			&subsystem.db,
716			&mut tx,
717			&subsystem.config,
718			&subsystem.pruning_config,
719			now,
720			hash,
721			header,
722		)
723		.await?;
724		subsystem.known_blocks.insert(hash, block_number);
725		subsystem.db.write(tx)?;
726	}
727
728	Ok(())
729}
730
731#[overseer::contextbounds(AvailabilityStore, prefix = self::overseer)]
732async fn process_new_head<Context>(
733	ctx: &mut Context,
734	db: &Arc<dyn Database>,
735	db_transaction: &mut DBTransaction,
736	config: &Config,
737	pruning_config: &PruningConfig,
738	now: Duration,
739	hash: Hash,
740	header: Header,
741) -> Result<(), Error> {
742	let candidate_events = util::request_candidate_events(hash, ctx.sender()).await.await??;
743
744	// We need to request the number of validators based on the parent state,
745	// as that is the number of validators used to create this block.
746	let n_validators =
747		util::request_validators(header.parent_hash, ctx.sender()).await.await??.len();
748
749	for event in candidate_events {
750		match event {
751			CandidateEvent::CandidateBacked(receipt, _head, _core_index, _group_index) => {
752				note_block_backed(
753					db,
754					db_transaction,
755					config,
756					pruning_config,
757					now,
758					n_validators,
759					receipt,
760				)?;
761			},
762			CandidateEvent::CandidateIncluded(receipt, _head, _core_index, _group_index) => {
763				note_block_included(
764					db,
765					db_transaction,
766					config,
767					pruning_config,
768					(header.number, hash),
769					receipt,
770				)?;
771			},
772			_ => {},
773		}
774	}
775
776	Ok(())
777}
778
779fn note_block_backed(
780	db: &Arc<dyn Database>,
781	db_transaction: &mut DBTransaction,
782	config: &Config,
783	pruning_config: &PruningConfig,
784	now: Duration,
785	n_validators: usize,
786	candidate: CandidateReceipt,
787) -> Result<(), Error> {
788	let candidate_hash = candidate.hash();
789
790	gum::debug!(target: LOG_TARGET, ?candidate_hash, "Candidate backed");
791
792	if load_meta(db, config, &candidate_hash)?.is_none() {
793		let meta = CandidateMeta {
794			state: State::Unavailable(now.into()),
795			data_available: false,
796			chunks_stored: bitvec::bitvec![u8, BitOrderLsb0; 0; n_validators],
797		};
798
799		let prune_at = now + pruning_config.keep_unavailable_for;
800
801		write_pruning_key(db_transaction, config, prune_at, &candidate_hash);
802		write_meta(db_transaction, config, &candidate_hash, &meta);
803	}
804
805	Ok(())
806}
807
808fn note_block_included(
809	db: &Arc<dyn Database>,
810	db_transaction: &mut DBTransaction,
811	config: &Config,
812	pruning_config: &PruningConfig,
813	block: (BlockNumber, Hash),
814	candidate: CandidateReceipt,
815) -> Result<(), Error> {
816	let candidate_hash = candidate.hash();
817
818	match load_meta(db, config, &candidate_hash)? {
819		None => {
820			// This is alarming. We've observed a block being included without ever seeing it
821			// backed. Warn and ignore.
822			gum::warn!(
823				target: LOG_TARGET,
824				?candidate_hash,
825				"Candidate included without being backed?",
826			);
827		},
828		Some(mut meta) => {
829			let be_block = (BEBlockNumber(block.0), block.1);
830
831			gum::debug!(target: LOG_TARGET, ?candidate_hash, "Candidate included");
832
833			meta.state = match meta.state {
834				State::Unavailable(at) => {
835					let at_d: Duration = at.into();
836					let prune_at = at_d + pruning_config.keep_unavailable_for;
837					delete_pruning_key(db_transaction, config, prune_at, &candidate_hash);
838
839					State::Unfinalized(at, vec![be_block])
840				},
841				State::Unfinalized(at, mut within) => {
842					if let Err(i) = within.binary_search(&be_block) {
843						within.insert(i, be_block);
844						State::Unfinalized(at, within)
845					} else {
846						return Ok(())
847					}
848				},
849				State::Finalized(_at) => {
850					// This should never happen as a candidate would have to be included after
851					// finality.
852					return Ok(())
853				},
854			};
855
856			write_unfinalized_block_contains(
857				db_transaction,
858				config,
859				block.0,
860				&block.1,
861				&candidate_hash,
862			);
863			write_meta(db_transaction, config, &candidate_hash, &meta);
864		},
865	}
866
867	Ok(())
868}
869
870macro_rules! peek_num {
871	($iter:ident) => {
872		match $iter.peek() {
873			Some(Ok((k, _))) => Ok(decode_unfinalized_key(&k[..]).ok().map(|(b, _, _)| b)),
874			Some(Err(_)) => Err($iter.next().expect("peek returned Some(Err); qed").unwrap_err()),
875			None => Ok(None),
876		}
877	};
878}
879
880#[overseer::contextbounds(AvailabilityStore, prefix = self::overseer)]
881async fn process_block_finalized<Context>(
882	ctx: &mut Context,
883	subsystem: &AvailabilityStoreSubsystem,
884	finalized_hash: Hash,
885	finalized_number: BlockNumber,
886) -> Result<(), Error> {
887	let now = subsystem.clock.now()?;
888
889	let mut next_possible_batch = 0;
890	loop {
891		let mut db_transaction = DBTransaction::new();
892		let (start_prefix, end_prefix) = finalized_block_range(finalized_number);
893
894		// We have to do some juggling here of the `iter` to make sure it doesn't cross the `.await`
895		// boundary as it is not `Send`. That is why we create the iterator once within this loop,
896		// drop it, do an asynchronous request, and then instantiate the exact same iterator again.
897		let batch_num = {
898			let mut iter = subsystem
899				.db
900				.iter_with_prefix(subsystem.config.col_meta, &start_prefix)
901				.take_while(|r| r.as_ref().map_or(true, |(k, _v)| &k[..] < &end_prefix[..]))
902				.peekable();
903
904			match peek_num!(iter)? {
905				None => break, // end of iterator.
906				Some(n) => n,
907			}
908		};
909
910		if batch_num < next_possible_batch {
911			continue
912		} // sanity.
913		next_possible_batch = batch_num + 1;
914
915		let batch_finalized_hash = if batch_num == finalized_number {
916			finalized_hash
917		} else {
918			let (tx, rx) = oneshot::channel();
919			ctx.send_message(ChainApiMessage::FinalizedBlockHash(batch_num, tx)).await;
920
921			match rx.await? {
922				Err(err) => {
923					gum::warn!(
924						target: LOG_TARGET,
925						batch_num,
926						?err,
927						"Failed to retrieve finalized block number.",
928					);
929
930					break
931				},
932				Ok(None) => {
933					gum::warn!(
934						target: LOG_TARGET,
935						"Availability store was informed that block #{} is finalized, \
936						but chain API has no finalized hash.",
937						batch_num,
938					);
939
940					break
941				},
942				Ok(Some(h)) => h,
943			}
944		};
945
946		let iter = subsystem
947			.db
948			.iter_with_prefix(subsystem.config.col_meta, &start_prefix)
949			.take_while(|r| r.as_ref().map_or(true, |(k, _v)| &k[..] < &end_prefix[..]))
950			.peekable();
951
952		let batch = load_all_at_finalized_height(iter, batch_num, batch_finalized_hash)?;
953
954		// Now that we've iterated over the entire batch at this finalized height,
955		// update the meta.
956
957		delete_unfinalized_height(&mut db_transaction, &subsystem.config, batch_num);
958
959		update_blocks_at_finalized_height(&subsystem, &mut db_transaction, batch, batch_num, now)?;
960
961		// We need to write at the end of the loop so the prefix iterator doesn't pick up the same
962		// values again in the next iteration. Another unfortunate effect of having to re-initialize
963		// the iterator.
964		subsystem.db.write(db_transaction)?;
965	}
966
967	Ok(())
968}
969
970// loads all candidates at the finalized height and maps them to `true` if finalized
971// and `false` if unfinalized.
972fn load_all_at_finalized_height(
973	mut iter: std::iter::Peekable<impl Iterator<Item = io::Result<util::database::DBKeyValue>>>,
974	block_number: BlockNumber,
975	finalized_hash: Hash,
976) -> io::Result<impl IntoIterator<Item = (CandidateHash, bool)>> {
977	// maps candidate hashes to true if finalized, false otherwise.
978	let mut candidates = HashMap::new();
979
980	// Load all candidates that were included at this height.
981	loop {
982		match peek_num!(iter)? {
983			None => break,                         // end of iterator.
984			Some(n) if n != block_number => break, // end of batch.
985			_ => {},
986		}
987
988		let (k, _v) = iter.next().expect("`peek` used to check non-empty; qed")?;
989		let (_, block_hash, candidate_hash) =
990			decode_unfinalized_key(&k[..]).expect("`peek_num` checks validity of key; qed");
991
992		if block_hash == finalized_hash {
993			candidates.insert(candidate_hash, true);
994		} else {
995			candidates.entry(candidate_hash).or_insert(false);
996		}
997	}
998
999	Ok(candidates)
1000}
1001
1002fn update_blocks_at_finalized_height(
1003	subsystem: &AvailabilityStoreSubsystem,
1004	db_transaction: &mut DBTransaction,
1005	candidates: impl IntoIterator<Item = (CandidateHash, bool)>,
1006	block_number: BlockNumber,
1007	now: Duration,
1008) -> Result<(), Error> {
1009	for (candidate_hash, is_finalized) in candidates {
1010		let mut meta = match load_meta(&subsystem.db, &subsystem.config, &candidate_hash)? {
1011			None => {
1012				gum::warn!(
1013					target: LOG_TARGET,
1014					"Dangling candidate metadata for {}",
1015					candidate_hash,
1016				);
1017
1018				continue
1019			},
1020			Some(c) => c,
1021		};
1022
1023		if is_finalized {
1024			// Clear everything else related to this block. We're finalized now!
1025			match meta.state {
1026				State::Finalized(_) => continue, // sanity
1027				State::Unavailable(at) => {
1028					// This is also not going to happen; the very fact that we are
1029					// iterating over the candidate here indicates that `State` should
1030					// be `Unfinalized`.
1031					delete_pruning_key(db_transaction, &subsystem.config, at, &candidate_hash);
1032				},
1033				State::Unfinalized(_, blocks) => {
1034					for (block_num, block_hash) in blocks.iter().cloned() {
1035						// this exact height is all getting cleared out anyway.
1036						if block_num.0 != block_number {
1037							delete_unfinalized_inclusion(
1038								db_transaction,
1039								&subsystem.config,
1040								block_num.0,
1041								&block_hash,
1042								&candidate_hash,
1043							);
1044						}
1045					}
1046				},
1047			}
1048
1049			meta.state = State::Finalized(now.into());
1050
1051			// Write the meta and a pruning record.
1052			write_meta(db_transaction, &subsystem.config, &candidate_hash, &meta);
1053			write_pruning_key(
1054				db_transaction,
1055				&subsystem.config,
1056				now + subsystem.pruning_config.keep_finalized_for,
1057				&candidate_hash,
1058			);
1059		} else {
1060			meta.state = match meta.state {
1061				State::Finalized(_) => continue,   // sanity.
1062				State::Unavailable(_) => continue, // sanity.
1063				State::Unfinalized(at, mut blocks) => {
1064					// Clear out everything at this height.
1065					blocks.retain(|(n, _)| n.0 != block_number);
1066
1067					// If empty, we need to go back to being unavailable as we aren't
1068					// aware of any blocks this is included in.
1069					if blocks.is_empty() {
1070						let at_d: Duration = at.into();
1071						let prune_at = at_d + subsystem.pruning_config.keep_unavailable_for;
1072						write_pruning_key(
1073							db_transaction,
1074							&subsystem.config,
1075							prune_at,
1076							&candidate_hash,
1077						);
1078						State::Unavailable(at)
1079					} else {
1080						State::Unfinalized(at, blocks)
1081					}
1082				},
1083			};
1084
1085			// Update the meta entry.
1086			write_meta(db_transaction, &subsystem.config, &candidate_hash, &meta)
1087		}
1088	}
1089
1090	Ok(())
1091}
1092
1093fn process_message(
1094	subsystem: &mut AvailabilityStoreSubsystem,
1095	msg: AvailabilityStoreMessage,
1096) -> Result<(), Error> {
1097	match msg {
1098		AvailabilityStoreMessage::QueryAvailableData(candidate, tx) => {
1099			let _ = tx.send(load_available_data(&subsystem.db, &subsystem.config, &candidate)?);
1100		},
1101		AvailabilityStoreMessage::QueryDataAvailability(candidate, tx) => {
1102			let a = load_meta(&subsystem.db, &subsystem.config, &candidate)?
1103				.map_or(false, |m| m.data_available);
1104			let _ = tx.send(a);
1105		},
1106		AvailabilityStoreMessage::QueryChunk(candidate, validator_index, tx) => {
1107			let _timer = subsystem.metrics.time_get_chunk();
1108			let _ =
1109				tx.send(load_chunk(&subsystem.db, &subsystem.config, &candidate, validator_index)?);
1110		},
1111		AvailabilityStoreMessage::QueryChunkSize(candidate, tx) => {
1112			let meta = load_meta(&subsystem.db, &subsystem.config, &candidate)?;
1113
1114			let validator_index = meta.map_or(None, |meta| meta.chunks_stored.first_one());
1115
1116			let maybe_chunk_size = if let Some(validator_index) = validator_index {
1117				load_chunk(
1118					&subsystem.db,
1119					&subsystem.config,
1120					&candidate,
1121					ValidatorIndex(validator_index as u32),
1122				)?
1123				.map(|erasure_chunk| erasure_chunk.chunk.len())
1124			} else {
1125				None
1126			};
1127
1128			let _ = tx.send(maybe_chunk_size);
1129		},
1130		AvailabilityStoreMessage::QueryAllChunks(candidate, tx) => {
1131			match load_meta(&subsystem.db, &subsystem.config, &candidate)? {
1132				None => {
1133					let _ = tx.send(Vec::new());
1134				},
1135				Some(meta) => {
1136					let mut chunks = Vec::new();
1137
1138					for (validator_index, _) in
1139						meta.chunks_stored.iter().enumerate().filter(|(_, b)| **b)
1140					{
1141						let validator_index = ValidatorIndex(validator_index as _);
1142						let _timer = subsystem.metrics.time_get_chunk();
1143						match load_chunk(
1144							&subsystem.db,
1145							&subsystem.config,
1146							&candidate,
1147							validator_index,
1148						)? {
1149							Some(c) => chunks.push((validator_index, c)),
1150							None => {
1151								gum::warn!(
1152									target: LOG_TARGET,
1153									?candidate,
1154									?validator_index,
1155									"No chunk found for set bit in meta"
1156								);
1157							},
1158						}
1159					}
1160
1161					let _ = tx.send(chunks);
1162				},
1163			}
1164		},
1165		AvailabilityStoreMessage::QueryChunkAvailability(candidate, validator_index, tx) => {
1166			let a = load_meta(&subsystem.db, &subsystem.config, &candidate)?.map_or(false, |m| {
1167				*m.chunks_stored.get(validator_index.0 as usize).as_deref().unwrap_or(&false)
1168			});
1169			let _ = tx.send(a);
1170		},
1171		AvailabilityStoreMessage::StoreChunk { candidate_hash, validator_index, chunk, tx } => {
1172			subsystem.metrics.on_chunks_received(1);
1173			let _timer = subsystem.metrics.time_store_chunk();
1174
1175			match store_chunk(
1176				&subsystem.db,
1177				&subsystem.config,
1178				candidate_hash,
1179				validator_index,
1180				chunk,
1181			) {
1182				Ok(true) => {
1183					let _ = tx.send(Ok(()));
1184				},
1185				Ok(false) => {
1186					let _ = tx.send(Err(()));
1187				},
1188				Err(e) => {
1189					let _ = tx.send(Err(()));
1190					return Err(e)
1191				},
1192			}
1193		},
1194		AvailabilityStoreMessage::StoreAvailableData {
1195			candidate_hash,
1196			n_validators,
1197			available_data,
1198			expected_erasure_root,
1199			core_index,
1200			node_features,
1201			tx,
1202		} => {
1203			subsystem.metrics.on_chunks_received(n_validators as _);
1204
1205			let _timer = subsystem.metrics.time_store_available_data();
1206
1207			let res = store_available_data(
1208				&subsystem,
1209				candidate_hash,
1210				n_validators as _,
1211				available_data,
1212				expected_erasure_root,
1213				core_index,
1214				node_features,
1215			);
1216
1217			match res {
1218				Ok(()) => {
1219					let _ = tx.send(Ok(()));
1220				},
1221				Err(Error::InvalidErasureRoot) => {
1222					let _ = tx.send(Err(StoreAvailableDataError::InvalidErasureRoot));
1223					return Err(Error::InvalidErasureRoot)
1224				},
1225				Err(e) => {
1226					// We do not bubble up internal errors to caller subsystems, instead the
1227					// tx channel is dropped and that error is caught by the caller subsystem.
1228					//
1229					// We bubble up the specific error here so `av-store` logs still tell what
1230					// happened.
1231					return Err(e.into())
1232				},
1233			}
1234		},
1235	}
1236
1237	Ok(())
1238}
1239
1240// Ok(true) on success, Ok(false) on failure, and Err on internal error.
1241fn store_chunk(
1242	db: &Arc<dyn Database>,
1243	config: &Config,
1244	candidate_hash: CandidateHash,
1245	validator_index: ValidatorIndex,
1246	chunk: ErasureChunk,
1247) -> Result<bool, Error> {
1248	let mut tx = DBTransaction::new();
1249
1250	let mut meta = match load_meta(db, config, &candidate_hash)? {
1251		Some(m) => m,
1252		None => return Ok(false), // we weren't informed of this candidate by import events.
1253	};
1254
1255	match meta.chunks_stored.get(validator_index.0 as usize).map(|b| *b) {
1256		Some(true) => return Ok(true), // already stored.
1257		Some(false) => {
1258			meta.chunks_stored.set(validator_index.0 as usize, true);
1259
1260			write_chunk(&mut tx, config, &candidate_hash, validator_index, &chunk);
1261			write_meta(&mut tx, config, &candidate_hash, &meta);
1262		},
1263		None => return Ok(false), // out of bounds.
1264	}
1265
1266	gum::debug!(
1267		target: LOG_TARGET,
1268		?candidate_hash,
1269		chunk_index = %chunk.index.0,
1270		validator_index = %validator_index.0,
1271		"Stored chunk index for candidate.",
1272	);
1273
1274	db.write(tx)?;
1275	Ok(true)
1276}
1277
1278fn store_available_data(
1279	subsystem: &AvailabilityStoreSubsystem,
1280	candidate_hash: CandidateHash,
1281	n_validators: usize,
1282	available_data: AvailableData,
1283	expected_erasure_root: Hash,
1284	core_index: CoreIndex,
1285	node_features: NodeFeatures,
1286) -> Result<(), Error> {
1287	let mut tx = DBTransaction::new();
1288
1289	let mut meta = match load_meta(&subsystem.db, &subsystem.config, &candidate_hash)? {
1290		Some(m) => {
1291			if m.data_available {
1292				return Ok(()) // already stored.
1293			}
1294
1295			m
1296		},
1297		None => {
1298			let now = subsystem.clock.now()?;
1299
1300			// Write a pruning record.
1301			let prune_at = now + subsystem.pruning_config.keep_unavailable_for;
1302			write_pruning_key(&mut tx, &subsystem.config, prune_at, &candidate_hash);
1303
1304			CandidateMeta {
1305				state: State::Unavailable(now.into()),
1306				data_available: false,
1307				chunks_stored: BitVec::new(),
1308			}
1309		},
1310	};
1311
1312	// Important note: This check below is critical for consensus and the `backing` subsystem relies
1313	// on it to ensure candidate validity.
1314	let chunks = polkadot_erasure_coding::obtain_chunks_v1(n_validators, &available_data)?;
1315	let branches = polkadot_erasure_coding::branches(chunks.as_ref());
1316
1317	if branches.root() != expected_erasure_root {
1318		return Err(Error::InvalidErasureRoot)
1319	}
1320
1321	let erasure_chunks: Vec<_> = chunks
1322		.iter()
1323		.zip(branches.map(|(proof, _)| proof))
1324		.enumerate()
1325		.map(|(index, (chunk, proof))| ErasureChunk {
1326			chunk: chunk.clone(),
1327			proof,
1328			index: ChunkIndex(index as u32),
1329		})
1330		.collect();
1331
1332	let chunk_indices = availability_chunk_indices(&node_features, n_validators, core_index)?;
1333	for (validator_index, chunk_index) in chunk_indices.into_iter().enumerate() {
1334		write_chunk(
1335			&mut tx,
1336			&subsystem.config,
1337			&candidate_hash,
1338			ValidatorIndex(validator_index as u32),
1339			&erasure_chunks[chunk_index.0 as usize],
1340		);
1341	}
1342
1343	meta.data_available = true;
1344	meta.chunks_stored = bitvec::bitvec![u8, BitOrderLsb0; 1; n_validators];
1345
1346	write_meta(&mut tx, &subsystem.config, &candidate_hash, &meta);
1347	write_available_data(&mut tx, &subsystem.config, &candidate_hash, &available_data);
1348
1349	subsystem.db.write(tx)?;
1350
1351	gum::debug!(target: LOG_TARGET, ?candidate_hash, "Stored data and chunks");
1352
1353	Ok(())
1354}
1355
1356fn prune_all(db: &Arc<dyn Database>, config: &Config, now: Duration) -> Result<(), Error> {
1357	let (range_start, range_end) = pruning_range(now);
1358
1359	let mut tx = DBTransaction::new();
1360	let iter = db
1361		.iter_with_prefix(config.col_meta, &range_start[..])
1362		.take_while(|r| r.as_ref().map_or(true, |(k, _v)| &k[..] < &range_end[..]));
1363
1364	for r in iter {
1365		let (k, _v) = r?;
1366		tx.delete(config.col_meta, &k[..]);
1367
1368		let (_, candidate_hash) = match decode_pruning_key(&k[..]) {
1369			Ok(m) => m,
1370			Err(_) => continue, // sanity
1371		};
1372
1373		delete_meta(&mut tx, config, &candidate_hash);
1374
1375		// Clean up all attached data of the candidate.
1376		if let Some(meta) = load_meta(db, config, &candidate_hash)? {
1377			// delete available data.
1378			if meta.data_available {
1379				delete_available_data(&mut tx, config, &candidate_hash)
1380			}
1381
1382			// delete chunks.
1383			for (i, b) in meta.chunks_stored.iter().enumerate() {
1384				if *b {
1385					delete_chunk(&mut tx, config, &candidate_hash, ValidatorIndex(i as _));
1386				}
1387			}
1388
1389			// delete unfinalized block references. Pruning references don't need to be
1390			// manually taken care of as we are deleting them as we go in the outer loop.
1391			if let State::Unfinalized(_, blocks) = meta.state {
1392				for (block_number, block_hash) in blocks {
1393					delete_unfinalized_inclusion(
1394						&mut tx,
1395						config,
1396						block_number.0,
1397						&block_hash,
1398						&candidate_hash,
1399					);
1400				}
1401			}
1402		}
1403	}
1404
1405	db.write(tx)?;
1406	Ok(())
1407}