parity_db/
column.rs

1// Copyright 2021-2022 Parity Technologies (UK) Ltd.
2// This file is dual-licensed as Apache-2.0 or MIT.
3
4use crate::{
5	btree::BTreeTable,
6	compress::Compress,
7	db::{check::CheckDisplay, Operation, RcValue},
8	display::hex,
9	error::{try_io, Error, Result},
10	index::{Address, IndexTable, PlanOutcome, TableId as IndexTableId},
11	log::{Log, LogAction, LogOverlays, LogQuery, LogReader, LogWriter},
12	options::{ColumnOptions, Metadata, Options, DEFAULT_COMPRESSION_THRESHOLD},
13	parking_lot::{RwLock, RwLockUpgradableReadGuard, RwLockWriteGuard},
14	stats::{ColumnStatSummary, ColumnStats},
15	table::{
16		key::{TableKey, TableKeyQuery},
17		TableId as ValueTableId, Value, ValueTable, SIZE_TIERS,
18	},
19	Key,
20};
21use std::{
22	collections::VecDeque,
23	path::PathBuf,
24	sync::{
25		atomic::{AtomicU64, Ordering},
26		Arc,
27	},
28};
29
30pub const MIN_INDEX_BITS: u8 = 16;
31// Measured in index entries
32const MAX_REINDEX_BATCH: usize = 8192;
33
34pub type ColId = u8;
35pub type Salt = [u8; 32];
36
37// The size tiers follow log distribution. Generated with the following code:
38//
39//{
40//	let mut r = [0u16; SIZE_TIERS - 1];
41//	let  start = MIN_ENTRY_SIZE as f64;
42//	let  end = MAX_ENTRY_SIZE as f64;
43//	let  n_slices = SIZE_TIERS - 1;
44//	let factor = ((end.ln() - start.ln()) / (n_slices - 1) as f64).exp();
45//
46//	let mut s = start;
47//	let mut i = 0;
48//	while i <  n_slices {
49//		r[i] = s.round() as u16;
50//		s = s * factor;
51//		i += 1;
52//	}
53//	r
54//};
55
56const SIZES: [u16; SIZE_TIERS - 1] = [
57	32, 33, 34, 35, 36, 37, 38, 39, 40, 41, 42, 43, 44, 46, 47, 48, 50, 51, 52, 54, 55, 57, 58, 60,
58	62, 63, 65, 67, 69, 71, 73, 75, 77, 79, 81, 83, 85, 88, 90, 93, 95, 98, 101, 103, 106, 109,
59	112, 115, 119, 122, 125, 129, 132, 136, 140, 144, 148, 152, 156, 160, 165, 169, 174, 179, 183,
60	189, 194, 199, 205, 210, 216, 222, 228, 235, 241, 248, 255, 262, 269, 276, 284, 292, 300, 308,
61	317, 325, 334, 344, 353, 363, 373, 383, 394, 405, 416, 428, 439, 452, 464, 477, 490, 504, 518,
62	532, 547, 562, 577, 593, 610, 627, 644, 662, 680, 699, 718, 738, 758, 779, 801, 823, 846, 869,
63	893, 918, 943, 969, 996, 1024, 1052, 1081, 1111, 1142, 1174, 1206, 1239, 1274, 1309, 1345,
64	1382, 1421, 1460, 1500, 1542, 1584, 1628, 1673, 1720, 1767, 1816, 1866, 1918, 1971, 2025, 2082,
65	2139, 2198, 2259, 2322, 2386, 2452, 2520, 2589, 2661, 2735, 2810, 2888, 2968, 3050, 3134, 3221,
66	3310, 3402, 3496, 3593, 3692, 3794, 3899, 4007, 4118, 4232, 4349, 4469, 4593, 4720, 4850, 4984,
67	5122, 5264, 5410, 5559, 5713, 5871, 6034, 6200, 6372, 6548, 6729, 6916, 7107, 7303, 7506, 7713,
68	7927, 8146, 8371, 8603, 8841, 9085, 9337, 9595, 9860, 10133, 10413, 10702, 10998, 11302, 11614,
69	11936, 12266, 12605, 12954, 13312, 13681, 14059, 14448, 14848, 15258, 15681, 16114, 16560,
70	17018, 17489, 17973, 18470, 18981, 19506, 20046, 20600, 21170, 21756, 22358, 22976, 23612,
71	24265, 24936, 25626, 26335, 27064, 27812, 28582, 29372, 30185, 31020, 31878, 32760,
72];
73
74#[derive(Debug)]
75struct Tables {
76	index: IndexTable,
77	value: Vec<ValueTable>,
78}
79
80#[derive(Debug)]
81struct Reindex {
82	queue: VecDeque<IndexTable>,
83	progress: AtomicU64,
84}
85
86#[allow(clippy::large_enum_variant)]
87#[derive(Debug)]
88pub enum Column {
89	Hash(HashColumn),
90	Tree(BTreeTable),
91}
92
93#[derive(Debug)]
94pub struct HashColumn {
95	col: ColId,
96	tables: RwLock<Tables>,
97	reindex: RwLock<Reindex>,
98	path: PathBuf,
99	preimage: bool,
100	uniform_keys: bool,
101	collect_stats: bool,
102	ref_counted: bool,
103	salt: Salt,
104	stats: ColumnStats,
105	compression: Compress,
106	db_version: u32,
107}
108
109#[derive(Clone, Copy)]
110pub struct TablesRef<'a> {
111	pub tables: &'a [ValueTable],
112	pub compression: &'a Compress,
113	pub col: ColId,
114	pub preimage: bool,
115	pub ref_counted: bool,
116}
117
118/// Value iteration state
119pub struct ValueIterState {
120	/// Reference counter.
121	pub rc: u32,
122	/// Value.
123	pub value: Vec<u8>,
124}
125
126// Only used for DB validation and migration.
127pub struct CorruptedIndexEntryInfo {
128	pub chunk_index: u64,
129	pub sub_index: u32,
130	pub entry: crate::index::Entry,
131	pub value_entry: Option<Vec<u8>>,
132	pub error: Option<Error>,
133}
134
135// Only used for DB validation and migration.
136pub struct IterState {
137	pub item_index: u64,
138	pub total_items: u64,
139	pub key: Key,
140	pub rc: u32,
141	pub value: Vec<u8>,
142}
143
144// Only used for DB validation and migration.
145enum IterStateOrCorrupted {
146	Item(IterState),
147	Corrupted(CorruptedIndexEntryInfo),
148}
149
150#[inline]
151pub fn hash_key(key: &[u8], salt: &Salt, uniform: bool, db_version: u32) -> Key {
152	use blake2::{
153		digest::{typenum::U32, FixedOutput, Update},
154		Blake2bMac,
155	};
156
157	let mut k = Key::default();
158	if uniform {
159		if db_version <= 5 {
160			k.copy_from_slice(&key[0..32]);
161		} else if db_version <= 7 {
162			// XOR with salt.
163			let key = &key[0..32];
164			for i in 0..32 {
165				k[i] = key[i] ^ salt[i];
166			}
167		} else {
168			#[cfg(any(test, feature = "instrumentation"))]
169			// Used for forcing collisions in tests.
170			if salt == &Salt::default() {
171				k.copy_from_slice(&key);
172				return k
173			}
174			// siphash 1-3 first 128 bits of the key
175			use siphasher::sip128::Hasher128;
176			use std::hash::Hasher;
177			let mut hasher = siphasher::sip128::SipHasher13::new_with_key(
178				salt[..16].try_into().expect("Salt length is 32"),
179			);
180			hasher.write(&key);
181			let hash = hasher.finish128();
182			k[0..8].copy_from_slice(&hash.h1.to_le_bytes());
183			k[8..16].copy_from_slice(&hash.h2.to_le_bytes());
184			k[16..].copy_from_slice(&key[16..]);
185		}
186	} else {
187		let mut ctx = Blake2bMac::<U32>::new_with_salt_and_personal(salt, &[], &[])
188			.expect("Salt length (32) is a valid key length (<= 64)");
189		ctx.update(key);
190		let hash = ctx.finalize_fixed();
191		k.copy_from_slice(&hash);
192	}
193	k
194}
195
196pub struct ReindexBatch {
197	pub drop_index: Option<IndexTableId>,
198	pub batch: Vec<(Key, Address)>,
199}
200
201impl HashColumn {
202	pub fn get(&self, key: &Key, log: &impl LogQuery) -> Result<Option<Value>> {
203		let tables = self.tables.read();
204		let values = self.as_ref(&tables.value);
205		if let Some((tier, value)) = self.get_in_index(key, &tables.index, values, log)? {
206			if self.collect_stats {
207				self.stats.query_hit(tier);
208			}
209			return Ok(Some(value))
210		}
211		for r in &self.reindex.read().queue {
212			if let Some((tier, value)) = self.get_in_index(key, r, values, log)? {
213				if self.collect_stats {
214					self.stats.query_hit(tier);
215				}
216				return Ok(Some(value))
217			}
218		}
219		if self.collect_stats {
220			self.stats.query_miss();
221		}
222		Ok(None)
223	}
224
225	pub fn get_size(&self, key: &Key, log: &RwLock<LogOverlays>) -> Result<Option<u32>> {
226		self.get(key, log).map(|v| v.map(|v| v.len() as u32))
227	}
228
229	fn get_in_index(
230		&self,
231		key: &Key,
232		index: &IndexTable,
233		tables: TablesRef,
234		log: &impl LogQuery,
235	) -> Result<Option<(u8, Value)>> {
236		let (mut entry, mut sub_index) = index.get(key, 0, log)?;
237		while !entry.is_empty() {
238			let address = entry.address(index.id.index_bits());
239			let value = Column::get_value(
240				TableKeyQuery::Check(&TableKey::Partial(*key)),
241				address,
242				tables,
243				log,
244			)?;
245			match value {
246				Some(result) => return Ok(Some(result)),
247				None => {
248					let (next_entry, next_index) = index.get(key, sub_index + 1, log)?;
249					entry = next_entry;
250					sub_index = next_index;
251				},
252			}
253		}
254		Ok(None)
255	}
256
257	pub fn as_ref<'a>(&'a self, tables: &'a [ValueTable]) -> TablesRef<'a> {
258		TablesRef {
259			tables,
260			preimage: self.preimage,
261			col: self.col,
262			ref_counted: self.ref_counted,
263			compression: &self.compression,
264		}
265	}
266}
267
268impl Column {
269	pub fn get_value(
270		mut key: TableKeyQuery,
271		address: Address,
272		tables: TablesRef,
273		log: &impl LogQuery,
274	) -> Result<Option<(u8, Value)>> {
275		let size_tier = address.size_tier() as usize;
276		if let Some((value, compressed, _rc)) =
277			tables.tables[size_tier].query(&mut key, address.offset(), log)?
278		{
279			let value = if compressed { tables.compression.decompress(&value)? } else { value };
280			return Ok(Some((size_tier as u8, value)))
281		}
282		Ok(None)
283	}
284
285	pub fn compress(
286		compression: &Compress,
287		key: &TableKey,
288		value: &[u8],
289		tables: &[ValueTable],
290	) -> (Option<Vec<u8>>, usize) {
291		let (len, result) = if value.len() > compression.threshold as usize {
292			let cvalue = compression.compress(value);
293			if cvalue.len() < value.len() {
294				(cvalue.len(), Some(cvalue))
295			} else {
296				(value.len(), None)
297			}
298		} else {
299			(value.len(), None)
300		};
301		let target_tier = tables
302			.iter()
303			.position(|t| t.value_size(key).map_or(false, |s| len <= s as usize));
304		let target_tier = target_tier.unwrap_or_else(|| {
305			log::trace!(target: "parity-db", "Using blob {}", key);
306			tables.len() - 1
307		});
308
309		(result, target_tier)
310	}
311
312	pub fn open(col: ColId, options: &Options, metadata: &Metadata) -> Result<Column> {
313		let path = &options.path;
314		let arc_path = Arc::new(path.clone());
315		let column_options = &metadata.columns[col as usize];
316		let db_version = metadata.version;
317		let value = (0..SIZE_TIERS)
318			.map(|i| Self::open_table(arc_path.clone(), col, i as u8, column_options, db_version))
319			.collect::<Result<_>>()?;
320
321		if column_options.btree_index {
322			Ok(Column::Tree(BTreeTable::open(col, value, options, metadata)?))
323		} else {
324			Ok(Column::Hash(HashColumn::open(col, value, options, metadata)?))
325		}
326	}
327
328	fn open_table(
329		path: Arc<PathBuf>,
330		col: ColId,
331		tier: u8,
332		options: &ColumnOptions,
333		db_version: u32,
334	) -> Result<ValueTable> {
335		let id = ValueTableId::new(col, tier);
336		let entry_size = SIZES.get(tier as usize).cloned();
337		ValueTable::open(path, id, entry_size, options, db_version)
338	}
339
340	pub(crate) fn drop_files(column: ColId, path: PathBuf) -> Result<()> {
341		// It is not specified how read_dir behaves when deleting and iterating in the same loop
342		// We collect a list of paths to be deleted first.
343		let mut to_delete = Vec::new();
344		for entry in try_io!(std::fs::read_dir(&path)) {
345			let entry = try_io!(entry);
346			if let Some(file) = entry.path().file_name().and_then(|f| f.to_str()) {
347				if crate::index::TableId::is_file_name(column, file) ||
348					crate::table::TableId::is_file_name(column, file)
349				{
350					to_delete.push(PathBuf::from(file));
351				}
352			}
353		}
354
355		for file in to_delete {
356			let mut path = path.clone();
357			path.push(file);
358			try_io!(std::fs::remove_file(path));
359		}
360		Ok(())
361	}
362}
363
364impl HashColumn {
365	fn open(
366		col: ColId,
367		value: Vec<ValueTable>,
368		options: &Options,
369		metadata: &Metadata,
370	) -> Result<HashColumn> {
371		let (index, reindexing, stats) = Self::open_index(&options.path, col)?;
372		let collect_stats = options.stats;
373		let path = &options.path;
374		let col_options = &metadata.columns[col as usize];
375		let db_version = metadata.version;
376		Ok(HashColumn {
377			col,
378			tables: RwLock::new(Tables { index, value }),
379			reindex: RwLock::new(Reindex { queue: reindexing, progress: AtomicU64::new(0) }),
380			path: path.into(),
381			preimage: col_options.preimage,
382			uniform_keys: col_options.uniform,
383			ref_counted: col_options.ref_counted,
384			collect_stats,
385			salt: metadata.salt,
386			stats,
387			compression: Compress::new(
388				col_options.compression,
389				options
390					.compression_threshold
391					.get(&col)
392					.copied()
393					.unwrap_or(DEFAULT_COMPRESSION_THRESHOLD),
394			),
395			db_version,
396		})
397	}
398
399	pub fn hash_key(&self, key: &[u8]) -> Key {
400		hash_key(key, &self.salt, self.uniform_keys, self.db_version)
401	}
402
403	pub fn flush(&self) -> Result<()> {
404		let tables = self.tables.read();
405		tables.index.flush()?;
406		for t in tables.value.iter() {
407			t.flush()?;
408		}
409		Ok(())
410	}
411
412	fn open_index(
413		path: &std::path::Path,
414		col: ColId,
415	) -> Result<(IndexTable, VecDeque<IndexTable>, ColumnStats)> {
416		let mut reindexing = VecDeque::new();
417		let mut top = None;
418		let mut stats = ColumnStats::empty();
419		for bits in (MIN_INDEX_BITS..65).rev() {
420			let id = IndexTableId::new(col, bits);
421			if let Some(table) = IndexTable::open_existing(path, id)? {
422				if top.is_none() {
423					stats = table.load_stats()?;
424					log::trace!(target: "parity-db", "Opened main index {}", table.id);
425					top = Some(table);
426				} else {
427					log::trace!(target: "parity-db", "Opened stale index {}", table.id);
428					reindexing.push_front(table);
429				}
430			}
431		}
432		let table = match top {
433			Some(table) => table,
434			None => IndexTable::create_new(path, IndexTableId::new(col, MIN_INDEX_BITS)),
435		};
436		Ok((table, reindexing, stats))
437	}
438
439	fn trigger_reindex<'a, 'b>(
440		tables: RwLockUpgradableReadGuard<'a, Tables>,
441		reindex: RwLockUpgradableReadGuard<'b, Reindex>,
442		path: &std::path::Path,
443	) -> (RwLockUpgradableReadGuard<'a, Tables>, RwLockUpgradableReadGuard<'b, Reindex>) {
444		let mut tables = RwLockUpgradableReadGuard::upgrade(tables);
445		let mut reindex = RwLockUpgradableReadGuard::upgrade(reindex);
446		log::info!(
447			target: "parity-db",
448			"Started reindex for {}",
449			tables.index.id,
450		);
451		// Start reindex
452		let new_index_id =
453			IndexTableId::new(tables.index.id.col(), tables.index.id.index_bits() + 1);
454		let new_table = IndexTable::create_new(path, new_index_id);
455		let old_table = std::mem::replace(&mut tables.index, new_table);
456		reindex.queue.push_back(old_table);
457		(
458			RwLockWriteGuard::downgrade_to_upgradable(tables),
459			RwLockWriteGuard::downgrade_to_upgradable(reindex),
460		)
461	}
462
463	pub fn write_reindex_plan(
464		&self,
465		key: &Key,
466		address: Address,
467		log: &mut LogWriter,
468	) -> Result<PlanOutcome> {
469		let tables = self.tables.upgradable_read();
470		let reindex = self.reindex.upgradable_read();
471		self.write_reindex_plan_locked(tables, reindex, key, address, log)
472	}
473
474	fn write_reindex_plan_locked(
475		&self,
476		mut tables: RwLockUpgradableReadGuard<Tables>,
477		mut reindex: RwLockUpgradableReadGuard<Reindex>,
478		key: &Key,
479		address: Address,
480		log: &mut LogWriter,
481	) -> Result<PlanOutcome> {
482		if Self::contains_partial_key_with_address(key, address, &tables.index, log)? {
483			log::trace!(target: "parity-db", "{}: Skipped reindex entry {} when reindexing", tables.index.id, hex(key));
484			return Ok(PlanOutcome::Skipped)
485		}
486		let mut outcome = PlanOutcome::Written;
487		while let PlanOutcome::NeedReindex =
488			tables.index.write_insert_plan(key, address, None, log)?
489		{
490			log::debug!(target: "parity-db", "{}: Index chunk full {} when reindexing", tables.index.id, hex(key));
491			(tables, reindex) = Self::trigger_reindex(tables, reindex, self.path.as_path());
492			outcome = PlanOutcome::NeedReindex;
493		}
494		Ok(outcome)
495	}
496
497	fn search_index<'a>(
498		key: &Key,
499		index: &'a IndexTable,
500		tables: &'a Tables,
501		log: &LogWriter,
502	) -> Result<Option<(&'a IndexTable, usize, Address)>> {
503		let (mut existing_entry, mut sub_index) = index.get(key, 0, log)?;
504		while !existing_entry.is_empty() {
505			let existing_address = existing_entry.address(index.id.index_bits());
506			let existing_tier = existing_address.size_tier();
507			let table_key = TableKey::Partial(*key);
508			if tables.value[existing_tier as usize].has_key_at(
509				existing_address.offset(),
510				&table_key,
511				log,
512			)? {
513				return Ok(Some((index, sub_index, existing_address)))
514			}
515
516			let (next_entry, next_index) = index.get(key, sub_index + 1, log)?;
517			existing_entry = next_entry;
518			sub_index = next_index;
519		}
520		Ok(None)
521	}
522
523	fn contains_partial_key_with_address(
524		key: &Key,
525		address: Address,
526		index: &IndexTable,
527		log: &LogWriter,
528	) -> Result<bool> {
529		let (mut existing_entry, mut sub_index) = index.get(key, 0, log)?;
530		while !existing_entry.is_empty() {
531			let existing_address = existing_entry.address(index.id.index_bits());
532			if existing_address == address {
533				return Ok(true)
534			}
535			let (next_entry, next_index) = index.get(key, sub_index + 1, log)?;
536			existing_entry = next_entry;
537			sub_index = next_index;
538		}
539		Ok(false)
540	}
541
542	fn search_all_indexes<'a>(
543		key: &Key,
544		tables: &'a Tables,
545		reindex: &'a Reindex,
546		log: &LogWriter,
547	) -> Result<Option<(&'a IndexTable, usize, Address)>> {
548		if let Some(r) = Self::search_index(key, &tables.index, tables, log)? {
549			return Ok(Some(r))
550		}
551		// Check old indexes
552		// TODO: don't search if index precedes reindex progress
553		for index in &reindex.queue {
554			if let Some(r) = Self::search_index(key, index, tables, log)? {
555				return Ok(Some(r))
556			}
557		}
558		Ok(None)
559	}
560
561	pub fn write_plan(
562		&self,
563		change: &Operation<Key, RcValue>,
564		log: &mut LogWriter,
565	) -> Result<PlanOutcome> {
566		let tables = self.tables.upgradable_read();
567		let reindex = self.reindex.upgradable_read();
568		let existing = Self::search_all_indexes(change.key(), &tables, &reindex, log)?;
569		if let Some((table, sub_index, existing_address)) = existing {
570			self.write_plan_existing(&tables, change, log, table, sub_index, existing_address)
571		} else {
572			match change {
573				Operation::Set(key, value) => {
574					let (r, _, _) =
575						self.write_plan_new(tables, reindex, key, value.as_ref(), log)?;
576					Ok(r)
577				},
578				Operation::Dereference(key) => {
579					log::trace!(target: "parity-db", "{}: Deleting missing key {}", tables.index.id, hex(key));
580					if self.collect_stats {
581						self.stats.remove_miss();
582					}
583					Ok(PlanOutcome::Skipped)
584				},
585				Operation::Reference(key) => {
586					log::trace!(target: "parity-db", "{}: Ignoring increase rc, missing key {}", tables.index.id, hex(key));
587					if self.collect_stats {
588						self.stats.reference_increase_miss();
589					}
590					Ok(PlanOutcome::Skipped)
591				},
592			}
593		}
594	}
595
596	#[allow(clippy::too_many_arguments)]
597	fn write_plan_existing(
598		&self,
599		tables: &Tables,
600		change: &Operation<Key, RcValue>,
601		log: &mut LogWriter,
602		index: &IndexTable,
603		sub_index: usize,
604		existing_address: Address,
605	) -> Result<PlanOutcome> {
606		let stats = if self.collect_stats { Some(&self.stats) } else { None };
607
608		let key = change.key();
609		let table_key = TableKey::Partial(*key);
610		match Column::write_existing_value_plan(
611			&table_key,
612			self.as_ref(&tables.value),
613			existing_address,
614			change,
615			log,
616			stats,
617			self.ref_counted,
618		)? {
619			(Some(outcome), _) => Ok(outcome),
620			(None, Some(value_address)) => {
621				// If it was found in an older index we just insert a new entry. Reindex won't
622				// overwrite it.
623				let sub_index = if index.id == tables.index.id { Some(sub_index) } else { None };
624				tables.index.write_insert_plan(key, value_address, sub_index, log)
625			},
626			(None, None) => {
627				log::trace!(target: "parity-db", "{}: Removing from index {}", tables.index.id, hex(key));
628				index.write_remove_plan(key, sub_index, log)?;
629				Ok(PlanOutcome::Written)
630			},
631		}
632	}
633
634	fn write_plan_new<'a, 'b>(
635		&self,
636		mut tables: RwLockUpgradableReadGuard<'a, Tables>,
637		mut reindex: RwLockUpgradableReadGuard<'b, Reindex>,
638		key: &Key,
639		value: &[u8],
640		log: &mut LogWriter,
641	) -> Result<(
642		PlanOutcome,
643		RwLockUpgradableReadGuard<'a, Tables>,
644		RwLockUpgradableReadGuard<'b, Reindex>,
645	)> {
646		let stats = self.collect_stats.then_some(&self.stats);
647		let table_key = TableKey::Partial(*key);
648		let address = Column::write_new_value_plan(
649			&table_key,
650			self.as_ref(&tables.value),
651			value,
652			log,
653			stats,
654		)?;
655		let mut outcome = PlanOutcome::Written;
656		while let PlanOutcome::NeedReindex =
657			tables.index.write_insert_plan(key, address, None, log)?
658		{
659			log::debug!(target: "parity-db", "{}: Index chunk full {}", tables.index.id, hex(key));
660			(tables, reindex) = Self::trigger_reindex(tables, reindex, self.path.as_path());
661			outcome = PlanOutcome::NeedReindex;
662		}
663		Ok((outcome, tables, reindex))
664	}
665
666	pub fn enact_plan(&self, action: LogAction, log: &mut LogReader) -> Result<()> {
667		let tables = self.tables.read();
668		let reindex = self.reindex.read();
669		match action {
670			LogAction::InsertIndex(record) => {
671				if tables.index.id == record.table {
672					tables.index.enact_plan(record.index, log)?;
673				} else if let Some(table) = reindex.queue.iter().find(|r| r.id == record.table) {
674					table.enact_plan(record.index, log)?;
675				} else {
676					// This may happen when removal is planed for an old index when reindexing.
677					// We can safely skip the removal since the new index does not have the entry
678					// anyway and the old index is already dropped.
679					log::debug!(
680						target: "parity-db",
681						"Missing index {}. Skipped",
682						record.table,
683					);
684					IndexTable::skip_plan(log)?;
685				}
686			},
687			LogAction::InsertValue(record) => {
688				tables.value[record.table.size_tier() as usize].enact_plan(record.index, log)?;
689			},
690			// This should never happen, unless something has modified the log file while the
691			// database is running. Existing logs should be validated with `validate_plan` on
692			// startup.
693			_ => return Err(Error::Corruption("Unexpected log action".into())),
694		}
695		Ok(())
696	}
697
698	pub fn validate_plan(&self, action: LogAction, log: &mut LogReader) -> Result<()> {
699		let tables = self.tables.upgradable_read();
700		let reindex = self.reindex.upgradable_read();
701		match action {
702			LogAction::InsertIndex(record) => {
703				if tables.index.id == record.table {
704					tables.index.validate_plan(record.index, log)?;
705				} else if let Some(table) = reindex.queue.iter().find(|r| r.id == record.table) {
706					table.validate_plan(record.index, log)?;
707				} else {
708					if record.table.index_bits() < tables.index.id.index_bits() {
709						// Insertion into a previously dropped index.
710						log::warn!( target: "parity-db", "Index {} is too old. Current is {}", record.table, tables.index.id);
711						return Err(Error::Corruption("Unexpected log index id".to_string()))
712					}
713					// Re-launch previously started reindex
714					// TODO: add explicit log records for reindexing events.
715					log::warn!(
716						target: "parity-db",
717						"Missing table {}, starting reindex",
718						record.table,
719					);
720					let lock = Self::trigger_reindex(tables, reindex, self.path.as_path());
721					std::mem::drop(lock);
722					return self.validate_plan(LogAction::InsertIndex(record), log)
723				}
724			},
725			LogAction::InsertValue(record) => {
726				tables.value[record.table.size_tier() as usize].validate_plan(record.index, log)?;
727			},
728			_ => {
729				log::error!(target: "parity-db", "Unexpected log action");
730				return Err(Error::Corruption("Unexpected log action".to_string()))
731			},
732		}
733		Ok(())
734	}
735
736	pub fn complete_plan(&self, log: &mut LogWriter) -> Result<()> {
737		let tables = self.tables.read();
738		for t in tables.value.iter() {
739			t.complete_plan(log)?;
740		}
741		if self.collect_stats {
742			self.stats.commit()
743		}
744		Ok(())
745	}
746
747	pub fn refresh_metadata(&self) -> Result<()> {
748		let tables = self.tables.read();
749		for t in tables.value.iter() {
750			t.refresh_metadata()?;
751		}
752		Ok(())
753	}
754
755	pub fn write_stats_text(&self, writer: &mut impl std::io::Write) -> Result<()> {
756		let tables = self.tables.read();
757		tables.index.write_stats(&self.stats)?;
758		self.stats.write_stats_text(writer, tables.index.id.col()).map_err(Error::Io)
759	}
760
761	fn stat_summary(&self) -> ColumnStatSummary {
762		self.stats.summary()
763	}
764
765	fn clear_stats(&self) -> Result<()> {
766		let tables = self.tables.read();
767		self.stats.clear();
768		tables.index.write_stats(&self.stats)
769	}
770
771	pub fn iter_values(&self, log: &Log, mut f: impl FnMut(ValueIterState) -> bool) -> Result<()> {
772		let tables = self.tables.read();
773		for table in &tables.value {
774			log::debug!( target: "parity-db", "{}: Iterating table {}", tables.index.id, table.id);
775			table.iter_while(log.overlays(), |_, rc, value, compressed| {
776				let value = if compressed {
777					if let Ok(value) = self.compression.decompress(&value) {
778						value
779					} else {
780						return false
781					}
782				} else {
783					value
784				};
785				let state = ValueIterState { rc, value };
786				f(state)
787			})?;
788			log::debug!( target: "parity-db", "{}: Done iterating table {}", tables.index.id, table.id);
789		}
790		Ok(())
791	}
792
793	pub fn iter_index(&self, log: &Log, mut f: impl FnMut(IterState) -> bool) -> Result<()> {
794		let action = |state| match state {
795			IterStateOrCorrupted::Item(item) => Ok(f(item)),
796			IterStateOrCorrupted::Corrupted(..) =>
797				Err(Error::Corruption("Missing indexed value".into())),
798		};
799		self.iter_index_internal(log, action, 0)
800	}
801
802	fn iter_index_internal(
803		&self,
804		log: &Log,
805		mut f: impl FnMut(IterStateOrCorrupted) -> Result<bool>,
806		start_chunk: u64,
807	) -> Result<()> {
808		let tables = self.tables.read();
809		let source = &tables.index;
810		let total_chunks = source.id.total_chunks();
811
812		for c in start_chunk..total_chunks {
813			let entries = source.entries(c, log.overlays())?;
814			for (sub_index, entry) in entries.iter().enumerate() {
815				if entry.is_empty() {
816					continue
817				}
818				let (size_tier, offset) = {
819					let address = entry.address(source.id.index_bits());
820					(address.size_tier(), address.offset())
821				};
822
823				let value = tables.value[size_tier as usize].get_with_meta(offset, log.overlays());
824				let (value, rc, pk, compressed) = match value {
825					Ok(Some(v)) => v,
826					Ok(None) => {
827						let value_entry = tables.value[size_tier as usize].dump_entry(offset).ok();
828						if !f(IterStateOrCorrupted::Corrupted(CorruptedIndexEntryInfo {
829							chunk_index: c,
830							sub_index: sub_index as u32,
831							value_entry,
832							entry: *entry,
833							error: None,
834						}))? {
835							return Ok(())
836						}
837						continue
838					},
839					Err(e) => {
840						let value_entry = if let Error::Corruption(_) = &e {
841							tables.value[size_tier as usize].dump_entry(offset).ok()
842						} else {
843							None
844						};
845						if !f(IterStateOrCorrupted::Corrupted(CorruptedIndexEntryInfo {
846							chunk_index: c,
847							sub_index: sub_index as u32,
848							value_entry,
849							entry: *entry,
850							error: Some(e),
851						}))? {
852							return Ok(())
853						}
854						continue
855					},
856				};
857				let mut key = source.recover_key_prefix(c, *entry);
858				key[6..].copy_from_slice(&pk);
859				let value = if compressed { self.compression.decompress(&value)? } else { value };
860				log::debug!(
861					target: "parity-db",
862					"{}: Iterating at {}/{}, key={:?}, pk={:?}",
863					source.id,
864					c,
865					source.id.total_chunks(),
866					hex(&key),
867					hex(&pk),
868				);
869				let state = IterStateOrCorrupted::Item(IterState {
870					item_index: c,
871					total_items: total_chunks,
872					key,
873					rc,
874					value,
875				});
876				if !f(state)? {
877					return Ok(())
878				}
879			}
880		}
881		Ok(())
882	}
883
884	fn iter_index_fast(
885		&self,
886		log: &Log,
887		mut f: impl FnMut(IterStateOrCorrupted) -> Result<bool>,
888		_start_chunk: u64,
889	) -> Result<()> {
890		let tables = self.tables.read();
891		let index = &tables.index;
892
893		let entries = index.sorted_entries()?;
894		let total = entries.len();
895		for (sub_index, entry) in entries.into_iter().enumerate() {
896			let (size_tier, offset) = {
897				let address = entry.address(index.id.index_bits());
898				(address.size_tier(), address.offset())
899			};
900
901			let value = tables.value[size_tier as usize].get_with_meta(offset, log.overlays());
902			let (value, rc, pk, compressed) = match value {
903				Ok(Some(v)) => v,
904				Ok(None) => {
905					let value_entry = tables.value[size_tier as usize].dump_entry(offset).ok();
906					if !f(IterStateOrCorrupted::Corrupted(CorruptedIndexEntryInfo {
907						chunk_index: sub_index as u64,
908						sub_index: sub_index as u32,
909						value_entry,
910						entry,
911						error: None,
912					}))? {
913						return Ok(())
914					}
915					continue
916				},
917				Err(e) => {
918					let value_entry = if let Error::Corruption(_) = &e {
919						tables.value[size_tier as usize].dump_entry(offset).ok()
920					} else {
921						None
922					};
923					if !f(IterStateOrCorrupted::Corrupted(CorruptedIndexEntryInfo {
924						chunk_index: sub_index as u64,
925						sub_index: sub_index as u32,
926						value_entry,
927						entry,
928						error: Some(e),
929					}))? {
930						return Ok(())
931					}
932					continue
933				},
934			};
935			let value = if compressed { self.compression.decompress(&value)? } else { value };
936			log::debug!(
937				target: "parity-db",
938				"{}: Iterating at {}/{}, pk={:?}",
939				index.id,
940				sub_index,
941				total,
942				hex(&pk),
943			);
944			let state = IterStateOrCorrupted::Item(IterState {
945				item_index: sub_index as u64,
946				total_items: total as u64,
947				key: Default::default(),
948				rc,
949				value,
950			});
951			if !f(state)? {
952				return Ok(())
953			}
954		}
955		Ok(())
956	}
957
958	fn dump(&self, log: &Log, check_params: &crate::CheckOptions, col: ColId) -> Result<()> {
959		let start_chunk = check_params.from.unwrap_or(0);
960		let end_chunk = check_params.bound;
961
962		let step = if check_params.fast { 1_000_000 } else { 10_000 };
963		let (denom, suffix) = if check_params.fast { (1_000_000, "m") } else { (1_000, "k") };
964		let mut next_info_at = step;
965		let start_time = std::time::Instant::now();
966		let index_id = self.tables.read().index.id;
967		log::info!(target: "parity-db", "Column {} (hash): Starting index validation", col);
968		let iter_fn =
969			if check_params.fast { Self::iter_index_fast } else { Self::iter_index_internal };
970		iter_fn(
971			self,
972			log,
973			|state| match state {
974				IterStateOrCorrupted::Item(IterState {
975					item_index,
976					total_items,
977					key,
978					rc,
979					value,
980				}) => {
981					if Some(item_index) == end_chunk {
982						return Ok(false)
983					}
984					if item_index >= next_info_at {
985						next_info_at += step;
986						log::info!(target: "parity-db", "Validated {}{} / {}{} entries", item_index / denom, suffix, total_items / denom, suffix);
987					}
988
989					match check_params.display {
990						CheckDisplay::Full => {
991							log::info!(
992								"Index key: {:x?}\n \
993							\tRc: {}",
994								&key,
995								rc,
996							);
997							log::info!("Value: {}", hex(&value));
998						},
999						CheckDisplay::Short(t) => {
1000							log::info!("Index key: {:x?}", &key);
1001							log::info!("Rc: {}, Value len: {}", rc, value.len());
1002							log::info!(
1003								"Value: {}",
1004								hex(&value[..std::cmp::min(t as usize, value.len())])
1005							);
1006						},
1007						CheckDisplay::None => (),
1008					}
1009					Ok(true)
1010				},
1011				IterStateOrCorrupted::Corrupted(c) => {
1012					log::error!(
1013						"Corrupted value for index entry: [{}][{}]: {} ({:?}). Error: {:?}",
1014						c.chunk_index,
1015						c.sub_index,
1016						c.entry.address(index_id.index_bits()),
1017						hex(&c.entry.as_u64().to_le_bytes()),
1018						c.error,
1019					);
1020					if let Some(v) = c.value_entry {
1021						log::error!("Value entry: {:?}", hex(v.as_slice()));
1022					}
1023					Ok(true)
1024				},
1025			},
1026			start_chunk,
1027		)?;
1028
1029		log::info!(target: "parity-db", "Index validation complete successfully, elapsed {:?}", start_time.elapsed());
1030		if check_params.validate_free_refs {
1031			log::info!(target: "parity-db", "Validating free refs");
1032			let tables = self.tables.read();
1033			let mut total = 0;
1034			for t in &tables.value {
1035				match t.check_free_refs() {
1036					Err(e) => log::warn!(target: "parity-db", "{}: Error: {:?}", t.id, e),
1037					Ok(n) => total += n,
1038				}
1039			}
1040			log::info!(target: "parity-db", "{} Total free refs", total);
1041		}
1042		Ok(())
1043	}
1044
1045	pub fn reindex(&self, log: &Log) -> Result<ReindexBatch> {
1046		let tables = self.tables.read();
1047		let reindex = self.reindex.read();
1048		let mut plan = Vec::new();
1049		let mut drop_index = None;
1050		if let Some(source) = reindex.queue.front() {
1051			let progress = reindex.progress.load(Ordering::Relaxed);
1052			if progress != source.id.total_chunks() {
1053				let mut source_index = progress;
1054				if source_index % 500 == 0 {
1055					log::debug!(target: "parity-db", "{}: Reindexing at {}/{}", tables.index.id, source_index, source.id.total_chunks());
1056				}
1057				log::debug!(target: "parity-db", "{}: Continue reindex at {}/{}", tables.index.id, source_index, source.id.total_chunks());
1058				while source_index < source.id.total_chunks() && plan.len() < MAX_REINDEX_BATCH {
1059					log::trace!(target: "parity-db", "{}: Reindexing {}", source.id, source_index);
1060					let entries = source.entries(source_index, log.overlays())?;
1061					for entry in entries.iter() {
1062						if entry.is_empty() {
1063							continue
1064						}
1065						// We only need key prefix to reindex.
1066						let key = source.recover_key_prefix(source_index, *entry);
1067						plan.push((key, entry.address(source.id.index_bits())))
1068					}
1069					source_index += 1;
1070				}
1071				log::trace!(target: "parity-db", "{}: End reindex batch {} ({})", tables.index.id, source_index, plan.len());
1072				reindex.progress.store(source_index, Ordering::Relaxed);
1073				if source_index == source.id.total_chunks() {
1074					log::info!(target: "parity-db", "Completed reindex {} into {}", source.id, tables.index.id);
1075					drop_index = Some(source.id);
1076				}
1077			}
1078		}
1079		Ok(ReindexBatch { drop_index, batch: plan })
1080	}
1081
1082	pub fn drop_index(&self, id: IndexTableId) -> Result<()> {
1083		log::debug!(target: "parity-db", "Dropping {}", id);
1084		let mut reindex = self.reindex.write();
1085		if reindex.queue.front_mut().map_or(false, |index| index.id == id) {
1086			let table = reindex.queue.pop_front();
1087			reindex.progress.store(0, Ordering::Relaxed);
1088			table.unwrap().drop_file()?;
1089		} else {
1090			log::warn!(target: "parity-db", "Dropping invalid index {}", id);
1091			return Ok(())
1092		}
1093		log::debug!(target: "parity-db", "Dropped {}", id);
1094		Ok(())
1095	}
1096}
1097
1098impl Column {
1099	pub fn write_existing_value_plan<K, V: AsRef<[u8]>>(
1100		key: &TableKey,
1101		tables: TablesRef,
1102		address: Address,
1103		change: &Operation<K, V>,
1104		log: &mut LogWriter,
1105		stats: Option<&ColumnStats>,
1106		ref_counted: bool,
1107	) -> Result<(Option<PlanOutcome>, Option<Address>)> {
1108		let tier = address.size_tier() as usize;
1109
1110		let fetch_size = || -> Result<(u32, u32)> {
1111			let (cur_size, compressed) =
1112				tables.tables[tier].size(key, address.offset(), log)?.unwrap_or((0, false));
1113			Ok(if compressed {
1114				// This is very costly.
1115				let compressed = tables.tables[tier]
1116					.get(key, address.offset(), log)?
1117					.expect("Same query as size")
1118					.0;
1119				let uncompressed = tables.compression.decompress(compressed.as_slice())?;
1120
1121				(cur_size, uncompressed.len() as u32)
1122			} else {
1123				(cur_size, cur_size)
1124			})
1125		};
1126
1127		match change {
1128			Operation::Reference(_) =>
1129				if ref_counted {
1130					log::trace!(target: "parity-db", "{}: Increment ref {}", tables.col, key);
1131					tables.tables[tier].write_inc_ref(address.offset(), log)?;
1132					if let Some(stats) = stats {
1133						stats.reference_increase();
1134					}
1135					Ok((Some(PlanOutcome::Written), None))
1136				} else {
1137					Ok((Some(PlanOutcome::Skipped), None))
1138				},
1139			Operation::Set(_, val) => {
1140				if ref_counted {
1141					log::trace!(target: "parity-db", "{}: Increment ref {}", tables.col, key);
1142					tables.tables[tier].write_inc_ref(address.offset(), log)?;
1143					return Ok((Some(PlanOutcome::Written), None))
1144				}
1145				if tables.preimage {
1146					// Replace is not supported
1147					return Ok((Some(PlanOutcome::Skipped), None))
1148				}
1149
1150				let (cval, target_tier) =
1151					Column::compress(tables.compression, key, val.as_ref(), tables.tables);
1152				let (cval, compressed) = cval
1153					.as_ref()
1154					.map(|cval| (cval.as_slice(), true))
1155					.unwrap_or((val.as_ref(), false));
1156
1157				if let Some(stats) = stats {
1158					let (cur_size, uncompressed) = fetch_size()?;
1159					stats.replace_val(
1160						cur_size,
1161						uncompressed,
1162						val.as_ref().len() as u32,
1163						cval.len() as u32,
1164					);
1165				}
1166				if tier == target_tier {
1167					log::trace!(target: "parity-db", "{}: Replacing {}", tables.col, key);
1168					tables.tables[target_tier].write_replace_plan(
1169						address.offset(),
1170						key,
1171						cval,
1172						log,
1173						compressed,
1174					)?;
1175					Ok((Some(PlanOutcome::Written), None))
1176				} else {
1177					log::trace!(target: "parity-db", "{}: Replacing in a new table {}", tables.col, key);
1178					tables.tables[tier].write_remove_plan(address.offset(), log)?;
1179					let new_offset =
1180						tables.tables[target_tier].write_insert_plan(key, cval, log, compressed)?;
1181					let new_address = Address::new(new_offset, target_tier as u8);
1182					Ok((None, Some(new_address)))
1183				}
1184			},
1185			Operation::Dereference(_) => {
1186				// Deletion
1187				let cur_size = if stats.is_some() { Some(fetch_size()?) } else { None };
1188				let remove = if ref_counted {
1189					let removed = !tables.tables[tier].write_dec_ref(address.offset(), log)?;
1190					log::trace!(target: "parity-db", "{}: Dereference {}, deleted={}", tables.col, key, removed);
1191					removed
1192				} else {
1193					log::trace!(target: "parity-db", "{}: Deleting {}", tables.col, key);
1194					tables.tables[tier].write_remove_plan(address.offset(), log)?;
1195					true
1196				};
1197				if remove {
1198					if let Some((compressed_size, uncompressed_size)) = cur_size {
1199						if let Some(stats) = stats {
1200							stats.remove_val(uncompressed_size, compressed_size);
1201						}
1202					}
1203					Ok((None, None))
1204				} else {
1205					Ok((Some(PlanOutcome::Written), None))
1206				}
1207			},
1208		}
1209	}
1210
1211	pub fn write_new_value_plan(
1212		key: &TableKey,
1213		tables: TablesRef,
1214		val: &[u8],
1215		log: &mut LogWriter,
1216		stats: Option<&ColumnStats>,
1217	) -> Result<Address> {
1218		let (cval, target_tier) = Column::compress(tables.compression, key, val, tables.tables);
1219		let (cval, compressed) =
1220			cval.as_ref().map(|cval| (cval.as_slice(), true)).unwrap_or((val, false));
1221
1222		log::trace!(target: "parity-db", "{}: Inserting new {}, size = {}", tables.col, key, cval.len());
1223		let offset = tables.tables[target_tier].write_insert_plan(key, cval, log, compressed)?;
1224		let address = Address::new(offset, target_tier as u8);
1225
1226		if let Some(stats) = stats {
1227			stats.insert_val(val.len() as u32, cval.len() as u32);
1228		}
1229		Ok(address)
1230	}
1231
1232	pub fn complete_plan(&self, log: &mut LogWriter) -> Result<()> {
1233		match self {
1234			Column::Hash(column) => column.complete_plan(log),
1235			Column::Tree(column) => column.complete_plan(log),
1236		}
1237	}
1238
1239	pub fn validate_plan(&self, action: LogAction, log: &mut LogReader) -> Result<()> {
1240		match self {
1241			Column::Hash(column) => column.validate_plan(action, log),
1242			Column::Tree(column) => column.validate_plan(action, log),
1243		}
1244	}
1245
1246	pub fn enact_plan(&self, action: LogAction, log: &mut LogReader) -> Result<()> {
1247		match self {
1248			Column::Hash(column) => column.enact_plan(action, log),
1249			Column::Tree(column) => column.enact_plan(action, log),
1250		}
1251	}
1252
1253	pub fn flush(&self) -> Result<()> {
1254		match self {
1255			Column::Hash(column) => column.flush(),
1256			Column::Tree(column) => column.flush(),
1257		}
1258	}
1259
1260	pub fn refresh_metadata(&self) -> Result<()> {
1261		match self {
1262			Column::Hash(column) => column.refresh_metadata(),
1263			Column::Tree(column) => column.refresh_metadata(),
1264		}
1265	}
1266
1267	pub fn write_stats_text(&self, writer: &mut impl std::io::Write) -> Result<()> {
1268		match self {
1269			Column::Hash(column) => column.write_stats_text(writer),
1270			Column::Tree(_column) => Ok(()),
1271		}
1272	}
1273
1274	pub fn clear_stats(&self) -> Result<()> {
1275		match self {
1276			Column::Hash(column) => column.clear_stats(),
1277			Column::Tree(_column) => Ok(()),
1278		}
1279	}
1280
1281	pub fn stats(&self) -> Option<ColumnStatSummary> {
1282		match self {
1283			Column::Hash(column) => Some(column.stat_summary()),
1284			Column::Tree(_column) => None,
1285		}
1286	}
1287
1288	pub fn dump(&self, log: &Log, check_params: &crate::CheckOptions, col: ColId) -> Result<()> {
1289		match self {
1290			Column::Hash(column) => column.dump(log, check_params, col),
1291			Column::Tree(_column) => Ok(()),
1292		}
1293	}
1294
1295	#[cfg(test)]
1296	#[cfg(feature = "instrumentation")]
1297	pub fn index_bits(&self) -> Option<u8> {
1298		match self {
1299			Column::Hash(column) => Some(column.tables.read().index.id.index_bits()),
1300			Column::Tree(_column) => None,
1301		}
1302	}
1303}