parity_db/btree/
mod.rs

1// Copyright 2021-2022 Parity Technologies (UK) Ltd.
2// This file is dual-licensed as Apache-2.0 or MIT.
3
4use crate::{
5	btree::{btree::BTree, node::Node},
6	column::{ColId, Column, TablesRef},
7	compress::Compress,
8	error::{Error, Result},
9	index::Address,
10	log::{LogAction, LogQuery, LogReader, LogWriter},
11	options::{Metadata, Options, DEFAULT_COMPRESSION_THRESHOLD},
12	parking_lot::RwLock,
13	table::{
14		key::{TableKey, TableKeyQuery},
15		Entry as ValueTableEntry, Value, ValueTable,
16	},
17	Operation,
18};
19pub use iter::{BTreeIterator, LastKey};
20use node::SeparatorInner;
21
22#[allow(clippy::module_inception)]
23mod btree;
24mod iter;
25mod node;
26
27const ORDER: usize = 8;
28const ORDER_CHILD: usize = ORDER + 1;
29const NULL_ADDRESS: Address = Address::from_u64(0);
30const HEADER_SIZE: u64 = 8 + 4;
31const HEADER_ADDRESS: Address = {
32	debug_assert!(HEADER_SIZE < crate::table::MIN_ENTRY_SIZE as u64);
33	Address::new(1, 0)
34};
35const MAX_KEYSIZE_ENCODED_SIZE: usize = 33;
36const ENTRY_CAPACITY: usize = ORDER * MAX_KEYSIZE_ENCODED_SIZE + ORDER * 8 + ORDER_CHILD * 8;
37
38#[derive(Debug, PartialEq, Eq, Clone, Copy)]
39pub enum IterDirection {
40	Backward,
41	Forward,
42}
43
44#[derive(Clone, PartialEq, Eq)]
45pub struct BTreeHeader {
46	pub root: Address,
47	pub depth: u32,
48}
49
50struct Entry {
51	encoded: ValueTableEntry<Vec<u8>>,
52}
53
54impl Entry {
55	fn empty() -> Self {
56		Self::from_encoded(Vec::with_capacity(ENTRY_CAPACITY))
57	}
58
59	fn from_encoded(enc: Vec<u8>) -> Self {
60		Entry { encoded: ValueTableEntry::new(enc) }
61	}
62
63	fn read_separator(&mut self) -> Result<Option<SeparatorInner>> {
64		if self.encoded.offset() == self.encoded.inner_mut().len() {
65			return Ok(None)
66		}
67		self.encoded
68			.check_remaining_len(8 + 1, || Error::Corruption("Unaligned separator".into()))?;
69		let value = self.encoded.read_u64();
70		let head = self.encoded.read_slice(1);
71		let head = head[0];
72		let size = if head == u8::MAX {
73			self.encoded
74				.check_remaining_len(4, || Error::Corruption("Cannot read size of key".into()))?;
75			self.encoded.read_u32() as usize
76		} else {
77			head as usize
78		};
79		self.encoded.check_remaining_len(size, || {
80			Error::Corruption(format!("Entry too small for key of len {}", size))
81		})?;
82		let key = self.encoded.read_slice(size).to_vec();
83		if value == 0 {
84			return Ok(None)
85		}
86		let value = Address::from_u64(value);
87		Ok(Some(SeparatorInner { key, value }))
88	}
89
90	fn write_separator(&mut self, key: &[u8], value: Address) {
91		let size = key.len();
92		let inner_size = self.encoded.inner_mut().len();
93		if size >= u8::MAX as usize {
94			self.encoded.inner_mut().resize(inner_size + 8 + 1 + 4 + size, 0);
95		} else {
96			self.encoded.inner_mut().resize(inner_size + 8 + 1 + size, 0);
97		};
98		self.encoded.write_u64(value.as_u64());
99		if size >= u8::MAX as usize {
100			self.encoded.write_slice(&[u8::MAX]);
101			self.encoded.write_u32(size as u32);
102		} else {
103			self.encoded.write_slice(&[size as u8]);
104		}
105		self.encoded.write_slice(key);
106	}
107
108	fn read_child_index(&mut self) -> Result<Option<Address>> {
109		self.encoded
110			.check_remaining_len(8, || Error::Corruption("Entry too small for Index".into()))?;
111		let index = self.encoded.read_u64();
112		Ok(if index == 0 { None } else { Some(Address::from_u64(index)) })
113	}
114
115	fn write_child_index(&mut self, index: Address) {
116		let inner_size = self.encoded.inner_mut().len();
117		self.encoded.inner_mut().resize(inner_size + 8, 0);
118		self.encoded.write_u64(index.as_u64());
119	}
120
121	fn write_header(&mut self, btree_header: &BTreeHeader) {
122		self.encoded.set_offset(0);
123		self.encoded.inner_mut().resize(8 + 4, 0);
124		self.encoded.write_u64(btree_header.root.as_u64());
125		self.encoded.write_u32(btree_header.depth);
126	}
127}
128
129#[derive(Debug)]
130pub struct BTreeTable {
131	id: ColId,
132	tables: RwLock<Vec<ValueTable>>,
133	ref_counted: bool,
134	compression: Compress,
135}
136
137impl BTreeTable {
138	pub fn open(
139		id: ColId,
140		values: Vec<ValueTable>,
141		options: &Options,
142		metadata: &Metadata,
143	) -> Result<Self> {
144		let size_tier = HEADER_ADDRESS.size_tier() as usize;
145		if !values[size_tier].is_init() {
146			let btree_header = BTreeHeader { root: NULL_ADDRESS, depth: 0 };
147			let mut entry = Entry::empty();
148			entry.write_header(&btree_header);
149			values[size_tier].init_with_entry(&*entry.encoded.inner_mut())?;
150		}
151		let col_options = &metadata.columns[id as usize];
152		Ok(BTreeTable {
153			id,
154			tables: RwLock::new(values),
155			ref_counted: col_options.ref_counted,
156			compression: Compress::new(
157				col_options.compression,
158				options
159					.compression_threshold
160					.get(&id)
161					.copied()
162					.unwrap_or(DEFAULT_COMPRESSION_THRESHOLD),
163			),
164		})
165	}
166
167	fn btree_header(log: &impl LogQuery, values: TablesRef) -> Result<BTreeHeader> {
168		let mut root = NULL_ADDRESS;
169		let mut depth = 0;
170		let key_query = TableKeyQuery::Fetch(None);
171		if let Some(encoded) = Column::get_value(key_query, HEADER_ADDRESS, values, log)? {
172			let mut buf: ValueTableEntry<Vec<u8>> = ValueTableEntry::new(encoded.1);
173			buf.check_remaining_len(8 + 4, || Error::Corruption("Invalid header length.".into()))?;
174			root = Address::from_u64(buf.read_u64());
175			depth = buf.read_u32();
176		}
177		Ok(BTreeHeader { root, depth })
178	}
179
180	fn get_at_value_index(
181		&self,
182		key: TableKeyQuery,
183		address: Address,
184		log: &impl LogQuery,
185	) -> Result<Option<(u8, Value)>> {
186		let tables = self.tables.read();
187		let btree = self.locked(&tables);
188		Column::get_value(key, address, btree, log)
189	}
190
191	pub fn flush(&self) -> Result<()> {
192		let tables = self.tables.read();
193		for t in tables.iter() {
194			t.flush()?;
195		}
196		Ok(())
197	}
198
199	pub fn get(key: &[u8], log: &impl LogQuery, values: TablesRef) -> Result<Option<Vec<u8>>> {
200		let btree_header = Self::btree_header(log, values)?;
201		if btree_header.root == NULL_ADDRESS {
202			return Ok(None)
203		}
204		let record_id = 0; // lifetime of Btree is the query, so no invalidate.
205				   // keeping log locked when parsing tree.
206		let tree = BTree::new(Some(btree_header.root), btree_header.depth, record_id);
207		tree.get(key, values, log)
208	}
209
210	fn get_encoded_entry(at: Address, log: &impl LogQuery, tables: TablesRef) -> Result<Vec<u8>> {
211		let key_query = TableKeyQuery::Check(&TableKey::NoHash);
212		if let Some((_tier, value)) = Column::get_value(key_query, at, tables, log)? {
213			Ok(value)
214		} else {
215			Err(Error::Corruption(format!("Missing btree entry at {at}")))
216		}
217	}
218
219	fn locked<'a>(&'a self, tables: &'a [ValueTable]) -> TablesRef<'a> {
220		TablesRef {
221			tables,
222			ref_counted: self.ref_counted,
223			preimage: false,
224			compression: &self.compression,
225			col: self.id,
226		}
227	}
228
229	pub fn with_locked<R>(&self, mut apply: impl FnMut(TablesRef) -> Result<R>) -> Result<R> {
230		let locked_tables = &*self.tables.read();
231		let locked = self.locked(locked_tables);
232		apply(locked)
233	}
234
235	pub fn enact_plan(&self, action: LogAction, log: &mut LogReader) -> Result<()> {
236		let tables = self.tables.read();
237		match action {
238			LogAction::InsertValue(record) => {
239				tables[record.table.size_tier() as usize].enact_plan(record.index, log)?;
240			},
241			_ => panic!("Unexpected log action"),
242		}
243		Ok(())
244	}
245
246	pub fn validate_plan(&self, action: LogAction, log: &mut LogReader) -> Result<()> {
247		let tables = self.tables.upgradable_read();
248		match action {
249			LogAction::InsertValue(record) => {
250				tables[record.table.size_tier() as usize].validate_plan(record.index, log)?;
251			},
252			_ => {
253				log::error!(target: "parity-db", "Unexpected log action");
254				return Err(Error::Corruption("Unexpected log action".to_string()))
255			},
256		}
257		Ok(())
258	}
259
260	pub fn complete_plan(&self, log: &mut LogWriter) -> Result<()> {
261		let tables = self.tables.read();
262		for t in tables.iter() {
263			t.complete_plan(log)?;
264		}
265		Ok(())
266	}
267
268	pub fn refresh_metadata(&self) -> Result<()> {
269		let tables = self.tables.read();
270		for t in tables.iter() {
271			t.refresh_metadata()?;
272		}
273		Ok(())
274	}
275
276	fn write_plan_remove_node(
277		tables: TablesRef,
278		writer: &mut LogWriter,
279		node_index: Address,
280	) -> Result<()> {
281		Column::write_existing_value_plan::<_, Vec<u8>>(
282			&TableKey::NoHash,
283			tables,
284			node_index,
285			&Operation::Dereference(()),
286			writer,
287			None,
288			false,
289		)?;
290		Ok(())
291	}
292
293	fn write_node_plan(
294		mut tables: TablesRef,
295		mut node: Node,
296		writer: &mut LogWriter,
297		node_id: Option<Address>,
298	) -> Result<Option<Address>> {
299		for child in node.children.as_mut().iter_mut() {
300			if child.moved {
301				node.changed = true;
302			} else if child.entry_index.is_none() {
303				break
304			}
305		}
306
307		for separator in node.separators.as_mut().iter_mut() {
308			if separator.modified {
309				node.changed = true;
310			} else if separator.separator.is_none() {
311				break
312			}
313		}
314
315		if !node.changed {
316			return Ok(None)
317		}
318
319		let mut entry = Entry::empty();
320		let mut i_children = 0;
321		let mut i_separator = 0;
322		loop {
323			if let Some(index) = node.children.as_mut()[i_children].entry_index {
324				entry.write_child_index(index);
325			} else {
326				entry.write_child_index(NULL_ADDRESS);
327			}
328			i_children += 1;
329			if i_children == ORDER_CHILD {
330				break
331			}
332			if let Some(sep) = &node.separators.as_mut()[i_separator].separator {
333				entry.write_separator(&sep.key, sep.value);
334				i_separator += 1
335			} else {
336				break
337			}
338		}
339
340		let old_comp = tables.compression;
341		tables.compression = &crate::compress::NO_COMPRESSION;
342		let result = Ok(if let Some(existing) = node_id {
343			let k = TableKey::NoHash;
344			if let (_, Some(new_index)) = Column::write_existing_value_plan(
345				&k,
346				tables,
347				existing,
348				&Operation::Set((), entry.encoded),
349				writer,
350				None,
351				false,
352			)? {
353				Some(new_index)
354			} else {
355				None
356			}
357		} else {
358			let k = TableKey::NoHash;
359			Some(Column::write_new_value_plan(&k, tables, entry.encoded.as_ref(), writer, None)?)
360		});
361		tables.compression = old_comp;
362
363		result
364	}
365}
366
367pub mod commit_overlay {
368	use super::*;
369	use crate::{
370		column::{ColId, Column},
371		db::{BTreeCommitOverlay, Operation, RcKey, RcValue},
372		error::Result,
373	};
374
375	#[derive(Debug)]
376	pub struct BTreeChangeSet {
377		pub col: ColId,
378		pub changes: Vec<Operation<RcKey, RcValue>>,
379	}
380
381	impl BTreeChangeSet {
382		pub fn new(col: ColId) -> Self {
383			BTreeChangeSet { col, changes: Default::default() }
384		}
385
386		pub fn push(&mut self, change: Operation<Value, Value>) {
387			// No key hashing
388			self.changes.push(match change {
389				Operation::Set(k, v) => Operation::Set(k.into(), v.into()),
390				Operation::Dereference(k) => Operation::Dereference(k.into()),
391				Operation::Reference(k) => Operation::Reference(k.into()),
392			});
393		}
394
395		pub fn copy_to_overlay(
396			&self,
397			overlay: &mut BTreeCommitOverlay,
398			record_id: u64,
399			bytes: &mut usize,
400			options: &Options,
401		) -> Result<()> {
402			let ref_counted = options.columns[self.col as usize].ref_counted;
403			for change in self.changes.iter() {
404				match change {
405					Operation::Set(key, value) => {
406						*bytes += key.value().len();
407						*bytes += value.value().len();
408						overlay.insert(key.clone(), (record_id, Some(value.clone())));
409					},
410					Operation::Dereference(key) => {
411						// Don't add removed ref-counted values to overlay.
412						// (current ref_counted implementation does not
413						// make much sense for btree indexed content).
414						if !ref_counted {
415							*bytes += key.value().len();
416							overlay.insert(key.clone(), (record_id, None));
417						}
418					},
419					Operation::Reference(..) => {
420						// Don't add (we allow remove value in overlay when using rc: some
421						// indexing on top of it is expected).
422						if !ref_counted {
423							return Err(Error::InvalidInput(format!(
424								"No Rc for column {}",
425								self.col
426							)))
427						}
428					},
429				}
430			}
431			Ok(())
432		}
433
434		pub fn clean_overlay(&mut self, overlay: &mut BTreeCommitOverlay, record_id: u64) {
435			use std::collections::btree_map::Entry;
436			for change in self.changes.drain(..) {
437				let key = change.into_key();
438				if let Entry::Occupied(e) = overlay.entry(key) {
439					if e.get().0 == record_id {
440						e.remove_entry();
441					}
442				}
443			}
444		}
445
446		pub fn write_plan(
447			&mut self,
448			btree: &BTreeTable,
449			writer: &mut LogWriter,
450			ops: &mut u64,
451		) -> Result<()> {
452			let record_id = writer.record_id();
453
454			let locked_tables = btree.tables.read();
455			let locked = btree.locked(&locked_tables);
456			let mut tree = BTree::open(locked, writer, record_id)?;
457
458			let mut btree_header =
459				BTreeHeader { root: tree.root_index.unwrap_or(NULL_ADDRESS), depth: tree.depth };
460			let old_btree_header = btree_header.clone();
461
462			self.changes.sort();
463			tree.write_sorted_changes(self.changes.as_slice(), locked, writer)?;
464			*ops += self.changes.len() as u64;
465
466			btree_header.root = tree.root_index.unwrap_or(NULL_ADDRESS);
467			btree_header.depth = tree.depth;
468
469			if old_btree_header != btree_header {
470				let mut entry = Entry::empty();
471				entry.write_header(&btree_header);
472				Column::write_existing_value_plan(
473					&TableKey::NoHash,
474					locked,
475					HEADER_ADDRESS,
476					&Operation::Set((), &entry.encoded.as_ref()[..HEADER_SIZE as usize]),
477					writer,
478					None,
479					false,
480				)?;
481			}
482			#[cfg(test)]
483			tree.is_balanced(locked, writer)?;
484			Ok(())
485		}
486	}
487}