1use crate::{
5 btree::BTreeTable,
6 compress::Compress,
7 db::{check::CheckDisplay, Operation, RcValue},
8 display::hex,
9 error::{try_io, Error, Result},
10 index::{Address, IndexTable, PlanOutcome, TableId as IndexTableId},
11 log::{Log, LogAction, LogOverlays, LogQuery, LogReader, LogWriter},
12 options::{ColumnOptions, Metadata, Options, DEFAULT_COMPRESSION_THRESHOLD},
13 parking_lot::{RwLock, RwLockUpgradableReadGuard, RwLockWriteGuard},
14 stats::{ColumnStatSummary, ColumnStats},
15 table::{
16 key::{TableKey, TableKeyQuery},
17 TableId as ValueTableId, Value, ValueTable, SIZE_TIERS,
18 },
19 Key,
20};
21use std::{
22 collections::VecDeque,
23 path::PathBuf,
24 sync::{
25 atomic::{AtomicU64, Ordering},
26 Arc,
27 },
28};
29
30pub const MIN_INDEX_BITS: u8 = 16;
31const MAX_REINDEX_BATCH: usize = 8192;
33
34pub type ColId = u8;
35pub type Salt = [u8; 32];
36
37const SIZES: [u16; SIZE_TIERS - 1] = [
57 32, 33, 34, 35, 36, 37, 38, 39, 40, 41, 42, 43, 44, 46, 47, 48, 50, 51, 52, 54, 55, 57, 58, 60,
58 62, 63, 65, 67, 69, 71, 73, 75, 77, 79, 81, 83, 85, 88, 90, 93, 95, 98, 101, 103, 106, 109,
59 112, 115, 119, 122, 125, 129, 132, 136, 140, 144, 148, 152, 156, 160, 165, 169, 174, 179, 183,
60 189, 194, 199, 205, 210, 216, 222, 228, 235, 241, 248, 255, 262, 269, 276, 284, 292, 300, 308,
61 317, 325, 334, 344, 353, 363, 373, 383, 394, 405, 416, 428, 439, 452, 464, 477, 490, 504, 518,
62 532, 547, 562, 577, 593, 610, 627, 644, 662, 680, 699, 718, 738, 758, 779, 801, 823, 846, 869,
63 893, 918, 943, 969, 996, 1024, 1052, 1081, 1111, 1142, 1174, 1206, 1239, 1274, 1309, 1345,
64 1382, 1421, 1460, 1500, 1542, 1584, 1628, 1673, 1720, 1767, 1816, 1866, 1918, 1971, 2025, 2082,
65 2139, 2198, 2259, 2322, 2386, 2452, 2520, 2589, 2661, 2735, 2810, 2888, 2968, 3050, 3134, 3221,
66 3310, 3402, 3496, 3593, 3692, 3794, 3899, 4007, 4118, 4232, 4349, 4469, 4593, 4720, 4850, 4984,
67 5122, 5264, 5410, 5559, 5713, 5871, 6034, 6200, 6372, 6548, 6729, 6916, 7107, 7303, 7506, 7713,
68 7927, 8146, 8371, 8603, 8841, 9085, 9337, 9595, 9860, 10133, 10413, 10702, 10998, 11302, 11614,
69 11936, 12266, 12605, 12954, 13312, 13681, 14059, 14448, 14848, 15258, 15681, 16114, 16560,
70 17018, 17489, 17973, 18470, 18981, 19506, 20046, 20600, 21170, 21756, 22358, 22976, 23612,
71 24265, 24936, 25626, 26335, 27064, 27812, 28582, 29372, 30185, 31020, 31878, 32760,
72];
73
74#[derive(Debug)]
75struct Tables {
76 index: IndexTable,
77 value: Vec<ValueTable>,
78}
79
80#[derive(Debug)]
81struct Reindex {
82 queue: VecDeque<IndexTable>,
83 progress: AtomicU64,
84}
85
86#[allow(clippy::large_enum_variant)]
87#[derive(Debug)]
88pub enum Column {
89 Hash(HashColumn),
90 Tree(BTreeTable),
91}
92
93#[derive(Debug)]
94pub struct HashColumn {
95 col: ColId,
96 tables: RwLock<Tables>,
97 reindex: RwLock<Reindex>,
98 path: PathBuf,
99 preimage: bool,
100 uniform_keys: bool,
101 collect_stats: bool,
102 ref_counted: bool,
103 salt: Salt,
104 stats: ColumnStats,
105 compression: Compress,
106 db_version: u32,
107}
108
109#[derive(Clone, Copy)]
110pub struct TablesRef<'a> {
111 pub tables: &'a [ValueTable],
112 pub compression: &'a Compress,
113 pub col: ColId,
114 pub preimage: bool,
115 pub ref_counted: bool,
116}
117
118pub struct ValueIterState {
120 pub rc: u32,
122 pub value: Vec<u8>,
124}
125
126pub struct CorruptedIndexEntryInfo {
128 pub chunk_index: u64,
129 pub sub_index: u32,
130 pub entry: crate::index::Entry,
131 pub value_entry: Option<Vec<u8>>,
132 pub error: Option<Error>,
133}
134
135pub struct IterState {
137 pub item_index: u64,
138 pub total_items: u64,
139 pub key: Key,
140 pub rc: u32,
141 pub value: Vec<u8>,
142}
143
144enum IterStateOrCorrupted {
146 Item(IterState),
147 Corrupted(CorruptedIndexEntryInfo),
148}
149
150#[inline]
151pub fn hash_key(key: &[u8], salt: &Salt, uniform: bool, db_version: u32) -> Key {
152 use blake2::{
153 digest::{typenum::U32, FixedOutput, Update},
154 Blake2bMac,
155 };
156
157 let mut k = Key::default();
158 if uniform {
159 if db_version <= 5 {
160 k.copy_from_slice(&key[0..32]);
161 } else if db_version <= 7 {
162 let key = &key[0..32];
164 for i in 0..32 {
165 k[i] = key[i] ^ salt[i];
166 }
167 } else {
168 #[cfg(any(test, feature = "instrumentation"))]
169 if salt == &Salt::default() {
171 k.copy_from_slice(&key);
172 return k
173 }
174 use siphasher::sip128::Hasher128;
176 use std::hash::Hasher;
177 let mut hasher = siphasher::sip128::SipHasher13::new_with_key(
178 salt[..16].try_into().expect("Salt length is 32"),
179 );
180 hasher.write(&key);
181 let hash = hasher.finish128();
182 k[0..8].copy_from_slice(&hash.h1.to_le_bytes());
183 k[8..16].copy_from_slice(&hash.h2.to_le_bytes());
184 k[16..].copy_from_slice(&key[16..]);
185 }
186 } else {
187 let mut ctx = Blake2bMac::<U32>::new_with_salt_and_personal(salt, &[], &[])
188 .expect("Salt length (32) is a valid key length (<= 64)");
189 ctx.update(key);
190 let hash = ctx.finalize_fixed();
191 k.copy_from_slice(&hash);
192 }
193 k
194}
195
196pub struct ReindexBatch {
197 pub drop_index: Option<IndexTableId>,
198 pub batch: Vec<(Key, Address)>,
199}
200
201impl HashColumn {
202 pub fn get(&self, key: &Key, log: &impl LogQuery) -> Result<Option<Value>> {
203 let tables = self.tables.read();
204 let values = self.as_ref(&tables.value);
205 if let Some((tier, value)) = self.get_in_index(key, &tables.index, values, log)? {
206 if self.collect_stats {
207 self.stats.query_hit(tier);
208 }
209 return Ok(Some(value))
210 }
211 for r in &self.reindex.read().queue {
212 if let Some((tier, value)) = self.get_in_index(key, r, values, log)? {
213 if self.collect_stats {
214 self.stats.query_hit(tier);
215 }
216 return Ok(Some(value))
217 }
218 }
219 if self.collect_stats {
220 self.stats.query_miss();
221 }
222 Ok(None)
223 }
224
225 pub fn get_size(&self, key: &Key, log: &RwLock<LogOverlays>) -> Result<Option<u32>> {
226 self.get(key, log).map(|v| v.map(|v| v.len() as u32))
227 }
228
229 fn get_in_index(
230 &self,
231 key: &Key,
232 index: &IndexTable,
233 tables: TablesRef,
234 log: &impl LogQuery,
235 ) -> Result<Option<(u8, Value)>> {
236 let (mut entry, mut sub_index) = index.get(key, 0, log)?;
237 while !entry.is_empty() {
238 let address = entry.address(index.id.index_bits());
239 let value = Column::get_value(
240 TableKeyQuery::Check(&TableKey::Partial(*key)),
241 address,
242 tables,
243 log,
244 )?;
245 match value {
246 Some(result) => return Ok(Some(result)),
247 None => {
248 let (next_entry, next_index) = index.get(key, sub_index + 1, log)?;
249 entry = next_entry;
250 sub_index = next_index;
251 },
252 }
253 }
254 Ok(None)
255 }
256
257 pub fn as_ref<'a>(&'a self, tables: &'a [ValueTable]) -> TablesRef<'a> {
258 TablesRef {
259 tables,
260 preimage: self.preimage,
261 col: self.col,
262 ref_counted: self.ref_counted,
263 compression: &self.compression,
264 }
265 }
266}
267
268impl Column {
269 pub fn get_value(
270 mut key: TableKeyQuery,
271 address: Address,
272 tables: TablesRef,
273 log: &impl LogQuery,
274 ) -> Result<Option<(u8, Value)>> {
275 let size_tier = address.size_tier() as usize;
276 if let Some((value, compressed, _rc)) =
277 tables.tables[size_tier].query(&mut key, address.offset(), log)?
278 {
279 let value = if compressed { tables.compression.decompress(&value)? } else { value };
280 return Ok(Some((size_tier as u8, value)))
281 }
282 Ok(None)
283 }
284
285 pub fn compress(
286 compression: &Compress,
287 key: &TableKey,
288 value: &[u8],
289 tables: &[ValueTable],
290 ) -> (Option<Vec<u8>>, usize) {
291 let (len, result) = if value.len() > compression.threshold as usize {
292 let cvalue = compression.compress(value);
293 if cvalue.len() < value.len() {
294 (cvalue.len(), Some(cvalue))
295 } else {
296 (value.len(), None)
297 }
298 } else {
299 (value.len(), None)
300 };
301 let target_tier = tables
302 .iter()
303 .position(|t| t.value_size(key).map_or(false, |s| len <= s as usize));
304 let target_tier = target_tier.unwrap_or_else(|| {
305 log::trace!(target: "parity-db", "Using blob {}", key);
306 tables.len() - 1
307 });
308
309 (result, target_tier)
310 }
311
312 pub fn open(col: ColId, options: &Options, metadata: &Metadata) -> Result<Column> {
313 let path = &options.path;
314 let arc_path = Arc::new(path.clone());
315 let column_options = &metadata.columns[col as usize];
316 let db_version = metadata.version;
317 let value = (0..SIZE_TIERS)
318 .map(|i| Self::open_table(arc_path.clone(), col, i as u8, column_options, db_version))
319 .collect::<Result<_>>()?;
320
321 if column_options.btree_index {
322 Ok(Column::Tree(BTreeTable::open(col, value, options, metadata)?))
323 } else {
324 Ok(Column::Hash(HashColumn::open(col, value, options, metadata)?))
325 }
326 }
327
328 fn open_table(
329 path: Arc<PathBuf>,
330 col: ColId,
331 tier: u8,
332 options: &ColumnOptions,
333 db_version: u32,
334 ) -> Result<ValueTable> {
335 let id = ValueTableId::new(col, tier);
336 let entry_size = SIZES.get(tier as usize).cloned();
337 ValueTable::open(path, id, entry_size, options, db_version)
338 }
339
340 pub(crate) fn drop_files(column: ColId, path: PathBuf) -> Result<()> {
341 let mut to_delete = Vec::new();
344 for entry in try_io!(std::fs::read_dir(&path)) {
345 let entry = try_io!(entry);
346 if let Some(file) = entry.path().file_name().and_then(|f| f.to_str()) {
347 if crate::index::TableId::is_file_name(column, file) ||
348 crate::table::TableId::is_file_name(column, file)
349 {
350 to_delete.push(PathBuf::from(file));
351 }
352 }
353 }
354
355 for file in to_delete {
356 let mut path = path.clone();
357 path.push(file);
358 try_io!(std::fs::remove_file(path));
359 }
360 Ok(())
361 }
362}
363
364impl HashColumn {
365 fn open(
366 col: ColId,
367 value: Vec<ValueTable>,
368 options: &Options,
369 metadata: &Metadata,
370 ) -> Result<HashColumn> {
371 let (index, reindexing, stats) = Self::open_index(&options.path, col)?;
372 let collect_stats = options.stats;
373 let path = &options.path;
374 let col_options = &metadata.columns[col as usize];
375 let db_version = metadata.version;
376 Ok(HashColumn {
377 col,
378 tables: RwLock::new(Tables { index, value }),
379 reindex: RwLock::new(Reindex { queue: reindexing, progress: AtomicU64::new(0) }),
380 path: path.into(),
381 preimage: col_options.preimage,
382 uniform_keys: col_options.uniform,
383 ref_counted: col_options.ref_counted,
384 collect_stats,
385 salt: metadata.salt,
386 stats,
387 compression: Compress::new(
388 col_options.compression,
389 options
390 .compression_threshold
391 .get(&col)
392 .copied()
393 .unwrap_or(DEFAULT_COMPRESSION_THRESHOLD),
394 ),
395 db_version,
396 })
397 }
398
399 pub fn hash_key(&self, key: &[u8]) -> Key {
400 hash_key(key, &self.salt, self.uniform_keys, self.db_version)
401 }
402
403 pub fn flush(&self) -> Result<()> {
404 let tables = self.tables.read();
405 tables.index.flush()?;
406 for t in tables.value.iter() {
407 t.flush()?;
408 }
409 Ok(())
410 }
411
412 fn open_index(
413 path: &std::path::Path,
414 col: ColId,
415 ) -> Result<(IndexTable, VecDeque<IndexTable>, ColumnStats)> {
416 let mut reindexing = VecDeque::new();
417 let mut top = None;
418 let mut stats = ColumnStats::empty();
419 for bits in (MIN_INDEX_BITS..65).rev() {
420 let id = IndexTableId::new(col, bits);
421 if let Some(table) = IndexTable::open_existing(path, id)? {
422 if top.is_none() {
423 stats = table.load_stats()?;
424 log::trace!(target: "parity-db", "Opened main index {}", table.id);
425 top = Some(table);
426 } else {
427 log::trace!(target: "parity-db", "Opened stale index {}", table.id);
428 reindexing.push_front(table);
429 }
430 }
431 }
432 let table = match top {
433 Some(table) => table,
434 None => IndexTable::create_new(path, IndexTableId::new(col, MIN_INDEX_BITS)),
435 };
436 Ok((table, reindexing, stats))
437 }
438
439 fn trigger_reindex<'a, 'b>(
440 tables: RwLockUpgradableReadGuard<'a, Tables>,
441 reindex: RwLockUpgradableReadGuard<'b, Reindex>,
442 path: &std::path::Path,
443 ) -> (RwLockUpgradableReadGuard<'a, Tables>, RwLockUpgradableReadGuard<'b, Reindex>) {
444 let mut tables = RwLockUpgradableReadGuard::upgrade(tables);
445 let mut reindex = RwLockUpgradableReadGuard::upgrade(reindex);
446 log::info!(
447 target: "parity-db",
448 "Started reindex for {}",
449 tables.index.id,
450 );
451 let new_index_id =
453 IndexTableId::new(tables.index.id.col(), tables.index.id.index_bits() + 1);
454 let new_table = IndexTable::create_new(path, new_index_id);
455 let old_table = std::mem::replace(&mut tables.index, new_table);
456 reindex.queue.push_back(old_table);
457 (
458 RwLockWriteGuard::downgrade_to_upgradable(tables),
459 RwLockWriteGuard::downgrade_to_upgradable(reindex),
460 )
461 }
462
463 pub fn write_reindex_plan(
464 &self,
465 key: &Key,
466 address: Address,
467 log: &mut LogWriter,
468 ) -> Result<PlanOutcome> {
469 let tables = self.tables.upgradable_read();
470 let reindex = self.reindex.upgradable_read();
471 self.write_reindex_plan_locked(tables, reindex, key, address, log)
472 }
473
474 fn write_reindex_plan_locked(
475 &self,
476 mut tables: RwLockUpgradableReadGuard<Tables>,
477 mut reindex: RwLockUpgradableReadGuard<Reindex>,
478 key: &Key,
479 address: Address,
480 log: &mut LogWriter,
481 ) -> Result<PlanOutcome> {
482 if Self::contains_partial_key_with_address(key, address, &tables.index, log)? {
483 log::trace!(target: "parity-db", "{}: Skipped reindex entry {} when reindexing", tables.index.id, hex(key));
484 return Ok(PlanOutcome::Skipped)
485 }
486 let mut outcome = PlanOutcome::Written;
487 while let PlanOutcome::NeedReindex =
488 tables.index.write_insert_plan(key, address, None, log)?
489 {
490 log::debug!(target: "parity-db", "{}: Index chunk full {} when reindexing", tables.index.id, hex(key));
491 (tables, reindex) = Self::trigger_reindex(tables, reindex, self.path.as_path());
492 outcome = PlanOutcome::NeedReindex;
493 }
494 Ok(outcome)
495 }
496
497 fn search_index<'a>(
498 key: &Key,
499 index: &'a IndexTable,
500 tables: &'a Tables,
501 log: &LogWriter,
502 ) -> Result<Option<(&'a IndexTable, usize, Address)>> {
503 let (mut existing_entry, mut sub_index) = index.get(key, 0, log)?;
504 while !existing_entry.is_empty() {
505 let existing_address = existing_entry.address(index.id.index_bits());
506 let existing_tier = existing_address.size_tier();
507 let table_key = TableKey::Partial(*key);
508 if tables.value[existing_tier as usize].has_key_at(
509 existing_address.offset(),
510 &table_key,
511 log,
512 )? {
513 return Ok(Some((index, sub_index, existing_address)))
514 }
515
516 let (next_entry, next_index) = index.get(key, sub_index + 1, log)?;
517 existing_entry = next_entry;
518 sub_index = next_index;
519 }
520 Ok(None)
521 }
522
523 fn contains_partial_key_with_address(
524 key: &Key,
525 address: Address,
526 index: &IndexTable,
527 log: &LogWriter,
528 ) -> Result<bool> {
529 let (mut existing_entry, mut sub_index) = index.get(key, 0, log)?;
530 while !existing_entry.is_empty() {
531 let existing_address = existing_entry.address(index.id.index_bits());
532 if existing_address == address {
533 return Ok(true)
534 }
535 let (next_entry, next_index) = index.get(key, sub_index + 1, log)?;
536 existing_entry = next_entry;
537 sub_index = next_index;
538 }
539 Ok(false)
540 }
541
542 fn search_all_indexes<'a>(
543 key: &Key,
544 tables: &'a Tables,
545 reindex: &'a Reindex,
546 log: &LogWriter,
547 ) -> Result<Option<(&'a IndexTable, usize, Address)>> {
548 if let Some(r) = Self::search_index(key, &tables.index, tables, log)? {
549 return Ok(Some(r))
550 }
551 for index in &reindex.queue {
554 if let Some(r) = Self::search_index(key, index, tables, log)? {
555 return Ok(Some(r))
556 }
557 }
558 Ok(None)
559 }
560
561 pub fn write_plan(
562 &self,
563 change: &Operation<Key, RcValue>,
564 log: &mut LogWriter,
565 ) -> Result<PlanOutcome> {
566 let tables = self.tables.upgradable_read();
567 let reindex = self.reindex.upgradable_read();
568 let existing = Self::search_all_indexes(change.key(), &tables, &reindex, log)?;
569 if let Some((table, sub_index, existing_address)) = existing {
570 self.write_plan_existing(&tables, change, log, table, sub_index, existing_address)
571 } else {
572 match change {
573 Operation::Set(key, value) => {
574 let (r, _, _) =
575 self.write_plan_new(tables, reindex, key, value.as_ref(), log)?;
576 Ok(r)
577 },
578 Operation::Dereference(key) => {
579 log::trace!(target: "parity-db", "{}: Deleting missing key {}", tables.index.id, hex(key));
580 if self.collect_stats {
581 self.stats.remove_miss();
582 }
583 Ok(PlanOutcome::Skipped)
584 },
585 Operation::Reference(key) => {
586 log::trace!(target: "parity-db", "{}: Ignoring increase rc, missing key {}", tables.index.id, hex(key));
587 if self.collect_stats {
588 self.stats.reference_increase_miss();
589 }
590 Ok(PlanOutcome::Skipped)
591 },
592 }
593 }
594 }
595
596 #[allow(clippy::too_many_arguments)]
597 fn write_plan_existing(
598 &self,
599 tables: &Tables,
600 change: &Operation<Key, RcValue>,
601 log: &mut LogWriter,
602 index: &IndexTable,
603 sub_index: usize,
604 existing_address: Address,
605 ) -> Result<PlanOutcome> {
606 let stats = if self.collect_stats { Some(&self.stats) } else { None };
607
608 let key = change.key();
609 let table_key = TableKey::Partial(*key);
610 match Column::write_existing_value_plan(
611 &table_key,
612 self.as_ref(&tables.value),
613 existing_address,
614 change,
615 log,
616 stats,
617 self.ref_counted,
618 )? {
619 (Some(outcome), _) => Ok(outcome),
620 (None, Some(value_address)) => {
621 let sub_index = if index.id == tables.index.id { Some(sub_index) } else { None };
624 tables.index.write_insert_plan(key, value_address, sub_index, log)
625 },
626 (None, None) => {
627 log::trace!(target: "parity-db", "{}: Removing from index {}", tables.index.id, hex(key));
628 index.write_remove_plan(key, sub_index, log)?;
629 Ok(PlanOutcome::Written)
630 },
631 }
632 }
633
634 fn write_plan_new<'a, 'b>(
635 &self,
636 mut tables: RwLockUpgradableReadGuard<'a, Tables>,
637 mut reindex: RwLockUpgradableReadGuard<'b, Reindex>,
638 key: &Key,
639 value: &[u8],
640 log: &mut LogWriter,
641 ) -> Result<(
642 PlanOutcome,
643 RwLockUpgradableReadGuard<'a, Tables>,
644 RwLockUpgradableReadGuard<'b, Reindex>,
645 )> {
646 let stats = self.collect_stats.then_some(&self.stats);
647 let table_key = TableKey::Partial(*key);
648 let address = Column::write_new_value_plan(
649 &table_key,
650 self.as_ref(&tables.value),
651 value,
652 log,
653 stats,
654 )?;
655 let mut outcome = PlanOutcome::Written;
656 while let PlanOutcome::NeedReindex =
657 tables.index.write_insert_plan(key, address, None, log)?
658 {
659 log::debug!(target: "parity-db", "{}: Index chunk full {}", tables.index.id, hex(key));
660 (tables, reindex) = Self::trigger_reindex(tables, reindex, self.path.as_path());
661 outcome = PlanOutcome::NeedReindex;
662 }
663 Ok((outcome, tables, reindex))
664 }
665
666 pub fn enact_plan(&self, action: LogAction, log: &mut LogReader) -> Result<()> {
667 let tables = self.tables.read();
668 let reindex = self.reindex.read();
669 match action {
670 LogAction::InsertIndex(record) => {
671 if tables.index.id == record.table {
672 tables.index.enact_plan(record.index, log)?;
673 } else if let Some(table) = reindex.queue.iter().find(|r| r.id == record.table) {
674 table.enact_plan(record.index, log)?;
675 } else {
676 log::debug!(
680 target: "parity-db",
681 "Missing index {}. Skipped",
682 record.table,
683 );
684 IndexTable::skip_plan(log)?;
685 }
686 },
687 LogAction::InsertValue(record) => {
688 tables.value[record.table.size_tier() as usize].enact_plan(record.index, log)?;
689 },
690 _ => return Err(Error::Corruption("Unexpected log action".into())),
694 }
695 Ok(())
696 }
697
698 pub fn validate_plan(&self, action: LogAction, log: &mut LogReader) -> Result<()> {
699 let tables = self.tables.upgradable_read();
700 let reindex = self.reindex.upgradable_read();
701 match action {
702 LogAction::InsertIndex(record) => {
703 if tables.index.id == record.table {
704 tables.index.validate_plan(record.index, log)?;
705 } else if let Some(table) = reindex.queue.iter().find(|r| r.id == record.table) {
706 table.validate_plan(record.index, log)?;
707 } else {
708 if record.table.index_bits() < tables.index.id.index_bits() {
709 log::warn!( target: "parity-db", "Index {} is too old. Current is {}", record.table, tables.index.id);
711 return Err(Error::Corruption("Unexpected log index id".to_string()))
712 }
713 log::warn!(
716 target: "parity-db",
717 "Missing table {}, starting reindex",
718 record.table,
719 );
720 let lock = Self::trigger_reindex(tables, reindex, self.path.as_path());
721 std::mem::drop(lock);
722 return self.validate_plan(LogAction::InsertIndex(record), log)
723 }
724 },
725 LogAction::InsertValue(record) => {
726 tables.value[record.table.size_tier() as usize].validate_plan(record.index, log)?;
727 },
728 _ => {
729 log::error!(target: "parity-db", "Unexpected log action");
730 return Err(Error::Corruption("Unexpected log action".to_string()))
731 },
732 }
733 Ok(())
734 }
735
736 pub fn complete_plan(&self, log: &mut LogWriter) -> Result<()> {
737 let tables = self.tables.read();
738 for t in tables.value.iter() {
739 t.complete_plan(log)?;
740 }
741 if self.collect_stats {
742 self.stats.commit()
743 }
744 Ok(())
745 }
746
747 pub fn refresh_metadata(&self) -> Result<()> {
748 let tables = self.tables.read();
749 for t in tables.value.iter() {
750 t.refresh_metadata()?;
751 }
752 Ok(())
753 }
754
755 pub fn write_stats_text(&self, writer: &mut impl std::io::Write) -> Result<()> {
756 let tables = self.tables.read();
757 tables.index.write_stats(&self.stats)?;
758 self.stats.write_stats_text(writer, tables.index.id.col()).map_err(Error::Io)
759 }
760
761 fn stat_summary(&self) -> ColumnStatSummary {
762 self.stats.summary()
763 }
764
765 fn clear_stats(&self) -> Result<()> {
766 let tables = self.tables.read();
767 self.stats.clear();
768 tables.index.write_stats(&self.stats)
769 }
770
771 pub fn iter_values(&self, log: &Log, mut f: impl FnMut(ValueIterState) -> bool) -> Result<()> {
772 let tables = self.tables.read();
773 for table in &tables.value {
774 log::debug!( target: "parity-db", "{}: Iterating table {}", tables.index.id, table.id);
775 table.iter_while(log.overlays(), |_, rc, value, compressed| {
776 let value = if compressed {
777 if let Ok(value) = self.compression.decompress(&value) {
778 value
779 } else {
780 return false
781 }
782 } else {
783 value
784 };
785 let state = ValueIterState { rc, value };
786 f(state)
787 })?;
788 log::debug!( target: "parity-db", "{}: Done iterating table {}", tables.index.id, table.id);
789 }
790 Ok(())
791 }
792
793 pub fn iter_index(&self, log: &Log, mut f: impl FnMut(IterState) -> bool) -> Result<()> {
794 let action = |state| match state {
795 IterStateOrCorrupted::Item(item) => Ok(f(item)),
796 IterStateOrCorrupted::Corrupted(..) =>
797 Err(Error::Corruption("Missing indexed value".into())),
798 };
799 self.iter_index_internal(log, action, 0)
800 }
801
802 fn iter_index_internal(
803 &self,
804 log: &Log,
805 mut f: impl FnMut(IterStateOrCorrupted) -> Result<bool>,
806 start_chunk: u64,
807 ) -> Result<()> {
808 let tables = self.tables.read();
809 let source = &tables.index;
810 let total_chunks = source.id.total_chunks();
811
812 for c in start_chunk..total_chunks {
813 let entries = source.entries(c, log.overlays())?;
814 for (sub_index, entry) in entries.iter().enumerate() {
815 if entry.is_empty() {
816 continue
817 }
818 let (size_tier, offset) = {
819 let address = entry.address(source.id.index_bits());
820 (address.size_tier(), address.offset())
821 };
822
823 let value = tables.value[size_tier as usize].get_with_meta(offset, log.overlays());
824 let (value, rc, pk, compressed) = match value {
825 Ok(Some(v)) => v,
826 Ok(None) => {
827 let value_entry = tables.value[size_tier as usize].dump_entry(offset).ok();
828 if !f(IterStateOrCorrupted::Corrupted(CorruptedIndexEntryInfo {
829 chunk_index: c,
830 sub_index: sub_index as u32,
831 value_entry,
832 entry: *entry,
833 error: None,
834 }))? {
835 return Ok(())
836 }
837 continue
838 },
839 Err(e) => {
840 let value_entry = if let Error::Corruption(_) = &e {
841 tables.value[size_tier as usize].dump_entry(offset).ok()
842 } else {
843 None
844 };
845 if !f(IterStateOrCorrupted::Corrupted(CorruptedIndexEntryInfo {
846 chunk_index: c,
847 sub_index: sub_index as u32,
848 value_entry,
849 entry: *entry,
850 error: Some(e),
851 }))? {
852 return Ok(())
853 }
854 continue
855 },
856 };
857 let mut key = source.recover_key_prefix(c, *entry);
858 key[6..].copy_from_slice(&pk);
859 let value = if compressed { self.compression.decompress(&value)? } else { value };
860 log::debug!(
861 target: "parity-db",
862 "{}: Iterating at {}/{}, key={:?}, pk={:?}",
863 source.id,
864 c,
865 source.id.total_chunks(),
866 hex(&key),
867 hex(&pk),
868 );
869 let state = IterStateOrCorrupted::Item(IterState {
870 item_index: c,
871 total_items: total_chunks,
872 key,
873 rc,
874 value,
875 });
876 if !f(state)? {
877 return Ok(())
878 }
879 }
880 }
881 Ok(())
882 }
883
884 fn iter_index_fast(
885 &self,
886 log: &Log,
887 mut f: impl FnMut(IterStateOrCorrupted) -> Result<bool>,
888 _start_chunk: u64,
889 ) -> Result<()> {
890 let tables = self.tables.read();
891 let index = &tables.index;
892
893 let entries = index.sorted_entries()?;
894 let total = entries.len();
895 for (sub_index, entry) in entries.into_iter().enumerate() {
896 let (size_tier, offset) = {
897 let address = entry.address(index.id.index_bits());
898 (address.size_tier(), address.offset())
899 };
900
901 let value = tables.value[size_tier as usize].get_with_meta(offset, log.overlays());
902 let (value, rc, pk, compressed) = match value {
903 Ok(Some(v)) => v,
904 Ok(None) => {
905 let value_entry = tables.value[size_tier as usize].dump_entry(offset).ok();
906 if !f(IterStateOrCorrupted::Corrupted(CorruptedIndexEntryInfo {
907 chunk_index: sub_index as u64,
908 sub_index: sub_index as u32,
909 value_entry,
910 entry,
911 error: None,
912 }))? {
913 return Ok(())
914 }
915 continue
916 },
917 Err(e) => {
918 let value_entry = if let Error::Corruption(_) = &e {
919 tables.value[size_tier as usize].dump_entry(offset).ok()
920 } else {
921 None
922 };
923 if !f(IterStateOrCorrupted::Corrupted(CorruptedIndexEntryInfo {
924 chunk_index: sub_index as u64,
925 sub_index: sub_index as u32,
926 value_entry,
927 entry,
928 error: Some(e),
929 }))? {
930 return Ok(())
931 }
932 continue
933 },
934 };
935 let value = if compressed { self.compression.decompress(&value)? } else { value };
936 log::debug!(
937 target: "parity-db",
938 "{}: Iterating at {}/{}, pk={:?}",
939 index.id,
940 sub_index,
941 total,
942 hex(&pk),
943 );
944 let state = IterStateOrCorrupted::Item(IterState {
945 item_index: sub_index as u64,
946 total_items: total as u64,
947 key: Default::default(),
948 rc,
949 value,
950 });
951 if !f(state)? {
952 return Ok(())
953 }
954 }
955 Ok(())
956 }
957
958 fn dump(&self, log: &Log, check_params: &crate::CheckOptions, col: ColId) -> Result<()> {
959 let start_chunk = check_params.from.unwrap_or(0);
960 let end_chunk = check_params.bound;
961
962 let step = if check_params.fast { 1_000_000 } else { 10_000 };
963 let (denom, suffix) = if check_params.fast { (1_000_000, "m") } else { (1_000, "k") };
964 let mut next_info_at = step;
965 let start_time = std::time::Instant::now();
966 let index_id = self.tables.read().index.id;
967 log::info!(target: "parity-db", "Column {} (hash): Starting index validation", col);
968 let iter_fn =
969 if check_params.fast { Self::iter_index_fast } else { Self::iter_index_internal };
970 iter_fn(
971 self,
972 log,
973 |state| match state {
974 IterStateOrCorrupted::Item(IterState {
975 item_index,
976 total_items,
977 key,
978 rc,
979 value,
980 }) => {
981 if Some(item_index) == end_chunk {
982 return Ok(false)
983 }
984 if item_index >= next_info_at {
985 next_info_at += step;
986 log::info!(target: "parity-db", "Validated {}{} / {}{} entries", item_index / denom, suffix, total_items / denom, suffix);
987 }
988
989 match check_params.display {
990 CheckDisplay::Full => {
991 log::info!(
992 "Index key: {:x?}\n \
993 \tRc: {}",
994 &key,
995 rc,
996 );
997 log::info!("Value: {}", hex(&value));
998 },
999 CheckDisplay::Short(t) => {
1000 log::info!("Index key: {:x?}", &key);
1001 log::info!("Rc: {}, Value len: {}", rc, value.len());
1002 log::info!(
1003 "Value: {}",
1004 hex(&value[..std::cmp::min(t as usize, value.len())])
1005 );
1006 },
1007 CheckDisplay::None => (),
1008 }
1009 Ok(true)
1010 },
1011 IterStateOrCorrupted::Corrupted(c) => {
1012 log::error!(
1013 "Corrupted value for index entry: [{}][{}]: {} ({:?}). Error: {:?}",
1014 c.chunk_index,
1015 c.sub_index,
1016 c.entry.address(index_id.index_bits()),
1017 hex(&c.entry.as_u64().to_le_bytes()),
1018 c.error,
1019 );
1020 if let Some(v) = c.value_entry {
1021 log::error!("Value entry: {:?}", hex(v.as_slice()));
1022 }
1023 Ok(true)
1024 },
1025 },
1026 start_chunk,
1027 )?;
1028
1029 log::info!(target: "parity-db", "Index validation complete successfully, elapsed {:?}", start_time.elapsed());
1030 if check_params.validate_free_refs {
1031 log::info!(target: "parity-db", "Validating free refs");
1032 let tables = self.tables.read();
1033 let mut total = 0;
1034 for t in &tables.value {
1035 match t.check_free_refs() {
1036 Err(e) => log::warn!(target: "parity-db", "{}: Error: {:?}", t.id, e),
1037 Ok(n) => total += n,
1038 }
1039 }
1040 log::info!(target: "parity-db", "{} Total free refs", total);
1041 }
1042 Ok(())
1043 }
1044
1045 pub fn reindex(&self, log: &Log) -> Result<ReindexBatch> {
1046 let tables = self.tables.read();
1047 let reindex = self.reindex.read();
1048 let mut plan = Vec::new();
1049 let mut drop_index = None;
1050 if let Some(source) = reindex.queue.front() {
1051 let progress = reindex.progress.load(Ordering::Relaxed);
1052 if progress != source.id.total_chunks() {
1053 let mut source_index = progress;
1054 if source_index % 500 == 0 {
1055 log::debug!(target: "parity-db", "{}: Reindexing at {}/{}", tables.index.id, source_index, source.id.total_chunks());
1056 }
1057 log::debug!(target: "parity-db", "{}: Continue reindex at {}/{}", tables.index.id, source_index, source.id.total_chunks());
1058 while source_index < source.id.total_chunks() && plan.len() < MAX_REINDEX_BATCH {
1059 log::trace!(target: "parity-db", "{}: Reindexing {}", source.id, source_index);
1060 let entries = source.entries(source_index, log.overlays())?;
1061 for entry in entries.iter() {
1062 if entry.is_empty() {
1063 continue
1064 }
1065 let key = source.recover_key_prefix(source_index, *entry);
1067 plan.push((key, entry.address(source.id.index_bits())))
1068 }
1069 source_index += 1;
1070 }
1071 log::trace!(target: "parity-db", "{}: End reindex batch {} ({})", tables.index.id, source_index, plan.len());
1072 reindex.progress.store(source_index, Ordering::Relaxed);
1073 if source_index == source.id.total_chunks() {
1074 log::info!(target: "parity-db", "Completed reindex {} into {}", source.id, tables.index.id);
1075 drop_index = Some(source.id);
1076 }
1077 }
1078 }
1079 Ok(ReindexBatch { drop_index, batch: plan })
1080 }
1081
1082 pub fn drop_index(&self, id: IndexTableId) -> Result<()> {
1083 log::debug!(target: "parity-db", "Dropping {}", id);
1084 let mut reindex = self.reindex.write();
1085 if reindex.queue.front_mut().map_or(false, |index| index.id == id) {
1086 let table = reindex.queue.pop_front();
1087 reindex.progress.store(0, Ordering::Relaxed);
1088 table.unwrap().drop_file()?;
1089 } else {
1090 log::warn!(target: "parity-db", "Dropping invalid index {}", id);
1091 return Ok(())
1092 }
1093 log::debug!(target: "parity-db", "Dropped {}", id);
1094 Ok(())
1095 }
1096}
1097
1098impl Column {
1099 pub fn write_existing_value_plan<K, V: AsRef<[u8]>>(
1100 key: &TableKey,
1101 tables: TablesRef,
1102 address: Address,
1103 change: &Operation<K, V>,
1104 log: &mut LogWriter,
1105 stats: Option<&ColumnStats>,
1106 ref_counted: bool,
1107 ) -> Result<(Option<PlanOutcome>, Option<Address>)> {
1108 let tier = address.size_tier() as usize;
1109
1110 let fetch_size = || -> Result<(u32, u32)> {
1111 let (cur_size, compressed) =
1112 tables.tables[tier].size(key, address.offset(), log)?.unwrap_or((0, false));
1113 Ok(if compressed {
1114 let compressed = tables.tables[tier]
1116 .get(key, address.offset(), log)?
1117 .expect("Same query as size")
1118 .0;
1119 let uncompressed = tables.compression.decompress(compressed.as_slice())?;
1120
1121 (cur_size, uncompressed.len() as u32)
1122 } else {
1123 (cur_size, cur_size)
1124 })
1125 };
1126
1127 match change {
1128 Operation::Reference(_) =>
1129 if ref_counted {
1130 log::trace!(target: "parity-db", "{}: Increment ref {}", tables.col, key);
1131 tables.tables[tier].write_inc_ref(address.offset(), log)?;
1132 if let Some(stats) = stats {
1133 stats.reference_increase();
1134 }
1135 Ok((Some(PlanOutcome::Written), None))
1136 } else {
1137 Ok((Some(PlanOutcome::Skipped), None))
1138 },
1139 Operation::Set(_, val) => {
1140 if ref_counted {
1141 log::trace!(target: "parity-db", "{}: Increment ref {}", tables.col, key);
1142 tables.tables[tier].write_inc_ref(address.offset(), log)?;
1143 return Ok((Some(PlanOutcome::Written), None))
1144 }
1145 if tables.preimage {
1146 return Ok((Some(PlanOutcome::Skipped), None))
1148 }
1149
1150 let (cval, target_tier) =
1151 Column::compress(tables.compression, key, val.as_ref(), tables.tables);
1152 let (cval, compressed) = cval
1153 .as_ref()
1154 .map(|cval| (cval.as_slice(), true))
1155 .unwrap_or((val.as_ref(), false));
1156
1157 if let Some(stats) = stats {
1158 let (cur_size, uncompressed) = fetch_size()?;
1159 stats.replace_val(
1160 cur_size,
1161 uncompressed,
1162 val.as_ref().len() as u32,
1163 cval.len() as u32,
1164 );
1165 }
1166 if tier == target_tier {
1167 log::trace!(target: "parity-db", "{}: Replacing {}", tables.col, key);
1168 tables.tables[target_tier].write_replace_plan(
1169 address.offset(),
1170 key,
1171 cval,
1172 log,
1173 compressed,
1174 )?;
1175 Ok((Some(PlanOutcome::Written), None))
1176 } else {
1177 log::trace!(target: "parity-db", "{}: Replacing in a new table {}", tables.col, key);
1178 tables.tables[tier].write_remove_plan(address.offset(), log)?;
1179 let new_offset =
1180 tables.tables[target_tier].write_insert_plan(key, cval, log, compressed)?;
1181 let new_address = Address::new(new_offset, target_tier as u8);
1182 Ok((None, Some(new_address)))
1183 }
1184 },
1185 Operation::Dereference(_) => {
1186 let cur_size = if stats.is_some() { Some(fetch_size()?) } else { None };
1188 let remove = if ref_counted {
1189 let removed = !tables.tables[tier].write_dec_ref(address.offset(), log)?;
1190 log::trace!(target: "parity-db", "{}: Dereference {}, deleted={}", tables.col, key, removed);
1191 removed
1192 } else {
1193 log::trace!(target: "parity-db", "{}: Deleting {}", tables.col, key);
1194 tables.tables[tier].write_remove_plan(address.offset(), log)?;
1195 true
1196 };
1197 if remove {
1198 if let Some((compressed_size, uncompressed_size)) = cur_size {
1199 if let Some(stats) = stats {
1200 stats.remove_val(uncompressed_size, compressed_size);
1201 }
1202 }
1203 Ok((None, None))
1204 } else {
1205 Ok((Some(PlanOutcome::Written), None))
1206 }
1207 },
1208 }
1209 }
1210
1211 pub fn write_new_value_plan(
1212 key: &TableKey,
1213 tables: TablesRef,
1214 val: &[u8],
1215 log: &mut LogWriter,
1216 stats: Option<&ColumnStats>,
1217 ) -> Result<Address> {
1218 let (cval, target_tier) = Column::compress(tables.compression, key, val, tables.tables);
1219 let (cval, compressed) =
1220 cval.as_ref().map(|cval| (cval.as_slice(), true)).unwrap_or((val, false));
1221
1222 log::trace!(target: "parity-db", "{}: Inserting new {}, size = {}", tables.col, key, cval.len());
1223 let offset = tables.tables[target_tier].write_insert_plan(key, cval, log, compressed)?;
1224 let address = Address::new(offset, target_tier as u8);
1225
1226 if let Some(stats) = stats {
1227 stats.insert_val(val.len() as u32, cval.len() as u32);
1228 }
1229 Ok(address)
1230 }
1231
1232 pub fn complete_plan(&self, log: &mut LogWriter) -> Result<()> {
1233 match self {
1234 Column::Hash(column) => column.complete_plan(log),
1235 Column::Tree(column) => column.complete_plan(log),
1236 }
1237 }
1238
1239 pub fn validate_plan(&self, action: LogAction, log: &mut LogReader) -> Result<()> {
1240 match self {
1241 Column::Hash(column) => column.validate_plan(action, log),
1242 Column::Tree(column) => column.validate_plan(action, log),
1243 }
1244 }
1245
1246 pub fn enact_plan(&self, action: LogAction, log: &mut LogReader) -> Result<()> {
1247 match self {
1248 Column::Hash(column) => column.enact_plan(action, log),
1249 Column::Tree(column) => column.enact_plan(action, log),
1250 }
1251 }
1252
1253 pub fn flush(&self) -> Result<()> {
1254 match self {
1255 Column::Hash(column) => column.flush(),
1256 Column::Tree(column) => column.flush(),
1257 }
1258 }
1259
1260 pub fn refresh_metadata(&self) -> Result<()> {
1261 match self {
1262 Column::Hash(column) => column.refresh_metadata(),
1263 Column::Tree(column) => column.refresh_metadata(),
1264 }
1265 }
1266
1267 pub fn write_stats_text(&self, writer: &mut impl std::io::Write) -> Result<()> {
1268 match self {
1269 Column::Hash(column) => column.write_stats_text(writer),
1270 Column::Tree(_column) => Ok(()),
1271 }
1272 }
1273
1274 pub fn clear_stats(&self) -> Result<()> {
1275 match self {
1276 Column::Hash(column) => column.clear_stats(),
1277 Column::Tree(_column) => Ok(()),
1278 }
1279 }
1280
1281 pub fn stats(&self) -> Option<ColumnStatSummary> {
1282 match self {
1283 Column::Hash(column) => Some(column.stat_summary()),
1284 Column::Tree(_column) => None,
1285 }
1286 }
1287
1288 pub fn dump(&self, log: &Log, check_params: &crate::CheckOptions, col: ColId) -> Result<()> {
1289 match self {
1290 Column::Hash(column) => column.dump(log, check_params, col),
1291 Column::Tree(_column) => Ok(()),
1292 }
1293 }
1294
1295 #[cfg(test)]
1296 #[cfg(feature = "instrumentation")]
1297 pub fn index_bits(&self) -> Option<u8> {
1298 match self {
1299 Column::Hash(column) => Some(column.tables.read().index.id.index_bits()),
1300 Column::Tree(_column) => None,
1301 }
1302 }
1303}