1use crate::{
5 btree::{btree::BTree, node::Node},
6 column::{ColId, Column, TablesRef},
7 compress::Compress,
8 error::{Error, Result},
9 index::Address,
10 log::{LogAction, LogQuery, LogReader, LogWriter},
11 options::{Metadata, Options, DEFAULT_COMPRESSION_THRESHOLD},
12 parking_lot::RwLock,
13 table::{
14 key::{TableKey, TableKeyQuery},
15 Entry as ValueTableEntry, Value, ValueTable,
16 },
17 Operation,
18};
19pub use iter::{BTreeIterator, LastKey};
20use node::SeparatorInner;
21
22#[allow(clippy::module_inception)]
23mod btree;
24mod iter;
25mod node;
26
27const ORDER: usize = 8;
28const ORDER_CHILD: usize = ORDER + 1;
29const NULL_ADDRESS: Address = Address::from_u64(0);
30const HEADER_SIZE: u64 = 8 + 4;
31const HEADER_ADDRESS: Address = {
32 debug_assert!(HEADER_SIZE < crate::table::MIN_ENTRY_SIZE as u64);
33 Address::new(1, 0)
34};
35const MAX_KEYSIZE_ENCODED_SIZE: usize = 33;
36const ENTRY_CAPACITY: usize = ORDER * MAX_KEYSIZE_ENCODED_SIZE + ORDER * 8 + ORDER_CHILD * 8;
37
38#[derive(Debug, PartialEq, Eq, Clone, Copy)]
39pub enum IterDirection {
40 Backward,
41 Forward,
42}
43
44#[derive(Clone, PartialEq, Eq)]
45pub struct BTreeHeader {
46 pub root: Address,
47 pub depth: u32,
48}
49
50struct Entry {
51 encoded: ValueTableEntry<Vec<u8>>,
52}
53
54impl Entry {
55 fn empty() -> Self {
56 Self::from_encoded(Vec::with_capacity(ENTRY_CAPACITY))
57 }
58
59 fn from_encoded(enc: Vec<u8>) -> Self {
60 Entry { encoded: ValueTableEntry::new(enc) }
61 }
62
63 fn read_separator(&mut self) -> Result<Option<SeparatorInner>> {
64 if self.encoded.offset() == self.encoded.inner_mut().len() {
65 return Ok(None)
66 }
67 self.encoded
68 .check_remaining_len(8 + 1, || Error::Corruption("Unaligned separator".into()))?;
69 let value = self.encoded.read_u64();
70 let head = self.encoded.read_slice(1);
71 let head = head[0];
72 let size = if head == u8::MAX {
73 self.encoded
74 .check_remaining_len(4, || Error::Corruption("Cannot read size of key".into()))?;
75 self.encoded.read_u32() as usize
76 } else {
77 head as usize
78 };
79 self.encoded.check_remaining_len(size, || {
80 Error::Corruption(format!("Entry too small for key of len {}", size))
81 })?;
82 let key = self.encoded.read_slice(size).to_vec();
83 if value == 0 {
84 return Ok(None)
85 }
86 let value = Address::from_u64(value);
87 Ok(Some(SeparatorInner { key, value }))
88 }
89
90 fn write_separator(&mut self, key: &[u8], value: Address) {
91 let size = key.len();
92 let inner_size = self.encoded.inner_mut().len();
93 if size >= u8::MAX as usize {
94 self.encoded.inner_mut().resize(inner_size + 8 + 1 + 4 + size, 0);
95 } else {
96 self.encoded.inner_mut().resize(inner_size + 8 + 1 + size, 0);
97 };
98 self.encoded.write_u64(value.as_u64());
99 if size >= u8::MAX as usize {
100 self.encoded.write_slice(&[u8::MAX]);
101 self.encoded.write_u32(size as u32);
102 } else {
103 self.encoded.write_slice(&[size as u8]);
104 }
105 self.encoded.write_slice(key);
106 }
107
108 fn read_child_index(&mut self) -> Result<Option<Address>> {
109 self.encoded
110 .check_remaining_len(8, || Error::Corruption("Entry too small for Index".into()))?;
111 let index = self.encoded.read_u64();
112 Ok(if index == 0 { None } else { Some(Address::from_u64(index)) })
113 }
114
115 fn write_child_index(&mut self, index: Address) {
116 let inner_size = self.encoded.inner_mut().len();
117 self.encoded.inner_mut().resize(inner_size + 8, 0);
118 self.encoded.write_u64(index.as_u64());
119 }
120
121 fn write_header(&mut self, btree_header: &BTreeHeader) {
122 self.encoded.set_offset(0);
123 self.encoded.inner_mut().resize(8 + 4, 0);
124 self.encoded.write_u64(btree_header.root.as_u64());
125 self.encoded.write_u32(btree_header.depth);
126 }
127}
128
129#[derive(Debug)]
130pub struct BTreeTable {
131 id: ColId,
132 tables: RwLock<Vec<ValueTable>>,
133 ref_counted: bool,
134 compression: Compress,
135}
136
137impl BTreeTable {
138 pub fn open(
139 id: ColId,
140 values: Vec<ValueTable>,
141 options: &Options,
142 metadata: &Metadata,
143 ) -> Result<Self> {
144 let size_tier = HEADER_ADDRESS.size_tier() as usize;
145 if !values[size_tier].is_init() {
146 let btree_header = BTreeHeader { root: NULL_ADDRESS, depth: 0 };
147 let mut entry = Entry::empty();
148 entry.write_header(&btree_header);
149 values[size_tier].init_with_entry(&*entry.encoded.inner_mut())?;
150 }
151 let col_options = &metadata.columns[id as usize];
152 Ok(BTreeTable {
153 id,
154 tables: RwLock::new(values),
155 ref_counted: col_options.ref_counted,
156 compression: Compress::new(
157 col_options.compression,
158 options
159 .compression_threshold
160 .get(&id)
161 .copied()
162 .unwrap_or(DEFAULT_COMPRESSION_THRESHOLD),
163 ),
164 })
165 }
166
167 fn btree_header(log: &impl LogQuery, values: TablesRef) -> Result<BTreeHeader> {
168 let mut root = NULL_ADDRESS;
169 let mut depth = 0;
170 let key_query = TableKeyQuery::Fetch(None);
171 if let Some(encoded) = Column::get_value(key_query, HEADER_ADDRESS, values, log)? {
172 let mut buf: ValueTableEntry<Vec<u8>> = ValueTableEntry::new(encoded.1);
173 buf.check_remaining_len(8 + 4, || Error::Corruption("Invalid header length.".into()))?;
174 root = Address::from_u64(buf.read_u64());
175 depth = buf.read_u32();
176 }
177 Ok(BTreeHeader { root, depth })
178 }
179
180 fn get_at_value_index(
181 &self,
182 key: TableKeyQuery,
183 address: Address,
184 log: &impl LogQuery,
185 ) -> Result<Option<(u8, Value)>> {
186 let tables = self.tables.read();
187 let btree = self.locked(&tables);
188 Column::get_value(key, address, btree, log)
189 }
190
191 pub fn flush(&self) -> Result<()> {
192 let tables = self.tables.read();
193 for t in tables.iter() {
194 t.flush()?;
195 }
196 Ok(())
197 }
198
199 pub fn get(key: &[u8], log: &impl LogQuery, values: TablesRef) -> Result<Option<Vec<u8>>> {
200 let btree_header = Self::btree_header(log, values)?;
201 if btree_header.root == NULL_ADDRESS {
202 return Ok(None)
203 }
204 let record_id = 0; let tree = BTree::new(Some(btree_header.root), btree_header.depth, record_id);
207 tree.get(key, values, log)
208 }
209
210 fn get_encoded_entry(at: Address, log: &impl LogQuery, tables: TablesRef) -> Result<Vec<u8>> {
211 let key_query = TableKeyQuery::Check(&TableKey::NoHash);
212 if let Some((_tier, value)) = Column::get_value(key_query, at, tables, log)? {
213 Ok(value)
214 } else {
215 Err(Error::Corruption(format!("Missing btree entry at {at}")))
216 }
217 }
218
219 fn locked<'a>(&'a self, tables: &'a [ValueTable]) -> TablesRef<'a> {
220 TablesRef {
221 tables,
222 ref_counted: self.ref_counted,
223 preimage: false,
224 compression: &self.compression,
225 col: self.id,
226 }
227 }
228
229 pub fn with_locked<R>(&self, mut apply: impl FnMut(TablesRef) -> Result<R>) -> Result<R> {
230 let locked_tables = &*self.tables.read();
231 let locked = self.locked(locked_tables);
232 apply(locked)
233 }
234
235 pub fn enact_plan(&self, action: LogAction, log: &mut LogReader) -> Result<()> {
236 let tables = self.tables.read();
237 match action {
238 LogAction::InsertValue(record) => {
239 tables[record.table.size_tier() as usize].enact_plan(record.index, log)?;
240 },
241 _ => panic!("Unexpected log action"),
242 }
243 Ok(())
244 }
245
246 pub fn validate_plan(&self, action: LogAction, log: &mut LogReader) -> Result<()> {
247 let tables = self.tables.upgradable_read();
248 match action {
249 LogAction::InsertValue(record) => {
250 tables[record.table.size_tier() as usize].validate_plan(record.index, log)?;
251 },
252 _ => {
253 log::error!(target: "parity-db", "Unexpected log action");
254 return Err(Error::Corruption("Unexpected log action".to_string()))
255 },
256 }
257 Ok(())
258 }
259
260 pub fn complete_plan(&self, log: &mut LogWriter) -> Result<()> {
261 let tables = self.tables.read();
262 for t in tables.iter() {
263 t.complete_plan(log)?;
264 }
265 Ok(())
266 }
267
268 pub fn refresh_metadata(&self) -> Result<()> {
269 let tables = self.tables.read();
270 for t in tables.iter() {
271 t.refresh_metadata()?;
272 }
273 Ok(())
274 }
275
276 fn write_plan_remove_node(
277 tables: TablesRef,
278 writer: &mut LogWriter,
279 node_index: Address,
280 ) -> Result<()> {
281 Column::write_existing_value_plan::<_, Vec<u8>>(
282 &TableKey::NoHash,
283 tables,
284 node_index,
285 &Operation::Dereference(()),
286 writer,
287 None,
288 false,
289 )?;
290 Ok(())
291 }
292
293 fn write_node_plan(
294 mut tables: TablesRef,
295 mut node: Node,
296 writer: &mut LogWriter,
297 node_id: Option<Address>,
298 ) -> Result<Option<Address>> {
299 for child in node.children.as_mut().iter_mut() {
300 if child.moved {
301 node.changed = true;
302 } else if child.entry_index.is_none() {
303 break
304 }
305 }
306
307 for separator in node.separators.as_mut().iter_mut() {
308 if separator.modified {
309 node.changed = true;
310 } else if separator.separator.is_none() {
311 break
312 }
313 }
314
315 if !node.changed {
316 return Ok(None)
317 }
318
319 let mut entry = Entry::empty();
320 let mut i_children = 0;
321 let mut i_separator = 0;
322 loop {
323 if let Some(index) = node.children.as_mut()[i_children].entry_index {
324 entry.write_child_index(index);
325 } else {
326 entry.write_child_index(NULL_ADDRESS);
327 }
328 i_children += 1;
329 if i_children == ORDER_CHILD {
330 break
331 }
332 if let Some(sep) = &node.separators.as_mut()[i_separator].separator {
333 entry.write_separator(&sep.key, sep.value);
334 i_separator += 1
335 } else {
336 break
337 }
338 }
339
340 let old_comp = tables.compression;
341 tables.compression = &crate::compress::NO_COMPRESSION;
342 let result = Ok(if let Some(existing) = node_id {
343 let k = TableKey::NoHash;
344 if let (_, Some(new_index)) = Column::write_existing_value_plan(
345 &k,
346 tables,
347 existing,
348 &Operation::Set((), entry.encoded),
349 writer,
350 None,
351 false,
352 )? {
353 Some(new_index)
354 } else {
355 None
356 }
357 } else {
358 let k = TableKey::NoHash;
359 Some(Column::write_new_value_plan(&k, tables, entry.encoded.as_ref(), writer, None)?)
360 });
361 tables.compression = old_comp;
362
363 result
364 }
365}
366
367pub mod commit_overlay {
368 use super::*;
369 use crate::{
370 column::{ColId, Column},
371 db::{BTreeCommitOverlay, Operation, RcKey, RcValue},
372 error::Result,
373 };
374
375 #[derive(Debug)]
376 pub struct BTreeChangeSet {
377 pub col: ColId,
378 pub changes: Vec<Operation<RcKey, RcValue>>,
379 }
380
381 impl BTreeChangeSet {
382 pub fn new(col: ColId) -> Self {
383 BTreeChangeSet { col, changes: Default::default() }
384 }
385
386 pub fn push(&mut self, change: Operation<Value, Value>) {
387 self.changes.push(match change {
389 Operation::Set(k, v) => Operation::Set(k.into(), v.into()),
390 Operation::Dereference(k) => Operation::Dereference(k.into()),
391 Operation::Reference(k) => Operation::Reference(k.into()),
392 });
393 }
394
395 pub fn copy_to_overlay(
396 &self,
397 overlay: &mut BTreeCommitOverlay,
398 record_id: u64,
399 bytes: &mut usize,
400 options: &Options,
401 ) -> Result<()> {
402 let ref_counted = options.columns[self.col as usize].ref_counted;
403 for change in self.changes.iter() {
404 match change {
405 Operation::Set(key, value) => {
406 *bytes += key.value().len();
407 *bytes += value.value().len();
408 overlay.insert(key.clone(), (record_id, Some(value.clone())));
409 },
410 Operation::Dereference(key) => {
411 if !ref_counted {
415 *bytes += key.value().len();
416 overlay.insert(key.clone(), (record_id, None));
417 }
418 },
419 Operation::Reference(..) => {
420 if !ref_counted {
423 return Err(Error::InvalidInput(format!(
424 "No Rc for column {}",
425 self.col
426 )))
427 }
428 },
429 }
430 }
431 Ok(())
432 }
433
434 pub fn clean_overlay(&mut self, overlay: &mut BTreeCommitOverlay, record_id: u64) {
435 use std::collections::btree_map::Entry;
436 for change in self.changes.drain(..) {
437 let key = change.into_key();
438 if let Entry::Occupied(e) = overlay.entry(key) {
439 if e.get().0 == record_id {
440 e.remove_entry();
441 }
442 }
443 }
444 }
445
446 pub fn write_plan(
447 &mut self,
448 btree: &BTreeTable,
449 writer: &mut LogWriter,
450 ops: &mut u64,
451 ) -> Result<()> {
452 let record_id = writer.record_id();
453
454 let locked_tables = btree.tables.read();
455 let locked = btree.locked(&locked_tables);
456 let mut tree = BTree::open(locked, writer, record_id)?;
457
458 let mut btree_header =
459 BTreeHeader { root: tree.root_index.unwrap_or(NULL_ADDRESS), depth: tree.depth };
460 let old_btree_header = btree_header.clone();
461
462 self.changes.sort();
463 tree.write_sorted_changes(self.changes.as_slice(), locked, writer)?;
464 *ops += self.changes.len() as u64;
465
466 btree_header.root = tree.root_index.unwrap_or(NULL_ADDRESS);
467 btree_header.depth = tree.depth;
468
469 if old_btree_header != btree_header {
470 let mut entry = Entry::empty();
471 entry.write_header(&btree_header);
472 Column::write_existing_value_plan(
473 &TableKey::NoHash,
474 locked,
475 HEADER_ADDRESS,
476 &Operation::Set((), &entry.encoded.as_ref()[..HEADER_SIZE as usize]),
477 writer,
478 None,
479 false,
480 )?;
481 }
482 #[cfg(test)]
483 tree.is_balanced(locked, writer)?;
484 Ok(())
485 }
486 }
487}