referrerpolicy=no-referrer-when-downgrade

sc_state_db/
pruning.rs

1// This file is part of Substrate.
2
3// Copyright (C) Parity Technologies (UK) Ltd.
4// SPDX-License-Identifier: GPL-3.0-or-later WITH Classpath-exception-2.0
5
6// This program is free software: you can redistribute it and/or modify
7// it under the terms of the GNU General Public License as published by
8// the Free Software Foundation, either version 3 of the License, or
9// (at your option) any later version.
10
11// This program is distributed in the hope that it will be useful,
12// but WITHOUT ANY WARRANTY; without even the implied warranty of
13// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
14// GNU General Public License for more details.
15
16// You should have received a copy of the GNU General Public License
17// along with this program. If not, see <https://www.gnu.org/licenses/>.
18
19//! Pruning window.
20//!
21//! For each block we maintain a list of nodes pending deletion.
22//! There is also a global index of node key to block number.
23//! If a node is re-inserted into the window it gets removed from
24//! the death list.
25//! The changes are journaled in the DB.
26
27use crate::{
28	noncanonical::LAST_CANONICAL, to_meta_key, CommitSet, Error, Hash, MetaDb, StateDbError,
29	DEFAULT_MAX_BLOCK_CONSTRAINT, LOG_TARGET,
30};
31use codec::{Decode, Encode};
32use log::trace;
33use std::collections::{HashMap, HashSet, VecDeque};
34
35pub(crate) const LAST_PRUNED: &[u8] = b"last_pruned";
36const PRUNING_JOURNAL: &[u8] = b"pruning_journal";
37
38/// See module documentation.
39pub struct RefWindow<BlockHash: Hash, Key: Hash, D: MetaDb> {
40	/// A queue of blocks keep tracking keys that should be deleted for each block in the
41	/// pruning window.
42	queue: DeathRowQueue<BlockHash, Key, D>,
43	/// Block number that is next to be pruned.
44	base: u64,
45}
46
47/// `DeathRowQueue` used to keep track of blocks in the pruning window, there are two flavors:
48/// - `Mem`, used when the backend database do not supports reference counting, keep all
49/// 	blocks in memory, and keep track of re-inserted keys to not delete them when pruning
50/// - `DbBacked`, used when the backend database supports reference counting, only keep
51/// 	a few number of blocks in memory and load more blocks on demand
52enum DeathRowQueue<BlockHash: Hash, Key: Hash, D: MetaDb> {
53	Mem {
54		/// A queue of keys that should be deleted for each block in the pruning window.
55		death_rows: VecDeque<DeathRow<BlockHash, Key>>,
56		/// An index that maps each key from `death_rows` to block number.
57		death_index: HashMap<Key, u64>,
58	},
59	DbBacked {
60		// The backend database
61		db: D,
62		/// A queue of keys that should be deleted for each block in the pruning window.
63		/// Only caching the first few blocks of the pruning window, blocks inside are
64		/// successive and ordered by block number
65		cache: VecDeque<DeathRow<BlockHash, Key>>,
66		/// A soft limit of the cache's size
67		cache_capacity: usize,
68		/// Last block number added to the window
69		last: Option<u64>,
70	},
71}
72
73impl<BlockHash: Hash, Key: Hash, D: MetaDb> DeathRowQueue<BlockHash, Key, D> {
74	/// Return a `DeathRowQueue` that all blocks are keep in memory
75	fn new_mem(db: &D, base: u64) -> Result<DeathRowQueue<BlockHash, Key, D>, Error<D::Error>> {
76		let mut block = base;
77		let mut queue = DeathRowQueue::<BlockHash, Key, D>::Mem {
78			death_rows: VecDeque::new(),
79			death_index: HashMap::new(),
80		};
81		// read the journal
82		trace!(
83			target: LOG_TARGET,
84			"Reading pruning journal for the memory queue. Pending #{}",
85			base,
86		);
87		loop {
88			let journal_key = to_journal_key(block);
89			match db.get_meta(&journal_key).map_err(Error::Db)? {
90				Some(record) => {
91					let record: JournalRecord<BlockHash, Key> =
92						Decode::decode(&mut record.as_slice())?;
93					trace!(
94						target: LOG_TARGET,
95						"Pruning journal entry {} ({} inserted, {} deleted)",
96						block,
97						record.inserted.len(),
98						record.deleted.len(),
99					);
100					queue.import(base, block, record);
101				},
102				None => break,
103			}
104			block += 1;
105		}
106		Ok(queue)
107	}
108
109	/// Return a `DeathRowQueue` that backed by an database, and only keep a few number
110	/// of blocks in memory
111	fn new_db_backed(
112		db: D,
113		base: u64,
114		last: Option<u64>,
115		window_size: u32,
116	) -> Result<DeathRowQueue<BlockHash, Key, D>, Error<D::Error>> {
117		// limit the cache capacity from 1 to `DEFAULT_MAX_BLOCK_CONSTRAINT`
118		let cache_capacity = window_size.clamp(1, DEFAULT_MAX_BLOCK_CONSTRAINT) as usize;
119		let mut cache = VecDeque::with_capacity(cache_capacity);
120		trace!(
121			target: LOG_TARGET,
122			"Reading pruning journal for the database-backed queue. Pending #{}",
123			base
124		);
125		DeathRowQueue::load_batch_from_db(&db, &mut cache, base, cache_capacity)?;
126		Ok(DeathRowQueue::DbBacked { db, cache, cache_capacity, last })
127	}
128
129	/// import a new block to the back of the queue
130	fn import(&mut self, base: u64, num: u64, journal_record: JournalRecord<BlockHash, Key>) {
131		let JournalRecord { hash, inserted, deleted } = journal_record;
132		trace!(target: LOG_TARGET, "Importing {}, base={}", num, base);
133		match self {
134			DeathRowQueue::DbBacked { cache, cache_capacity, last, .. } => {
135				// If the new block continues cached range and there is space, load it directly into
136				// cache.
137				if num == base + cache.len() as u64 && cache.len() < *cache_capacity {
138					trace!(target: LOG_TARGET, "Adding to DB backed cache {:?} (#{})", hash, num);
139					cache.push_back(DeathRow { hash, deleted: deleted.into_iter().collect() });
140				}
141				*last = Some(num);
142			},
143			DeathRowQueue::Mem { death_rows, death_index } => {
144				// remove all re-inserted keys from death rows
145				for k in inserted {
146					if let Some(block) = death_index.remove(&k) {
147						death_rows[(block - base) as usize].deleted.remove(&k);
148					}
149				}
150				// add new keys
151				let imported_block = base + death_rows.len() as u64;
152				for k in deleted.iter() {
153					death_index.insert(k.clone(), imported_block);
154				}
155				death_rows.push_back(DeathRow { hash, deleted: deleted.into_iter().collect() });
156			},
157		}
158	}
159
160	/// Pop out one block from the front of the queue, `base` is the block number
161	/// of the first block of the queue
162	fn pop_front(
163		&mut self,
164		base: u64,
165	) -> Result<Option<DeathRow<BlockHash, Key>>, Error<D::Error>> {
166		match self {
167			DeathRowQueue::DbBacked { db, cache, cache_capacity, .. } => {
168				if cache.is_empty() {
169					DeathRowQueue::load_batch_from_db(db, cache, base, *cache_capacity)?;
170				}
171				Ok(cache.pop_front())
172			},
173			DeathRowQueue::Mem { death_rows, death_index } => match death_rows.pop_front() {
174				Some(row) => {
175					for k in row.deleted.iter() {
176						death_index.remove(k);
177					}
178					Ok(Some(row))
179				},
180				None => Ok(None),
181			},
182		}
183	}
184
185	/// Load a batch of blocks from the backend database into `cache`, starting from `base` and up
186	/// to `base + cache_capacity`
187	fn load_batch_from_db(
188		db: &D,
189		cache: &mut VecDeque<DeathRow<BlockHash, Key>>,
190		base: u64,
191		cache_capacity: usize,
192	) -> Result<(), Error<D::Error>> {
193		let start = base + cache.len() as u64;
194		let batch_size = cache_capacity;
195		for i in 0..batch_size as u64 {
196			match load_death_row_from_db::<BlockHash, Key, D>(db, start + i)? {
197				Some(row) => {
198					cache.push_back(row);
199				},
200				None => break,
201			}
202		}
203		Ok(())
204	}
205
206	/// Check if the block at the given `index` of the queue exist
207	/// it is the caller's responsibility to ensure `index` won't be out of bounds
208	fn have_block(&self, hash: &BlockHash, index: usize) -> HaveBlock {
209		match self {
210			DeathRowQueue::DbBacked { cache, .. } => {
211				if cache.len() > index {
212					(cache[index].hash == *hash).into()
213				} else {
214					// The block is not in the cache but it still may exist on disk.
215					HaveBlock::Maybe
216				}
217			},
218			DeathRowQueue::Mem { death_rows, .. } => (death_rows[index].hash == *hash).into(),
219		}
220	}
221
222	/// Return the number of block in the pruning window
223	fn len(&self, base: u64) -> u64 {
224		match self {
225			DeathRowQueue::DbBacked { last, .. } => last.map_or(0, |l| l + 1 - base),
226			DeathRowQueue::Mem { death_rows, .. } => death_rows.len() as u64,
227		}
228	}
229
230	#[cfg(test)]
231	fn get_mem_queue_state(
232		&self,
233	) -> Option<(&VecDeque<DeathRow<BlockHash, Key>>, &HashMap<Key, u64>)> {
234		match self {
235			DeathRowQueue::DbBacked { .. } => None,
236			DeathRowQueue::Mem { death_rows, death_index } => Some((death_rows, death_index)),
237		}
238	}
239
240	#[cfg(test)]
241	fn get_db_backed_queue_state(
242		&self,
243	) -> Option<(&VecDeque<DeathRow<BlockHash, Key>>, Option<u64>)> {
244		match self {
245			DeathRowQueue::DbBacked { cache, last, .. } => Some((cache, *last)),
246			DeathRowQueue::Mem { .. } => None,
247		}
248	}
249}
250
251fn load_death_row_from_db<BlockHash: Hash, Key: Hash, D: MetaDb>(
252	db: &D,
253	block: u64,
254) -> Result<Option<DeathRow<BlockHash, Key>>, Error<D::Error>> {
255	let journal_key = to_journal_key(block);
256	match db.get_meta(&journal_key).map_err(Error::Db)? {
257		Some(record) => {
258			let JournalRecord { hash, deleted, .. } = Decode::decode(&mut record.as_slice())?;
259			Ok(Some(DeathRow { hash, deleted: deleted.into_iter().collect() }))
260		},
261		None => Ok(None),
262	}
263}
264
265#[derive(Clone, Debug, PartialEq, Eq)]
266struct DeathRow<BlockHash: Hash, Key: Hash> {
267	hash: BlockHash,
268	deleted: HashSet<Key>,
269}
270
271#[derive(Encode, Decode, Default)]
272struct JournalRecord<BlockHash: Hash, Key: Hash> {
273	hash: BlockHash,
274	inserted: Vec<Key>,
275	deleted: Vec<Key>,
276}
277
278fn to_journal_key(block: u64) -> Vec<u8> {
279	to_meta_key(PRUNING_JOURNAL, &block)
280}
281
282/// The result return by `RefWindow::have_block`
283#[derive(Debug, PartialEq, Eq)]
284pub enum HaveBlock {
285	/// Definitely don't have this block.
286	No,
287	/// May or may not have this block, need further checking
288	Maybe,
289	/// Definitely has this block
290	Yes,
291}
292
293impl From<bool> for HaveBlock {
294	fn from(have: bool) -> Self {
295		if have {
296			HaveBlock::Yes
297		} else {
298			HaveBlock::No
299		}
300	}
301}
302
303impl<BlockHash: Hash, Key: Hash, D: MetaDb> RefWindow<BlockHash, Key, D> {
304	pub fn new(
305		db: D,
306		window_size: u32,
307		count_insertions: bool,
308	) -> Result<RefWindow<BlockHash, Key, D>, Error<D::Error>> {
309		// the block number of the first block in the queue or the next block number if the queue is
310		// empty
311		let base = match db.get_meta(&to_meta_key(LAST_PRUNED, &())).map_err(Error::Db)? {
312			Some(buffer) => u64::decode(&mut buffer.as_slice())? + 1,
313			None => 0,
314		};
315		// the block number of the last block in the queue
316		let last_canonicalized_number =
317			match db.get_meta(&to_meta_key(LAST_CANONICAL, &())).map_err(Error::Db)? {
318				Some(buffer) => Some(<(BlockHash, u64)>::decode(&mut buffer.as_slice())?.1),
319				None => None,
320			};
321
322		let queue = if count_insertions {
323			// Highly scientific crafted number for deciding when to print the warning!
324			//
325			// Rocksdb doesn't support refcounting and requires that we load the entire pruning
326			// window into the memory.
327			if window_size > 1000 {
328				log::warn!(
329					target: LOG_TARGET,
330					"Large pruning window of {window_size} detected! THIS CAN LEAD TO HIGH MEMORY USAGE AND CRASHES. \
331					Reduce the pruning window or switch your database to paritydb."
332				);
333			}
334
335			DeathRowQueue::new_mem(&db, base)?
336		} else {
337			let last = match last_canonicalized_number {
338				Some(last_canonicalized_number) => {
339					debug_assert!(last_canonicalized_number + 1 >= base);
340					Some(last_canonicalized_number)
341				},
342				// None means `LAST_CANONICAL` is never been wrote, since the pruning journals are
343				// in the same `CommitSet` as `LAST_CANONICAL`, it means no pruning journal have
344				// ever been committed to the db, thus set `unload` to zero
345				None => None,
346			};
347			DeathRowQueue::new_db_backed(db, base, last, window_size)?
348		};
349
350		Ok(RefWindow { queue, base })
351	}
352
353	pub fn window_size(&self) -> u64 {
354		self.queue.len(self.base) as u64
355	}
356
357	/// Get the hash of the next pruning block
358	pub fn next_hash(&mut self) -> Result<Option<BlockHash>, Error<D::Error>> {
359		let res = match &mut self.queue {
360			DeathRowQueue::DbBacked { db, cache, cache_capacity, .. } => {
361				if cache.is_empty() {
362					DeathRowQueue::load_batch_from_db(db, cache, self.base, *cache_capacity)?;
363				}
364				cache.front().map(|r| r.hash.clone())
365			},
366			DeathRowQueue::Mem { death_rows, .. } => death_rows.front().map(|r| r.hash.clone()),
367		};
368		Ok(res)
369	}
370
371	fn is_empty(&self) -> bool {
372		self.window_size() == 0
373	}
374
375	// Check if a block is in the pruning window and not be pruned yet
376	pub fn have_block(&self, hash: &BlockHash, number: u64) -> HaveBlock {
377		// if the queue is empty or the block number exceed the pruning window, we definitely
378		// do not have this block
379		if self.is_empty() || number < self.base || number >= self.base + self.window_size() {
380			return HaveBlock::No
381		}
382		self.queue.have_block(hash, (number - self.base) as usize)
383	}
384
385	/// Prune next block. Expects at least one block in the window. Adds changes to `commit`.
386	pub fn prune_one(&mut self, commit: &mut CommitSet<Key>) -> Result<(), Error<D::Error>> {
387		if let Some(pruned) = self.queue.pop_front(self.base)? {
388			trace!(target: LOG_TARGET, "Pruning {:?} ({} deleted)", pruned.hash, pruned.deleted.len());
389			let index = self.base;
390			commit.data.deleted.extend(pruned.deleted.into_iter());
391			commit.meta.inserted.push((to_meta_key(LAST_PRUNED, &()), index.encode()));
392			commit.meta.deleted.push(to_journal_key(self.base));
393			self.base += 1;
394			Ok(())
395		} else {
396			trace!(target: LOG_TARGET, "Trying to prune when there's nothing to prune");
397			Err(Error::StateDb(StateDbError::BlockUnavailable))
398		}
399	}
400
401	/// Add a change set to the window. Creates a journal record and pushes it to `commit`
402	pub fn note_canonical(
403		&mut self,
404		hash: &BlockHash,
405		number: u64,
406		commit: &mut CommitSet<Key>,
407	) -> Result<(), Error<D::Error>> {
408		if self.base == 0 && self.is_empty() && number > 0 {
409			// This branch is taken if the node imports the target block of a warp sync.
410			// assume that the block was canonicalized
411			self.base = number;
412			// The parent of the block was the last block that got pruned.
413			commit
414				.meta
415				.inserted
416				.push((to_meta_key(LAST_PRUNED, &()), (number - 1).encode()));
417		} else if (self.base + self.window_size()) != number {
418			return Err(Error::StateDb(StateDbError::InvalidBlockNumber))
419		}
420		trace!(
421			target: LOG_TARGET,
422			"Adding to pruning window: {:?} ({} inserted, {} deleted)",
423			hash,
424			commit.data.inserted.len(),
425			commit.data.deleted.len(),
426		);
427		let inserted = if matches!(self.queue, DeathRowQueue::Mem { .. }) {
428			commit.data.inserted.iter().map(|(k, _)| k.clone()).collect()
429		} else {
430			Default::default()
431		};
432		let deleted = std::mem::take(&mut commit.data.deleted);
433		let journal_record = JournalRecord { hash: hash.clone(), inserted, deleted };
434		commit.meta.inserted.push((to_journal_key(number), journal_record.encode()));
435		self.queue.import(self.base, number, journal_record);
436		Ok(())
437	}
438}
439
440#[cfg(test)]
441mod tests {
442	use super::{to_journal_key, DeathRowQueue, HaveBlock, JournalRecord, RefWindow, LAST_PRUNED};
443	use crate::{
444		noncanonical::LAST_CANONICAL,
445		test::{make_commit, make_db, TestDb},
446		to_meta_key, CommitSet, Error, Hash, StateDbError, DEFAULT_MAX_BLOCK_CONSTRAINT,
447	};
448	use codec::Encode;
449	use sp_core::H256;
450
451	fn check_journal(pruning: &RefWindow<H256, H256, TestDb>, db: &TestDb) {
452		let count_insertions = matches!(pruning.queue, DeathRowQueue::Mem { .. });
453		let restored: RefWindow<H256, H256, TestDb> =
454			RefWindow::new(db.clone(), DEFAULT_MAX_BLOCK_CONSTRAINT, count_insertions).unwrap();
455		assert_eq!(pruning.base, restored.base);
456		assert_eq!(pruning.queue.get_mem_queue_state(), restored.queue.get_mem_queue_state());
457	}
458
459	#[test]
460	fn created_from_empty_db() {
461		let db = make_db(&[]);
462		let pruning: RefWindow<H256, H256, TestDb> =
463			RefWindow::new(db, DEFAULT_MAX_BLOCK_CONSTRAINT, true).unwrap();
464		assert_eq!(pruning.base, 0);
465		let (death_rows, death_index) = pruning.queue.get_mem_queue_state().unwrap();
466		assert!(death_rows.is_empty());
467		assert!(death_index.is_empty());
468	}
469
470	#[test]
471	fn prune_empty() {
472		let db = make_db(&[]);
473		let mut pruning: RefWindow<H256, H256, TestDb> =
474			RefWindow::new(db, DEFAULT_MAX_BLOCK_CONSTRAINT, true).unwrap();
475		let mut commit = CommitSet::default();
476		assert_eq!(
477			Err(Error::StateDb(StateDbError::BlockUnavailable)),
478			pruning.prune_one(&mut commit)
479		);
480		assert_eq!(pruning.base, 0);
481		let (death_rows, death_index) = pruning.queue.get_mem_queue_state().unwrap();
482		assert!(death_rows.is_empty());
483		assert!(death_index.is_empty());
484	}
485
486	#[test]
487	fn prune_one() {
488		let mut db = make_db(&[1, 2, 3]);
489		let mut pruning: RefWindow<H256, H256, TestDb> =
490			RefWindow::new(db.clone(), DEFAULT_MAX_BLOCK_CONSTRAINT, true).unwrap();
491		let mut commit = make_commit(&[4, 5], &[1, 3]);
492		let hash = H256::random();
493		pruning.note_canonical(&hash, 0, &mut commit).unwrap();
494		db.commit(&commit);
495		assert_eq!(pruning.have_block(&hash, 0), HaveBlock::Yes);
496		assert_eq!(pruning.have_block(&hash, 0), HaveBlock::Yes);
497		assert!(commit.data.deleted.is_empty());
498		let (death_rows, death_index) = pruning.queue.get_mem_queue_state().unwrap();
499		assert_eq!(death_rows.len(), 1);
500		assert_eq!(death_index.len(), 2);
501		assert!(db.data_eq(&make_db(&[1, 2, 3, 4, 5])));
502		check_journal(&pruning, &db);
503
504		let mut commit = CommitSet::default();
505		pruning.prune_one(&mut commit).unwrap();
506		assert_eq!(pruning.have_block(&hash, 0), HaveBlock::No);
507		db.commit(&commit);
508		assert_eq!(pruning.have_block(&hash, 0), HaveBlock::No);
509		assert!(db.data_eq(&make_db(&[2, 4, 5])));
510		let (death_rows, death_index) = pruning.queue.get_mem_queue_state().unwrap();
511		assert!(death_rows.is_empty());
512		assert!(death_index.is_empty());
513		assert_eq!(pruning.base, 1);
514	}
515
516	#[test]
517	fn prune_two() {
518		let mut db = make_db(&[1, 2, 3]);
519		let mut pruning: RefWindow<H256, H256, TestDb> =
520			RefWindow::new(db.clone(), DEFAULT_MAX_BLOCK_CONSTRAINT, true).unwrap();
521		let mut commit = make_commit(&[4], &[1]);
522		pruning.note_canonical(&H256::random(), 0, &mut commit).unwrap();
523		db.commit(&commit);
524		let mut commit = make_commit(&[5], &[2]);
525		pruning.note_canonical(&H256::random(), 1, &mut commit).unwrap();
526		db.commit(&commit);
527		assert!(db.data_eq(&make_db(&[1, 2, 3, 4, 5])));
528
529		check_journal(&pruning, &db);
530
531		let mut commit = CommitSet::default();
532		pruning.prune_one(&mut commit).unwrap();
533		db.commit(&commit);
534		assert!(db.data_eq(&make_db(&[2, 3, 4, 5])));
535		let mut commit = CommitSet::default();
536		pruning.prune_one(&mut commit).unwrap();
537		db.commit(&commit);
538		assert!(db.data_eq(&make_db(&[3, 4, 5])));
539		assert_eq!(pruning.base, 2);
540	}
541
542	#[test]
543	fn prune_two_pending() {
544		let mut db = make_db(&[1, 2, 3]);
545		let mut pruning: RefWindow<H256, H256, TestDb> =
546			RefWindow::new(db.clone(), DEFAULT_MAX_BLOCK_CONSTRAINT, true).unwrap();
547		let mut commit = make_commit(&[4], &[1]);
548		pruning.note_canonical(&H256::random(), 0, &mut commit).unwrap();
549		db.commit(&commit);
550		let mut commit = make_commit(&[5], &[2]);
551		pruning.note_canonical(&H256::random(), 1, &mut commit).unwrap();
552		db.commit(&commit);
553		assert!(db.data_eq(&make_db(&[1, 2, 3, 4, 5])));
554		let mut commit = CommitSet::default();
555		pruning.prune_one(&mut commit).unwrap();
556		db.commit(&commit);
557		assert!(db.data_eq(&make_db(&[2, 3, 4, 5])));
558		let mut commit = CommitSet::default();
559		pruning.prune_one(&mut commit).unwrap();
560		db.commit(&commit);
561		assert!(db.data_eq(&make_db(&[3, 4, 5])));
562		assert_eq!(pruning.base, 2);
563	}
564
565	#[test]
566	fn reinserted_survives() {
567		let mut db = make_db(&[1, 2, 3]);
568		let mut pruning: RefWindow<H256, H256, TestDb> =
569			RefWindow::new(db.clone(), DEFAULT_MAX_BLOCK_CONSTRAINT, true).unwrap();
570		let mut commit = make_commit(&[], &[2]);
571		pruning.note_canonical(&H256::random(), 0, &mut commit).unwrap();
572		db.commit(&commit);
573		let mut commit = make_commit(&[2], &[]);
574		pruning.note_canonical(&H256::random(), 1, &mut commit).unwrap();
575		db.commit(&commit);
576		let mut commit = make_commit(&[], &[2]);
577		pruning.note_canonical(&H256::random(), 2, &mut commit).unwrap();
578		db.commit(&commit);
579		assert!(db.data_eq(&make_db(&[1, 2, 3])));
580
581		check_journal(&pruning, &db);
582
583		let mut commit = CommitSet::default();
584		pruning.prune_one(&mut commit).unwrap();
585		db.commit(&commit);
586		assert!(db.data_eq(&make_db(&[1, 2, 3])));
587		let mut commit = CommitSet::default();
588		pruning.prune_one(&mut commit).unwrap();
589		db.commit(&commit);
590		assert!(db.data_eq(&make_db(&[1, 2, 3])));
591		pruning.prune_one(&mut commit).unwrap();
592		db.commit(&commit);
593		assert!(db.data_eq(&make_db(&[1, 3])));
594		assert_eq!(pruning.base, 3);
595	}
596
597	#[test]
598	fn reinserted_survive_pending() {
599		let mut db = make_db(&[1, 2, 3]);
600		let mut pruning: RefWindow<H256, H256, TestDb> =
601			RefWindow::new(db.clone(), DEFAULT_MAX_BLOCK_CONSTRAINT, true).unwrap();
602		let mut commit = make_commit(&[], &[2]);
603		pruning.note_canonical(&H256::random(), 0, &mut commit).unwrap();
604		db.commit(&commit);
605		let mut commit = make_commit(&[2], &[]);
606		pruning.note_canonical(&H256::random(), 1, &mut commit).unwrap();
607		db.commit(&commit);
608		let mut commit = make_commit(&[], &[2]);
609		pruning.note_canonical(&H256::random(), 2, &mut commit).unwrap();
610		db.commit(&commit);
611		assert!(db.data_eq(&make_db(&[1, 2, 3])));
612
613		let mut commit = CommitSet::default();
614		pruning.prune_one(&mut commit).unwrap();
615		db.commit(&commit);
616		assert!(db.data_eq(&make_db(&[1, 2, 3])));
617		let mut commit = CommitSet::default();
618		pruning.prune_one(&mut commit).unwrap();
619		db.commit(&commit);
620		assert!(db.data_eq(&make_db(&[1, 2, 3])));
621		pruning.prune_one(&mut commit).unwrap();
622		db.commit(&commit);
623		assert!(db.data_eq(&make_db(&[1, 3])));
624		assert_eq!(pruning.base, 3);
625	}
626
627	#[test]
628	fn reinserted_ignores() {
629		let mut db = make_db(&[1, 2, 3]);
630		let mut pruning: RefWindow<H256, H256, TestDb> =
631			RefWindow::new(db.clone(), DEFAULT_MAX_BLOCK_CONSTRAINT, false).unwrap();
632		let mut commit = make_commit(&[], &[2]);
633		pruning.note_canonical(&H256::random(), 0, &mut commit).unwrap();
634		db.commit(&commit);
635		let mut commit = make_commit(&[2], &[]);
636		pruning.note_canonical(&H256::random(), 1, &mut commit).unwrap();
637		db.commit(&commit);
638		let mut commit = make_commit(&[], &[2]);
639		pruning.note_canonical(&H256::random(), 2, &mut commit).unwrap();
640		db.commit(&commit);
641		assert!(db.data_eq(&make_db(&[1, 2, 3])));
642
643		check_journal(&pruning, &db);
644
645		let mut commit = CommitSet::default();
646		pruning.prune_one(&mut commit).unwrap();
647		db.commit(&commit);
648		assert!(db.data_eq(&make_db(&[1, 3])));
649	}
650
651	fn push_last_canonicalized<H: Hash>(block: u64, commit: &mut CommitSet<H>) {
652		commit
653			.meta
654			.inserted
655			.push((to_meta_key(LAST_CANONICAL, &()), (block, block).encode()));
656	}
657
658	fn push_last_pruned<H: Hash>(block: u64, commit: &mut CommitSet<H>) {
659		commit.meta.inserted.push((to_meta_key(LAST_PRUNED, &()), block.encode()));
660	}
661
662	#[test]
663	fn init_db_backed_queue() {
664		let mut db = make_db(&[]);
665		let mut commit = CommitSet::default();
666
667		fn load_pruning_from_db(db: TestDb) -> (usize, u64) {
668			let pruning: RefWindow<u64, H256, TestDb> =
669				RefWindow::new(db, DEFAULT_MAX_BLOCK_CONSTRAINT, false).unwrap();
670			let (cache, _) = pruning.queue.get_db_backed_queue_state().unwrap();
671			(cache.len(), pruning.base)
672		}
673
674		fn push_record(block: u64, commit: &mut CommitSet<H256>) {
675			commit
676				.meta
677				.inserted
678				.push((to_journal_key(block), JournalRecord::<u64, H256>::default().encode()));
679		}
680
681		// empty database
682		let (loaded_blocks, base) = load_pruning_from_db(db.clone());
683		assert_eq!(loaded_blocks, 0);
684		assert_eq!(base, 0);
685
686		// canonicalized the genesis block but no pruning
687		push_last_canonicalized(0, &mut commit);
688		push_record(0, &mut commit);
689		db.commit(&commit);
690		let (loaded_blocks, base) = load_pruning_from_db(db.clone());
691		assert_eq!(loaded_blocks, 1);
692		assert_eq!(base, 0);
693
694		// pruned the genesis block
695		push_last_pruned(0, &mut commit);
696		db.commit(&commit);
697		let (loaded_blocks, base) = load_pruning_from_db(db.clone());
698		assert_eq!(loaded_blocks, 0);
699		assert_eq!(base, 1);
700
701		// canonicalize more blocks
702		push_last_canonicalized(10, &mut commit);
703		for i in 1..=10 {
704			push_record(i, &mut commit);
705		}
706		db.commit(&commit);
707		let (loaded_blocks, base) = load_pruning_from_db(db.clone());
708		assert_eq!(loaded_blocks, 10);
709		assert_eq!(base, 1);
710
711		// pruned all blocks
712		push_last_pruned(10, &mut commit);
713		db.commit(&commit);
714		let (loaded_blocks, base) = load_pruning_from_db(db.clone());
715		assert_eq!(loaded_blocks, 0);
716		assert_eq!(base, 11);
717	}
718
719	#[test]
720	fn db_backed_queue() {
721		let mut db = make_db(&[]);
722		let mut pruning: RefWindow<u64, H256, TestDb> =
723			RefWindow::new(db.clone(), DEFAULT_MAX_BLOCK_CONSTRAINT, false).unwrap();
724		let cache_capacity = DEFAULT_MAX_BLOCK_CONSTRAINT as usize;
725
726		// start as an empty queue
727		let (cache, last) = pruning.queue.get_db_backed_queue_state().unwrap();
728		assert_eq!(cache.len(), 0);
729		assert_eq!(last, None);
730
731		// import blocks
732		// queue size and content should match
733		for i in 0..(cache_capacity + 10) {
734			let mut commit = make_commit(&[], &[]);
735			pruning.note_canonical(&(i as u64), i as u64, &mut commit).unwrap();
736			push_last_canonicalized(i as u64, &mut commit);
737			db.commit(&commit);
738			// blocks will fill the cache first
739			let (cache, last) = pruning.queue.get_db_backed_queue_state().unwrap();
740			if i < cache_capacity {
741				assert_eq!(cache.len(), i + 1);
742			} else {
743				assert_eq!(cache.len(), cache_capacity);
744			}
745			assert_eq!(last, Some(i as u64));
746		}
747		assert_eq!(pruning.window_size(), cache_capacity as u64 + 10);
748		let (cache, last) = pruning.queue.get_db_backed_queue_state().unwrap();
749		assert_eq!(cache.len(), cache_capacity);
750		assert_eq!(last, Some(cache_capacity as u64 + 10 - 1));
751		for i in 0..cache_capacity {
752			assert_eq!(cache[i].hash, i as u64);
753		}
754
755		// import a new block to the end of the queue
756		// won't keep the new block in memory
757		let mut commit = CommitSet::default();
758		pruning
759			.note_canonical(&(cache_capacity as u64 + 10), cache_capacity as u64 + 10, &mut commit)
760			.unwrap();
761		assert_eq!(pruning.window_size(), cache_capacity as u64 + 11);
762		let (cache, _) = pruning.queue.get_db_backed_queue_state().unwrap();
763		assert_eq!(cache.len(), cache_capacity);
764
765		// revert the last add that no apply yet
766		// NOTE: do not commit the previous `CommitSet` to db
767		pruning = RefWindow::new(db.clone(), DEFAULT_MAX_BLOCK_CONSTRAINT, false).unwrap();
768		let cache_capacity = DEFAULT_MAX_BLOCK_CONSTRAINT as usize;
769		assert_eq!(pruning.window_size(), cache_capacity as u64 + 10);
770		let (cache, _) = pruning.queue.get_db_backed_queue_state().unwrap();
771		assert_eq!(cache.len(), cache_capacity);
772
773		// remove one block from the start of the queue
774		// block is removed from the head of cache
775		let mut commit = CommitSet::default();
776		pruning.prune_one(&mut commit).unwrap();
777		db.commit(&commit);
778		assert_eq!(pruning.window_size(), cache_capacity as u64 + 9);
779		let (cache, _) = pruning.queue.get_db_backed_queue_state().unwrap();
780		assert_eq!(cache.len(), cache_capacity - 1);
781		for i in 0..(cache_capacity - 1) {
782			assert_eq!(cache[i].hash, (i + 1) as u64);
783		}
784
785		// load a new queue from db
786		// `cache` is full again but the content of the queue should be the same
787		let pruning: RefWindow<u64, H256, TestDb> =
788			RefWindow::new(db, DEFAULT_MAX_BLOCK_CONSTRAINT, false).unwrap();
789		assert_eq!(pruning.window_size(), cache_capacity as u64 + 9);
790		let (cache, _) = pruning.queue.get_db_backed_queue_state().unwrap();
791		assert_eq!(cache.len(), cache_capacity);
792		for i in 0..cache_capacity {
793			assert_eq!(cache[i].hash, (i + 1) as u64);
794		}
795	}
796
797	#[test]
798	fn load_block_from_db() {
799		let mut db = make_db(&[]);
800		let mut pruning: RefWindow<u64, H256, TestDb> =
801			RefWindow::new(db.clone(), DEFAULT_MAX_BLOCK_CONSTRAINT, false).unwrap();
802		let cache_capacity = DEFAULT_MAX_BLOCK_CONSTRAINT as usize;
803
804		// import blocks
805		for i in 0..(cache_capacity as u64 * 2 + 10) {
806			let mut commit = make_commit(&[], &[]);
807			pruning.note_canonical(&i, i, &mut commit).unwrap();
808			push_last_canonicalized(i as u64, &mut commit);
809			db.commit(&commit);
810		}
811
812		// the following operations won't trigger loading block from db:
813		// - getting block in cache
814		// - getting block not in the queue
815		assert_eq!(pruning.next_hash().unwrap().unwrap(), 0);
816		let (cache, last) = pruning.queue.get_db_backed_queue_state().unwrap();
817		assert_eq!(cache.len(), cache_capacity);
818		assert_eq!(last, Some(cache_capacity as u64 * 2 + 10 - 1));
819
820		// clear all block loaded in cache
821		for _ in 0..cache_capacity * 2 {
822			let mut commit = CommitSet::default();
823			pruning.prune_one(&mut commit).unwrap();
824			db.commit(&commit);
825		}
826		let (cache, _) = pruning.queue.get_db_backed_queue_state().unwrap();
827		assert!(cache.is_empty());
828
829		// getting the hash of block that not in cache will also trigger loading
830		// the remaining blocks from db
831		assert_eq!(pruning.next_hash().unwrap().unwrap(), (cache_capacity * 2) as u64);
832		let (cache, _) = pruning.queue.get_db_backed_queue_state().unwrap();
833		assert_eq!(cache.len(), 10);
834
835		// load a new queue from db
836		// `cache` should be the same
837		let pruning: RefWindow<u64, H256, TestDb> =
838			RefWindow::new(db, DEFAULT_MAX_BLOCK_CONSTRAINT, false).unwrap();
839		assert_eq!(pruning.window_size(), 10);
840		let (cache, _) = pruning.queue.get_db_backed_queue_state().unwrap();
841		assert_eq!(cache.len(), 10);
842		for i in 0..10 {
843			assert_eq!(cache[i].hash, (cache_capacity * 2 + i) as u64);
844		}
845	}
846
847	#[test]
848	fn get_block_from_queue() {
849		let mut db = make_db(&[]);
850		let mut pruning: RefWindow<u64, H256, TestDb> =
851			RefWindow::new(db.clone(), DEFAULT_MAX_BLOCK_CONSTRAINT, false).unwrap();
852		let cache_capacity = DEFAULT_MAX_BLOCK_CONSTRAINT as u64;
853
854		// import blocks and commit to db
855		let mut commit = make_commit(&[], &[]);
856		for i in 0..(cache_capacity + 10) {
857			pruning.note_canonical(&i, i, &mut commit).unwrap();
858		}
859		db.commit(&commit);
860
861		// import a block but not commit to db yet
862		let mut pending_commit = make_commit(&[], &[]);
863		let index = cache_capacity + 10;
864		pruning.note_canonical(&index, index, &mut pending_commit).unwrap();
865
866		let mut commit = make_commit(&[], &[]);
867		// prune blocks that had committed to db
868		for i in 0..(cache_capacity + 10) {
869			assert_eq!(pruning.next_hash().unwrap(), Some(i));
870			pruning.prune_one(&mut commit).unwrap();
871		}
872		// return `None` for block that did not commit to db
873		assert_eq!(pruning.next_hash().unwrap(), None);
874		assert_eq!(
875			pruning.prune_one(&mut commit).unwrap_err(),
876			Error::StateDb(StateDbError::BlockUnavailable)
877		);
878		// commit block to db and no error return
879		db.commit(&pending_commit);
880		assert_eq!(pruning.next_hash().unwrap(), Some(index));
881		pruning.prune_one(&mut commit).unwrap();
882		db.commit(&commit);
883	}
884
885	/// Ensure that after warp syncing the state is stored correctly in the db. The warp sync target
886	/// block is imported with all its state at once. This test ensures that after a restart
887	/// `pruning` still knows that this block was imported.
888	#[test]
889	fn store_correct_state_after_warp_syncing() {
890		for count_insertions in [true, false] {
891			let mut db = make_db(&[]);
892			let mut pruning: RefWindow<u64, H256, TestDb> =
893				RefWindow::new(db.clone(), DEFAULT_MAX_BLOCK_CONSTRAINT, count_insertions).unwrap();
894			let block = 10000;
895
896			// import blocks
897			let mut commit = make_commit(&[], &[]);
898			pruning.note_canonical(&block, block, &mut commit).unwrap();
899			push_last_canonicalized(block, &mut commit);
900			db.commit(&commit);
901
902			// load a new queue from db
903			// `cache` should be the same
904			let pruning: RefWindow<u64, H256, TestDb> =
905				RefWindow::new(db, DEFAULT_MAX_BLOCK_CONSTRAINT, count_insertions).unwrap();
906
907			assert_eq!(HaveBlock::Yes, pruning.have_block(&block, block));
908		}
909	}
910}