1use super::{
7 iter::{LastIndex, SeekTo},
8 *,
9};
10use crate::{
11 column::Column,
12 db::{RcKey, RcValue},
13 error::Result,
14 index::Address,
15 log::{LogQuery, LogWriter},
16 table::key::TableKey,
17 Operation,
18};
19use std::cmp::Ordering;
20
21impl Node {
22 pub(crate) fn last_separator_index(&self) -> Option<usize> {
23 self.separators.iter().rposition(|separator| separator.separator.is_some())
24 }
25
26 pub fn write_child(
27 &mut self,
28 i: usize,
29 child: Self,
30 btree: TablesRef,
31 log: &mut LogWriter,
32 ) -> Result<()> {
33 if child.changed {
34 let child_index = self.children[i].entry_index;
35 let new_index = BTreeTable::write_node_plan(btree, child, log, child_index)?;
36 if new_index.is_some() {
37 self.changed = true;
38 self.children[i].entry_index = new_index;
39 }
40 }
41 Ok(())
42 }
43
44 pub fn write_split_child(
45 right_ix: Option<Address>,
46 right: Node,
47 btree: TablesRef,
48 log: &mut LogWriter,
49 ) -> Result<Child> {
50 let new_index = BTreeTable::write_node_plan(btree, right, log, right_ix)?;
51 let right_ix = if new_index.is_some() { new_index } else { right_ix };
52 Ok(Self::new_child(right_ix))
53 }
54
55 pub fn change(
56 &mut self,
57 parent: Option<(&mut Self, usize)>,
58 depth: u32,
59 changes: &mut &[Operation<RcKey, RcValue>],
60 btree: TablesRef,
61 log: &mut LogWriter,
62 ) -> Result<(Option<(Separator, Child)>, bool)> {
63 loop {
64 if !btree.ref_counted &&
65 changes.len() > 1 &&
66 !matches!(changes[1], Operation::Reference(..)) &&
67 changes[0].key() == changes[1].key()
68 {
69 *changes = &changes[1..];
70 continue
71 }
72 let r = match &changes[0] {
73 Operation::Set(key, value) =>
74 self.insert(depth, key.value(), value.value(), changes, btree, log)?,
75 _ => self.on_existing(depth, changes, btree, log)?,
76 };
77 if r.0.is_some() || r.1 {
78 return Ok(r)
79 }
80 if changes.len() == 1 {
81 break
82 }
83 if let Some((parent, p)) = &parent {
84 let key = &changes[1].key();
85 let (at, i) = self.position(key.value())?; if at || i < self.number_separator() {
87 *changes = &changes[1..];
88 continue
89 }
90 let (at, i) = parent.position(key.value())?;
91 if !at && &i == p && i < parent.number_separator() {
92 *changes = &changes[1..];
93 continue
94 }
95 }
99 break
100 }
101 Ok((None, false))
102 }
103
104 fn insert(
105 &mut self,
106 depth: u32,
107 key: &[u8],
108 value: &[u8],
109 changes: &mut &[Operation<RcKey, RcValue>],
110 btree: TablesRef,
111 log: &mut LogWriter,
112 ) -> Result<(Option<(Separator, Child)>, bool)> {
113 let has_child = depth != 0;
114
115 let (at, i) = self.position(key)?;
116 if !at {
118 if has_child {
119 return Ok(if let Some(mut child) = self.fetch_child(i, btree, log)? {
120 let r = child.change(Some((self, i)), depth - 1, changes, btree, log)?;
121 self.write_child(i, child, btree, log)?;
122 match r {
123 (Some((sep, right)), _) => {
124 (self.insert_node(depth, i, sep, right, btree, log)?, false)
126 },
127 (None, true) => {
128 self.rebalance(depth, i, btree, log)?;
129 (None, self.need_rebalance())
130 },
131 r => r,
132 }
133 } else {
134 (None, false)
135 })
136 }
137
138 if self.has_separator(ORDER - 1) {
139 let middle = ORDER / 2;
141 let insert = i;
142 let insert_separator = Self::create_separator(key, value, btree, log, None)?;
143
144 return match insert.cmp(&middle) {
145 Ordering::Equal => {
146 let (right, right_ix) = self.split(middle, true, None, None, has_child);
147 let right = Self::write_split_child(right_ix, right, btree, log)?;
148 Ok((Some((insert_separator, right)), false))
149 },
150 Ordering::Less => {
151 let (right, right_ix) = self.split(middle, false, None, None, has_child);
152 let sep = self.remove_separator(middle - 1);
153 self.shift_from(insert, has_child, false);
154 self.set_separator(insert, insert_separator);
155 let right = Self::write_split_child(right_ix, right, btree, log)?;
156 Ok((Some((sep, right)), false))
157 },
158 Ordering::Greater => {
159 let (right, right_ix) = self.split(
160 middle + 1,
161 false,
162 Some((insert, insert_separator)),
163 None,
164 has_child,
165 );
166 let sep = self.remove_separator(middle);
167 let right = Self::write_split_child(right_ix, right, btree, log)?;
168 Ok((Some((sep, right)), false))
169 },
170 }
171 }
172
173 self.shift_from(i, has_child, false);
174 self.set_separator(i, Self::create_separator(key, value, btree, log, None)?);
175 } else {
176 let existing = self.separator_address(i);
177 self.set_separator(i, Self::create_separator(key, value, btree, log, existing)?);
178 }
179 Ok((None, false))
180 }
181
182 pub fn insert_node(
183 &mut self,
184 depth: u32,
185 at: usize,
186 separator: Separator,
187 right_child: Child,
188 btree: TablesRef,
189 log: &mut LogWriter,
190 ) -> Result<Option<(Separator, Child)>> {
191 debug_assert!(depth != 0);
192 let has_child = true;
193 let child = right_child;
194 if self.has_separator(ORDER - 1) {
195 let middle = ORDER / 2;
197 let insert = at;
198
199 match insert.cmp(&middle) {
200 Ordering::Equal => {
201 let (mut right, right_ix) = self.split(middle, true, None, None, has_child);
202 right.set_child(0, child);
203 let right = Self::write_split_child(right_ix, right, btree, log)?;
204 Ok(Some((separator, right)))
205 },
206 Ordering::Less => {
207 let (right, right_ix) = self.split(middle, false, None, None, has_child);
208 let sep = self.remove_separator(middle - 1);
209 self.shift_from(insert, has_child, false);
210 self.set_separator(insert, separator);
211 self.set_child(insert + 1, child);
212 let right = Self::write_split_child(right_ix, right, btree, log)?;
213 Ok(Some((sep, right)))
214 },
215 Ordering::Greater => {
216 let (right, right_ix) = self.split(
217 middle + 1,
218 false,
219 Some((insert, separator)),
220 Some((insert, child)),
221 has_child,
222 );
223 let sep = self.remove_separator(middle);
224 let right = Self::write_split_child(right_ix, right, btree, log)?;
225 Ok(Some((sep, right)))
226 },
227 }
228 } else {
229 self.shift_from(at, has_child, false);
230 self.set_separator(at, separator);
231 self.set_child(at + 1, child);
232 Ok(None)
233 }
234 }
235
236 fn on_existing(
237 &mut self,
238 depth: u32,
239 changes: &mut &[Operation<RcKey, RcValue>],
240 values: TablesRef,
241 log: &mut LogWriter,
242 ) -> Result<(Option<(Separator, Child)>, bool)> {
243 let change = &changes[0];
244 let key = change.key();
245 let has_child = depth != 0;
246 let (at, i) = self.position(key.value())?;
247 if at {
248 let existing = self.separator_address(i);
249 if let Some(existing) = existing {
250 if Column::write_existing_value_plan::<_, RcValue>(
251 &TableKey::NoHash,
252 values,
253 existing,
254 change,
255 log,
256 None,
257 values.ref_counted,
258 )?
259 .0
260 .is_none()
261 {
262 let _ = self.remove_separator(i);
263 if depth != 0 {
264 if let Some(mut child) = self.fetch_child(i, values, log)? {
266 let (need_balance, sep) = child.remove_last(depth - 1, values, log)?;
267 self.write_child(i, child, values, log)?;
268 if let Some(sep) = sep {
269 self.set_separator(i, sep);
270 }
271 if need_balance {
272 self.rebalance(depth, i, values, log)?;
273 }
274 }
275 } else {
276 self.remove_from(i, false, true);
277 }
278 }
279 }
280 } else {
281 if !has_child {
282 return Ok((None, false))
283 }
284 return if let Some(mut child) = self.fetch_child(i, values, log)? {
285 let r = child.change(Some((self, i)), depth - 1, changes, values, log)?;
286 self.write_child(i, child, values, log)?;
287 Ok(match r {
288 (Some((sep, right)), _) => {
289 (self.insert_node(depth, i, sep, right, values, log)?, false)
291 },
292 (None, true) => {
293 self.rebalance(depth, i, values, log)?;
294 (None, self.need_rebalance())
295 },
296 r => r,
297 })
298 } else {
299 Ok((None, false))
300 }
301 }
302
303 Ok((None, self.need_rebalance()))
304 }
305
306 pub fn rebalance(
307 &mut self,
308 depth: u32,
309 at: usize,
310 values: TablesRef,
311 log: &mut LogWriter,
312 ) -> Result<()> {
313 let has_child = depth - 1 != 0;
314 let middle = ORDER / 2;
315 let mut balance_from_left = false;
316 let mut left = None;
317 if at > 0 {
318 left = self.fetch_child(at - 1, values, log)?;
319 if let Some(node) = left.as_ref() {
320 if node.has_separator(middle) {
321 balance_from_left = true;
322 }
323 }
324 }
325 if balance_from_left {
326 let mut left = left.unwrap();
327 let mut child = None;
328 let last_sibling = left.last_separator_index().unwrap();
329 if has_child {
330 child = Some(left.remove_child(last_sibling + 1));
331 }
332 let separator2 = left.remove_separator(last_sibling);
333 let separator = self.remove_separator(at - 1);
334 self.set_separator(at - 1, separator2);
335 let mut right = self.fetch_child(at, values, log)?.unwrap();
336 right.shift_from(0, has_child, true);
337 if let Some(child) = child {
338 right.set_child(0, child);
339 }
340 right.set_separator(0, separator);
341 self.write_child(at - 1, left, values, log)?;
342 self.write_child(at, right, values, log)?;
343 return Ok(())
344 }
345
346 let number_child = self.number_separator() + 1;
347 let mut balance_from_right = false;
348 let mut right = None;
349 if at + 1 < number_child {
350 right = self.fetch_child(at + 1, values, log)?;
351 if let Some(node) = right.as_ref() {
352 if node.has_separator(middle) {
353 balance_from_right = true;
354 }
355 }
356 }
357
358 if balance_from_right {
359 let mut child = None;
360 let mut right = right.unwrap();
361 if has_child {
362 child = Some(right.remove_child(0));
363 }
364 let separator2 = right.remove_separator(0);
365 right.remove_from(0, has_child, true);
366 let separator = self.remove_separator(at);
367 self.set_separator(at, separator2);
368 let mut left = self.fetch_child(at, values, log)?.unwrap();
369 let last_left = left.number_separator();
370 left.set_separator(last_left, separator);
371 if let Some(child) = child {
372 left.set_child(last_left + 1, child);
373 }
374 self.write_child(at, left, values, log)?;
375 self.write_child(at + 1, right, values, log)?;
376 return Ok(())
377 }
378
379 let (at, at_right) = if at + 1 == number_child {
380 right = self.fetch_child(at, values, log)?;
381 (at - 1, at)
382 } else {
383 left = self.fetch_child(at, values, log)?;
384 if right.is_none() {
385 right = self.fetch_child(at + 1, values, log)?;
386 }
387 (at, at + 1)
388 };
389
390 let mut left = left.unwrap();
391 let separator = self.remove_separator(at);
392 let mut i = left.number_separator();
393 left.set_separator(i, separator);
394 i += 1;
395 let mut right = right.unwrap();
396 let right_len = right.number_separator();
397 let mut right_i = 0;
398 while right_i < right_len {
399 let mut child = None;
400 if has_child {
401 child = Some(right.remove_child(right_i));
402 }
403 let separator = right.remove_separator(right_i);
404 left.set_separator(i, separator);
405 if let Some(child) = child {
406 left.set_child(i, child);
407 }
408 i += 1;
409 right_i += 1;
410 }
411 if has_child {
412 let child = right.remove_child(right_i);
413 left.set_child(i, child);
414 }
415
416 let removed = self.remove_child(at_right);
417 if let Some(index) = removed.entry_index {
418 BTreeTable::write_plan_remove_node(values, log, index)?;
419 }
420 self.write_child(at, left, values, log)?;
421 let has_child = true; self.remove_from(at, has_child, false);
423 Ok(())
424 }
425
426 fn need_rebalance(&mut self) -> bool {
427 let middle = ORDER / 2;
428 !self.has_separator(middle - 1)
429 }
430
431 pub fn need_remove_root(
432 &mut self,
433 values: TablesRef,
434 log: &mut LogWriter,
435 ) -> Result<Option<(Option<Address>, Node)>> {
436 if self.number_separator() == 0 && self.fetch_child(0, values, log)?.is_some() {
437 if let Some(node) = self.fetch_child(0, values, log)? {
438 let child = self.remove_child(0);
439 return Ok(Some((child.entry_index, node)))
440 }
441 }
442 Ok(None)
443 }
444
445 pub fn remove_last(
446 &mut self,
447 depth: u32,
448 values: TablesRef,
449 log: &mut LogWriter,
450 ) -> Result<(bool, Option<Separator>)> {
451 let last = if let Some(last) = self.last_separator_index() {
452 last
453 } else {
454 return Ok((false, None))
455 };
456 let i = last;
457 if depth == 0 {
458 let result = self.remove_separator(i);
459
460 Ok((self.need_rebalance(), Some(result)))
461 } else if let Some(mut child) = self.fetch_child(i + 1, values, log)? {
462 let result = child.remove_last(depth - 1, values, log)?;
463 self.write_child(i + 1, child, values, log)?;
464 if result.0 {
465 self.rebalance(depth, i + 1, values, log)?;
466 Ok((self.need_rebalance(), result.1))
467 } else {
468 Ok(result)
469 }
470 } else {
471 Ok((false, None))
472 }
473 }
474
475 pub fn get(
476 &self,
477 key: &[u8],
478 values: TablesRef,
479 log: &impl LogQuery,
480 ) -> Result<Option<Address>> {
481 let (at, i) = self.position(key)?;
482 if at {
483 Ok(self.separator_address(i))
484 } else {
485 if let Some(child) = self.fetch_child(i, values, log)? {
486 return child.get(key, values, log)
487 }
488
489 Ok(None)
490 }
491 }
492
493 pub fn seek(
494 self,
495 seek_to: SeekTo,
496 values: TablesRef,
497 log: &impl LogQuery,
498 mut depth: u32,
499 stack: &mut Vec<(LastIndex, Self)>,
500 ) -> Result<()> {
501 let mut from = self;
502 loop {
503 let (at, i) = if let Some(key) = seek_to.key() {
504 from.position(key)?
505 } else {
506 (false, from.last_separator_index().map(|i| i + 1).unwrap_or(0))
507 };
508 if at {
509 stack.push(match seek_to {
510 SeekTo::Exclude(_) => (LastIndex::At(i), from),
511 SeekTo::Include(_) => (LastIndex::Seeked(i), from),
512 SeekTo::Last => unreachable!(),
513 });
514 return Ok(())
515 }
516 if depth == 0 {
517 stack.push((LastIndex::Before(i), from));
518 return Ok(())
519 }
520 if let Some(child) = from.fetch_child(i, values, log)? {
521 stack.push((LastIndex::Descend(i), from));
522 from = child;
523 depth -= 1
524 } else {
525 return Err(Error::Corruption(format!("A btree node is missing a child at {i:?}")))
526 }
527 }
528 }
529
530 #[cfg(test)]
531 pub fn is_balanced(
532 &self,
533 tables: TablesRef,
534 log: &impl LogQuery,
535 parent_size: usize,
536 ) -> Result<bool> {
537 let size = self.number_separator();
538 if parent_size != 0 && size < ORDER / 2 {
539 return Ok(false)
540 }
541
542 let mut i = 0;
543 while i < ORDER {
544 let child = self.fetch_child(i, tables, log)?;
545 i += 1;
546 if child.is_none() {
547 continue
548 }
549 if !child.unwrap().is_balanced(tables, log, size)? {
550 return Ok(false)
551 }
552 }
553
554 Ok(true)
555 }
556}
557
558#[derive(Clone, Debug)]
562pub struct Node {
563 pub(super) separators: [Separator; ORDER],
564 pub(super) children: [Child; ORDER_CHILD],
565 pub(super) changed: bool,
566}
567
568impl Default for Node {
569 fn default() -> Self {
570 Node { separators: Default::default(), children: Default::default(), changed: true }
571 }
572}
573
574#[derive(Clone, Default, Debug)]
575pub struct Separator {
576 pub(super) modified: bool,
577 pub(super) separator: Option<SeparatorInner>,
578}
579
580#[derive(Clone, Debug)]
581pub struct SeparatorInner {
582 pub key: Vec<u8>,
583 pub value: Address,
584}
585
586#[derive(Clone, Default, Debug)]
587pub struct Child {
588 pub(super) moved: bool,
589 pub(super) entry_index: Option<Address>,
590}
591
592impl Node {
593 pub fn from_encoded(enc: Vec<u8>) -> Result<Self> {
594 let mut entry = Entry::from_encoded(enc);
595 let mut node =
596 Node { separators: Default::default(), children: Default::default(), changed: false };
597 let mut i_children = 0;
598 let mut i_separator = 0;
599 loop {
600 if let Some(child_index) = entry.read_child_index()? {
601 node.children.as_mut()[i_children].entry_index = Some(child_index);
602 }
603 i_children += 1;
604 if i_children == ORDER_CHILD {
605 break
606 }
607 if let Some(sep) = entry.read_separator()? {
608 node.separators.as_mut()[i_separator].separator = Some(sep);
609 i_separator += 1
610 } else {
611 break
612 }
613 }
614 Ok(node)
615 }
616
617 pub fn remove_separator(&mut self, at: usize) -> Separator {
618 self.changed = true;
619 let mut separator = std::mem::replace(
620 &mut self.separators[at],
621 Separator { modified: true, separator: None },
622 );
623 separator.modified = true;
624 separator
625 }
626
627 pub fn remove_child(&mut self, at: usize) -> Child {
628 self.changed = true;
629 let mut child =
630 std::mem::replace(&mut self.children[at], Child { moved: true, entry_index: None });
631 child.moved = true;
632 child
633 }
634
635 pub fn has_separator(&self, at: usize) -> bool {
636 self.separators[at].separator.is_some()
637 }
638
639 pub fn separator_key(&self, at: usize) -> Option<Vec<u8>> {
640 self.separators[at].separator.as_ref().map(|s| s.key.clone())
641 }
642
643 pub fn separator_address(&self, at: usize) -> Option<Address> {
644 self.separators[at].separator.as_ref().map(|s| s.value)
645 }
646
647 pub fn set_separator(&mut self, at: usize, mut sep: Separator) {
648 sep.modified = true;
649 self.changed = true;
650 self.separators.as_mut()[at] = sep;
651 }
652
653 pub fn new_child(index: Option<Address>) -> Child {
654 Child { moved: true, entry_index: index }
655 }
656
657 pub fn set_child(&mut self, at: usize, mut child: Child) {
658 child.moved = true;
659 self.changed = true;
660 self.children.as_mut()[at] = child;
661 }
662
663 fn create_separator(
664 key: &[u8],
665 value: &[u8],
666 btree: TablesRef,
667 log: &mut LogWriter,
668 existing: Option<Address>,
669 ) -> Result<Separator> {
670 let key = key.to_vec();
671 let value = if let Some(address) = existing {
672 Column::write_existing_value_plan(
673 &TableKey::NoHash,
674 btree,
675 address,
676 &Operation::Set((), value),
677 log,
678 None,
679 btree.ref_counted,
680 )?
681 .1
682 .unwrap_or(address)
683 } else {
684 Column::write_new_value_plan(&TableKey::NoHash, btree, value, log, None)?
685 };
686 let modified = Some(value) != existing;
687 Ok(Separator { modified, separator: Some(SeparatorInner { key, value }) })
688 }
689
690 fn split(
691 &mut self,
692 at: usize,
693 skip_left_child: bool,
694 mut insert_right: Option<(usize, Separator)>,
695 mut insert_right_child: Option<(usize, Child)>,
696 has_child: bool,
697 ) -> (Self, Option<Address>) {
698 let (right_ix, mut right) = (None, Self::default());
699 let mut offset = 0;
700 let right_start = at;
701 for i in right_start..ORDER {
702 let sep = self.remove_separator(i);
703 if insert_right.as_ref().map(|ins| ins.0 == i).unwrap_or(false) {
704 if let Some((_, sep)) = insert_right.take() {
705 right.separators.as_mut()[i - right_start] = sep;
706 offset = 1;
707 }
708 }
709 right.separators.as_mut()[i + offset - right_start] = sep;
710 }
711 if let Some((insert, sep)) = insert_right.take() {
712 debug_assert!(insert == ORDER);
713 right.separators.as_mut()[insert - right_start] = sep;
714 }
715 let mut offset = 0;
716 if has_child {
717 let skip_offset = usize::from(skip_left_child);
718 for i in right_start + skip_offset..ORDER_CHILD {
719 let child = self.remove_child(i);
720 if insert_right_child.as_ref().map(|ins| ins.0 + 1 == i).unwrap_or(false) {
721 offset = 1;
722 if let Some((_, mut child)) = insert_right_child.take() {
723 child.moved = true;
724 right.children.as_mut()[i - right_start] = child;
725 }
726 }
727 right.children.as_mut()[i + offset - right_start] = child;
728 }
729 if let Some((insert, mut child)) = insert_right_child.take() {
730 debug_assert!(insert == ORDER);
731 child.moved = true;
732 right.children.as_mut()[insert + 1 - right_start] = child;
733 }
734 }
735 (right, right_ix)
736 }
737
738 fn shift_from(&mut self, from: usize, has_child: bool, with_left_child: bool) {
739 self.changed = true;
740 let mut i = from;
741 let mut current = self.remove_separator(i);
742 while current.separator.is_some() {
743 current.modified = true;
744 i += 1;
745 if i == ORDER {
746 break
747 }
748 current = std::mem::replace(&mut self.separators[i], current);
749 }
750 if has_child {
751 let mut i = if with_left_child { from } else { from + 1 };
752 let mut current = self.remove_child(i);
753 while current.entry_index.is_some() {
754 current.moved = true;
755 i += 1;
756 if i == ORDER_CHILD {
757 break
758 }
759 current = std::mem::replace(&mut self.children[i], current);
760 }
761 }
762 }
763
764 fn remove_from(&mut self, from: usize, has_child: bool, with_left_child: bool) {
765 let mut i = from;
766 self.changed = true;
767 while i < ORDER - 1 {
768 self.separators.as_mut()[i] = self.remove_separator(i + 1);
769 self.separators.as_mut()[i].modified = true;
770 if self.separators.as_mut()[i].separator.is_none() {
771 break
772 }
773 i += 1;
774 }
775 if has_child {
776 let mut i = if with_left_child { from } else { from + 1 };
777 while i < ORDER_CHILD - 1 {
778 self.children.as_mut()[i] = self.remove_child(i + 1);
779 if self.children.as_mut()[i].entry_index.is_none() {
780 break
781 }
782 i += 1;
783 }
784 }
785 }
786
787 fn number_separator(&self) -> usize {
788 let mut i = 0;
789 while self.separators[i].separator.is_some() {
790 i += 1;
791 if i == ORDER {
792 break
793 }
794 }
795 i
796 }
797
798 fn position(&self, key: &[u8]) -> Result<(bool, usize)> {
801 let mut i = 0;
802 while let Some(separator) = self.separators[i].separator.as_ref() {
803 match key[..].cmp(&separator.key[..]) {
804 Ordering::Greater => (),
805 Ordering::Less => return Ok((false, i)),
806 Ordering::Equal => return Ok((true, i)),
807 }
808 i += 1;
809 if i == ORDER {
810 break
811 }
812 }
813 Ok((false, i))
814 }
815
816 pub fn fetch_child(
817 &self,
818 i: usize,
819 values: TablesRef,
820 log: &impl LogQuery,
821 ) -> Result<Option<Self>> {
822 if let Some(ix) = self.children[i].entry_index {
823 let entry = BTreeTable::get_encoded_entry(ix, log, values)?;
824 return Ok(Some(Self::from_encoded(entry)?))
825 }
826 Ok(None)
827 }
828}