1use crate::{
48 column::ColId,
49 display::hex,
50 error::Result,
51 log::{LogOverlays, LogQuery, LogReader, LogWriter},
52 options::ColumnOptions as Options,
53 parking_lot::RwLock,
54 table::key::{TableKey, TableKeyQuery, PARTIAL_SIZE},
55};
56use std::{
57 convert::TryInto,
58 mem::MaybeUninit,
59 sync::{
60 atomic::{AtomicBool, AtomicU64, Ordering},
61 Arc,
62 },
63};
64
65pub const SIZE_TIERS: usize = 1usize << SIZE_TIERS_BITS;
66pub const SIZE_TIERS_BITS: u8 = 8;
67pub const COMPRESSED_MASK: u16 = 0x80_00;
68pub const MAX_ENTRY_SIZE: usize = 0x7ff8; pub const MIN_ENTRY_SIZE: usize = 32;
70const REFS_SIZE: usize = 4;
71const SIZE_SIZE: usize = 2;
72const INDEX_SIZE: usize = 8;
73const MAX_ENTRY_BUF_SIZE: usize = 0x8000;
74
75const TOMBSTONE: &[u8] = &[0xff, 0xff];
76const MULTIPART_V4: &[u8] = &[0xff, 0xfe];
77const MULTIHEAD_V4: &[u8] = &[0xff, 0xfd];
78const MULTIPART: &[u8] = &[0xfe, 0xff];
79const MULTIHEAD: &[u8] = &[0xfd, 0xff];
80const MULTIHEAD_COMPRESSED: &[u8] = &[0xfd, 0x7f];
81const LOCKED_REF: u32 = u32::MAX;
83
84const MULTIPART_ENTRY_SIZE: u16 = 4096;
85
86pub type Value = Vec<u8>;
87
88#[derive(Debug, Clone, Copy, Eq, PartialEq, Hash)]
89pub struct TableId(u16);
90
91impl TableId {
92 pub fn new(col: ColId, size_tier: u8) -> TableId {
93 TableId(((col as u16) << 8) | size_tier as u16)
94 }
95
96 pub fn from_u16(id: u16) -> TableId {
97 TableId(id)
98 }
99
100 pub fn col(&self) -> ColId {
101 (self.0 >> 8) as ColId
102 }
103
104 pub fn size_tier(&self) -> u8 {
105 (self.0 & 0xff) as u8
106 }
107
108 pub fn file_name(&self) -> String {
109 format!("table_{:02}_{}", self.col(), hex(&[self.size_tier()]))
110 }
111
112 pub fn is_file_name(col: ColId, name: &str) -> bool {
113 name.starts_with(&format!("table_{col:02}_"))
114 }
115
116 pub fn as_u16(&self) -> u16 {
117 self.0
118 }
119
120 pub fn log_index(&self) -> usize {
121 self.col() as usize * SIZE_TIERS + self.size_tier() as usize
122 }
123
124 pub const fn max_log_tables(num_columns: usize) -> usize {
125 SIZE_TIERS * num_columns
126 }
127
128 pub fn from_log_index(i: usize) -> Self {
129 let col = i / SIZE_TIERS;
130 let tier = i % SIZE_TIERS;
131 Self::new(col as ColId, tier as u8)
132 }
133}
134
135impl std::fmt::Display for TableId {
136 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
137 write!(f, "t{:02}-{:02}", self.col(), hex(&[self.size_tier()]))
138 }
139}
140
141#[derive(Debug)]
142pub struct ValueTable {
143 pub id: TableId,
144 pub entry_size: u16,
145 file: crate::file::TableFile,
146 filled: AtomicU64, written: AtomicU64, last_removed: AtomicU64,
149 dirty_header: AtomicBool,
150 multipart: bool,
151 ref_counted: bool,
152 db_version: u32,
153}
154
155#[derive(Default, Clone, Copy)]
156struct Header([u8; 16]);
157
158impl Header {
159 fn last_removed(&self) -> u64 {
160 u64::from_le_bytes(self.0[0..INDEX_SIZE].try_into().unwrap())
161 }
162 fn set_last_removed(&mut self, last_removed: u64) {
163 self.0[0..INDEX_SIZE].copy_from_slice(&last_removed.to_le_bytes());
164 }
165 fn filled(&self) -> u64 {
166 u64::from_le_bytes(self.0[INDEX_SIZE..INDEX_SIZE * 2].try_into().unwrap())
167 }
168 fn set_filled(&mut self, filled: u64) {
169 self.0[INDEX_SIZE..INDEX_SIZE * 2].copy_from_slice(&filled.to_le_bytes());
170 }
171}
172
173pub struct Entry<B: AsRef<[u8]>>(usize, B);
174#[cfg(feature = "loom")]
175pub type FullEntry = Entry<Vec<u8>>;
176#[cfg(not(feature = "loom"))]
177pub type FullEntry = Entry<[u8; MAX_ENTRY_BUF_SIZE]>;
178pub type EntryRef<'a> = Entry<&'a [u8]>;
179type PartialEntry = Entry<[u8; 10]>;
180type PartialKeyEntry = Entry<[u8; 40]>; impl<const C: usize> Entry<[u8; C]> {
183 #[inline(always)]
184 #[allow(clippy::uninit_assumed_init)]
185 pub fn new_uninit() -> Self {
186 Entry(0, unsafe { MaybeUninit::uninit().assume_init() })
187 }
188}
189
190#[cfg(feature = "loom")]
191impl Entry<Vec<u8>> {
192 pub fn new_uninit_full_entry() -> Self {
193 Entry(0, vec![0; MAX_ENTRY_BUF_SIZE])
194 }
195}
196
197#[cfg(not(feature = "loom"))]
198impl Entry<[u8; MAX_ENTRY_BUF_SIZE]> {
199 pub fn new_uninit_full_entry() -> Self {
200 Self::new_uninit()
201 }
202}
203
204impl<B: AsRef<[u8]>> Entry<B> {
205 #[inline(always)]
206 pub fn check_remaining_len(
207 &self,
208 len: usize,
209 error: impl Fn() -> crate::error::Error,
210 ) -> Result<()> {
211 if self.0 + len > self.1.as_ref().len() {
212 return Err(error())
213 }
214 Ok(())
215 }
216
217 #[inline(always)]
218 pub fn new(data: B) -> Self {
219 Entry(0, data)
220 }
221
222 pub fn set_offset(&mut self, offset: usize) {
223 self.0 = offset;
224 }
225
226 pub fn offset(&self) -> usize {
227 self.0
228 }
229
230 pub fn read_slice(&mut self, size: usize) -> &[u8] {
231 let start = self.0;
232 self.0 += size;
233 &self.1.as_ref()[start..self.0]
234 }
235
236 fn is_tombstone(&self) -> bool {
237 &self.1.as_ref()[0..SIZE_SIZE] == TOMBSTONE
238 }
239
240 fn is_multipart(&self) -> bool {
241 &self.1.as_ref()[0..SIZE_SIZE] == MULTIPART
242 }
243
244 fn is_multipart_v4(&self) -> bool {
245 &self.1.as_ref()[0..SIZE_SIZE] == MULTIPART_V4
246 }
247
248 fn is_multihead_compressed(&self) -> bool {
249 &self.1.as_ref()[0..SIZE_SIZE] == MULTIHEAD_COMPRESSED
250 }
251
252 fn is_multihead(&self) -> bool {
253 self.is_multihead_compressed() || &self.1.as_ref()[0..SIZE_SIZE] == MULTIHEAD
254 }
255
256 fn is_multihead_v4(&self) -> bool {
257 &self.1.as_ref()[0..SIZE_SIZE] == MULTIHEAD_V4
258 }
259
260 fn is_multi(&self, db_version: u32) -> bool {
261 self.is_multipart() ||
262 self.is_multihead() ||
263 (db_version <= 4 && (self.is_multipart_v4() || self.is_multihead_v4()))
264 }
265
266 fn read_size(&mut self) -> (u16, bool) {
267 let size = u16::from_le_bytes(self.read_slice(SIZE_SIZE).try_into().unwrap());
268 let compressed = (size & COMPRESSED_MASK) > 0;
269 (size & !COMPRESSED_MASK, compressed)
270 }
271
272 fn skip_size(&mut self) {
273 self.0 += SIZE_SIZE;
274 }
275
276 pub fn read_u64(&mut self) -> u64 {
277 u64::from_le_bytes(self.read_slice(8).try_into().unwrap())
278 }
279
280 fn read_next(&mut self) -> u64 {
281 self.read_u64()
282 }
283
284 pub fn skip_u64(&mut self) {
285 self.0 += 8;
286 }
287
288 pub fn skip_next(&mut self) {
289 self.skip_u64()
290 }
291
292 pub fn read_u32(&mut self) -> u32 {
293 u32::from_le_bytes(self.read_slice(REFS_SIZE).try_into().unwrap())
294 }
295
296 fn read_rc(&mut self) -> u32 {
297 self.read_u32()
298 }
299
300 fn read_partial(&mut self) -> &[u8] {
301 self.read_slice(PARTIAL_SIZE)
302 }
303
304 fn remaining_to(&self, end: usize) -> &[u8] {
305 &self.1.as_ref()[self.0..end]
306 }
307}
308
309impl<B: AsRef<[u8]> + AsMut<[u8]>> Entry<B> {
310 pub fn write_slice(&mut self, buf: &[u8]) {
311 let start = self.0;
312 self.0 += buf.len();
313 self.1.as_mut()[start..self.0].copy_from_slice(buf);
314 }
315
316 fn write_tombstone(&mut self) {
317 self.write_slice(TOMBSTONE);
318 }
319
320 fn write_multipart(&mut self) {
321 self.write_slice(MULTIPART);
322 }
323
324 fn write_multihead(&mut self) {
325 self.write_slice(MULTIHEAD);
326 }
327
328 fn write_multihead_compressed(&mut self) {
329 self.write_slice(MULTIHEAD_COMPRESSED);
330 }
331
332 fn write_size(&mut self, mut size: u16, compressed: bool) {
333 if compressed {
334 size |= COMPRESSED_MASK;
335 }
336 self.write_slice(&size.to_le_bytes());
337 }
338 pub fn write_u64(&mut self, next_index: u64) {
339 self.write_slice(&next_index.to_le_bytes());
340 }
341
342 fn write_next(&mut self, next_index: u64) {
343 self.write_u64(next_index)
344 }
345
346 pub fn write_u32(&mut self, next_index: u32) {
347 self.write_slice(&next_index.to_le_bytes());
348 }
349
350 fn write_rc(&mut self, rc: u32) {
351 self.write_slice(&rc.to_le_bytes());
352 }
353
354 pub fn inner_mut(&mut self) -> &mut B {
355 &mut self.1
356 }
357}
358
359impl<B: AsRef<[u8]> + AsMut<[u8]>> AsMut<[u8]> for Entry<B> {
360 fn as_mut(&mut self) -> &mut [u8] {
361 self.1.as_mut()
362 }
363}
364
365impl<B: AsRef<[u8]>> AsRef<[u8]> for Entry<B> {
366 fn as_ref(&self) -> &[u8] {
367 self.1.as_ref()
368 }
369}
370
371impl<B: AsRef<[u8]> + AsMut<[u8]>> std::ops::Index<std::ops::Range<usize>> for Entry<B> {
372 type Output = [u8];
373
374 fn index(&self, index: std::ops::Range<usize>) -> &[u8] {
375 &self.1.as_ref()[index]
376 }
377}
378
379impl<B: AsRef<[u8]> + AsMut<[u8]>> std::ops::IndexMut<std::ops::Range<usize>> for Entry<B> {
380 fn index_mut(&mut self, index: std::ops::Range<usize>) -> &mut [u8] {
381 &mut self.1.as_mut()[index]
382 }
383}
384
385enum LockedSlice<O: std::ops::Deref<Target = [u8]>, F: std::ops::Deref<Target = [u8]>> {
386 FromOverlay(O),
387 FromFile(F),
388}
389
390impl<O: std::ops::Deref<Target = [u8]>, F: std::ops::Deref<Target = [u8]>> LockedSlice<O, F> {
391 fn as_slice(&self) -> &[u8] {
392 match self {
393 LockedSlice::FromOverlay(slice) => &*slice,
394 LockedSlice::FromFile(slice) => &*slice,
395 }
396 }
397}
398
399impl ValueTable {
400 pub fn open(
401 path: Arc<std::path::PathBuf>,
402 id: TableId,
403 entry_size: Option<u16>,
404 options: &Options,
405 db_version: u32,
406 ) -> Result<ValueTable> {
407 let (multipart, entry_size) = match entry_size {
408 Some(s) => (false, s),
409 None => (true, MULTIPART_ENTRY_SIZE),
410 };
411 assert!(entry_size >= MIN_ENTRY_SIZE as u16);
412 assert!(entry_size <= MAX_ENTRY_SIZE as u16);
413
414 let mut filepath: std::path::PathBuf = std::path::PathBuf::clone(&*path);
415 filepath.push(id.file_name());
416 let file = crate::file::TableFile::open(filepath, entry_size, id)?;
417 let mut filled = 1;
418 let mut last_removed = 0;
419 if file.map.read().is_some() {
420 let mut header = Header::default();
421 file.read_at(&mut header.0, 0)?;
422 last_removed = header.last_removed();
423 filled = header.filled();
424 if filled == 0 {
425 filled = 1;
426 }
427 if last_removed >= filled {
428 return Err(crate::error::Error::Corruption(format!(
429 "Bad removed ref {} out of {}",
430 last_removed, filled
431 )))
432 }
433 log::debug!(target: "parity-db", "Opened value table {} with {} entries, entry_size={}, removed={}", id, filled, entry_size, last_removed);
434 }
435
436 Ok(ValueTable {
437 id,
438 entry_size,
439 file,
440 filled: AtomicU64::new(filled),
441 written: AtomicU64::new(filled),
442 last_removed: AtomicU64::new(last_removed),
443 dirty_header: AtomicBool::new(false),
444 multipart,
445 ref_counted: options.ref_counted,
446 db_version,
447 })
448 }
449
450 pub fn value_size(&self, key: &TableKey) -> Option<u16> {
451 let base = self.entry_size - SIZE_SIZE as u16 - self.ref_size() as u16;
452 let k_encoded = key.encoded_size() as u16;
453 if base < k_encoded {
454 None
455 } else {
456 Some(base - k_encoded)
457 }
458 }
459
460 #[inline(always)]
462 fn for_parts(
463 &self,
464 key: &mut TableKeyQuery,
465 mut index: u64,
466 log: &impl LogQuery,
467 mut f: impl FnMut(&[u8]) -> bool,
468 ) -> Result<(u32, bool)> {
469 let mut part = 0;
470 let mut compressed = false;
471 let mut rc = 1;
472 let entry_size = self.entry_size as usize;
473 loop {
474 let vbuf = log.value_ref(self.id, index);
475 let buf: LockedSlice<_, _> = if let Some(buf) = vbuf.as_deref() {
476 log::trace!(
477 target: "parity-db",
478 "{}: Found in overlay {}",
479 self.id,
480 index,
481 );
482 LockedSlice::FromOverlay(buf)
483 } else {
484 log::trace!(
485 target: "parity-db",
486 "{}: Query slot {}",
487 self.id,
488 index,
489 );
490 let vbuf = self.file.slice_at(index * self.entry_size as u64, entry_size);
491 LockedSlice::FromFile(vbuf)
492 };
493 let mut buf = EntryRef::new(buf.as_slice());
494
495 if buf.is_tombstone() {
496 return Ok((0, false))
497 }
498
499 if self.multipart && part == 0 && !buf.is_multihead() {
500 return Ok((0, false))
502 }
503
504 let (entry_end, next) = if self.multipart && buf.is_multi(self.db_version) {
505 if part == 0 && self.db_version > 6 && buf.is_multihead_compressed() {
506 compressed = true;
507 }
508 buf.skip_size();
509 let next = buf.read_next();
510 (entry_size, next)
511 } else {
512 let (size, read_compressed) = buf.read_size();
513 if part == 0 || self.db_version <= 6 {
514 compressed = read_compressed;
515 }
516 (buf.offset() + size as usize, 0)
517 };
518
519 if part == 0 {
520 if self.ref_counted {
521 rc = buf.read_rc();
522 }
523 match key {
524 TableKeyQuery::Fetch(Some(to_fetch)) => {
525 **to_fetch = TableKey::fetch_partial(&mut buf)?;
526 },
527 TableKeyQuery::Fetch(None) => (),
528 TableKeyQuery::Check(k) => {
529 let to_fetch = k.fetch(&mut buf)?;
530 if !k.compare(&to_fetch) {
531 log::debug!(
532 target: "parity-db",
533 "{}: Key mismatch at {}. Expected {}, got {:?}, size = {}",
534 self.id,
535 index,
536 k,
537 to_fetch.as_ref().map(hex),
538 self.entry_size,
539 );
540 return Ok((0, false))
541 }
542 },
543 }
544 }
545
546 if buf.offset() > entry_end {
547 return Err(crate::error::Error::Corruption(format!(
548 "Unexpected entry size. Expected at least {} bytes",
549 buf.offset() - 2
550 )))
551 }
552
553 if !f(buf.remaining_to(entry_end)) {
554 break
555 };
556
557 if next == 0 {
558 break
559 }
560 part += 1;
561 index = next;
562 }
563 Ok((rc, compressed))
564 }
565
566 pub fn get(
567 &self,
568 key: &TableKey,
569 index: u64,
570 log: &impl LogQuery,
571 ) -> Result<Option<(Value, bool)>> {
572 if let Some((value, compressed, _)) =
573 self.query(&mut TableKeyQuery::Check(key), index, log)?
574 {
575 Ok(Some((value, compressed)))
576 } else {
577 Ok(None)
578 }
579 }
580
581 pub fn dump_entry(&self, index: u64) -> Result<Vec<u8>> {
582 let entry_size = self.entry_size as usize;
583 let mut buf = FullEntry::new_uninit_full_entry();
584 self.file.read_at(&mut buf[0..entry_size], index * self.entry_size as u64)?;
585 Ok(buf[0..entry_size].to_vec())
586 }
587
588 pub fn query(
589 &self,
590 key: &mut TableKeyQuery,
591 index: u64,
592 log: &impl LogQuery,
593 ) -> Result<Option<(Value, bool, u32)>> {
594 let mut result = Vec::new();
595 let (rc, compressed) = self.for_parts(key, index, log, |buf| {
596 result.extend_from_slice(buf);
597 true
598 })?;
599 if rc > 0 {
600 return Ok(Some((result, compressed, rc)))
601 }
602 Ok(None)
603 }
604
605 #[allow(clippy::type_complexity)]
606 pub fn get_with_meta(
607 &self,
608 index: u64,
609 log: &impl LogQuery,
610 ) -> Result<Option<(Value, u32, [u8; PARTIAL_SIZE], bool)>> {
611 let mut query_key = Default::default();
612 if let Some((value, compressed, rc)) =
613 self.query(&mut TableKeyQuery::Fetch(Some(&mut query_key)), index, log)?
614 {
615 return Ok(Some((value, rc, query_key, compressed)))
616 }
617 Ok(None)
618 }
619
620 pub fn size(
621 &self,
622 key: &TableKey,
623 index: u64,
624 log: &impl LogQuery,
625 ) -> Result<Option<(u32, bool)>> {
626 let mut result = 0;
627 let (rc, compressed) =
628 self.for_parts(&mut TableKeyQuery::Check(key), index, log, |buf| {
629 result += buf.len() as u32;
630 true
631 })?;
632 if rc > 0 {
633 return Ok(Some((result, compressed)))
634 }
635 Ok(None)
636 }
637
638 pub fn has_key_at(&self, index: u64, key: &TableKey, log: &LogWriter) -> Result<bool> {
639 match key {
640 TableKey::Partial(k) => Ok(match self.partial_key_at(index, log)? {
641 Some(existing_key) => &existing_key[..] == key::partial_key(k),
642 None => false,
643 }),
644 TableKey::NoHash => Ok(!self.is_tombstone(index, log)?),
645 }
646 }
647
648 pub fn partial_key_at(
649 &self,
650 index: u64,
651 log: &impl LogQuery,
652 ) -> Result<Option<[u8; PARTIAL_SIZE]>> {
653 let mut query_key = Default::default();
654 let (rc, _compressed) =
655 self.for_parts(&mut TableKeyQuery::Fetch(Some(&mut query_key)), index, log, |_buf| {
656 false
657 })?;
658 Ok(if rc == 0 { None } else { Some(query_key) })
659 }
660
661 pub fn is_tombstone(&self, index: u64, log: &impl LogQuery) -> Result<bool> {
662 let mut buf = PartialKeyEntry::new_uninit();
663 let buf = if log.value(self.id, index, buf.as_mut()) {
664 &mut buf
665 } else {
666 self.file.read_at(buf.as_mut(), index * self.entry_size as u64)?;
667 &mut buf
668 };
669 Ok(buf.is_tombstone())
670 }
671
672 pub fn read_next_free(&self, index: u64, log: &LogWriter) -> Result<u64> {
673 let mut buf = PartialEntry::new_uninit();
674 let filled = self.filled.load(Ordering::Relaxed);
675 if !log.value(self.id, index, buf.as_mut()) {
676 self.file.read_at(buf.as_mut(), index * self.entry_size as u64)?;
677 }
678 buf.skip_size();
679 let next = buf.read_next();
680 if next >= filled {
681 return Err(crate::error::Error::Corruption(format!(
682 "Bad removed ref {} out of {}",
683 next, filled
684 )))
685 }
686 Ok(next)
687 }
688
689 pub fn read_next_part(&self, index: u64, log: &LogWriter) -> Result<Option<u64>> {
690 let mut buf = PartialEntry::new_uninit();
691 if !log.value(self.id, index, buf.as_mut()) {
692 self.file.read_at(buf.as_mut(), index * self.entry_size as u64)?;
693 }
694 if self.multipart && buf.is_multi(self.db_version) {
695 buf.skip_size();
696 let next = buf.read_next();
697 return Ok(Some(next))
698 }
699 Ok(None)
700 }
701
702 pub fn next_free(&self, log: &mut LogWriter) -> Result<u64> {
703 let filled = self.filled.load(Ordering::Relaxed);
704 let last_removed = self.last_removed.load(Ordering::Relaxed);
705 let index = if last_removed != 0 {
706 let next_removed = self.read_next_free(last_removed, log)?;
707 log::trace!(
708 target: "parity-db",
709 "{}: Inserting into removed slot {}",
710 self.id,
711 last_removed,
712 );
713 self.last_removed.store(next_removed, Ordering::Relaxed);
714 last_removed
715 } else {
716 log::trace!(
717 target: "parity-db",
718 "{}: Inserting into new slot {}",
719 self.id,
720 filled,
721 );
722 self.filled.store(filled + 1, Ordering::Relaxed);
723 filled
724 };
725 self.dirty_header.store(true, Ordering::Relaxed);
726 Ok(index)
727 }
728
729 fn overwrite_chain(
730 &self,
731 key: &TableKey,
732 value: &[u8],
733 log: &mut LogWriter,
734 at: Option<u64>,
735 compressed: bool,
736 ) -> Result<u64> {
737 let mut remainder = value.len() + self.ref_size() + key.encoded_size();
738 let mut offset = 0;
739 let mut start = 0;
740 assert!(self.multipart || value.len() <= self.value_size(key).unwrap() as usize);
741 let (mut index, mut follow) = match at {
742 Some(index) => (index, true),
743 None => (self.next_free(log)?, false),
744 };
745 loop {
746 let mut next_index = 0;
747 if follow {
748 match self.read_next_part(index, log)? {
750 Some(next) => {
751 next_index = next;
752 },
753 None => {
754 follow = false;
755 },
756 }
757 }
758 log::trace!(
759 target: "parity-db",
760 "{}: Writing slot {}: {}",
761 self.id,
762 index,
763 key,
764 );
765 let mut buf = FullEntry::new_uninit_full_entry();
766 let free_space = self.entry_size as usize - SIZE_SIZE;
767 let value_len = if remainder > free_space {
768 if !follow {
769 next_index = self.next_free(log)?
770 }
771 if start == 0 {
772 if compressed {
773 buf.write_multihead_compressed();
774 } else {
775 buf.write_multihead();
776 }
777 } else {
778 buf.write_multipart();
779 }
780 buf.write_next(next_index);
781 free_space - INDEX_SIZE
782 } else {
783 buf.write_size(remainder as u16, compressed);
784 remainder
785 };
786 let init_offset = buf.offset();
787 if offset == 0 {
788 if self.ref_counted {
789 buf.write_rc(1u32);
791 }
792 key.write(&mut buf);
793 }
794 let written = buf.offset() - init_offset;
795 buf.write_slice(&value[offset..offset + value_len - written]);
796 offset += value_len - written;
797 log.insert_value(self.id, index, buf[0..buf.offset()].to_vec());
798 remainder -= value_len;
799 if start == 0 {
800 start = index;
801 }
802 index = next_index;
803 if remainder == 0 {
804 if index != 0 {
805 self.clear_chain(index, log)?;
807 }
808 break
809 }
810 }
811
812 Ok(start)
813 }
814
815 fn clear_chain(&self, mut index: u64, log: &mut LogWriter) -> Result<()> {
816 loop {
817 match self.read_next_part(index, log)? {
818 Some(next) => {
819 self.clear_slot(index, log)?;
820 index = next;
821 },
822 None => {
823 self.clear_slot(index, log)?;
824 return Ok(())
825 },
826 }
827 }
828 }
829
830 fn clear_slot(&self, index: u64, log: &mut LogWriter) -> Result<()> {
831 let last_removed = self.last_removed.load(Ordering::Relaxed);
832 log::trace!(
833 target: "parity-db",
834 "{}: Freeing slot {}",
835 self.id,
836 index,
837 );
838
839 let mut buf = PartialEntry::new_uninit();
840 buf.write_tombstone();
841 buf.write_next(last_removed);
842
843 log.insert_value(self.id, index, buf[0..buf.offset()].to_vec());
844 self.last_removed.store(index, Ordering::Relaxed);
845 self.dirty_header.store(true, Ordering::Relaxed);
846 Ok(())
847 }
848
849 pub fn write_insert_plan(
850 &self,
851 key: &TableKey,
852 value: &[u8],
853 log: &mut LogWriter,
854 compressed: bool,
855 ) -> Result<u64> {
856 self.overwrite_chain(key, value, log, None, compressed)
857 }
858
859 pub fn write_replace_plan(
860 &self,
861 index: u64,
862 key: &TableKey,
863 value: &[u8],
864 log: &mut LogWriter,
865 compressed: bool,
866 ) -> Result<()> {
867 self.overwrite_chain(key, value, log, Some(index), compressed)?;
868 Ok(())
869 }
870
871 pub fn write_remove_plan(&self, index: u64, log: &mut LogWriter) -> Result<()> {
872 if self.multipart {
873 self.clear_chain(index, log)?;
874 } else {
875 self.clear_slot(index, log)?;
876 }
877 Ok(())
878 }
879
880 pub fn write_inc_ref(&self, index: u64, log: &mut LogWriter) -> Result<()> {
881 self.change_ref(index, 1, log)?;
882 Ok(())
883 }
884
885 pub fn write_dec_ref(&self, index: u64, log: &mut LogWriter) -> Result<bool> {
886 if self.change_ref(index, -1, log)? {
887 return Ok(true)
888 }
889 self.write_remove_plan(index, log)?;
890 Ok(false)
891 }
892
893 pub fn change_ref(&self, index: u64, delta: i32, log: &mut LogWriter) -> Result<bool> {
894 let mut buf = FullEntry::new_uninit_full_entry();
895 let buf = if log.value(self.id, index, buf.as_mut()) {
896 &mut buf
897 } else {
898 self.file
899 .read_at(&mut buf[0..self.entry_size as usize], index * self.entry_size as u64)?;
900 &mut buf
901 };
902
903 if buf.is_tombstone() {
904 return Ok(false)
905 }
906
907 let size = if self.multipart && buf.is_multi(self.db_version) {
908 buf.skip_size();
909 buf.skip_next();
910 self.entry_size as usize
911 } else {
912 let (size, _compressed) = buf.read_size();
913 buf.offset() + size as usize
914 };
915
916 let rc_offset = buf.offset();
917 let mut counter = buf.read_rc();
918 if delta > 0 {
919 if counter >= LOCKED_REF - delta as u32 {
920 counter = LOCKED_REF
921 } else {
922 counter += delta as u32;
923 }
924 } else if counter != LOCKED_REF {
925 counter = counter.saturating_sub(-delta as u32);
926 if counter == 0 {
927 return Ok(false)
928 }
929 }
930
931 buf.set_offset(rc_offset);
932 buf.write_rc(counter);
933 log.insert_value(self.id, index, buf[0..size].to_vec());
935 Ok(true)
936 }
937
938 pub fn enact_plan(&self, index: u64, log: &mut LogReader) -> Result<()> {
939 while index >= self.file.capacity.load(Ordering::Relaxed) {
940 self.file.grow(self.entry_size)?;
941 }
942 if index == 0 {
943 let mut header = Header::default();
944 log.read(&mut header.0)?;
945 self.file.write_at(&header.0, 0)?;
946 self.written.store(header.filled(), Ordering::Relaxed);
947 log::trace!(target: "parity-db", "{}: Enacted header, {} filled", self.id, header.filled());
948 return Ok(())
949 }
950
951 let mut buf = FullEntry::new_uninit_full_entry();
952 log.read(&mut buf[0..SIZE_SIZE])?;
953 if buf.is_tombstone() {
954 log.read(&mut buf[SIZE_SIZE..SIZE_SIZE + INDEX_SIZE])?;
955 self.file
956 .write_at(&buf[0..SIZE_SIZE + INDEX_SIZE], index * (self.entry_size as u64))?;
957 log::trace!(target: "parity-db", "{}: Enacted tombstone in slot {}", self.id, index);
958 } else if self.multipart && buf.is_multi(self.db_version) {
959 let entry_size = self.entry_size as usize;
960 log.read(&mut buf[SIZE_SIZE..entry_size])?;
961 self.file.write_at(&buf[0..entry_size], index * (entry_size as u64))?;
962 log::trace!(target: "parity-db", "{}: Enacted multipart in slot {}", self.id, index);
963 } else {
964 let (len, _compressed) = buf.read_size();
965 log.read(&mut buf[SIZE_SIZE..SIZE_SIZE + len as usize])?;
966 self.file
967 .write_at(&buf[0..(SIZE_SIZE + len as usize)], index * (self.entry_size as u64))?;
968 log::trace!(target: "parity-db", "{}: Enacted {}: {}, {} bytes", self.id, index, hex(&buf.1[6..32]), len);
969 }
970 Ok(())
971 }
972
973 pub fn validate_plan(&self, index: u64, log: &mut LogReader) -> Result<()> {
974 if index == 0 {
975 let mut header = Header::default();
976 log.read(&mut header.0)?;
977 return Ok(())
979 }
980 let mut buf = FullEntry::new_uninit_full_entry();
981 log.read(&mut buf[0..SIZE_SIZE])?;
982 if buf.is_tombstone() {
983 log.read(&mut buf[SIZE_SIZE..SIZE_SIZE + INDEX_SIZE])?;
984 log::trace!(target: "parity-db", "{}: Validated tombstone in slot {}", self.id, index);
985 } else if self.multipart && buf.is_multi(self.db_version) {
986 let entry_size = self.entry_size as usize;
987 log.read(&mut buf[SIZE_SIZE..entry_size])?;
988 log::trace!(target: "parity-db", "{}: Validated multipart in slot {}", self.id, index);
989 } else {
990 let (len, _compressed) = buf.read_size();
992 log.read(&mut buf[SIZE_SIZE..SIZE_SIZE + len as usize])?;
993 log::trace!(target: "parity-db", "{}: Validated {}: {}, {} bytes", self.id, index, hex(&buf[SIZE_SIZE..32]), len);
994 }
995 Ok(())
996 }
997
998 pub fn refresh_metadata(&self) -> Result<()> {
999 if self.file.map.read().is_none() {
1000 return Ok(())
1001 }
1002 let mut header = Header::default();
1003 self.file.read_at(&mut header.0, 0)?;
1004 let last_removed = header.last_removed();
1005 let mut filled = header.filled();
1006 if filled == 0 {
1007 filled = 1;
1008 }
1009 self.last_removed.store(last_removed, Ordering::Relaxed);
1010 self.filled.store(filled, Ordering::Relaxed);
1011 self.written.store(filled, Ordering::Relaxed);
1012 Ok(())
1013 }
1014
1015 pub fn complete_plan(&self, log: &mut LogWriter) -> Result<()> {
1016 if let Ok(true) =
1017 self.dirty_header
1018 .compare_exchange(true, false, Ordering::Relaxed, Ordering::Relaxed)
1019 {
1020 let mut buf = Header::default();
1022 let last_removed = self.last_removed.load(Ordering::Relaxed);
1023 let filled = self.filled.load(Ordering::Relaxed);
1024 buf.set_last_removed(last_removed);
1025 buf.set_filled(filled);
1026 log.insert_value(self.id, 0, buf.0.to_vec());
1027 }
1028 Ok(())
1029 }
1030
1031 pub fn flush(&self) -> Result<()> {
1032 self.file.flush()
1033 }
1034
1035 fn ref_size(&self) -> usize {
1036 if self.ref_counted {
1037 REFS_SIZE
1038 } else {
1039 0
1040 }
1041 }
1042
1043 pub fn iter_while(
1044 &self,
1045 log: &impl LogQuery,
1046 mut f: impl FnMut(u64, u32, Vec<u8>, bool) -> bool,
1047 ) -> Result<()> {
1048 let written = self.written.load(Ordering::Relaxed);
1049 for index in 1..written {
1050 let mut result = Vec::new();
1051 let mut _fetch_key = Default::default();
1053 match self.for_parts(
1054 &mut TableKeyQuery::Fetch(Some(&mut _fetch_key)),
1055 index,
1056 log,
1057 |buf| {
1058 result.extend_from_slice(buf);
1059 true
1060 },
1061 ) {
1062 Ok((rc, compressed)) =>
1063 if rc > 0 && !f(index, rc, result, compressed) {
1064 break
1065 },
1066 Err(crate::error::Error::InvalidValueData) => (), Err(e) => return Err(e),
1068 }
1069 }
1070 Ok(())
1071 }
1072
1073 pub fn is_init(&self) -> bool {
1074 self.file.map.read().is_some()
1075 }
1076
1077 pub fn init_with_entry(&self, entry: &[u8]) -> Result<()> {
1078 if let Err(e) = self.do_init_with_entry(entry) {
1079 log::error!(target: "parity-db", "Failure to initialize file {}", self.file.path.display());
1080 let _ = self.file.remove(); return Err(e)
1082 }
1083 Ok(())
1084 }
1085
1086 fn do_init_with_entry(&self, entry: &[u8]) -> Result<()> {
1087 self.file.grow(self.entry_size)?;
1088
1089 let empty_overlays = RwLock::new(LogOverlays::with_columns(0));
1090 let mut log = LogWriter::new(&empty_overlays, 0);
1091 let at = self.overwrite_chain(&TableKey::NoHash, entry, &mut log, None, false)?;
1092 self.complete_plan(&mut log)?;
1093 assert_eq!(at, 1);
1094 let log = log.drain();
1095 let change = log.local_values_changes(self.id).expect("entry written above");
1096 for (at, (_rec_id, entry)) in change.map.iter() {
1097 self.file.write_at(entry.as_slice(), *at * (self.entry_size as u64))?;
1098 }
1099 Ok(())
1100 }
1101
1102 pub fn check_free_refs(&self) -> Result<u64> {
1104 let written = self.written.load(Ordering::Relaxed);
1105 let mut next = self.last_removed.load(Ordering::Relaxed);
1106 let mut len = 0;
1107 while next != 0 {
1108 if next >= written {
1109 return Err(crate::error::Error::Corruption(format!(
1110 "Bad removed ref {} out of {}",
1111 next, written
1112 )))
1113 }
1114 let mut buf = PartialEntry::new_uninit();
1115 self.file.read_at(buf.as_mut(), next * self.entry_size as u64)?;
1116 buf.skip_size();
1117 next = buf.read_next();
1118 len += 1;
1119 }
1120 Ok(len)
1121 }
1122}
1123
1124pub mod key {
1125 use super::{EntryRef, FullEntry};
1126 use crate::{Key, Result};
1127
1128 pub const PARTIAL_SIZE: usize = 26;
1129
1130 pub fn partial_key(hash: &Key) -> &[u8] {
1131 &hash[6..]
1132 }
1133
1134 pub enum TableKey {
1135 Partial(Key),
1136 NoHash,
1137 }
1138
1139 impl TableKey {
1140 pub fn encoded_size(&self) -> usize {
1141 match self {
1142 TableKey::Partial(_) => PARTIAL_SIZE,
1143 TableKey::NoHash => 0,
1144 }
1145 }
1146
1147 pub fn index_from_partial(partial: &[u8]) -> u64 {
1148 u64::from_be_bytes((partial[0..8]).try_into().unwrap())
1149 }
1150
1151 pub fn compare(&self, fetch: &Option<[u8; PARTIAL_SIZE]>) -> bool {
1152 match (self, fetch) {
1153 (TableKey::Partial(k), Some(fetch)) => partial_key(k) == fetch,
1154 (TableKey::NoHash, _) => true,
1155 _ => false,
1156 }
1157 }
1158
1159 pub fn fetch_partial<'a>(buf: &mut EntryRef<'a>) -> Result<[u8; PARTIAL_SIZE]> {
1160 let mut result = [0u8; PARTIAL_SIZE];
1161 if buf.1.len() >= PARTIAL_SIZE {
1162 let pks = buf.read_partial();
1163 result.copy_from_slice(pks);
1164 return Ok(result)
1165 }
1166 Err(crate::error::Error::InvalidValueData)
1167 }
1168
1169 pub fn fetch<'a>(&self, buf: &mut EntryRef<'a>) -> Result<Option<[u8; PARTIAL_SIZE]>> {
1170 match self {
1171 TableKey::Partial(_k) => Ok(Some(Self::fetch_partial(buf)?)),
1172 TableKey::NoHash => Ok(None),
1173 }
1174 }
1175
1176 pub fn write(&self, buf: &mut FullEntry) {
1177 match self {
1178 TableKey::Partial(k) => {
1179 buf.write_slice(partial_key(k));
1180 },
1181 TableKey::NoHash => (),
1182 }
1183 }
1184 }
1185
1186 impl std::fmt::Display for TableKey {
1187 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
1188 match self {
1189 TableKey::Partial(k) => write!(f, "{}", crate::display::hex(k)),
1190 TableKey::NoHash => write!(f, "no_hash"),
1191 }
1192 }
1193 }
1194
1195 pub enum TableKeyQuery<'a> {
1196 Check(&'a TableKey),
1197 Fetch(Option<&'a mut [u8; PARTIAL_SIZE]>),
1198 }
1199}
1200
1201#[cfg(test)]
1202mod test {
1203 const ENTRY_SIZE: u16 = 64;
1204
1205 use super::{TableId, Value, ValueTable, MULTIPART_ENTRY_SIZE};
1206 use crate::{
1207 log::{Log, LogAction, LogWriter},
1208 options::{ColumnOptions, Options, CURRENT_VERSION},
1209 table::key::TableKey,
1210 Key,
1211 };
1212 use std::sync::{atomic::Ordering, Arc};
1213 use tempfile::{tempdir, TempDir};
1214
1215 fn new_table(dir: &TempDir, size: Option<u16>, options: &ColumnOptions) -> ValueTable {
1216 let id = TableId::new(0, 0);
1217 ValueTable::open(Arc::new(dir.path().to_path_buf()), id, size, options, CURRENT_VERSION)
1218 .unwrap()
1219 }
1220
1221 fn new_log(dir: &TempDir) -> Log {
1222 let options = Options::with_columns(dir.path(), 1);
1223 Log::open(&options).unwrap()
1224 }
1225
1226 fn write_ops<F: FnOnce(&mut LogWriter)>(table: &ValueTable, log: &Log, f: F) {
1227 let mut writer = log.begin_record();
1228 f(&mut writer);
1229 let bytes_written = log.end_record(writer.drain()).unwrap();
1230 let _ = log.read_next(false);
1231 log.flush_one(0).unwrap();
1232 let mut reader = log.read_next(false).unwrap().unwrap();
1233 loop {
1234 match reader.next().unwrap() {
1235 LogAction::BeginRecord |
1236 LogAction::InsertIndex { .. } |
1237 LogAction::DropTable { .. } => {
1238 panic!("Unexpected log entry");
1239 },
1240 LogAction::EndRecord => {
1241 let bytes_read = reader.read_bytes();
1242 assert_eq!(bytes_written, bytes_read);
1243 break
1244 },
1245 LogAction::InsertValue(insertion) => {
1246 table.enact_plan(insertion.index, &mut reader).unwrap();
1247 },
1248 }
1249 }
1250 }
1251
1252 fn key(k: u32) -> Key {
1253 use blake2::{digest::typenum::U32, Blake2b, Digest};
1254 let mut key = Key::default();
1255 let hash = Blake2b::<U32>::digest(k.to_le_bytes());
1256 key.copy_from_slice(&hash);
1257 key
1258 }
1259
1260 fn simple_key(k: Key) -> TableKey {
1261 TableKey::Partial(k)
1262 }
1263
1264 fn no_hash(_: Key) -> TableKey {
1265 TableKey::NoHash
1266 }
1267
1268 fn value(size: usize) -> Value {
1269 use rand::RngCore;
1270 let mut result = vec![0; size];
1271 rand::thread_rng().fill_bytes(&mut result);
1272 result
1273 }
1274
1275 fn rc_options() -> ColumnOptions {
1276 ColumnOptions { ref_counted: true, ..Default::default() }
1277 }
1278
1279 #[test]
1280 fn insert_simple() {
1281 insert_simple_inner(&Default::default());
1282 insert_simple_inner(&rc_options());
1283 }
1284 fn insert_simple_inner(options: &ColumnOptions) {
1285 let dir = tempdir().unwrap();
1286 let table = new_table(&dir, Some(ENTRY_SIZE), options);
1287 let log = new_log(&dir);
1288
1289 let key = key(1);
1290 let key = TableKey::Partial(key);
1291 let key = &key;
1292 let val = value(19);
1293 let compressed = true;
1294
1295 write_ops(&table, &log, |writer| {
1296 table.write_insert_plan(key, &val, writer, compressed).unwrap();
1297 assert_eq!(table.get(key, 1, writer).unwrap(), Some((val.clone(), compressed)));
1298 });
1299
1300 assert_eq!(table.get(key, 1, log.overlays()).unwrap(), Some((val, compressed)));
1301 assert_eq!(table.filled.load(Ordering::Relaxed), 2);
1302 }
1303
1304 #[test]
1305 #[should_panic(expected = "assertion failed: entry_size <= MAX_ENTRY_SIZE as u16")]
1306 fn oversized_into_fixed_panics() {
1307 let dir = tempdir().unwrap();
1308 let _table = new_table(&dir, Some(65534), &Default::default());
1309 }
1310
1311 #[test]
1312 fn remove_simple() {
1313 remove_simple_inner(&Default::default());
1314 remove_simple_inner(&rc_options());
1315 }
1316 fn remove_simple_inner(options: &ColumnOptions) {
1317 let dir = tempdir().unwrap();
1318 let table = new_table(&dir, Some(ENTRY_SIZE), options);
1319 let log = new_log(&dir);
1320
1321 let key1 = key(1);
1322 let key1 = &TableKey::Partial(key1);
1323 let key2 = key(2);
1324 let key2 = &TableKey::Partial(key2);
1325 let val1 = value(11);
1326 let val2 = value(21);
1327 let compressed = false;
1328
1329 write_ops(&table, &log, |writer| {
1330 table.write_insert_plan(key1, &val1, writer, compressed).unwrap();
1331 table.write_insert_plan(key2, &val2, writer, compressed).unwrap();
1332 });
1333
1334 write_ops(&table, &log, |writer| {
1335 table.write_remove_plan(1, writer).unwrap();
1336 });
1337
1338 assert_eq!(table.get(key1, 1, log.overlays()).unwrap(), None);
1339 assert_eq!(table.last_removed.load(Ordering::Relaxed), 1);
1340
1341 write_ops(&table, &log, |writer| {
1342 table.write_insert_plan(key1, &val1, writer, compressed).unwrap();
1343 });
1344 assert_eq!(table.get(key1, 1, log.overlays()).unwrap(), Some((val1, compressed)));
1345 assert_eq!(table.last_removed.load(Ordering::Relaxed), 0);
1346 }
1347
1348 #[test]
1349 fn replace_simple() {
1350 replace_simple_inner(&Default::default(), simple_key);
1351 replace_simple_inner(&rc_options(), simple_key);
1352 replace_simple_inner(&Default::default(), no_hash);
1353 replace_simple_inner(&rc_options(), no_hash);
1354 }
1355 fn replace_simple_inner(options: &ColumnOptions, table_key: fn(Key) -> TableKey) {
1356 let dir = tempdir().unwrap();
1357 let table = new_table(&dir, Some(ENTRY_SIZE), options);
1358 let log = new_log(&dir);
1359
1360 let key1 = key(1);
1361 let key1 = &table_key(key1);
1362 let key2 = key(2);
1363 let key2 = &table_key(key2);
1364 let val1 = value(11);
1365 let val2 = value(21);
1366 let val3 = value(26); let compressed = true;
1368
1369 write_ops(&table, &log, |writer| {
1370 table.write_insert_plan(key1, &val1, writer, compressed).unwrap();
1371 table.write_insert_plan(key2, &val2, writer, compressed).unwrap();
1372 });
1373
1374 write_ops(&table, &log, |writer| {
1375 table.write_replace_plan(1, key2, &val3, writer, false).unwrap();
1376 });
1377
1378 assert_eq!(table.get(key2, 1, log.overlays()).unwrap(), Some((val3, false)));
1379 assert_eq!(table.last_removed.load(Ordering::Relaxed), 0);
1380 }
1381
1382 #[test]
1383 fn replace_multipart_shorter() {
1384 replace_multipart_shorter_inner(&Default::default());
1385 replace_multipart_shorter_inner(&rc_options());
1386 }
1387 fn replace_multipart_shorter_inner(options: &ColumnOptions) {
1388 let dir = tempdir().unwrap();
1389 let table = new_table(&dir, None, options);
1390 let log = new_log(&dir);
1391
1392 let key1 = key(1);
1393 let key1 = &TableKey::Partial(key1);
1394 let key2 = key(2);
1395 let key2 = &TableKey::Partial(key2);
1396 let val1 = value(20000);
1397 let val2 = value(30);
1398 let val1s = value(5000);
1399 let compressed = false;
1400
1401 write_ops(&table, &log, |writer| {
1402 table.write_insert_plan(key1, &val1, writer, compressed).unwrap();
1403 table.write_insert_plan(key2, &val2, writer, compressed).unwrap();
1404 });
1405
1406 assert_eq!(table.get(key1, 1, log.overlays()).unwrap(), Some((val1, compressed)));
1407 assert_eq!(table.last_removed.load(Ordering::Relaxed), 0);
1408 assert_eq!(table.filled.load(Ordering::Relaxed), 7);
1409
1410 write_ops(&table, &log, |writer| {
1411 table.write_replace_plan(1, key1, &val1s, writer, compressed).unwrap();
1412 });
1413 assert_eq!(table.get(key1, 1, log.overlays()).unwrap(), Some((val1s, compressed)));
1414 assert_eq!(table.last_removed.load(Ordering::Relaxed), 5);
1415 write_ops(&table, &log, |writer| {
1416 assert_eq!(table.read_next_free(5, writer).unwrap(), 4);
1417 assert_eq!(table.read_next_free(4, writer).unwrap(), 3);
1418 assert_eq!(table.read_next_free(3, writer).unwrap(), 0);
1419 });
1420 }
1421
1422 #[test]
1423 fn replace_multipart_longer() {
1424 replace_multipart_longer_inner(&Default::default());
1425 replace_multipart_longer_inner(&rc_options());
1426 }
1427 fn replace_multipart_longer_inner(options: &ColumnOptions) {
1428 let dir = tempdir().unwrap();
1429 let table = new_table(&dir, None, options);
1430 let log = new_log(&dir);
1431
1432 let key1 = key(1);
1433 let key1 = &TableKey::Partial(key1);
1434 let key2 = key(2);
1435 let key2 = &TableKey::Partial(key2);
1436 let val1 = value(5000);
1437 let val2 = value(30);
1438 let val1l = value(20000);
1439 let compressed = false;
1440
1441 write_ops(&table, &log, |writer| {
1442 table.write_insert_plan(key1, &val1, writer, compressed).unwrap();
1443 table.write_insert_plan(key2, &val2, writer, compressed).unwrap();
1444 });
1445
1446 assert_eq!(table.get(key1, 1, log.overlays()).unwrap(), Some((val1, compressed)));
1447 assert_eq!(table.last_removed.load(Ordering::Relaxed), 0);
1448 assert_eq!(table.filled.load(Ordering::Relaxed), 4);
1449
1450 write_ops(&table, &log, |writer| {
1451 table.write_replace_plan(1, key1, &val1l, writer, compressed).unwrap();
1452 });
1453 assert_eq!(table.get(key1, 1, log.overlays()).unwrap(), Some((val1l, compressed)));
1454 assert_eq!(table.last_removed.load(Ordering::Relaxed), 0);
1455 assert_eq!(table.filled.load(Ordering::Relaxed), 7);
1456 }
1457
1458 #[test]
1459 fn ref_counting() {
1460 for compressed in [false, true] {
1461 let dir = tempdir().unwrap();
1462 let table = new_table(&dir, None, &rc_options());
1463 let log = new_log(&dir);
1464
1465 let key = key(1);
1466 let key = &TableKey::Partial(key);
1467 let val = value(5000);
1468
1469 write_ops(&table, &log, |writer| {
1470 table.write_insert_plan(key, &val, writer, compressed).unwrap();
1471 table.write_inc_ref(1, writer).unwrap();
1472 });
1473 assert_eq!(table.get(key, 1, log.overlays()).unwrap(), Some((val.clone(), compressed)));
1474 write_ops(&table, &log, |writer| {
1475 table.write_dec_ref(1, writer).unwrap();
1476 });
1477 assert_eq!(table.get(key, 1, log.overlays()).unwrap(), Some((val, compressed)));
1478 write_ops(&table, &log, |writer| {
1479 table.write_dec_ref(1, writer).unwrap();
1480 });
1481 assert_eq!(table.get(key, 1, log.overlays()).unwrap(), None);
1482 }
1483 }
1484
1485 #[test]
1486 fn iteration() {
1487 for multipart in [false, true] {
1488 for compressed in [false, true] {
1489 let (entry_size, size_mul) =
1490 if multipart { (None, 100) } else { (Some(MULTIPART_ENTRY_SIZE / 2), 1) };
1491
1492 let dir = tempdir().unwrap();
1493 let table = new_table(&dir, entry_size, &rc_options());
1494 let log = new_log(&dir);
1495
1496 let (v1, v2, v3) = (
1497 value(MULTIPART_ENTRY_SIZE as usize / 8 * size_mul),
1498 value(MULTIPART_ENTRY_SIZE as usize / 4 * size_mul),
1499 value(MULTIPART_ENTRY_SIZE as usize * 3 / 8 * size_mul),
1500 );
1501 let entries = [
1502 (TableKey::Partial(key(1)), &v1),
1503 (TableKey::Partial(key(2)), &v2),
1504 (TableKey::Partial(key(3)), &v3),
1505 ];
1506
1507 write_ops(&table, &log, |writer| {
1508 for (k, v) in &entries {
1509 table.write_insert_plan(k, &v, writer, compressed).unwrap();
1510 }
1511 table.complete_plan(writer).unwrap();
1512 });
1513
1514 let mut res = Vec::new();
1515 table
1516 .iter_while(log.overlays(), |_index, _rc, v, cmpr| {
1517 res.push((v.len(), cmpr));
1518 true
1519 })
1520 .unwrap();
1521 assert_eq!(
1522 res,
1523 vec![(v1.len(), compressed), (v2.len(), compressed), (v3.len(), compressed)]
1524 );
1525
1526 let v2_index = 2 + v1.len() as u64 / super::MULTIPART_ENTRY_SIZE as u64;
1527 write_ops(&table, &log, |writer| {
1528 table.write_remove_plan(v2_index, writer).unwrap();
1529 });
1530
1531 let mut res = Vec::new();
1532 table
1533 .iter_while(log.overlays(), |_index, _rc, v, cmpr| {
1534 res.push((v.len(), cmpr));
1535 true
1536 })
1537 .unwrap();
1538 assert_eq!(res, vec![(v1.len(), compressed), (v3.len(), compressed)]);
1539 }
1540 }
1541 }
1542
1543 #[test]
1544 fn ref_underflow() {
1545 let dir = tempdir().unwrap();
1546 let table = new_table(&dir, Some(ENTRY_SIZE), &rc_options());
1547 let log = new_log(&dir);
1548
1549 let key = key(1);
1550 let key = &TableKey::Partial(key);
1551 let val = value(10);
1552
1553 let compressed = false;
1554 write_ops(&table, &log, |writer| {
1555 table.write_insert_plan(key, &val, writer, compressed).unwrap();
1556 table.write_inc_ref(1, writer).unwrap();
1557 });
1558 assert_eq!(table.get(key, 1, log.overlays()).unwrap(), Some((val, compressed)));
1559 write_ops(&table, &log, |writer| {
1560 table.write_dec_ref(1, writer).unwrap();
1561 table.write_dec_ref(1, writer).unwrap();
1562 table.write_dec_ref(1, writer).unwrap();
1563 });
1564 assert_eq!(table.get(key, 1, log.overlays()).unwrap(), None);
1565 }
1566
1567 #[test]
1568 fn multipart_collision() {
1569 use super::MAX_ENTRY_SIZE;
1570 let mut entry = super::Entry::new(super::MULTIPART.to_vec());
1571 let size = entry.read_size().0 as usize;
1572 assert!(size > MAX_ENTRY_SIZE);
1573 let mut entry = super::Entry::new(super::MULTIHEAD.to_vec());
1574 let size = entry.read_size().0 as usize;
1575 assert!(size > MAX_ENTRY_SIZE);
1576 let mut entry = super::Entry::new(super::MULTIHEAD_COMPRESSED.to_vec());
1577 let size = entry.read_size().0 as usize;
1578 assert!(size > MAX_ENTRY_SIZE);
1579 let dir = tempdir().unwrap();
1580 let table = new_table(&dir, Some(MAX_ENTRY_SIZE as u16), &rc_options());
1581 let log = new_log(&dir);
1582
1583 let key = key(1);
1584 let key = &TableKey::Partial(key);
1585 let val = value(32225); let compressed = true;
1588 write_ops(&table, &log, |writer| {
1589 table.write_insert_plan(key, &val, writer, compressed).unwrap();
1590 });
1591 assert_eq!(table.get(key, 1, log.overlays()).unwrap(), Some((val, compressed)));
1592 write_ops(&table, &log, |writer| {
1593 table.write_dec_ref(1, writer).unwrap();
1594 });
1595 assert_eq!(table.last_removed.load(Ordering::Relaxed), 1);
1596
1597 let value_size = table.value_size(key).unwrap();
1599 assert_eq!(0x7fd8, table.value_size(key).unwrap()); let val = value(value_size as usize); write_ops(&table, &log, |writer| {
1602 table.write_insert_plan(key, &val, writer, compressed).unwrap();
1603 });
1604 assert_eq!(table.get(key, 1, log.overlays()).unwrap(), Some((val, compressed)));
1605 }
1606
1607 #[test]
1608 fn bad_size_header() {
1609 let dir = tempdir().unwrap();
1610 let table = new_table(&dir, Some(36), &rc_options());
1611 let log = new_log(&dir);
1612
1613 let key = &TableKey::Partial(key(1));
1614 let val = value(4);
1615
1616 let compressed = false;
1617 write_ops(&table, &log, |writer| {
1618 table.write_insert_plan(key, &val, writer, compressed).unwrap();
1619 });
1620 let zeroes = [0u8, 0u8];
1622 table.file.write_at(&zeroes, table.entry_size as u64).unwrap();
1623 let log = new_log(&dir);
1624 assert!(matches!(
1625 table.get(key, 1, log.overlays()),
1626 Err(crate::error::Error::Corruption(_))
1627 ));
1628 }
1629}