1use crate::{
28 noncanonical::LAST_CANONICAL, to_meta_key, CommitSet, Error, Hash, MetaDb, StateDbError,
29 DEFAULT_MAX_BLOCK_CONSTRAINT, LOG_TARGET,
30};
31use codec::{Decode, Encode};
32use log::trace;
33use std::collections::{HashMap, HashSet, VecDeque};
34
35pub(crate) const LAST_PRUNED: &[u8] = b"last_pruned";
36const PRUNING_JOURNAL: &[u8] = b"pruning_journal";
37
38pub struct RefWindow<BlockHash: Hash, Key: Hash, D: MetaDb> {
40 queue: DeathRowQueue<BlockHash, Key, D>,
43 base: u64,
45}
46
47enum DeathRowQueue<BlockHash: Hash, Key: Hash, D: MetaDb> {
53 Mem {
54 death_rows: VecDeque<DeathRow<BlockHash, Key>>,
56 death_index: HashMap<Key, u64>,
58 },
59 DbBacked {
60 db: D,
62 cache: VecDeque<DeathRow<BlockHash, Key>>,
66 cache_capacity: usize,
68 last: Option<u64>,
70 },
71}
72
73impl<BlockHash: Hash, Key: Hash, D: MetaDb> DeathRowQueue<BlockHash, Key, D> {
74 fn new_mem(db: &D, base: u64) -> Result<DeathRowQueue<BlockHash, Key, D>, Error<D::Error>> {
76 let mut block = base;
77 let mut queue = DeathRowQueue::<BlockHash, Key, D>::Mem {
78 death_rows: VecDeque::new(),
79 death_index: HashMap::new(),
80 };
81 trace!(
83 target: LOG_TARGET,
84 "Reading pruning journal for the memory queue. Pending #{}",
85 base,
86 );
87 loop {
88 let journal_key = to_journal_key(block);
89 match db.get_meta(&journal_key).map_err(Error::Db)? {
90 Some(record) => {
91 let record: JournalRecord<BlockHash, Key> =
92 Decode::decode(&mut record.as_slice())?;
93 trace!(
94 target: LOG_TARGET,
95 "Pruning journal entry {} ({} inserted, {} deleted)",
96 block,
97 record.inserted.len(),
98 record.deleted.len(),
99 );
100 queue.import(base, block, record);
101 },
102 None => break,
103 }
104 block += 1;
105 }
106 Ok(queue)
107 }
108
109 fn new_db_backed(
112 db: D,
113 base: u64,
114 last: Option<u64>,
115 window_size: u32,
116 ) -> Result<DeathRowQueue<BlockHash, Key, D>, Error<D::Error>> {
117 let cache_capacity = window_size.clamp(1, DEFAULT_MAX_BLOCK_CONSTRAINT) as usize;
119 let mut cache = VecDeque::with_capacity(cache_capacity);
120 trace!(
121 target: LOG_TARGET,
122 "Reading pruning journal for the database-backed queue. Pending #{}",
123 base
124 );
125 DeathRowQueue::load_batch_from_db(&db, &mut cache, base, cache_capacity)?;
126 Ok(DeathRowQueue::DbBacked { db, cache, cache_capacity, last })
127 }
128
129 fn import(&mut self, base: u64, num: u64, journal_record: JournalRecord<BlockHash, Key>) {
131 let JournalRecord { hash, inserted, deleted } = journal_record;
132 trace!(target: LOG_TARGET, "Importing {}, base={}", num, base);
133 match self {
134 DeathRowQueue::DbBacked { cache, cache_capacity, last, .. } => {
135 if num == base + cache.len() as u64 && cache.len() < *cache_capacity {
138 trace!(target: LOG_TARGET, "Adding to DB backed cache {:?} (#{})", hash, num);
139 cache.push_back(DeathRow { hash, deleted: deleted.into_iter().collect() });
140 }
141 *last = Some(num);
142 },
143 DeathRowQueue::Mem { death_rows, death_index } => {
144 for k in inserted {
146 if let Some(block) = death_index.remove(&k) {
147 death_rows[(block - base) as usize].deleted.remove(&k);
148 }
149 }
150 let imported_block = base + death_rows.len() as u64;
152 for k in deleted.iter() {
153 death_index.insert(k.clone(), imported_block);
154 }
155 death_rows.push_back(DeathRow { hash, deleted: deleted.into_iter().collect() });
156 },
157 }
158 }
159
160 fn pop_front(
163 &mut self,
164 base: u64,
165 ) -> Result<Option<DeathRow<BlockHash, Key>>, Error<D::Error>> {
166 match self {
167 DeathRowQueue::DbBacked { db, cache, cache_capacity, .. } => {
168 if cache.is_empty() {
169 DeathRowQueue::load_batch_from_db(db, cache, base, *cache_capacity)?;
170 }
171 Ok(cache.pop_front())
172 },
173 DeathRowQueue::Mem { death_rows, death_index } => match death_rows.pop_front() {
174 Some(row) => {
175 for k in row.deleted.iter() {
176 death_index.remove(k);
177 }
178 Ok(Some(row))
179 },
180 None => Ok(None),
181 },
182 }
183 }
184
185 fn load_batch_from_db(
188 db: &D,
189 cache: &mut VecDeque<DeathRow<BlockHash, Key>>,
190 base: u64,
191 cache_capacity: usize,
192 ) -> Result<(), Error<D::Error>> {
193 let start = base + cache.len() as u64;
194 let batch_size = cache_capacity;
195 for i in 0..batch_size as u64 {
196 match load_death_row_from_db::<BlockHash, Key, D>(db, start + i)? {
197 Some(row) => {
198 cache.push_back(row);
199 },
200 None => break,
201 }
202 }
203 Ok(())
204 }
205
206 fn have_block(&self, hash: &BlockHash, index: usize) -> HaveBlock {
209 match self {
210 DeathRowQueue::DbBacked { cache, .. } => {
211 if cache.len() > index {
212 (cache[index].hash == *hash).into()
213 } else {
214 HaveBlock::Maybe
216 }
217 },
218 DeathRowQueue::Mem { death_rows, .. } => (death_rows[index].hash == *hash).into(),
219 }
220 }
221
222 fn len(&self, base: u64) -> u64 {
224 match self {
225 DeathRowQueue::DbBacked { last, .. } => last.map_or(0, |l| l + 1 - base),
226 DeathRowQueue::Mem { death_rows, .. } => death_rows.len() as u64,
227 }
228 }
229
230 #[cfg(test)]
231 fn get_mem_queue_state(
232 &self,
233 ) -> Option<(&VecDeque<DeathRow<BlockHash, Key>>, &HashMap<Key, u64>)> {
234 match self {
235 DeathRowQueue::DbBacked { .. } => None,
236 DeathRowQueue::Mem { death_rows, death_index } => Some((death_rows, death_index)),
237 }
238 }
239
240 #[cfg(test)]
241 fn get_db_backed_queue_state(
242 &self,
243 ) -> Option<(&VecDeque<DeathRow<BlockHash, Key>>, Option<u64>)> {
244 match self {
245 DeathRowQueue::DbBacked { cache, last, .. } => Some((cache, *last)),
246 DeathRowQueue::Mem { .. } => None,
247 }
248 }
249}
250
251fn load_death_row_from_db<BlockHash: Hash, Key: Hash, D: MetaDb>(
252 db: &D,
253 block: u64,
254) -> Result<Option<DeathRow<BlockHash, Key>>, Error<D::Error>> {
255 let journal_key = to_journal_key(block);
256 match db.get_meta(&journal_key).map_err(Error::Db)? {
257 Some(record) => {
258 let JournalRecord { hash, deleted, .. } = Decode::decode(&mut record.as_slice())?;
259 Ok(Some(DeathRow { hash, deleted: deleted.into_iter().collect() }))
260 },
261 None => Ok(None),
262 }
263}
264
265#[derive(Clone, Debug, PartialEq, Eq)]
266struct DeathRow<BlockHash: Hash, Key: Hash> {
267 hash: BlockHash,
268 deleted: HashSet<Key>,
269}
270
271#[derive(Encode, Decode, Default)]
272struct JournalRecord<BlockHash: Hash, Key: Hash> {
273 hash: BlockHash,
274 inserted: Vec<Key>,
275 deleted: Vec<Key>,
276}
277
278fn to_journal_key(block: u64) -> Vec<u8> {
279 to_meta_key(PRUNING_JOURNAL, &block)
280}
281
282#[derive(Debug, PartialEq, Eq)]
284pub enum HaveBlock {
285 No,
287 Maybe,
289 Yes,
291}
292
293impl From<bool> for HaveBlock {
294 fn from(have: bool) -> Self {
295 if have {
296 HaveBlock::Yes
297 } else {
298 HaveBlock::No
299 }
300 }
301}
302
303impl<BlockHash: Hash, Key: Hash, D: MetaDb> RefWindow<BlockHash, Key, D> {
304 pub fn new(
305 db: D,
306 window_size: u32,
307 count_insertions: bool,
308 ) -> Result<RefWindow<BlockHash, Key, D>, Error<D::Error>> {
309 let base = match db.get_meta(&to_meta_key(LAST_PRUNED, &())).map_err(Error::Db)? {
312 Some(buffer) => u64::decode(&mut buffer.as_slice())? + 1,
313 None => 0,
314 };
315 let last_canonicalized_number =
317 match db.get_meta(&to_meta_key(LAST_CANONICAL, &())).map_err(Error::Db)? {
318 Some(buffer) => Some(<(BlockHash, u64)>::decode(&mut buffer.as_slice())?.1),
319 None => None,
320 };
321
322 let queue = if count_insertions {
323 if window_size > 1000 {
328 log::warn!(
329 target: LOG_TARGET,
330 "Large pruning window of {window_size} detected! THIS CAN LEAD TO HIGH MEMORY USAGE AND CRASHES. \
331 Reduce the pruning window or switch your database to paritydb."
332 );
333 }
334
335 DeathRowQueue::new_mem(&db, base)?
336 } else {
337 let last = match last_canonicalized_number {
338 Some(last_canonicalized_number) => {
339 debug_assert!(last_canonicalized_number + 1 >= base);
340 Some(last_canonicalized_number)
341 },
342 None => None,
346 };
347 DeathRowQueue::new_db_backed(db, base, last, window_size)?
348 };
349
350 Ok(RefWindow { queue, base })
351 }
352
353 pub fn window_size(&self) -> u64 {
354 self.queue.len(self.base) as u64
355 }
356
357 pub fn next_hash(&mut self) -> Result<Option<BlockHash>, Error<D::Error>> {
359 let res = match &mut self.queue {
360 DeathRowQueue::DbBacked { db, cache, cache_capacity, .. } => {
361 if cache.is_empty() {
362 DeathRowQueue::load_batch_from_db(db, cache, self.base, *cache_capacity)?;
363 }
364 cache.front().map(|r| r.hash.clone())
365 },
366 DeathRowQueue::Mem { death_rows, .. } => death_rows.front().map(|r| r.hash.clone()),
367 };
368 Ok(res)
369 }
370
371 fn is_empty(&self) -> bool {
372 self.window_size() == 0
373 }
374
375 pub fn have_block(&self, hash: &BlockHash, number: u64) -> HaveBlock {
377 if self.is_empty() || number < self.base || number >= self.base + self.window_size() {
380 return HaveBlock::No
381 }
382 self.queue.have_block(hash, (number - self.base) as usize)
383 }
384
385 pub fn prune_one(&mut self, commit: &mut CommitSet<Key>) -> Result<(), Error<D::Error>> {
387 if let Some(pruned) = self.queue.pop_front(self.base)? {
388 trace!(target: LOG_TARGET, "Pruning {:?} ({} deleted)", pruned.hash, pruned.deleted.len());
389 let index = self.base;
390 commit.data.deleted.extend(pruned.deleted.into_iter());
391 commit.meta.inserted.push((to_meta_key(LAST_PRUNED, &()), index.encode()));
392 commit.meta.deleted.push(to_journal_key(self.base));
393 self.base += 1;
394 Ok(())
395 } else {
396 trace!(target: LOG_TARGET, "Trying to prune when there's nothing to prune");
397 Err(Error::StateDb(StateDbError::BlockUnavailable))
398 }
399 }
400
401 pub fn note_canonical(
403 &mut self,
404 hash: &BlockHash,
405 number: u64,
406 commit: &mut CommitSet<Key>,
407 ) -> Result<(), Error<D::Error>> {
408 if self.base == 0 && self.is_empty() && number > 0 {
409 self.base = number;
412 commit
414 .meta
415 .inserted
416 .push((to_meta_key(LAST_PRUNED, &()), (number - 1).encode()));
417 } else if (self.base + self.window_size()) != number {
418 return Err(Error::StateDb(StateDbError::InvalidBlockNumber))
419 }
420 trace!(
421 target: LOG_TARGET,
422 "Adding to pruning window: {:?} ({} inserted, {} deleted)",
423 hash,
424 commit.data.inserted.len(),
425 commit.data.deleted.len(),
426 );
427 let inserted = if matches!(self.queue, DeathRowQueue::Mem { .. }) {
428 commit.data.inserted.iter().map(|(k, _)| k.clone()).collect()
429 } else {
430 Default::default()
431 };
432 let deleted = std::mem::take(&mut commit.data.deleted);
433 let journal_record = JournalRecord { hash: hash.clone(), inserted, deleted };
434 commit.meta.inserted.push((to_journal_key(number), journal_record.encode()));
435 self.queue.import(self.base, number, journal_record);
436 Ok(())
437 }
438}
439
440#[cfg(test)]
441mod tests {
442 use super::{to_journal_key, DeathRowQueue, HaveBlock, JournalRecord, RefWindow, LAST_PRUNED};
443 use crate::{
444 noncanonical::LAST_CANONICAL,
445 test::{make_commit, make_db, TestDb},
446 to_meta_key, CommitSet, Error, Hash, StateDbError, DEFAULT_MAX_BLOCK_CONSTRAINT,
447 };
448 use codec::Encode;
449 use sp_core::H256;
450
451 fn check_journal(pruning: &RefWindow<H256, H256, TestDb>, db: &TestDb) {
452 let count_insertions = matches!(pruning.queue, DeathRowQueue::Mem { .. });
453 let restored: RefWindow<H256, H256, TestDb> =
454 RefWindow::new(db.clone(), DEFAULT_MAX_BLOCK_CONSTRAINT, count_insertions).unwrap();
455 assert_eq!(pruning.base, restored.base);
456 assert_eq!(pruning.queue.get_mem_queue_state(), restored.queue.get_mem_queue_state());
457 }
458
459 #[test]
460 fn created_from_empty_db() {
461 let db = make_db(&[]);
462 let pruning: RefWindow<H256, H256, TestDb> =
463 RefWindow::new(db, DEFAULT_MAX_BLOCK_CONSTRAINT, true).unwrap();
464 assert_eq!(pruning.base, 0);
465 let (death_rows, death_index) = pruning.queue.get_mem_queue_state().unwrap();
466 assert!(death_rows.is_empty());
467 assert!(death_index.is_empty());
468 }
469
470 #[test]
471 fn prune_empty() {
472 let db = make_db(&[]);
473 let mut pruning: RefWindow<H256, H256, TestDb> =
474 RefWindow::new(db, DEFAULT_MAX_BLOCK_CONSTRAINT, true).unwrap();
475 let mut commit = CommitSet::default();
476 assert_eq!(
477 Err(Error::StateDb(StateDbError::BlockUnavailable)),
478 pruning.prune_one(&mut commit)
479 );
480 assert_eq!(pruning.base, 0);
481 let (death_rows, death_index) = pruning.queue.get_mem_queue_state().unwrap();
482 assert!(death_rows.is_empty());
483 assert!(death_index.is_empty());
484 }
485
486 #[test]
487 fn prune_one() {
488 let mut db = make_db(&[1, 2, 3]);
489 let mut pruning: RefWindow<H256, H256, TestDb> =
490 RefWindow::new(db.clone(), DEFAULT_MAX_BLOCK_CONSTRAINT, true).unwrap();
491 let mut commit = make_commit(&[4, 5], &[1, 3]);
492 let hash = H256::random();
493 pruning.note_canonical(&hash, 0, &mut commit).unwrap();
494 db.commit(&commit);
495 assert_eq!(pruning.have_block(&hash, 0), HaveBlock::Yes);
496 assert_eq!(pruning.have_block(&hash, 0), HaveBlock::Yes);
497 assert!(commit.data.deleted.is_empty());
498 let (death_rows, death_index) = pruning.queue.get_mem_queue_state().unwrap();
499 assert_eq!(death_rows.len(), 1);
500 assert_eq!(death_index.len(), 2);
501 assert!(db.data_eq(&make_db(&[1, 2, 3, 4, 5])));
502 check_journal(&pruning, &db);
503
504 let mut commit = CommitSet::default();
505 pruning.prune_one(&mut commit).unwrap();
506 assert_eq!(pruning.have_block(&hash, 0), HaveBlock::No);
507 db.commit(&commit);
508 assert_eq!(pruning.have_block(&hash, 0), HaveBlock::No);
509 assert!(db.data_eq(&make_db(&[2, 4, 5])));
510 let (death_rows, death_index) = pruning.queue.get_mem_queue_state().unwrap();
511 assert!(death_rows.is_empty());
512 assert!(death_index.is_empty());
513 assert_eq!(pruning.base, 1);
514 }
515
516 #[test]
517 fn prune_two() {
518 let mut db = make_db(&[1, 2, 3]);
519 let mut pruning: RefWindow<H256, H256, TestDb> =
520 RefWindow::new(db.clone(), DEFAULT_MAX_BLOCK_CONSTRAINT, true).unwrap();
521 let mut commit = make_commit(&[4], &[1]);
522 pruning.note_canonical(&H256::random(), 0, &mut commit).unwrap();
523 db.commit(&commit);
524 let mut commit = make_commit(&[5], &[2]);
525 pruning.note_canonical(&H256::random(), 1, &mut commit).unwrap();
526 db.commit(&commit);
527 assert!(db.data_eq(&make_db(&[1, 2, 3, 4, 5])));
528
529 check_journal(&pruning, &db);
530
531 let mut commit = CommitSet::default();
532 pruning.prune_one(&mut commit).unwrap();
533 db.commit(&commit);
534 assert!(db.data_eq(&make_db(&[2, 3, 4, 5])));
535 let mut commit = CommitSet::default();
536 pruning.prune_one(&mut commit).unwrap();
537 db.commit(&commit);
538 assert!(db.data_eq(&make_db(&[3, 4, 5])));
539 assert_eq!(pruning.base, 2);
540 }
541
542 #[test]
543 fn prune_two_pending() {
544 let mut db = make_db(&[1, 2, 3]);
545 let mut pruning: RefWindow<H256, H256, TestDb> =
546 RefWindow::new(db.clone(), DEFAULT_MAX_BLOCK_CONSTRAINT, true).unwrap();
547 let mut commit = make_commit(&[4], &[1]);
548 pruning.note_canonical(&H256::random(), 0, &mut commit).unwrap();
549 db.commit(&commit);
550 let mut commit = make_commit(&[5], &[2]);
551 pruning.note_canonical(&H256::random(), 1, &mut commit).unwrap();
552 db.commit(&commit);
553 assert!(db.data_eq(&make_db(&[1, 2, 3, 4, 5])));
554 let mut commit = CommitSet::default();
555 pruning.prune_one(&mut commit).unwrap();
556 db.commit(&commit);
557 assert!(db.data_eq(&make_db(&[2, 3, 4, 5])));
558 let mut commit = CommitSet::default();
559 pruning.prune_one(&mut commit).unwrap();
560 db.commit(&commit);
561 assert!(db.data_eq(&make_db(&[3, 4, 5])));
562 assert_eq!(pruning.base, 2);
563 }
564
565 #[test]
566 fn reinserted_survives() {
567 let mut db = make_db(&[1, 2, 3]);
568 let mut pruning: RefWindow<H256, H256, TestDb> =
569 RefWindow::new(db.clone(), DEFAULT_MAX_BLOCK_CONSTRAINT, true).unwrap();
570 let mut commit = make_commit(&[], &[2]);
571 pruning.note_canonical(&H256::random(), 0, &mut commit).unwrap();
572 db.commit(&commit);
573 let mut commit = make_commit(&[2], &[]);
574 pruning.note_canonical(&H256::random(), 1, &mut commit).unwrap();
575 db.commit(&commit);
576 let mut commit = make_commit(&[], &[2]);
577 pruning.note_canonical(&H256::random(), 2, &mut commit).unwrap();
578 db.commit(&commit);
579 assert!(db.data_eq(&make_db(&[1, 2, 3])));
580
581 check_journal(&pruning, &db);
582
583 let mut commit = CommitSet::default();
584 pruning.prune_one(&mut commit).unwrap();
585 db.commit(&commit);
586 assert!(db.data_eq(&make_db(&[1, 2, 3])));
587 let mut commit = CommitSet::default();
588 pruning.prune_one(&mut commit).unwrap();
589 db.commit(&commit);
590 assert!(db.data_eq(&make_db(&[1, 2, 3])));
591 pruning.prune_one(&mut commit).unwrap();
592 db.commit(&commit);
593 assert!(db.data_eq(&make_db(&[1, 3])));
594 assert_eq!(pruning.base, 3);
595 }
596
597 #[test]
598 fn reinserted_survive_pending() {
599 let mut db = make_db(&[1, 2, 3]);
600 let mut pruning: RefWindow<H256, H256, TestDb> =
601 RefWindow::new(db.clone(), DEFAULT_MAX_BLOCK_CONSTRAINT, true).unwrap();
602 let mut commit = make_commit(&[], &[2]);
603 pruning.note_canonical(&H256::random(), 0, &mut commit).unwrap();
604 db.commit(&commit);
605 let mut commit = make_commit(&[2], &[]);
606 pruning.note_canonical(&H256::random(), 1, &mut commit).unwrap();
607 db.commit(&commit);
608 let mut commit = make_commit(&[], &[2]);
609 pruning.note_canonical(&H256::random(), 2, &mut commit).unwrap();
610 db.commit(&commit);
611 assert!(db.data_eq(&make_db(&[1, 2, 3])));
612
613 let mut commit = CommitSet::default();
614 pruning.prune_one(&mut commit).unwrap();
615 db.commit(&commit);
616 assert!(db.data_eq(&make_db(&[1, 2, 3])));
617 let mut commit = CommitSet::default();
618 pruning.prune_one(&mut commit).unwrap();
619 db.commit(&commit);
620 assert!(db.data_eq(&make_db(&[1, 2, 3])));
621 pruning.prune_one(&mut commit).unwrap();
622 db.commit(&commit);
623 assert!(db.data_eq(&make_db(&[1, 3])));
624 assert_eq!(pruning.base, 3);
625 }
626
627 #[test]
628 fn reinserted_ignores() {
629 let mut db = make_db(&[1, 2, 3]);
630 let mut pruning: RefWindow<H256, H256, TestDb> =
631 RefWindow::new(db.clone(), DEFAULT_MAX_BLOCK_CONSTRAINT, false).unwrap();
632 let mut commit = make_commit(&[], &[2]);
633 pruning.note_canonical(&H256::random(), 0, &mut commit).unwrap();
634 db.commit(&commit);
635 let mut commit = make_commit(&[2], &[]);
636 pruning.note_canonical(&H256::random(), 1, &mut commit).unwrap();
637 db.commit(&commit);
638 let mut commit = make_commit(&[], &[2]);
639 pruning.note_canonical(&H256::random(), 2, &mut commit).unwrap();
640 db.commit(&commit);
641 assert!(db.data_eq(&make_db(&[1, 2, 3])));
642
643 check_journal(&pruning, &db);
644
645 let mut commit = CommitSet::default();
646 pruning.prune_one(&mut commit).unwrap();
647 db.commit(&commit);
648 assert!(db.data_eq(&make_db(&[1, 3])));
649 }
650
651 fn push_last_canonicalized<H: Hash>(block: u64, commit: &mut CommitSet<H>) {
652 commit
653 .meta
654 .inserted
655 .push((to_meta_key(LAST_CANONICAL, &()), (block, block).encode()));
656 }
657
658 fn push_last_pruned<H: Hash>(block: u64, commit: &mut CommitSet<H>) {
659 commit.meta.inserted.push((to_meta_key(LAST_PRUNED, &()), block.encode()));
660 }
661
662 #[test]
663 fn init_db_backed_queue() {
664 let mut db = make_db(&[]);
665 let mut commit = CommitSet::default();
666
667 fn load_pruning_from_db(db: TestDb) -> (usize, u64) {
668 let pruning: RefWindow<u64, H256, TestDb> =
669 RefWindow::new(db, DEFAULT_MAX_BLOCK_CONSTRAINT, false).unwrap();
670 let (cache, _) = pruning.queue.get_db_backed_queue_state().unwrap();
671 (cache.len(), pruning.base)
672 }
673
674 fn push_record(block: u64, commit: &mut CommitSet<H256>) {
675 commit
676 .meta
677 .inserted
678 .push((to_journal_key(block), JournalRecord::<u64, H256>::default().encode()));
679 }
680
681 let (loaded_blocks, base) = load_pruning_from_db(db.clone());
683 assert_eq!(loaded_blocks, 0);
684 assert_eq!(base, 0);
685
686 push_last_canonicalized(0, &mut commit);
688 push_record(0, &mut commit);
689 db.commit(&commit);
690 let (loaded_blocks, base) = load_pruning_from_db(db.clone());
691 assert_eq!(loaded_blocks, 1);
692 assert_eq!(base, 0);
693
694 push_last_pruned(0, &mut commit);
696 db.commit(&commit);
697 let (loaded_blocks, base) = load_pruning_from_db(db.clone());
698 assert_eq!(loaded_blocks, 0);
699 assert_eq!(base, 1);
700
701 push_last_canonicalized(10, &mut commit);
703 for i in 1..=10 {
704 push_record(i, &mut commit);
705 }
706 db.commit(&commit);
707 let (loaded_blocks, base) = load_pruning_from_db(db.clone());
708 assert_eq!(loaded_blocks, 10);
709 assert_eq!(base, 1);
710
711 push_last_pruned(10, &mut commit);
713 db.commit(&commit);
714 let (loaded_blocks, base) = load_pruning_from_db(db.clone());
715 assert_eq!(loaded_blocks, 0);
716 assert_eq!(base, 11);
717 }
718
719 #[test]
720 fn db_backed_queue() {
721 let mut db = make_db(&[]);
722 let mut pruning: RefWindow<u64, H256, TestDb> =
723 RefWindow::new(db.clone(), DEFAULT_MAX_BLOCK_CONSTRAINT, false).unwrap();
724 let cache_capacity = DEFAULT_MAX_BLOCK_CONSTRAINT as usize;
725
726 let (cache, last) = pruning.queue.get_db_backed_queue_state().unwrap();
728 assert_eq!(cache.len(), 0);
729 assert_eq!(last, None);
730
731 for i in 0..(cache_capacity + 10) {
734 let mut commit = make_commit(&[], &[]);
735 pruning.note_canonical(&(i as u64), i as u64, &mut commit).unwrap();
736 push_last_canonicalized(i as u64, &mut commit);
737 db.commit(&commit);
738 let (cache, last) = pruning.queue.get_db_backed_queue_state().unwrap();
740 if i < cache_capacity {
741 assert_eq!(cache.len(), i + 1);
742 } else {
743 assert_eq!(cache.len(), cache_capacity);
744 }
745 assert_eq!(last, Some(i as u64));
746 }
747 assert_eq!(pruning.window_size(), cache_capacity as u64 + 10);
748 let (cache, last) = pruning.queue.get_db_backed_queue_state().unwrap();
749 assert_eq!(cache.len(), cache_capacity);
750 assert_eq!(last, Some(cache_capacity as u64 + 10 - 1));
751 for i in 0..cache_capacity {
752 assert_eq!(cache[i].hash, i as u64);
753 }
754
755 let mut commit = CommitSet::default();
758 pruning
759 .note_canonical(&(cache_capacity as u64 + 10), cache_capacity as u64 + 10, &mut commit)
760 .unwrap();
761 assert_eq!(pruning.window_size(), cache_capacity as u64 + 11);
762 let (cache, _) = pruning.queue.get_db_backed_queue_state().unwrap();
763 assert_eq!(cache.len(), cache_capacity);
764
765 pruning = RefWindow::new(db.clone(), DEFAULT_MAX_BLOCK_CONSTRAINT, false).unwrap();
768 let cache_capacity = DEFAULT_MAX_BLOCK_CONSTRAINT as usize;
769 assert_eq!(pruning.window_size(), cache_capacity as u64 + 10);
770 let (cache, _) = pruning.queue.get_db_backed_queue_state().unwrap();
771 assert_eq!(cache.len(), cache_capacity);
772
773 let mut commit = CommitSet::default();
776 pruning.prune_one(&mut commit).unwrap();
777 db.commit(&commit);
778 assert_eq!(pruning.window_size(), cache_capacity as u64 + 9);
779 let (cache, _) = pruning.queue.get_db_backed_queue_state().unwrap();
780 assert_eq!(cache.len(), cache_capacity - 1);
781 for i in 0..(cache_capacity - 1) {
782 assert_eq!(cache[i].hash, (i + 1) as u64);
783 }
784
785 let pruning: RefWindow<u64, H256, TestDb> =
788 RefWindow::new(db, DEFAULT_MAX_BLOCK_CONSTRAINT, false).unwrap();
789 assert_eq!(pruning.window_size(), cache_capacity as u64 + 9);
790 let (cache, _) = pruning.queue.get_db_backed_queue_state().unwrap();
791 assert_eq!(cache.len(), cache_capacity);
792 for i in 0..cache_capacity {
793 assert_eq!(cache[i].hash, (i + 1) as u64);
794 }
795 }
796
797 #[test]
798 fn load_block_from_db() {
799 let mut db = make_db(&[]);
800 let mut pruning: RefWindow<u64, H256, TestDb> =
801 RefWindow::new(db.clone(), DEFAULT_MAX_BLOCK_CONSTRAINT, false).unwrap();
802 let cache_capacity = DEFAULT_MAX_BLOCK_CONSTRAINT as usize;
803
804 for i in 0..(cache_capacity as u64 * 2 + 10) {
806 let mut commit = make_commit(&[], &[]);
807 pruning.note_canonical(&i, i, &mut commit).unwrap();
808 push_last_canonicalized(i as u64, &mut commit);
809 db.commit(&commit);
810 }
811
812 assert_eq!(pruning.next_hash().unwrap().unwrap(), 0);
816 let (cache, last) = pruning.queue.get_db_backed_queue_state().unwrap();
817 assert_eq!(cache.len(), cache_capacity);
818 assert_eq!(last, Some(cache_capacity as u64 * 2 + 10 - 1));
819
820 for _ in 0..cache_capacity * 2 {
822 let mut commit = CommitSet::default();
823 pruning.prune_one(&mut commit).unwrap();
824 db.commit(&commit);
825 }
826 let (cache, _) = pruning.queue.get_db_backed_queue_state().unwrap();
827 assert!(cache.is_empty());
828
829 assert_eq!(pruning.next_hash().unwrap().unwrap(), (cache_capacity * 2) as u64);
832 let (cache, _) = pruning.queue.get_db_backed_queue_state().unwrap();
833 assert_eq!(cache.len(), 10);
834
835 let pruning: RefWindow<u64, H256, TestDb> =
838 RefWindow::new(db, DEFAULT_MAX_BLOCK_CONSTRAINT, false).unwrap();
839 assert_eq!(pruning.window_size(), 10);
840 let (cache, _) = pruning.queue.get_db_backed_queue_state().unwrap();
841 assert_eq!(cache.len(), 10);
842 for i in 0..10 {
843 assert_eq!(cache[i].hash, (cache_capacity * 2 + i) as u64);
844 }
845 }
846
847 #[test]
848 fn get_block_from_queue() {
849 let mut db = make_db(&[]);
850 let mut pruning: RefWindow<u64, H256, TestDb> =
851 RefWindow::new(db.clone(), DEFAULT_MAX_BLOCK_CONSTRAINT, false).unwrap();
852 let cache_capacity = DEFAULT_MAX_BLOCK_CONSTRAINT as u64;
853
854 let mut commit = make_commit(&[], &[]);
856 for i in 0..(cache_capacity + 10) {
857 pruning.note_canonical(&i, i, &mut commit).unwrap();
858 }
859 db.commit(&commit);
860
861 let mut pending_commit = make_commit(&[], &[]);
863 let index = cache_capacity + 10;
864 pruning.note_canonical(&index, index, &mut pending_commit).unwrap();
865
866 let mut commit = make_commit(&[], &[]);
867 for i in 0..(cache_capacity + 10) {
869 assert_eq!(pruning.next_hash().unwrap(), Some(i));
870 pruning.prune_one(&mut commit).unwrap();
871 }
872 assert_eq!(pruning.next_hash().unwrap(), None);
874 assert_eq!(
875 pruning.prune_one(&mut commit).unwrap_err(),
876 Error::StateDb(StateDbError::BlockUnavailable)
877 );
878 db.commit(&pending_commit);
880 assert_eq!(pruning.next_hash().unwrap(), Some(index));
881 pruning.prune_one(&mut commit).unwrap();
882 db.commit(&commit);
883 }
884
885 #[test]
889 fn store_correct_state_after_warp_syncing() {
890 for count_insertions in [true, false] {
891 let mut db = make_db(&[]);
892 let mut pruning: RefWindow<u64, H256, TestDb> =
893 RefWindow::new(db.clone(), DEFAULT_MAX_BLOCK_CONSTRAINT, count_insertions).unwrap();
894 let block = 10000;
895
896 let mut commit = make_commit(&[], &[]);
898 pruning.note_canonical(&block, block, &mut commit).unwrap();
899 push_last_canonicalized(block, &mut commit);
900 db.commit(&commit);
901
902 let pruning: RefWindow<u64, H256, TestDb> =
905 RefWindow::new(db, DEFAULT_MAX_BLOCK_CONSTRAINT, count_insertions).unwrap();
906
907 assert_eq!(HaveBlock::Yes, pruning.have_block(&block, block));
908 }
909 }
910}