1#![recursion_limit = "256"]
20#![warn(missing_docs)]
21
22use std::{
23 collections::{BTreeSet, HashMap, HashSet},
24 io,
25 sync::Arc,
26 time::{Duration, SystemTime, SystemTimeError, UNIX_EPOCH},
27};
28
29use codec::{Decode, Encode, Error as CodecError, Input};
30use futures::{
31 channel::{
32 mpsc::{channel, Receiver as MpscReceiver, Sender as MpscSender},
33 oneshot,
34 },
35 future, select, FutureExt, SinkExt, StreamExt,
36};
37use futures_timer::Delay;
38use polkadot_node_subsystem_util::database::{DBTransaction, Database};
39use sp_consensus::SyncOracle;
40
41use bitvec::{order::Lsb0 as BitOrderLsb0, vec::BitVec};
42use polkadot_node_primitives::{AvailableData, ErasureChunk};
43use polkadot_node_subsystem::{
44 errors::{ChainApiError, RuntimeApiError},
45 messages::{AvailabilityStoreMessage, ChainApiMessage, StoreAvailableDataError},
46 overseer, ActiveLeavesUpdate, FromOrchestra, OverseerSignal, SpawnedSubsystem, SubsystemError,
47};
48use polkadot_node_subsystem_util as util;
49use polkadot_primitives::{
50 BlockNumber, CandidateEvent, CandidateHash, CandidateReceiptV2 as CandidateReceipt, ChunkIndex,
51 CoreIndex, Hash, Header, NodeFeatures, ValidatorIndex,
52};
53use util::availability_chunks::availability_chunk_indices;
54
55mod metrics;
56pub use self::metrics::*;
57
58#[cfg(test)]
59mod tests;
60
61const LOG_TARGET: &str = "parachain::availability-store";
62
63const AVAILABLE_PREFIX: &[u8; 9] = b"available";
66const CHUNK_PREFIX: &[u8; 5] = b"chunk";
67const META_PREFIX: &[u8; 4] = b"meta";
68const UNFINALIZED_PREFIX: &[u8; 11] = b"unfinalized";
69const PRUNE_BY_TIME_PREFIX: &[u8; 13] = b"prune_by_time";
70
71const TOMBSTONE_VALUE: &[u8] = b" ";
74
75const KEEP_UNAVAILABLE_FOR: Duration = Duration::from_secs(60 * 60);
77
78const PRUNING_INTERVAL: Duration = Duration::from_secs(60 * 5);
80
81#[derive(Debug, Clone, Copy, PartialEq, PartialOrd, Eq, Ord)]
83struct BETimestamp(u64);
84
85impl Encode for BETimestamp {
86 fn size_hint(&self) -> usize {
87 std::mem::size_of::<u64>()
88 }
89
90 fn using_encoded<R, F: FnOnce(&[u8]) -> R>(&self, f: F) -> R {
91 f(&self.0.to_be_bytes())
92 }
93}
94
95impl Decode for BETimestamp {
96 fn decode<I: Input>(value: &mut I) -> Result<Self, CodecError> {
97 <[u8; 8]>::decode(value).map(u64::from_be_bytes).map(Self)
98 }
99}
100
101impl From<Duration> for BETimestamp {
102 fn from(d: Duration) -> Self {
103 BETimestamp(d.as_secs())
104 }
105}
106
107impl Into<Duration> for BETimestamp {
108 fn into(self) -> Duration {
109 Duration::from_secs(self.0)
110 }
111}
112
113#[derive(Debug, Clone, PartialEq, PartialOrd, Eq, Ord)]
115struct BEBlockNumber(BlockNumber);
116
117impl Encode for BEBlockNumber {
118 fn size_hint(&self) -> usize {
119 std::mem::size_of::<BlockNumber>()
120 }
121
122 fn using_encoded<R, F: FnOnce(&[u8]) -> R>(&self, f: F) -> R {
123 f(&self.0.to_be_bytes())
124 }
125}
126
127impl Decode for BEBlockNumber {
128 fn decode<I: Input>(value: &mut I) -> Result<Self, CodecError> {
129 <[u8; std::mem::size_of::<BlockNumber>()]>::decode(value)
130 .map(BlockNumber::from_be_bytes)
131 .map(Self)
132 }
133}
134
135#[derive(Debug, Encode, Decode)]
136enum State {
137 #[codec(index = 0)]
139 Unavailable(BETimestamp),
140 #[codec(index = 1)]
146 Unfinalized(BETimestamp, Vec<(BEBlockNumber, Hash)>),
147 #[codec(index = 2)]
149 Finalized(BETimestamp),
150}
151
152#[derive(Debug, Encode, Decode)]
154struct CandidateMeta {
155 state: State,
156 data_available: bool,
157 chunks_stored: BitVec<u8, BitOrderLsb0>,
158}
159
160fn query_inner<D: Decode>(
161 db: &Arc<dyn Database>,
162 column: u32,
163 key: &[u8],
164) -> Result<Option<D>, Error> {
165 match db.get(column, key) {
166 Ok(Some(raw)) => {
167 let res = D::decode(&mut &raw[..])?;
168 Ok(Some(res))
169 },
170 Ok(None) => Ok(None),
171 Err(err) => {
172 gum::warn!(target: LOG_TARGET, ?err, "Error reading from the availability store");
173 Err(err.into())
174 },
175 }
176}
177
178fn write_available_data(
179 tx: &mut DBTransaction,
180 config: &Config,
181 hash: &CandidateHash,
182 available_data: &AvailableData,
183) {
184 let key = (AVAILABLE_PREFIX, hash).encode();
185
186 tx.put_vec(config.col_data, &key[..], available_data.encode());
187}
188
189fn load_available_data(
190 db: &Arc<dyn Database>,
191 config: &Config,
192 hash: &CandidateHash,
193) -> Result<Option<AvailableData>, Error> {
194 let key = (AVAILABLE_PREFIX, hash).encode();
195
196 query_inner(db, config.col_data, &key)
197}
198
199fn delete_available_data(tx: &mut DBTransaction, config: &Config, hash: &CandidateHash) {
200 let key = (AVAILABLE_PREFIX, hash).encode();
201
202 tx.delete(config.col_data, &key[..])
203}
204
205fn load_chunk(
206 db: &Arc<dyn Database>,
207 config: &Config,
208 candidate_hash: &CandidateHash,
209 validator_index: ValidatorIndex,
210) -> Result<Option<ErasureChunk>, Error> {
211 let key = (CHUNK_PREFIX, candidate_hash, validator_index).encode();
212
213 query_inner(db, config.col_data, &key)
214}
215
216fn write_chunk(
217 tx: &mut DBTransaction,
218 config: &Config,
219 candidate_hash: &CandidateHash,
220 validator_index: ValidatorIndex,
221 erasure_chunk: &ErasureChunk,
222) {
223 let key = (CHUNK_PREFIX, candidate_hash, validator_index).encode();
224
225 tx.put_vec(config.col_data, &key, erasure_chunk.encode());
226}
227
228fn delete_chunk(
229 tx: &mut DBTransaction,
230 config: &Config,
231 candidate_hash: &CandidateHash,
232 validator_index: ValidatorIndex,
233) {
234 let key = (CHUNK_PREFIX, candidate_hash, validator_index).encode();
235
236 tx.delete(config.col_data, &key[..]);
237}
238
239fn load_meta(
240 db: &Arc<dyn Database>,
241 config: &Config,
242 hash: &CandidateHash,
243) -> Result<Option<CandidateMeta>, Error> {
244 let key = (META_PREFIX, hash).encode();
245
246 query_inner(db, config.col_meta, &key)
247}
248
249fn write_meta(tx: &mut DBTransaction, config: &Config, hash: &CandidateHash, meta: &CandidateMeta) {
250 let key = (META_PREFIX, hash).encode();
251
252 tx.put_vec(config.col_meta, &key, meta.encode());
253}
254
255fn delete_meta(tx: &mut DBTransaction, config: &Config, hash: &CandidateHash) {
256 let key = (META_PREFIX, hash).encode();
257 tx.delete(config.col_meta, &key[..])
258}
259
260fn delete_unfinalized_height(tx: &mut DBTransaction, config: &Config, block_number: BlockNumber) {
261 let prefix = (UNFINALIZED_PREFIX, BEBlockNumber(block_number)).encode();
262 tx.delete_prefix(config.col_meta, &prefix);
263}
264
265fn delete_unfinalized_inclusion(
266 tx: &mut DBTransaction,
267 config: &Config,
268 block_number: BlockNumber,
269 block_hash: &Hash,
270 candidate_hash: &CandidateHash,
271) {
272 let key =
273 (UNFINALIZED_PREFIX, BEBlockNumber(block_number), block_hash, candidate_hash).encode();
274
275 tx.delete(config.col_meta, &key[..]);
276}
277
278fn delete_pruning_key(
279 tx: &mut DBTransaction,
280 config: &Config,
281 t: impl Into<BETimestamp>,
282 h: &CandidateHash,
283) {
284 let key = (PRUNE_BY_TIME_PREFIX, t.into(), h).encode();
285 tx.delete(config.col_meta, &key);
286}
287
288fn write_pruning_key(
289 tx: &mut DBTransaction,
290 config: &Config,
291 t: impl Into<BETimestamp>,
292 h: &CandidateHash,
293) {
294 let t = t.into();
295 let key = (PRUNE_BY_TIME_PREFIX, t, h).encode();
296 tx.put(config.col_meta, &key, TOMBSTONE_VALUE);
297}
298
299fn finalized_block_range(finalized: BlockNumber) -> (Vec<u8>, Vec<u8>) {
300 let start = UNFINALIZED_PREFIX.encode();
302 let end = (UNFINALIZED_PREFIX, BEBlockNumber(finalized + 1)).encode();
303
304 (start, end)
305}
306
307fn write_unfinalized_block_contains(
308 tx: &mut DBTransaction,
309 config: &Config,
310 n: BlockNumber,
311 h: &Hash,
312 ch: &CandidateHash,
313) {
314 let key = (UNFINALIZED_PREFIX, BEBlockNumber(n), h, ch).encode();
315 tx.put(config.col_meta, &key, TOMBSTONE_VALUE);
316}
317
318fn pruning_range(now: impl Into<BETimestamp>) -> (Vec<u8>, Vec<u8>) {
319 let start = PRUNE_BY_TIME_PREFIX.encode();
320 let end = (PRUNE_BY_TIME_PREFIX, BETimestamp(now.into().0 + 1)).encode();
321
322 (start, end)
323}
324
325fn decode_unfinalized_key(s: &[u8]) -> Result<(BlockNumber, Hash, CandidateHash), CodecError> {
326 if !s.starts_with(UNFINALIZED_PREFIX) {
327 return Err("missing magic string".into())
328 }
329
330 <(BEBlockNumber, Hash, CandidateHash)>::decode(&mut &s[UNFINALIZED_PREFIX.len()..])
331 .map(|(b, h, ch)| (b.0, h, ch))
332}
333
334fn decode_pruning_key(s: &[u8]) -> Result<(Duration, CandidateHash), CodecError> {
335 if !s.starts_with(PRUNE_BY_TIME_PREFIX) {
336 return Err("missing magic string".into())
337 }
338
339 <(BETimestamp, CandidateHash)>::decode(&mut &s[PRUNE_BY_TIME_PREFIX.len()..])
340 .map(|(t, ch)| (t.into(), ch))
341}
342
343#[derive(Debug, thiserror::Error)]
344#[allow(missing_docs)]
345pub enum Error {
346 #[error(transparent)]
347 RuntimeApi(#[from] RuntimeApiError),
348
349 #[error(transparent)]
350 ChainApi(#[from] ChainApiError),
351
352 #[error(transparent)]
353 Erasure(#[from] polkadot_erasure_coding::Error),
354
355 #[error(transparent)]
356 Io(#[from] io::Error),
357
358 #[error(transparent)]
359 Oneshot(#[from] oneshot::Canceled),
360
361 #[error(transparent)]
362 Subsystem(#[from] SubsystemError),
363
364 #[error("Context signal channel closed")]
365 ContextChannelClosed,
366
367 #[error(transparent)]
368 Time(#[from] SystemTimeError),
369
370 #[error(transparent)]
371 Codec(#[from] CodecError),
372
373 #[error("Custom databases are not supported")]
374 CustomDatabase,
375
376 #[error("Erasure root does not match expected one")]
377 InvalidErasureRoot,
378}
379
380impl Error {
381 fn is_fatal(&self) -> bool {
385 match self {
386 Self::Io(_) => true,
387 Self::Oneshot(_) => true,
388 Self::CustomDatabase => true,
389 Self::ContextChannelClosed => true,
390 _ => false,
391 }
392 }
393}
394
395impl Error {
396 fn trace(&self) {
397 match self {
398 Self::RuntimeApi(_) | Self::Oneshot(_) => {
400 gum::debug!(target: LOG_TARGET, err = ?self)
401 },
402 _ => gum::warn!(target: LOG_TARGET, err = ?self),
404 }
405 }
406}
407
408#[derive(Clone)]
412struct PruningConfig {
413 keep_unavailable_for: Duration,
415
416 keep_finalized_for: Duration,
418
419 pruning_interval: Duration,
421}
422
423#[derive(Debug, Clone, Copy)]
425pub struct Config {
426 pub col_data: u32,
428 pub col_meta: u32,
430 pub keep_finalized_for: u32,
432}
433
434trait Clock: Send + Sync {
435 fn now(&self) -> Result<Duration, Error>;
437}
438
439struct SystemClock;
440
441impl Clock for SystemClock {
442 fn now(&self) -> Result<Duration, Error> {
443 SystemTime::now().duration_since(UNIX_EPOCH).map_err(Into::into)
444 }
445}
446
447pub struct AvailabilityStoreSubsystem {
449 pruning_config: PruningConfig,
450 config: Config,
451 db: Arc<dyn Database>,
452 known_blocks: KnownUnfinalizedBlocks,
453 finalized_number: Option<BlockNumber>,
454 metrics: Metrics,
455 clock: Box<dyn Clock>,
456 sync_oracle: Box<dyn SyncOracle + Send + Sync>,
457}
458
459impl AvailabilityStoreSubsystem {
460 pub fn new(
462 db: Arc<dyn Database>,
463 config: Config,
464 sync_oracle: Box<dyn SyncOracle + Send + Sync>,
465 metrics: Metrics,
466 ) -> Self {
467 let pruning_config = PruningConfig {
468 keep_unavailable_for: KEEP_UNAVAILABLE_FOR,
469 keep_finalized_for: Duration::from_secs(config.keep_finalized_for as u64 * 3600),
470 pruning_interval: PRUNING_INTERVAL,
471 };
472
473 Self::with_pruning_config_and_clock(
474 db,
475 config,
476 pruning_config,
477 Box::new(SystemClock),
478 sync_oracle,
479 metrics,
480 )
481 }
482
483 fn with_pruning_config_and_clock(
485 db: Arc<dyn Database>,
486 config: Config,
487 pruning_config: PruningConfig,
488 clock: Box<dyn Clock>,
489 sync_oracle: Box<dyn SyncOracle + Send + Sync>,
490 metrics: Metrics,
491 ) -> Self {
492 Self {
493 pruning_config,
494 config,
495 db,
496 metrics,
497 clock,
498 known_blocks: KnownUnfinalizedBlocks::default(),
499 sync_oracle,
500 finalized_number: None,
501 }
502 }
503}
504
505#[derive(Default, Debug)]
508struct KnownUnfinalizedBlocks {
509 by_hash: HashSet<Hash>,
510 by_number: BTreeSet<(BlockNumber, Hash)>,
511}
512
513impl KnownUnfinalizedBlocks {
514 fn is_known(&self, hash: &Hash) -> bool {
516 self.by_hash.contains(hash)
517 }
518
519 fn insert(&mut self, hash: Hash, number: BlockNumber) {
521 self.by_hash.insert(hash);
522 self.by_number.insert((number, hash));
523 }
524
525 fn prune_finalized(&mut self, finalized: BlockNumber) {
527 let split_point = finalized.saturating_add(1);
529 let mut finalized = self.by_number.split_off(&(split_point, Hash::zero()));
530 std::mem::swap(&mut self.by_number, &mut finalized);
532 for (_, block) in finalized {
533 self.by_hash.remove(&block);
534 }
535 }
536}
537
538#[overseer::subsystem(AvailabilityStore, error=SubsystemError, prefix=self::overseer)]
539impl<Context> AvailabilityStoreSubsystem {
540 fn start(self, ctx: Context) -> SpawnedSubsystem {
541 let future = run::<Context>(self, ctx).map(|_| Ok(())).boxed();
542
543 SpawnedSubsystem { name: "availability-store-subsystem", future }
544 }
545}
546
547#[overseer::contextbounds(AvailabilityStore, prefix = self::overseer)]
548async fn run<Context>(mut subsystem: AvailabilityStoreSubsystem, mut ctx: Context) {
549 let mut next_pruning = Delay::new(subsystem.pruning_config.pruning_interval).fuse();
550 let (mut pruning_result_tx, mut pruning_result_rx) = channel(10);
553 loop {
554 let res = run_iteration(
555 &mut ctx,
556 &mut subsystem,
557 &mut next_pruning,
558 (&mut pruning_result_tx, &mut pruning_result_rx),
559 )
560 .await;
561 match res {
562 Err(e) => {
563 e.trace();
564 if e.is_fatal() {
565 break
566 }
567 },
568 Ok(true) => {
569 gum::info!(target: LOG_TARGET, "received `Conclude` signal, exiting");
570 break
571 },
572 Ok(false) => continue,
573 }
574 }
575}
576
577#[overseer::contextbounds(AvailabilityStore, prefix = self::overseer)]
578async fn run_iteration<Context>(
579 ctx: &mut Context,
580 subsystem: &mut AvailabilityStoreSubsystem,
581 mut next_pruning: &mut future::Fuse<Delay>,
582 (pruning_result_tx, pruning_result_rx): (
583 &mut MpscSender<Result<(), Error>>,
584 &mut MpscReceiver<Result<(), Error>>,
585 ),
586) -> Result<bool, Error> {
587 select! {
588 incoming = ctx.recv().fuse() => {
589 match incoming.map_err(|_| Error::ContextChannelClosed)? {
590 FromOrchestra::Signal(OverseerSignal::Conclude) => return Ok(true),
591 FromOrchestra::Signal(OverseerSignal::ActiveLeaves(
592 ActiveLeavesUpdate { activated, .. })
593 ) => {
594 for activated in activated.into_iter() {
595 let _timer = subsystem.metrics.time_block_activated();
596 process_block_activated(ctx, subsystem, activated.hash).await?;
597 }
598 }
599 FromOrchestra::Signal(OverseerSignal::BlockFinalized(hash, number)) => {
600 let _timer = subsystem.metrics.time_process_block_finalized();
601
602 if !subsystem.known_blocks.is_known(&hash) {
603 if !subsystem.sync_oracle.is_major_syncing() {
609 process_block_activated(ctx, subsystem, hash).await?;
613 }
614 }
615 subsystem.finalized_number = Some(number);
616 subsystem.known_blocks.prune_finalized(number);
617 process_block_finalized(
618 ctx,
619 &subsystem,
620 hash,
621 number,
622 ).await?;
623 }
624 FromOrchestra::Communication { msg } => {
625 let _timer = subsystem.metrics.time_process_message();
626 process_message(subsystem, msg)?;
627 }
628 }
629 }
630 _ = next_pruning => {
631 *next_pruning = Delay::new(subsystem.pruning_config.pruning_interval).fuse();
634 start_prune_all(ctx, subsystem, pruning_result_tx.clone()).await?;
635 },
636 result = pruning_result_rx.next() => {
639 if let Some(result) = result {
640 result?;
641 }
642 },
643 }
644
645 Ok(false)
646}
647
648#[overseer::contextbounds(AvailabilityStore, prefix = self::overseer)]
652async fn start_prune_all<Context>(
653 ctx: &mut Context,
654 subsystem: &mut AvailabilityStoreSubsystem,
655 mut pruning_result_tx: MpscSender<Result<(), Error>>,
656) -> Result<(), Error> {
657 let metrics = subsystem.metrics.clone();
658 let db = subsystem.db.clone();
659 let config = subsystem.config;
660 let time_now = subsystem.clock.now()?;
661
662 ctx.spawn_blocking(
663 "av-store-prunning",
664 Box::pin(async move {
665 let _timer = metrics.time_pruning();
666
667 gum::debug!(target: LOG_TARGET, "Prunning started");
668 let result = prune_all(&db, &config, time_now);
669
670 if let Err(err) = pruning_result_tx.send(result).await {
671 gum::debug!(target: LOG_TARGET, ?err, "Failed to send prune_all result",);
673 }
674 }),
675 )?;
676 Ok(())
677}
678
679#[overseer::contextbounds(AvailabilityStore, prefix = self::overseer)]
680async fn process_block_activated<Context>(
681 ctx: &mut Context,
682 subsystem: &mut AvailabilityStoreSubsystem,
683 activated: Hash,
684) -> Result<(), Error> {
685 let now = subsystem.clock.now()?;
686
687 let block_header = {
688 let (tx, rx) = oneshot::channel();
689
690 ctx.send_message(ChainApiMessage::BlockHeader(activated, tx)).await;
691
692 match rx.await?? {
693 None => return Ok(()),
694 Some(n) => n,
695 }
696 };
697 let block_number = block_header.number;
698
699 let new_blocks = util::determine_new_blocks(
700 ctx.sender(),
701 |hash| -> Result<bool, Error> { Ok(subsystem.known_blocks.is_known(hash)) },
702 activated,
703 &block_header,
704 subsystem.finalized_number.unwrap_or(block_number.saturating_sub(1)),
705 )
706 .await?;
707
708 for (hash, header) in new_blocks.into_iter().rev() {
710 let mut tx = DBTransaction::new();
713 process_new_head(
714 ctx,
715 &subsystem.db,
716 &mut tx,
717 &subsystem.config,
718 &subsystem.pruning_config,
719 now,
720 hash,
721 header,
722 )
723 .await?;
724 subsystem.known_blocks.insert(hash, block_number);
725 subsystem.db.write(tx)?;
726 }
727
728 Ok(())
729}
730
731#[overseer::contextbounds(AvailabilityStore, prefix = self::overseer)]
732async fn process_new_head<Context>(
733 ctx: &mut Context,
734 db: &Arc<dyn Database>,
735 db_transaction: &mut DBTransaction,
736 config: &Config,
737 pruning_config: &PruningConfig,
738 now: Duration,
739 hash: Hash,
740 header: Header,
741) -> Result<(), Error> {
742 let candidate_events = util::request_candidate_events(hash, ctx.sender()).await.await??;
743
744 let n_validators =
747 util::request_validators(header.parent_hash, ctx.sender()).await.await??.len();
748
749 for event in candidate_events {
750 match event {
751 CandidateEvent::CandidateBacked(receipt, _head, _core_index, _group_index) => {
752 note_block_backed(
753 db,
754 db_transaction,
755 config,
756 pruning_config,
757 now,
758 n_validators,
759 receipt,
760 )?;
761 },
762 CandidateEvent::CandidateIncluded(receipt, _head, _core_index, _group_index) => {
763 note_block_included(
764 db,
765 db_transaction,
766 config,
767 pruning_config,
768 (header.number, hash),
769 receipt,
770 )?;
771 },
772 _ => {},
773 }
774 }
775
776 Ok(())
777}
778
779fn note_block_backed(
780 db: &Arc<dyn Database>,
781 db_transaction: &mut DBTransaction,
782 config: &Config,
783 pruning_config: &PruningConfig,
784 now: Duration,
785 n_validators: usize,
786 candidate: CandidateReceipt,
787) -> Result<(), Error> {
788 let candidate_hash = candidate.hash();
789
790 gum::debug!(target: LOG_TARGET, ?candidate_hash, "Candidate backed");
791
792 if load_meta(db, config, &candidate_hash)?.is_none() {
793 let meta = CandidateMeta {
794 state: State::Unavailable(now.into()),
795 data_available: false,
796 chunks_stored: bitvec::bitvec![u8, BitOrderLsb0; 0; n_validators],
797 };
798
799 let prune_at = now + pruning_config.keep_unavailable_for;
800
801 write_pruning_key(db_transaction, config, prune_at, &candidate_hash);
802 write_meta(db_transaction, config, &candidate_hash, &meta);
803 }
804
805 Ok(())
806}
807
808fn note_block_included(
809 db: &Arc<dyn Database>,
810 db_transaction: &mut DBTransaction,
811 config: &Config,
812 pruning_config: &PruningConfig,
813 block: (BlockNumber, Hash),
814 candidate: CandidateReceipt,
815) -> Result<(), Error> {
816 let candidate_hash = candidate.hash();
817
818 match load_meta(db, config, &candidate_hash)? {
819 None => {
820 gum::warn!(
823 target: LOG_TARGET,
824 ?candidate_hash,
825 "Candidate included without being backed?",
826 );
827 },
828 Some(mut meta) => {
829 let be_block = (BEBlockNumber(block.0), block.1);
830
831 gum::debug!(target: LOG_TARGET, ?candidate_hash, "Candidate included");
832
833 meta.state = match meta.state {
834 State::Unavailable(at) => {
835 let at_d: Duration = at.into();
836 let prune_at = at_d + pruning_config.keep_unavailable_for;
837 delete_pruning_key(db_transaction, config, prune_at, &candidate_hash);
838
839 State::Unfinalized(at, vec![be_block])
840 },
841 State::Unfinalized(at, mut within) => {
842 if let Err(i) = within.binary_search(&be_block) {
843 within.insert(i, be_block);
844 State::Unfinalized(at, within)
845 } else {
846 return Ok(())
847 }
848 },
849 State::Finalized(_at) => {
850 return Ok(())
853 },
854 };
855
856 write_unfinalized_block_contains(
857 db_transaction,
858 config,
859 block.0,
860 &block.1,
861 &candidate_hash,
862 );
863 write_meta(db_transaction, config, &candidate_hash, &meta);
864 },
865 }
866
867 Ok(())
868}
869
870macro_rules! peek_num {
871 ($iter:ident) => {
872 match $iter.peek() {
873 Some(Ok((k, _))) => Ok(decode_unfinalized_key(&k[..]).ok().map(|(b, _, _)| b)),
874 Some(Err(_)) => Err($iter.next().expect("peek returned Some(Err); qed").unwrap_err()),
875 None => Ok(None),
876 }
877 };
878}
879
880#[overseer::contextbounds(AvailabilityStore, prefix = self::overseer)]
881async fn process_block_finalized<Context>(
882 ctx: &mut Context,
883 subsystem: &AvailabilityStoreSubsystem,
884 finalized_hash: Hash,
885 finalized_number: BlockNumber,
886) -> Result<(), Error> {
887 let now = subsystem.clock.now()?;
888
889 let mut next_possible_batch = 0;
890 loop {
891 let mut db_transaction = DBTransaction::new();
892 let (start_prefix, end_prefix) = finalized_block_range(finalized_number);
893
894 let batch_num = {
898 let mut iter = subsystem
899 .db
900 .iter_with_prefix(subsystem.config.col_meta, &start_prefix)
901 .take_while(|r| r.as_ref().map_or(true, |(k, _v)| &k[..] < &end_prefix[..]))
902 .peekable();
903
904 match peek_num!(iter)? {
905 None => break, Some(n) => n,
907 }
908 };
909
910 if batch_num < next_possible_batch {
911 continue
912 } next_possible_batch = batch_num + 1;
914
915 let batch_finalized_hash = if batch_num == finalized_number {
916 finalized_hash
917 } else {
918 let (tx, rx) = oneshot::channel();
919 ctx.send_message(ChainApiMessage::FinalizedBlockHash(batch_num, tx)).await;
920
921 match rx.await? {
922 Err(err) => {
923 gum::warn!(
924 target: LOG_TARGET,
925 batch_num,
926 ?err,
927 "Failed to retrieve finalized block number.",
928 );
929
930 break
931 },
932 Ok(None) => {
933 gum::warn!(
934 target: LOG_TARGET,
935 "Availability store was informed that block #{} is finalized, \
936 but chain API has no finalized hash.",
937 batch_num,
938 );
939
940 break
941 },
942 Ok(Some(h)) => h,
943 }
944 };
945
946 let iter = subsystem
947 .db
948 .iter_with_prefix(subsystem.config.col_meta, &start_prefix)
949 .take_while(|r| r.as_ref().map_or(true, |(k, _v)| &k[..] < &end_prefix[..]))
950 .peekable();
951
952 let batch = load_all_at_finalized_height(iter, batch_num, batch_finalized_hash)?;
953
954 delete_unfinalized_height(&mut db_transaction, &subsystem.config, batch_num);
958
959 update_blocks_at_finalized_height(&subsystem, &mut db_transaction, batch, batch_num, now)?;
960
961 subsystem.db.write(db_transaction)?;
965 }
966
967 Ok(())
968}
969
970fn load_all_at_finalized_height(
973 mut iter: std::iter::Peekable<impl Iterator<Item = io::Result<util::database::DBKeyValue>>>,
974 block_number: BlockNumber,
975 finalized_hash: Hash,
976) -> io::Result<impl IntoIterator<Item = (CandidateHash, bool)>> {
977 let mut candidates = HashMap::new();
979
980 loop {
982 match peek_num!(iter)? {
983 None => break, Some(n) if n != block_number => break, _ => {},
986 }
987
988 let (k, _v) = iter.next().expect("`peek` used to check non-empty; qed")?;
989 let (_, block_hash, candidate_hash) =
990 decode_unfinalized_key(&k[..]).expect("`peek_num` checks validity of key; qed");
991
992 if block_hash == finalized_hash {
993 candidates.insert(candidate_hash, true);
994 } else {
995 candidates.entry(candidate_hash).or_insert(false);
996 }
997 }
998
999 Ok(candidates)
1000}
1001
1002fn update_blocks_at_finalized_height(
1003 subsystem: &AvailabilityStoreSubsystem,
1004 db_transaction: &mut DBTransaction,
1005 candidates: impl IntoIterator<Item = (CandidateHash, bool)>,
1006 block_number: BlockNumber,
1007 now: Duration,
1008) -> Result<(), Error> {
1009 for (candidate_hash, is_finalized) in candidates {
1010 let mut meta = match load_meta(&subsystem.db, &subsystem.config, &candidate_hash)? {
1011 None => {
1012 gum::warn!(
1013 target: LOG_TARGET,
1014 "Dangling candidate metadata for {}",
1015 candidate_hash,
1016 );
1017
1018 continue
1019 },
1020 Some(c) => c,
1021 };
1022
1023 if is_finalized {
1024 match meta.state {
1026 State::Finalized(_) => continue, State::Unavailable(at) => {
1028 delete_pruning_key(db_transaction, &subsystem.config, at, &candidate_hash);
1032 },
1033 State::Unfinalized(_, blocks) => {
1034 for (block_num, block_hash) in blocks.iter().cloned() {
1035 if block_num.0 != block_number {
1037 delete_unfinalized_inclusion(
1038 db_transaction,
1039 &subsystem.config,
1040 block_num.0,
1041 &block_hash,
1042 &candidate_hash,
1043 );
1044 }
1045 }
1046 },
1047 }
1048
1049 meta.state = State::Finalized(now.into());
1050
1051 write_meta(db_transaction, &subsystem.config, &candidate_hash, &meta);
1053 write_pruning_key(
1054 db_transaction,
1055 &subsystem.config,
1056 now + subsystem.pruning_config.keep_finalized_for,
1057 &candidate_hash,
1058 );
1059 } else {
1060 meta.state = match meta.state {
1061 State::Finalized(_) => continue, State::Unavailable(_) => continue, State::Unfinalized(at, mut blocks) => {
1064 blocks.retain(|(n, _)| n.0 != block_number);
1066
1067 if blocks.is_empty() {
1070 let at_d: Duration = at.into();
1071 let prune_at = at_d + subsystem.pruning_config.keep_unavailable_for;
1072 write_pruning_key(
1073 db_transaction,
1074 &subsystem.config,
1075 prune_at,
1076 &candidate_hash,
1077 );
1078 State::Unavailable(at)
1079 } else {
1080 State::Unfinalized(at, blocks)
1081 }
1082 },
1083 };
1084
1085 write_meta(db_transaction, &subsystem.config, &candidate_hash, &meta)
1087 }
1088 }
1089
1090 Ok(())
1091}
1092
1093fn process_message(
1094 subsystem: &mut AvailabilityStoreSubsystem,
1095 msg: AvailabilityStoreMessage,
1096) -> Result<(), Error> {
1097 match msg {
1098 AvailabilityStoreMessage::QueryAvailableData(candidate, tx) => {
1099 let _ = tx.send(load_available_data(&subsystem.db, &subsystem.config, &candidate)?);
1100 },
1101 AvailabilityStoreMessage::QueryDataAvailability(candidate, tx) => {
1102 let a = load_meta(&subsystem.db, &subsystem.config, &candidate)?
1103 .map_or(false, |m| m.data_available);
1104 let _ = tx.send(a);
1105 },
1106 AvailabilityStoreMessage::QueryChunk(candidate, validator_index, tx) => {
1107 let _timer = subsystem.metrics.time_get_chunk();
1108 let _ =
1109 tx.send(load_chunk(&subsystem.db, &subsystem.config, &candidate, validator_index)?);
1110 },
1111 AvailabilityStoreMessage::QueryChunkSize(candidate, tx) => {
1112 let meta = load_meta(&subsystem.db, &subsystem.config, &candidate)?;
1113
1114 let validator_index = meta.map_or(None, |meta| meta.chunks_stored.first_one());
1115
1116 let maybe_chunk_size = if let Some(validator_index) = validator_index {
1117 load_chunk(
1118 &subsystem.db,
1119 &subsystem.config,
1120 &candidate,
1121 ValidatorIndex(validator_index as u32),
1122 )?
1123 .map(|erasure_chunk| erasure_chunk.chunk.len())
1124 } else {
1125 None
1126 };
1127
1128 let _ = tx.send(maybe_chunk_size);
1129 },
1130 AvailabilityStoreMessage::QueryAllChunks(candidate, tx) => {
1131 match load_meta(&subsystem.db, &subsystem.config, &candidate)? {
1132 None => {
1133 let _ = tx.send(Vec::new());
1134 },
1135 Some(meta) => {
1136 let mut chunks = Vec::new();
1137
1138 for (validator_index, _) in
1139 meta.chunks_stored.iter().enumerate().filter(|(_, b)| **b)
1140 {
1141 let validator_index = ValidatorIndex(validator_index as _);
1142 let _timer = subsystem.metrics.time_get_chunk();
1143 match load_chunk(
1144 &subsystem.db,
1145 &subsystem.config,
1146 &candidate,
1147 validator_index,
1148 )? {
1149 Some(c) => chunks.push((validator_index, c)),
1150 None => {
1151 gum::warn!(
1152 target: LOG_TARGET,
1153 ?candidate,
1154 ?validator_index,
1155 "No chunk found for set bit in meta"
1156 );
1157 },
1158 }
1159 }
1160
1161 let _ = tx.send(chunks);
1162 },
1163 }
1164 },
1165 AvailabilityStoreMessage::QueryChunkAvailability(candidate, validator_index, tx) => {
1166 let a = load_meta(&subsystem.db, &subsystem.config, &candidate)?.map_or(false, |m| {
1167 *m.chunks_stored.get(validator_index.0 as usize).as_deref().unwrap_or(&false)
1168 });
1169 let _ = tx.send(a);
1170 },
1171 AvailabilityStoreMessage::StoreChunk { candidate_hash, validator_index, chunk, tx } => {
1172 subsystem.metrics.on_chunks_received(1);
1173 let _timer = subsystem.metrics.time_store_chunk();
1174
1175 match store_chunk(
1176 &subsystem.db,
1177 &subsystem.config,
1178 candidate_hash,
1179 validator_index,
1180 chunk,
1181 ) {
1182 Ok(true) => {
1183 let _ = tx.send(Ok(()));
1184 },
1185 Ok(false) => {
1186 let _ = tx.send(Err(()));
1187 },
1188 Err(e) => {
1189 let _ = tx.send(Err(()));
1190 return Err(e)
1191 },
1192 }
1193 },
1194 AvailabilityStoreMessage::StoreAvailableData {
1195 candidate_hash,
1196 n_validators,
1197 available_data,
1198 expected_erasure_root,
1199 core_index,
1200 node_features,
1201 tx,
1202 } => {
1203 subsystem.metrics.on_chunks_received(n_validators as _);
1204
1205 let _timer = subsystem.metrics.time_store_available_data();
1206
1207 let res = store_available_data(
1208 &subsystem,
1209 candidate_hash,
1210 n_validators as _,
1211 available_data,
1212 expected_erasure_root,
1213 core_index,
1214 node_features,
1215 );
1216
1217 match res {
1218 Ok(()) => {
1219 let _ = tx.send(Ok(()));
1220 },
1221 Err(Error::InvalidErasureRoot) => {
1222 let _ = tx.send(Err(StoreAvailableDataError::InvalidErasureRoot));
1223 return Err(Error::InvalidErasureRoot)
1224 },
1225 Err(e) => {
1226 return Err(e.into())
1232 },
1233 }
1234 },
1235 }
1236
1237 Ok(())
1238}
1239
1240fn store_chunk(
1242 db: &Arc<dyn Database>,
1243 config: &Config,
1244 candidate_hash: CandidateHash,
1245 validator_index: ValidatorIndex,
1246 chunk: ErasureChunk,
1247) -> Result<bool, Error> {
1248 let mut tx = DBTransaction::new();
1249
1250 let mut meta = match load_meta(db, config, &candidate_hash)? {
1251 Some(m) => m,
1252 None => return Ok(false), };
1254
1255 match meta.chunks_stored.get(validator_index.0 as usize).map(|b| *b) {
1256 Some(true) => return Ok(true), Some(false) => {
1258 meta.chunks_stored.set(validator_index.0 as usize, true);
1259
1260 write_chunk(&mut tx, config, &candidate_hash, validator_index, &chunk);
1261 write_meta(&mut tx, config, &candidate_hash, &meta);
1262 },
1263 None => return Ok(false), }
1265
1266 gum::debug!(
1267 target: LOG_TARGET,
1268 ?candidate_hash,
1269 chunk_index = %chunk.index.0,
1270 validator_index = %validator_index.0,
1271 "Stored chunk index for candidate.",
1272 );
1273
1274 db.write(tx)?;
1275 Ok(true)
1276}
1277
1278fn store_available_data(
1279 subsystem: &AvailabilityStoreSubsystem,
1280 candidate_hash: CandidateHash,
1281 n_validators: usize,
1282 available_data: AvailableData,
1283 expected_erasure_root: Hash,
1284 core_index: CoreIndex,
1285 node_features: NodeFeatures,
1286) -> Result<(), Error> {
1287 let mut tx = DBTransaction::new();
1288
1289 let mut meta = match load_meta(&subsystem.db, &subsystem.config, &candidate_hash)? {
1290 Some(m) => {
1291 if m.data_available {
1292 return Ok(()) }
1294
1295 m
1296 },
1297 None => {
1298 let now = subsystem.clock.now()?;
1299
1300 let prune_at = now + subsystem.pruning_config.keep_unavailable_for;
1302 write_pruning_key(&mut tx, &subsystem.config, prune_at, &candidate_hash);
1303
1304 CandidateMeta {
1305 state: State::Unavailable(now.into()),
1306 data_available: false,
1307 chunks_stored: BitVec::new(),
1308 }
1309 },
1310 };
1311
1312 let chunks = polkadot_erasure_coding::obtain_chunks_v1(n_validators, &available_data)?;
1315 let branches = polkadot_erasure_coding::branches(chunks.as_ref());
1316
1317 if branches.root() != expected_erasure_root {
1318 return Err(Error::InvalidErasureRoot)
1319 }
1320
1321 let erasure_chunks: Vec<_> = chunks
1322 .iter()
1323 .zip(branches.map(|(proof, _)| proof))
1324 .enumerate()
1325 .map(|(index, (chunk, proof))| ErasureChunk {
1326 chunk: chunk.clone(),
1327 proof,
1328 index: ChunkIndex(index as u32),
1329 })
1330 .collect();
1331
1332 let chunk_indices = availability_chunk_indices(&node_features, n_validators, core_index)?;
1333 for (validator_index, chunk_index) in chunk_indices.into_iter().enumerate() {
1334 write_chunk(
1335 &mut tx,
1336 &subsystem.config,
1337 &candidate_hash,
1338 ValidatorIndex(validator_index as u32),
1339 &erasure_chunks[chunk_index.0 as usize],
1340 );
1341 }
1342
1343 meta.data_available = true;
1344 meta.chunks_stored = bitvec::bitvec![u8, BitOrderLsb0; 1; n_validators];
1345
1346 write_meta(&mut tx, &subsystem.config, &candidate_hash, &meta);
1347 write_available_data(&mut tx, &subsystem.config, &candidate_hash, &available_data);
1348
1349 subsystem.db.write(tx)?;
1350
1351 gum::debug!(target: LOG_TARGET, ?candidate_hash, "Stored data and chunks");
1352
1353 Ok(())
1354}
1355
1356fn prune_all(db: &Arc<dyn Database>, config: &Config, now: Duration) -> Result<(), Error> {
1357 let (range_start, range_end) = pruning_range(now);
1358
1359 let mut tx = DBTransaction::new();
1360 let iter = db
1361 .iter_with_prefix(config.col_meta, &range_start[..])
1362 .take_while(|r| r.as_ref().map_or(true, |(k, _v)| &k[..] < &range_end[..]));
1363
1364 for r in iter {
1365 let (k, _v) = r?;
1366 tx.delete(config.col_meta, &k[..]);
1367
1368 let (_, candidate_hash) = match decode_pruning_key(&k[..]) {
1369 Ok(m) => m,
1370 Err(_) => continue, };
1372
1373 delete_meta(&mut tx, config, &candidate_hash);
1374
1375 if let Some(meta) = load_meta(db, config, &candidate_hash)? {
1377 if meta.data_available {
1379 delete_available_data(&mut tx, config, &candidate_hash)
1380 }
1381
1382 for (i, b) in meta.chunks_stored.iter().enumerate() {
1384 if *b {
1385 delete_chunk(&mut tx, config, &candidate_hash, ValidatorIndex(i as _));
1386 }
1387 }
1388
1389 if let State::Unfinalized(_, blocks) = meta.state {
1392 for (block_number, block_hash) in blocks {
1393 delete_unfinalized_inclusion(
1394 &mut tx,
1395 config,
1396 block_number.0,
1397 &block_hash,
1398 &candidate_hash,
1399 );
1400 }
1401 }
1402 }
1403 }
1404
1405 db.write(tx)?;
1406 Ok(())
1407}