parity_db/
table.rs

1// Copyright 2021-2022 Parity Technologies (UK) Ltd.
2// This file is dual-licensed as Apache-2.0 or MIT.
3
4// On disk data layout for value tables.
5// All numerical values are little endian.
6//
7// Entry 0 (metadata)
8// [LAST_REMOVED: 8][FILLED: 8]
9// LAST_REMOVED - 64-bit index of removed entries linked list head
10// FILLED - highest index filled with live data
11//
12// Complete entry:
13// [SIZE: 2][REFS: 4][KEY: 26][VALUE]
14// SIZE: 15-bit value size. Sizes up to 0x7ffc are allowed.
15// This includes size of REFS and KEY.
16// The highest bit is reserved to indicate if compression is applied.
17// REF: 32-bit reference counter (optional).
18// KEY: lower 26 bytes of the key (optional for btree nodes).
19// VALUE: payload bytes.
20//
21// Partial entry (first part):
22// [MULTIHEAD: 2][NEXT: 8][REFS: 4][KEY: 26][VALUE]
23// MULTIHEAD - Split entry head marker. 0xfffd.
24// NEXT - 64-bit index of the entry that holds the next part.
25// take all available space in this entry.
26// REF: 32-bit reference counter (optional).
27// KEY: lower 26 bytes of the key (optional for btree nodes).
28// VALUE: The rest of the entry is filled with payload bytes.
29//
30// Partial entry (continuation):
31// [MULTIPART: 2][NEXT: 8][VALUE]
32// MULTIPART - Split entry marker. 0xfffe.
33// NEXT - 64-bit index of the entry that holds the next part.
34// VALUE: The rest of the entry is filled with payload bytes.
35//
36// Partial entry (last part):
37// [SIZE: 2][VALUE: SIZE]
38// SIZE: 15-bit size of the remaining payload.
39// The highest bit is reserved to indicate if compression is applied.
40// VALUE: SIZE payload bytes.
41//
42// Deleted entry
43// [TOMBSTONE: 2][NEXT: 8]
44// TOMBSTONE - Deleted entry marker. 0xffff
45// NEXT - 64-bit index of the next deleted entry.
46
47use crate::{
48	column::ColId,
49	display::hex,
50	error::Result,
51	log::{LogOverlays, LogQuery, LogReader, LogWriter},
52	options::ColumnOptions as Options,
53	parking_lot::RwLock,
54	table::key::{TableKey, TableKeyQuery, PARTIAL_SIZE},
55};
56use std::{
57	convert::TryInto,
58	mem::MaybeUninit,
59	sync::{
60		atomic::{AtomicBool, AtomicU64, Ordering},
61		Arc,
62	},
63};
64
65pub const SIZE_TIERS: usize = 1usize << SIZE_TIERS_BITS;
66pub const SIZE_TIERS_BITS: u8 = 8;
67pub const COMPRESSED_MASK: u16 = 0x80_00;
68pub const MAX_ENTRY_SIZE: usize = 0x7ff8; // Actual max size in V4 was 0x7dfe
69pub const MIN_ENTRY_SIZE: usize = 32;
70const REFS_SIZE: usize = 4;
71const SIZE_SIZE: usize = 2;
72const INDEX_SIZE: usize = 8;
73const MAX_ENTRY_BUF_SIZE: usize = 0x8000;
74
75const TOMBSTONE: &[u8] = &[0xff, 0xff];
76const MULTIPART_V4: &[u8] = &[0xff, 0xfe];
77const MULTIHEAD_V4: &[u8] = &[0xff, 0xfd];
78const MULTIPART: &[u8] = &[0xfe, 0xff];
79const MULTIHEAD: &[u8] = &[0xfd, 0xff];
80const MULTIHEAD_COMPRESSED: &[u8] = &[0xfd, 0x7f];
81// When a rc reach locked ref, it is locked in db.
82const LOCKED_REF: u32 = u32::MAX;
83
84const MULTIPART_ENTRY_SIZE: u16 = 4096;
85
86pub type Value = Vec<u8>;
87
88#[derive(Debug, Clone, Copy, Eq, PartialEq, Hash)]
89pub struct TableId(u16);
90
91impl TableId {
92	pub fn new(col: ColId, size_tier: u8) -> TableId {
93		TableId(((col as u16) << 8) | size_tier as u16)
94	}
95
96	pub fn from_u16(id: u16) -> TableId {
97		TableId(id)
98	}
99
100	pub fn col(&self) -> ColId {
101		(self.0 >> 8) as ColId
102	}
103
104	pub fn size_tier(&self) -> u8 {
105		(self.0 & 0xff) as u8
106	}
107
108	pub fn file_name(&self) -> String {
109		format!("table_{:02}_{}", self.col(), hex(&[self.size_tier()]))
110	}
111
112	pub fn is_file_name(col: ColId, name: &str) -> bool {
113		name.starts_with(&format!("table_{col:02}_"))
114	}
115
116	pub fn as_u16(&self) -> u16 {
117		self.0
118	}
119
120	pub fn log_index(&self) -> usize {
121		self.col() as usize * SIZE_TIERS + self.size_tier() as usize
122	}
123
124	pub const fn max_log_tables(num_columns: usize) -> usize {
125		SIZE_TIERS * num_columns
126	}
127
128	pub fn from_log_index(i: usize) -> Self {
129		let col = i / SIZE_TIERS;
130		let tier = i % SIZE_TIERS;
131		Self::new(col as ColId, tier as u8)
132	}
133}
134
135impl std::fmt::Display for TableId {
136	fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
137		write!(f, "t{:02}-{:02}", self.col(), hex(&[self.size_tier()]))
138	}
139}
140
141#[derive(Debug)]
142pub struct ValueTable {
143	pub id: TableId,
144	pub entry_size: u16,
145	file: crate::file::TableFile,
146	filled: AtomicU64,  // Number of entries from the POV of the log overlay.
147	written: AtomicU64, // Actual number of entries on disk.
148	last_removed: AtomicU64,
149	dirty_header: AtomicBool,
150	multipart: bool,
151	ref_counted: bool,
152	db_version: u32,
153}
154
155#[derive(Default, Clone, Copy)]
156struct Header([u8; 16]);
157
158impl Header {
159	fn last_removed(&self) -> u64 {
160		u64::from_le_bytes(self.0[0..INDEX_SIZE].try_into().unwrap())
161	}
162	fn set_last_removed(&mut self, last_removed: u64) {
163		self.0[0..INDEX_SIZE].copy_from_slice(&last_removed.to_le_bytes());
164	}
165	fn filled(&self) -> u64 {
166		u64::from_le_bytes(self.0[INDEX_SIZE..INDEX_SIZE * 2].try_into().unwrap())
167	}
168	fn set_filled(&mut self, filled: u64) {
169		self.0[INDEX_SIZE..INDEX_SIZE * 2].copy_from_slice(&filled.to_le_bytes());
170	}
171}
172
173pub struct Entry<B: AsRef<[u8]>>(usize, B);
174#[cfg(feature = "loom")]
175pub type FullEntry = Entry<Vec<u8>>;
176#[cfg(not(feature = "loom"))]
177pub type FullEntry = Entry<[u8; MAX_ENTRY_BUF_SIZE]>;
178pub type EntryRef<'a> = Entry<&'a [u8]>;
179type PartialEntry = Entry<[u8; 10]>;
180type PartialKeyEntry = Entry<[u8; 40]>; // 2 + 4 + 26 + 8
181
182impl<const C: usize> Entry<[u8; C]> {
183	#[inline(always)]
184	#[allow(clippy::uninit_assumed_init)]
185	pub fn new_uninit() -> Self {
186		Entry(0, unsafe { MaybeUninit::uninit().assume_init() })
187	}
188}
189
190#[cfg(feature = "loom")]
191impl Entry<Vec<u8>> {
192	pub fn new_uninit_full_entry() -> Self {
193		Entry(0, vec![0; MAX_ENTRY_BUF_SIZE])
194	}
195}
196
197#[cfg(not(feature = "loom"))]
198impl Entry<[u8; MAX_ENTRY_BUF_SIZE]> {
199	pub fn new_uninit_full_entry() -> Self {
200		Self::new_uninit()
201	}
202}
203
204impl<B: AsRef<[u8]>> Entry<B> {
205	#[inline(always)]
206	pub fn check_remaining_len(
207		&self,
208		len: usize,
209		error: impl Fn() -> crate::error::Error,
210	) -> Result<()> {
211		if self.0 + len > self.1.as_ref().len() {
212			return Err(error())
213		}
214		Ok(())
215	}
216
217	#[inline(always)]
218	pub fn new(data: B) -> Self {
219		Entry(0, data)
220	}
221
222	pub fn set_offset(&mut self, offset: usize) {
223		self.0 = offset;
224	}
225
226	pub fn offset(&self) -> usize {
227		self.0
228	}
229
230	pub fn read_slice(&mut self, size: usize) -> &[u8] {
231		let start = self.0;
232		self.0 += size;
233		&self.1.as_ref()[start..self.0]
234	}
235
236	fn is_tombstone(&self) -> bool {
237		&self.1.as_ref()[0..SIZE_SIZE] == TOMBSTONE
238	}
239
240	fn is_multipart(&self) -> bool {
241		&self.1.as_ref()[0..SIZE_SIZE] == MULTIPART
242	}
243
244	fn is_multipart_v4(&self) -> bool {
245		&self.1.as_ref()[0..SIZE_SIZE] == MULTIPART_V4
246	}
247
248	fn is_multihead_compressed(&self) -> bool {
249		&self.1.as_ref()[0..SIZE_SIZE] == MULTIHEAD_COMPRESSED
250	}
251
252	fn is_multihead(&self) -> bool {
253		self.is_multihead_compressed() || &self.1.as_ref()[0..SIZE_SIZE] == MULTIHEAD
254	}
255
256	fn is_multihead_v4(&self) -> bool {
257		&self.1.as_ref()[0..SIZE_SIZE] == MULTIHEAD_V4
258	}
259
260	fn is_multi(&self, db_version: u32) -> bool {
261		self.is_multipart() ||
262			self.is_multihead() ||
263			(db_version <= 4 && (self.is_multipart_v4() || self.is_multihead_v4()))
264	}
265
266	fn read_size(&mut self) -> (u16, bool) {
267		let size = u16::from_le_bytes(self.read_slice(SIZE_SIZE).try_into().unwrap());
268		let compressed = (size & COMPRESSED_MASK) > 0;
269		(size & !COMPRESSED_MASK, compressed)
270	}
271
272	fn skip_size(&mut self) {
273		self.0 += SIZE_SIZE;
274	}
275
276	pub fn read_u64(&mut self) -> u64 {
277		u64::from_le_bytes(self.read_slice(8).try_into().unwrap())
278	}
279
280	fn read_next(&mut self) -> u64 {
281		self.read_u64()
282	}
283
284	pub fn skip_u64(&mut self) {
285		self.0 += 8;
286	}
287
288	pub fn skip_next(&mut self) {
289		self.skip_u64()
290	}
291
292	pub fn read_u32(&mut self) -> u32 {
293		u32::from_le_bytes(self.read_slice(REFS_SIZE).try_into().unwrap())
294	}
295
296	fn read_rc(&mut self) -> u32 {
297		self.read_u32()
298	}
299
300	fn read_partial(&mut self) -> &[u8] {
301		self.read_slice(PARTIAL_SIZE)
302	}
303
304	fn remaining_to(&self, end: usize) -> &[u8] {
305		&self.1.as_ref()[self.0..end]
306	}
307}
308
309impl<B: AsRef<[u8]> + AsMut<[u8]>> Entry<B> {
310	pub fn write_slice(&mut self, buf: &[u8]) {
311		let start = self.0;
312		self.0 += buf.len();
313		self.1.as_mut()[start..self.0].copy_from_slice(buf);
314	}
315
316	fn write_tombstone(&mut self) {
317		self.write_slice(TOMBSTONE);
318	}
319
320	fn write_multipart(&mut self) {
321		self.write_slice(MULTIPART);
322	}
323
324	fn write_multihead(&mut self) {
325		self.write_slice(MULTIHEAD);
326	}
327
328	fn write_multihead_compressed(&mut self) {
329		self.write_slice(MULTIHEAD_COMPRESSED);
330	}
331
332	fn write_size(&mut self, mut size: u16, compressed: bool) {
333		if compressed {
334			size |= COMPRESSED_MASK;
335		}
336		self.write_slice(&size.to_le_bytes());
337	}
338	pub fn write_u64(&mut self, next_index: u64) {
339		self.write_slice(&next_index.to_le_bytes());
340	}
341
342	fn write_next(&mut self, next_index: u64) {
343		self.write_u64(next_index)
344	}
345
346	pub fn write_u32(&mut self, next_index: u32) {
347		self.write_slice(&next_index.to_le_bytes());
348	}
349
350	fn write_rc(&mut self, rc: u32) {
351		self.write_slice(&rc.to_le_bytes());
352	}
353
354	pub fn inner_mut(&mut self) -> &mut B {
355		&mut self.1
356	}
357}
358
359impl<B: AsRef<[u8]> + AsMut<[u8]>> AsMut<[u8]> for Entry<B> {
360	fn as_mut(&mut self) -> &mut [u8] {
361		self.1.as_mut()
362	}
363}
364
365impl<B: AsRef<[u8]>> AsRef<[u8]> for Entry<B> {
366	fn as_ref(&self) -> &[u8] {
367		self.1.as_ref()
368	}
369}
370
371impl<B: AsRef<[u8]> + AsMut<[u8]>> std::ops::Index<std::ops::Range<usize>> for Entry<B> {
372	type Output = [u8];
373
374	fn index(&self, index: std::ops::Range<usize>) -> &[u8] {
375		&self.1.as_ref()[index]
376	}
377}
378
379impl<B: AsRef<[u8]> + AsMut<[u8]>> std::ops::IndexMut<std::ops::Range<usize>> for Entry<B> {
380	fn index_mut(&mut self, index: std::ops::Range<usize>) -> &mut [u8] {
381		&mut self.1.as_mut()[index]
382	}
383}
384
385enum LockedSlice<O: std::ops::Deref<Target = [u8]>, F: std::ops::Deref<Target = [u8]>> {
386	FromOverlay(O),
387	FromFile(F),
388}
389
390impl<O: std::ops::Deref<Target = [u8]>, F: std::ops::Deref<Target = [u8]>> LockedSlice<O, F> {
391	fn as_slice(&self) -> &[u8] {
392		match self {
393			LockedSlice::FromOverlay(slice) => &*slice,
394			LockedSlice::FromFile(slice) => &*slice,
395		}
396	}
397}
398
399impl ValueTable {
400	pub fn open(
401		path: Arc<std::path::PathBuf>,
402		id: TableId,
403		entry_size: Option<u16>,
404		options: &Options,
405		db_version: u32,
406	) -> Result<ValueTable> {
407		let (multipart, entry_size) = match entry_size {
408			Some(s) => (false, s),
409			None => (true, MULTIPART_ENTRY_SIZE),
410		};
411		assert!(entry_size >= MIN_ENTRY_SIZE as u16);
412		assert!(entry_size <= MAX_ENTRY_SIZE as u16);
413
414		let mut filepath: std::path::PathBuf = std::path::PathBuf::clone(&*path);
415		filepath.push(id.file_name());
416		let file = crate::file::TableFile::open(filepath, entry_size, id)?;
417		let mut filled = 1;
418		let mut last_removed = 0;
419		if file.map.read().is_some() {
420			let mut header = Header::default();
421			file.read_at(&mut header.0, 0)?;
422			last_removed = header.last_removed();
423			filled = header.filled();
424			if filled == 0 {
425				filled = 1;
426			}
427			if last_removed >= filled {
428				return Err(crate::error::Error::Corruption(format!(
429					"Bad removed ref {} out of {}",
430					last_removed, filled
431				)))
432			}
433			log::debug!(target: "parity-db", "Opened value table {} with {} entries, entry_size={}, removed={}", id, filled, entry_size, last_removed);
434		}
435
436		Ok(ValueTable {
437			id,
438			entry_size,
439			file,
440			filled: AtomicU64::new(filled),
441			written: AtomicU64::new(filled),
442			last_removed: AtomicU64::new(last_removed),
443			dirty_header: AtomicBool::new(false),
444			multipart,
445			ref_counted: options.ref_counted,
446			db_version,
447		})
448	}
449
450	pub fn value_size(&self, key: &TableKey) -> Option<u16> {
451		let base = self.entry_size - SIZE_SIZE as u16 - self.ref_size() as u16;
452		let k_encoded = key.encoded_size() as u16;
453		if base < k_encoded {
454			None
455		} else {
456			Some(base - k_encoded)
457		}
458	}
459
460	// Return ref counter, partial key and if the value is compressed.
461	#[inline(always)]
462	fn for_parts(
463		&self,
464		key: &mut TableKeyQuery,
465		mut index: u64,
466		log: &impl LogQuery,
467		mut f: impl FnMut(&[u8]) -> bool,
468	) -> Result<(u32, bool)> {
469		let mut part = 0;
470		let mut compressed = false;
471		let mut rc = 1;
472		let entry_size = self.entry_size as usize;
473		loop {
474			let vbuf = log.value_ref(self.id, index);
475			let buf: LockedSlice<_, _> = if let Some(buf) = vbuf.as_deref() {
476				log::trace!(
477					target: "parity-db",
478					"{}: Found in overlay {}",
479					self.id,
480					index,
481				);
482				LockedSlice::FromOverlay(buf)
483			} else {
484				log::trace!(
485					target: "parity-db",
486					"{}: Query slot {}",
487					self.id,
488					index,
489				);
490				let vbuf = self.file.slice_at(index * self.entry_size as u64, entry_size);
491				LockedSlice::FromFile(vbuf)
492			};
493			let mut buf = EntryRef::new(buf.as_slice());
494
495			if buf.is_tombstone() {
496				return Ok((0, false))
497			}
498
499			if self.multipart && part == 0 && !buf.is_multihead() {
500				// This may only happen during value iteration.
501				return Ok((0, false))
502			}
503
504			let (entry_end, next) = if self.multipart && buf.is_multi(self.db_version) {
505				if part == 0 && self.db_version > 6 && buf.is_multihead_compressed() {
506					compressed = true;
507				}
508				buf.skip_size();
509				let next = buf.read_next();
510				(entry_size, next)
511			} else {
512				let (size, read_compressed) = buf.read_size();
513				if part == 0 || self.db_version <= 6 {
514					compressed = read_compressed;
515				}
516				(buf.offset() + size as usize, 0)
517			};
518
519			if part == 0 {
520				if self.ref_counted {
521					rc = buf.read_rc();
522				}
523				match key {
524					TableKeyQuery::Fetch(Some(to_fetch)) => {
525						**to_fetch = TableKey::fetch_partial(&mut buf)?;
526					},
527					TableKeyQuery::Fetch(None) => (),
528					TableKeyQuery::Check(k) => {
529						let to_fetch = k.fetch(&mut buf)?;
530						if !k.compare(&to_fetch) {
531							log::debug!(
532								target: "parity-db",
533								"{}: Key mismatch at {}. Expected {}, got {:?}, size = {}",
534								self.id,
535								index,
536								k,
537								to_fetch.as_ref().map(hex),
538								self.entry_size,
539							);
540							return Ok((0, false))
541						}
542					},
543				}
544			}
545
546			if buf.offset() > entry_end {
547				return Err(crate::error::Error::Corruption(format!(
548					"Unexpected entry size. Expected at least {} bytes",
549					buf.offset() - 2
550				)))
551			}
552
553			if !f(buf.remaining_to(entry_end)) {
554				break
555			};
556
557			if next == 0 {
558				break
559			}
560			part += 1;
561			index = next;
562		}
563		Ok((rc, compressed))
564	}
565
566	pub fn get(
567		&self,
568		key: &TableKey,
569		index: u64,
570		log: &impl LogQuery,
571	) -> Result<Option<(Value, bool)>> {
572		if let Some((value, compressed, _)) =
573			self.query(&mut TableKeyQuery::Check(key), index, log)?
574		{
575			Ok(Some((value, compressed)))
576		} else {
577			Ok(None)
578		}
579	}
580
581	pub fn dump_entry(&self, index: u64) -> Result<Vec<u8>> {
582		let entry_size = self.entry_size as usize;
583		let mut buf = FullEntry::new_uninit_full_entry();
584		self.file.read_at(&mut buf[0..entry_size], index * self.entry_size as u64)?;
585		Ok(buf[0..entry_size].to_vec())
586	}
587
588	pub fn query(
589		&self,
590		key: &mut TableKeyQuery,
591		index: u64,
592		log: &impl LogQuery,
593	) -> Result<Option<(Value, bool, u32)>> {
594		let mut result = Vec::new();
595		let (rc, compressed) = self.for_parts(key, index, log, |buf| {
596			result.extend_from_slice(buf);
597			true
598		})?;
599		if rc > 0 {
600			return Ok(Some((result, compressed, rc)))
601		}
602		Ok(None)
603	}
604
605	#[allow(clippy::type_complexity)]
606	pub fn get_with_meta(
607		&self,
608		index: u64,
609		log: &impl LogQuery,
610	) -> Result<Option<(Value, u32, [u8; PARTIAL_SIZE], bool)>> {
611		let mut query_key = Default::default();
612		if let Some((value, compressed, rc)) =
613			self.query(&mut TableKeyQuery::Fetch(Some(&mut query_key)), index, log)?
614		{
615			return Ok(Some((value, rc, query_key, compressed)))
616		}
617		Ok(None)
618	}
619
620	pub fn size(
621		&self,
622		key: &TableKey,
623		index: u64,
624		log: &impl LogQuery,
625	) -> Result<Option<(u32, bool)>> {
626		let mut result = 0;
627		let (rc, compressed) =
628			self.for_parts(&mut TableKeyQuery::Check(key), index, log, |buf| {
629				result += buf.len() as u32;
630				true
631			})?;
632		if rc > 0 {
633			return Ok(Some((result, compressed)))
634		}
635		Ok(None)
636	}
637
638	pub fn has_key_at(&self, index: u64, key: &TableKey, log: &LogWriter) -> Result<bool> {
639		match key {
640			TableKey::Partial(k) => Ok(match self.partial_key_at(index, log)? {
641				Some(existing_key) => &existing_key[..] == key::partial_key(k),
642				None => false,
643			}),
644			TableKey::NoHash => Ok(!self.is_tombstone(index, log)?),
645		}
646	}
647
648	pub fn partial_key_at(
649		&self,
650		index: u64,
651		log: &impl LogQuery,
652	) -> Result<Option<[u8; PARTIAL_SIZE]>> {
653		let mut query_key = Default::default();
654		let (rc, _compressed) =
655			self.for_parts(&mut TableKeyQuery::Fetch(Some(&mut query_key)), index, log, |_buf| {
656				false
657			})?;
658		Ok(if rc == 0 { None } else { Some(query_key) })
659	}
660
661	pub fn is_tombstone(&self, index: u64, log: &impl LogQuery) -> Result<bool> {
662		let mut buf = PartialKeyEntry::new_uninit();
663		let buf = if log.value(self.id, index, buf.as_mut()) {
664			&mut buf
665		} else {
666			self.file.read_at(buf.as_mut(), index * self.entry_size as u64)?;
667			&mut buf
668		};
669		Ok(buf.is_tombstone())
670	}
671
672	pub fn read_next_free(&self, index: u64, log: &LogWriter) -> Result<u64> {
673		let mut buf = PartialEntry::new_uninit();
674		let filled = self.filled.load(Ordering::Relaxed);
675		if !log.value(self.id, index, buf.as_mut()) {
676			self.file.read_at(buf.as_mut(), index * self.entry_size as u64)?;
677		}
678		buf.skip_size();
679		let next = buf.read_next();
680		if next >= filled {
681			return Err(crate::error::Error::Corruption(format!(
682				"Bad removed ref {} out of {}",
683				next, filled
684			)))
685		}
686		Ok(next)
687	}
688
689	pub fn read_next_part(&self, index: u64, log: &LogWriter) -> Result<Option<u64>> {
690		let mut buf = PartialEntry::new_uninit();
691		if !log.value(self.id, index, buf.as_mut()) {
692			self.file.read_at(buf.as_mut(), index * self.entry_size as u64)?;
693		}
694		if self.multipart && buf.is_multi(self.db_version) {
695			buf.skip_size();
696			let next = buf.read_next();
697			return Ok(Some(next))
698		}
699		Ok(None)
700	}
701
702	pub fn next_free(&self, log: &mut LogWriter) -> Result<u64> {
703		let filled = self.filled.load(Ordering::Relaxed);
704		let last_removed = self.last_removed.load(Ordering::Relaxed);
705		let index = if last_removed != 0 {
706			let next_removed = self.read_next_free(last_removed, log)?;
707			log::trace!(
708				target: "parity-db",
709				"{}: Inserting into removed slot {}",
710				self.id,
711				last_removed,
712			);
713			self.last_removed.store(next_removed, Ordering::Relaxed);
714			last_removed
715		} else {
716			log::trace!(
717				target: "parity-db",
718				"{}: Inserting into new slot {}",
719				self.id,
720				filled,
721			);
722			self.filled.store(filled + 1, Ordering::Relaxed);
723			filled
724		};
725		self.dirty_header.store(true, Ordering::Relaxed);
726		Ok(index)
727	}
728
729	fn overwrite_chain(
730		&self,
731		key: &TableKey,
732		value: &[u8],
733		log: &mut LogWriter,
734		at: Option<u64>,
735		compressed: bool,
736	) -> Result<u64> {
737		let mut remainder = value.len() + self.ref_size() + key.encoded_size();
738		let mut offset = 0;
739		let mut start = 0;
740		assert!(self.multipart || value.len() <= self.value_size(key).unwrap() as usize);
741		let (mut index, mut follow) = match at {
742			Some(index) => (index, true),
743			None => (self.next_free(log)?, false),
744		};
745		loop {
746			let mut next_index = 0;
747			if follow {
748				// check existing link
749				match self.read_next_part(index, log)? {
750					Some(next) => {
751						next_index = next;
752					},
753					None => {
754						follow = false;
755					},
756				}
757			}
758			log::trace!(
759				target: "parity-db",
760				"{}: Writing slot {}: {}",
761				self.id,
762				index,
763				key,
764			);
765			let mut buf = FullEntry::new_uninit_full_entry();
766			let free_space = self.entry_size as usize - SIZE_SIZE;
767			let value_len = if remainder > free_space {
768				if !follow {
769					next_index = self.next_free(log)?
770				}
771				if start == 0 {
772					if compressed {
773						buf.write_multihead_compressed();
774					} else {
775						buf.write_multihead();
776					}
777				} else {
778					buf.write_multipart();
779				}
780				buf.write_next(next_index);
781				free_space - INDEX_SIZE
782			} else {
783				buf.write_size(remainder as u16, compressed);
784				remainder
785			};
786			let init_offset = buf.offset();
787			if offset == 0 {
788				if self.ref_counted {
789					// First reference.
790					buf.write_rc(1u32);
791				}
792				key.write(&mut buf);
793			}
794			let written = buf.offset() - init_offset;
795			buf.write_slice(&value[offset..offset + value_len - written]);
796			offset += value_len - written;
797			log.insert_value(self.id, index, buf[0..buf.offset()].to_vec());
798			remainder -= value_len;
799			if start == 0 {
800				start = index;
801			}
802			index = next_index;
803			if remainder == 0 {
804				if index != 0 {
805					// End of new entry. Clear the remaining tail and exit
806					self.clear_chain(index, log)?;
807				}
808				break
809			}
810		}
811
812		Ok(start)
813	}
814
815	fn clear_chain(&self, mut index: u64, log: &mut LogWriter) -> Result<()> {
816		loop {
817			match self.read_next_part(index, log)? {
818				Some(next) => {
819					self.clear_slot(index, log)?;
820					index = next;
821				},
822				None => {
823					self.clear_slot(index, log)?;
824					return Ok(())
825				},
826			}
827		}
828	}
829
830	fn clear_slot(&self, index: u64, log: &mut LogWriter) -> Result<()> {
831		let last_removed = self.last_removed.load(Ordering::Relaxed);
832		log::trace!(
833			target: "parity-db",
834			"{}: Freeing slot {}",
835			self.id,
836			index,
837		);
838
839		let mut buf = PartialEntry::new_uninit();
840		buf.write_tombstone();
841		buf.write_next(last_removed);
842
843		log.insert_value(self.id, index, buf[0..buf.offset()].to_vec());
844		self.last_removed.store(index, Ordering::Relaxed);
845		self.dirty_header.store(true, Ordering::Relaxed);
846		Ok(())
847	}
848
849	pub fn write_insert_plan(
850		&self,
851		key: &TableKey,
852		value: &[u8],
853		log: &mut LogWriter,
854		compressed: bool,
855	) -> Result<u64> {
856		self.overwrite_chain(key, value, log, None, compressed)
857	}
858
859	pub fn write_replace_plan(
860		&self,
861		index: u64,
862		key: &TableKey,
863		value: &[u8],
864		log: &mut LogWriter,
865		compressed: bool,
866	) -> Result<()> {
867		self.overwrite_chain(key, value, log, Some(index), compressed)?;
868		Ok(())
869	}
870
871	pub fn write_remove_plan(&self, index: u64, log: &mut LogWriter) -> Result<()> {
872		if self.multipart {
873			self.clear_chain(index, log)?;
874		} else {
875			self.clear_slot(index, log)?;
876		}
877		Ok(())
878	}
879
880	pub fn write_inc_ref(&self, index: u64, log: &mut LogWriter) -> Result<()> {
881		self.change_ref(index, 1, log)?;
882		Ok(())
883	}
884
885	pub fn write_dec_ref(&self, index: u64, log: &mut LogWriter) -> Result<bool> {
886		if self.change_ref(index, -1, log)? {
887			return Ok(true)
888		}
889		self.write_remove_plan(index, log)?;
890		Ok(false)
891	}
892
893	pub fn change_ref(&self, index: u64, delta: i32, log: &mut LogWriter) -> Result<bool> {
894		let mut buf = FullEntry::new_uninit_full_entry();
895		let buf = if log.value(self.id, index, buf.as_mut()) {
896			&mut buf
897		} else {
898			self.file
899				.read_at(&mut buf[0..self.entry_size as usize], index * self.entry_size as u64)?;
900			&mut buf
901		};
902
903		if buf.is_tombstone() {
904			return Ok(false)
905		}
906
907		let size = if self.multipart && buf.is_multi(self.db_version) {
908			buf.skip_size();
909			buf.skip_next();
910			self.entry_size as usize
911		} else {
912			let (size, _compressed) = buf.read_size();
913			buf.offset() + size as usize
914		};
915
916		let rc_offset = buf.offset();
917		let mut counter = buf.read_rc();
918		if delta > 0 {
919			if counter >= LOCKED_REF - delta as u32 {
920				counter = LOCKED_REF
921			} else {
922				counter += delta as u32;
923			}
924		} else if counter != LOCKED_REF {
925			counter = counter.saturating_sub(-delta as u32);
926			if counter == 0 {
927				return Ok(false)
928			}
929		}
930
931		buf.set_offset(rc_offset);
932		buf.write_rc(counter);
933		// TODO: optimize actual buf size
934		log.insert_value(self.id, index, buf[0..size].to_vec());
935		Ok(true)
936	}
937
938	pub fn enact_plan(&self, index: u64, log: &mut LogReader) -> Result<()> {
939		while index >= self.file.capacity.load(Ordering::Relaxed) {
940			self.file.grow(self.entry_size)?;
941		}
942		if index == 0 {
943			let mut header = Header::default();
944			log.read(&mut header.0)?;
945			self.file.write_at(&header.0, 0)?;
946			self.written.store(header.filled(), Ordering::Relaxed);
947			log::trace!(target: "parity-db", "{}: Enacted header, {} filled", self.id, header.filled());
948			return Ok(())
949		}
950
951		let mut buf = FullEntry::new_uninit_full_entry();
952		log.read(&mut buf[0..SIZE_SIZE])?;
953		if buf.is_tombstone() {
954			log.read(&mut buf[SIZE_SIZE..SIZE_SIZE + INDEX_SIZE])?;
955			self.file
956				.write_at(&buf[0..SIZE_SIZE + INDEX_SIZE], index * (self.entry_size as u64))?;
957			log::trace!(target: "parity-db", "{}: Enacted tombstone in slot {}", self.id, index);
958		} else if self.multipart && buf.is_multi(self.db_version) {
959			let entry_size = self.entry_size as usize;
960			log.read(&mut buf[SIZE_SIZE..entry_size])?;
961			self.file.write_at(&buf[0..entry_size], index * (entry_size as u64))?;
962			log::trace!(target: "parity-db", "{}: Enacted multipart in slot {}", self.id, index);
963		} else {
964			let (len, _compressed) = buf.read_size();
965			log.read(&mut buf[SIZE_SIZE..SIZE_SIZE + len as usize])?;
966			self.file
967				.write_at(&buf[0..(SIZE_SIZE + len as usize)], index * (self.entry_size as u64))?;
968			log::trace!(target: "parity-db", "{}: Enacted {}: {}, {} bytes", self.id, index, hex(&buf.1[6..32]), len);
969		}
970		Ok(())
971	}
972
973	pub fn validate_plan(&self, index: u64, log: &mut LogReader) -> Result<()> {
974		if index == 0 {
975			let mut header = Header::default();
976			log.read(&mut header.0)?;
977			// TODO: sanity check last_removed and filled
978			return Ok(())
979		}
980		let mut buf = FullEntry::new_uninit_full_entry();
981		log.read(&mut buf[0..SIZE_SIZE])?;
982		if buf.is_tombstone() {
983			log.read(&mut buf[SIZE_SIZE..SIZE_SIZE + INDEX_SIZE])?;
984			log::trace!(target: "parity-db", "{}: Validated tombstone in slot {}", self.id, index);
985		} else if self.multipart && buf.is_multi(self.db_version) {
986			let entry_size = self.entry_size as usize;
987			log.read(&mut buf[SIZE_SIZE..entry_size])?;
988			log::trace!(target: "parity-db", "{}: Validated multipart in slot {}", self.id, index);
989		} else {
990			// TODO: check len
991			let (len, _compressed) = buf.read_size();
992			log.read(&mut buf[SIZE_SIZE..SIZE_SIZE + len as usize])?;
993			log::trace!(target: "parity-db", "{}: Validated {}: {}, {} bytes", self.id, index, hex(&buf[SIZE_SIZE..32]), len);
994		}
995		Ok(())
996	}
997
998	pub fn refresh_metadata(&self) -> Result<()> {
999		if self.file.map.read().is_none() {
1000			return Ok(())
1001		}
1002		let mut header = Header::default();
1003		self.file.read_at(&mut header.0, 0)?;
1004		let last_removed = header.last_removed();
1005		let mut filled = header.filled();
1006		if filled == 0 {
1007			filled = 1;
1008		}
1009		self.last_removed.store(last_removed, Ordering::Relaxed);
1010		self.filled.store(filled, Ordering::Relaxed);
1011		self.written.store(filled, Ordering::Relaxed);
1012		Ok(())
1013	}
1014
1015	pub fn complete_plan(&self, log: &mut LogWriter) -> Result<()> {
1016		if let Ok(true) =
1017			self.dirty_header
1018				.compare_exchange(true, false, Ordering::Relaxed, Ordering::Relaxed)
1019		{
1020			// last_removed or filled pointers were modified. Add them to the log
1021			let mut buf = Header::default();
1022			let last_removed = self.last_removed.load(Ordering::Relaxed);
1023			let filled = self.filled.load(Ordering::Relaxed);
1024			buf.set_last_removed(last_removed);
1025			buf.set_filled(filled);
1026			log.insert_value(self.id, 0, buf.0.to_vec());
1027		}
1028		Ok(())
1029	}
1030
1031	pub fn flush(&self) -> Result<()> {
1032		self.file.flush()
1033	}
1034
1035	fn ref_size(&self) -> usize {
1036		if self.ref_counted {
1037			REFS_SIZE
1038		} else {
1039			0
1040		}
1041	}
1042
1043	pub fn iter_while(
1044		&self,
1045		log: &impl LogQuery,
1046		mut f: impl FnMut(u64, u32, Vec<u8>, bool) -> bool,
1047	) -> Result<()> {
1048		let written = self.written.load(Ordering::Relaxed);
1049		for index in 1..written {
1050			let mut result = Vec::new();
1051			// expect only indexed key.
1052			let mut _fetch_key = Default::default();
1053			match self.for_parts(
1054				&mut TableKeyQuery::Fetch(Some(&mut _fetch_key)),
1055				index,
1056				log,
1057				|buf| {
1058					result.extend_from_slice(buf);
1059					true
1060				},
1061			) {
1062				Ok((rc, compressed)) =>
1063					if rc > 0 && !f(index, rc, result, compressed) {
1064						break
1065					},
1066				Err(crate::error::Error::InvalidValueData) => (), // ignore, can be external index.
1067				Err(e) => return Err(e),
1068			}
1069		}
1070		Ok(())
1071	}
1072
1073	pub fn is_init(&self) -> bool {
1074		self.file.map.read().is_some()
1075	}
1076
1077	pub fn init_with_entry(&self, entry: &[u8]) -> Result<()> {
1078		if let Err(e) = self.do_init_with_entry(entry) {
1079			log::error!(target: "parity-db", "Failure to initialize file {}", self.file.path.display());
1080			let _ = self.file.remove(); // We ignore error here
1081			return Err(e)
1082		}
1083		Ok(())
1084	}
1085
1086	fn do_init_with_entry(&self, entry: &[u8]) -> Result<()> {
1087		self.file.grow(self.entry_size)?;
1088
1089		let empty_overlays = RwLock::new(LogOverlays::with_columns(0));
1090		let mut log = LogWriter::new(&empty_overlays, 0);
1091		let at = self.overwrite_chain(&TableKey::NoHash, entry, &mut log, None, false)?;
1092		self.complete_plan(&mut log)?;
1093		assert_eq!(at, 1);
1094		let log = log.drain();
1095		let change = log.local_values_changes(self.id).expect("entry written above");
1096		for (at, (_rec_id, entry)) in change.map.iter() {
1097			self.file.write_at(entry.as_slice(), *at * (self.entry_size as u64))?;
1098		}
1099		Ok(())
1100	}
1101
1102	/// Validate free records sequence.
1103	pub fn check_free_refs(&self) -> Result<u64> {
1104		let written = self.written.load(Ordering::Relaxed);
1105		let mut next = self.last_removed.load(Ordering::Relaxed);
1106		let mut len = 0;
1107		while next != 0 {
1108			if next >= written {
1109				return Err(crate::error::Error::Corruption(format!(
1110					"Bad removed ref {} out of {}",
1111					next, written
1112				)))
1113			}
1114			let mut buf = PartialEntry::new_uninit();
1115			self.file.read_at(buf.as_mut(), next * self.entry_size as u64)?;
1116			buf.skip_size();
1117			next = buf.read_next();
1118			len += 1;
1119		}
1120		Ok(len)
1121	}
1122}
1123
1124pub mod key {
1125	use super::{EntryRef, FullEntry};
1126	use crate::{Key, Result};
1127
1128	pub const PARTIAL_SIZE: usize = 26;
1129
1130	pub fn partial_key(hash: &Key) -> &[u8] {
1131		&hash[6..]
1132	}
1133
1134	pub enum TableKey {
1135		Partial(Key),
1136		NoHash,
1137	}
1138
1139	impl TableKey {
1140		pub fn encoded_size(&self) -> usize {
1141			match self {
1142				TableKey::Partial(_) => PARTIAL_SIZE,
1143				TableKey::NoHash => 0,
1144			}
1145		}
1146
1147		pub fn index_from_partial(partial: &[u8]) -> u64 {
1148			u64::from_be_bytes((partial[0..8]).try_into().unwrap())
1149		}
1150
1151		pub fn compare(&self, fetch: &Option<[u8; PARTIAL_SIZE]>) -> bool {
1152			match (self, fetch) {
1153				(TableKey::Partial(k), Some(fetch)) => partial_key(k) == fetch,
1154				(TableKey::NoHash, _) => true,
1155				_ => false,
1156			}
1157		}
1158
1159		pub fn fetch_partial<'a>(buf: &mut EntryRef<'a>) -> Result<[u8; PARTIAL_SIZE]> {
1160			let mut result = [0u8; PARTIAL_SIZE];
1161			if buf.1.len() >= PARTIAL_SIZE {
1162				let pks = buf.read_partial();
1163				result.copy_from_slice(pks);
1164				return Ok(result)
1165			}
1166			Err(crate::error::Error::InvalidValueData)
1167		}
1168
1169		pub fn fetch<'a>(&self, buf: &mut EntryRef<'a>) -> Result<Option<[u8; PARTIAL_SIZE]>> {
1170			match self {
1171				TableKey::Partial(_k) => Ok(Some(Self::fetch_partial(buf)?)),
1172				TableKey::NoHash => Ok(None),
1173			}
1174		}
1175
1176		pub fn write(&self, buf: &mut FullEntry) {
1177			match self {
1178				TableKey::Partial(k) => {
1179					buf.write_slice(partial_key(k));
1180				},
1181				TableKey::NoHash => (),
1182			}
1183		}
1184	}
1185
1186	impl std::fmt::Display for TableKey {
1187		fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
1188			match self {
1189				TableKey::Partial(k) => write!(f, "{}", crate::display::hex(k)),
1190				TableKey::NoHash => write!(f, "no_hash"),
1191			}
1192		}
1193	}
1194
1195	pub enum TableKeyQuery<'a> {
1196		Check(&'a TableKey),
1197		Fetch(Option<&'a mut [u8; PARTIAL_SIZE]>),
1198	}
1199}
1200
1201#[cfg(test)]
1202mod test {
1203	const ENTRY_SIZE: u16 = 64;
1204
1205	use super::{TableId, Value, ValueTable, MULTIPART_ENTRY_SIZE};
1206	use crate::{
1207		log::{Log, LogAction, LogWriter},
1208		options::{ColumnOptions, Options, CURRENT_VERSION},
1209		table::key::TableKey,
1210		Key,
1211	};
1212	use std::sync::{atomic::Ordering, Arc};
1213	use tempfile::{tempdir, TempDir};
1214
1215	fn new_table(dir: &TempDir, size: Option<u16>, options: &ColumnOptions) -> ValueTable {
1216		let id = TableId::new(0, 0);
1217		ValueTable::open(Arc::new(dir.path().to_path_buf()), id, size, options, CURRENT_VERSION)
1218			.unwrap()
1219	}
1220
1221	fn new_log(dir: &TempDir) -> Log {
1222		let options = Options::with_columns(dir.path(), 1);
1223		Log::open(&options).unwrap()
1224	}
1225
1226	fn write_ops<F: FnOnce(&mut LogWriter)>(table: &ValueTable, log: &Log, f: F) {
1227		let mut writer = log.begin_record();
1228		f(&mut writer);
1229		let bytes_written = log.end_record(writer.drain()).unwrap();
1230		let _ = log.read_next(false);
1231		log.flush_one(0).unwrap();
1232		let mut reader = log.read_next(false).unwrap().unwrap();
1233		loop {
1234			match reader.next().unwrap() {
1235				LogAction::BeginRecord |
1236				LogAction::InsertIndex { .. } |
1237				LogAction::DropTable { .. } => {
1238					panic!("Unexpected log entry");
1239				},
1240				LogAction::EndRecord => {
1241					let bytes_read = reader.read_bytes();
1242					assert_eq!(bytes_written, bytes_read);
1243					break
1244				},
1245				LogAction::InsertValue(insertion) => {
1246					table.enact_plan(insertion.index, &mut reader).unwrap();
1247				},
1248			}
1249		}
1250	}
1251
1252	fn key(k: u32) -> Key {
1253		use blake2::{digest::typenum::U32, Blake2b, Digest};
1254		let mut key = Key::default();
1255		let hash = Blake2b::<U32>::digest(k.to_le_bytes());
1256		key.copy_from_slice(&hash);
1257		key
1258	}
1259
1260	fn simple_key(k: Key) -> TableKey {
1261		TableKey::Partial(k)
1262	}
1263
1264	fn no_hash(_: Key) -> TableKey {
1265		TableKey::NoHash
1266	}
1267
1268	fn value(size: usize) -> Value {
1269		use rand::RngCore;
1270		let mut result = vec![0; size];
1271		rand::thread_rng().fill_bytes(&mut result);
1272		result
1273	}
1274
1275	fn rc_options() -> ColumnOptions {
1276		ColumnOptions { ref_counted: true, ..Default::default() }
1277	}
1278
1279	#[test]
1280	fn insert_simple() {
1281		insert_simple_inner(&Default::default());
1282		insert_simple_inner(&rc_options());
1283	}
1284	fn insert_simple_inner(options: &ColumnOptions) {
1285		let dir = tempdir().unwrap();
1286		let table = new_table(&dir, Some(ENTRY_SIZE), options);
1287		let log = new_log(&dir);
1288
1289		let key = key(1);
1290		let key = TableKey::Partial(key);
1291		let key = &key;
1292		let val = value(19);
1293		let compressed = true;
1294
1295		write_ops(&table, &log, |writer| {
1296			table.write_insert_plan(key, &val, writer, compressed).unwrap();
1297			assert_eq!(table.get(key, 1, writer).unwrap(), Some((val.clone(), compressed)));
1298		});
1299
1300		assert_eq!(table.get(key, 1, log.overlays()).unwrap(), Some((val, compressed)));
1301		assert_eq!(table.filled.load(Ordering::Relaxed), 2);
1302	}
1303
1304	#[test]
1305	#[should_panic(expected = "assertion failed: entry_size <= MAX_ENTRY_SIZE as u16")]
1306	fn oversized_into_fixed_panics() {
1307		let dir = tempdir().unwrap();
1308		let _table = new_table(&dir, Some(65534), &Default::default());
1309	}
1310
1311	#[test]
1312	fn remove_simple() {
1313		remove_simple_inner(&Default::default());
1314		remove_simple_inner(&rc_options());
1315	}
1316	fn remove_simple_inner(options: &ColumnOptions) {
1317		let dir = tempdir().unwrap();
1318		let table = new_table(&dir, Some(ENTRY_SIZE), options);
1319		let log = new_log(&dir);
1320
1321		let key1 = key(1);
1322		let key1 = &TableKey::Partial(key1);
1323		let key2 = key(2);
1324		let key2 = &TableKey::Partial(key2);
1325		let val1 = value(11);
1326		let val2 = value(21);
1327		let compressed = false;
1328
1329		write_ops(&table, &log, |writer| {
1330			table.write_insert_plan(key1, &val1, writer, compressed).unwrap();
1331			table.write_insert_plan(key2, &val2, writer, compressed).unwrap();
1332		});
1333
1334		write_ops(&table, &log, |writer| {
1335			table.write_remove_plan(1, writer).unwrap();
1336		});
1337
1338		assert_eq!(table.get(key1, 1, log.overlays()).unwrap(), None);
1339		assert_eq!(table.last_removed.load(Ordering::Relaxed), 1);
1340
1341		write_ops(&table, &log, |writer| {
1342			table.write_insert_plan(key1, &val1, writer, compressed).unwrap();
1343		});
1344		assert_eq!(table.get(key1, 1, log.overlays()).unwrap(), Some((val1, compressed)));
1345		assert_eq!(table.last_removed.load(Ordering::Relaxed), 0);
1346	}
1347
1348	#[test]
1349	fn replace_simple() {
1350		replace_simple_inner(&Default::default(), simple_key);
1351		replace_simple_inner(&rc_options(), simple_key);
1352		replace_simple_inner(&Default::default(), no_hash);
1353		replace_simple_inner(&rc_options(), no_hash);
1354	}
1355	fn replace_simple_inner(options: &ColumnOptions, table_key: fn(Key) -> TableKey) {
1356		let dir = tempdir().unwrap();
1357		let table = new_table(&dir, Some(ENTRY_SIZE), options);
1358		let log = new_log(&dir);
1359
1360		let key1 = key(1);
1361		let key1 = &table_key(key1);
1362		let key2 = key(2);
1363		let key2 = &table_key(key2);
1364		let val1 = value(11);
1365		let val2 = value(21);
1366		let val3 = value(26); // max size for full hash and rc
1367		let compressed = true;
1368
1369		write_ops(&table, &log, |writer| {
1370			table.write_insert_plan(key1, &val1, writer, compressed).unwrap();
1371			table.write_insert_plan(key2, &val2, writer, compressed).unwrap();
1372		});
1373
1374		write_ops(&table, &log, |writer| {
1375			table.write_replace_plan(1, key2, &val3, writer, false).unwrap();
1376		});
1377
1378		assert_eq!(table.get(key2, 1, log.overlays()).unwrap(), Some((val3, false)));
1379		assert_eq!(table.last_removed.load(Ordering::Relaxed), 0);
1380	}
1381
1382	#[test]
1383	fn replace_multipart_shorter() {
1384		replace_multipart_shorter_inner(&Default::default());
1385		replace_multipart_shorter_inner(&rc_options());
1386	}
1387	fn replace_multipart_shorter_inner(options: &ColumnOptions) {
1388		let dir = tempdir().unwrap();
1389		let table = new_table(&dir, None, options);
1390		let log = new_log(&dir);
1391
1392		let key1 = key(1);
1393		let key1 = &TableKey::Partial(key1);
1394		let key2 = key(2);
1395		let key2 = &TableKey::Partial(key2);
1396		let val1 = value(20000);
1397		let val2 = value(30);
1398		let val1s = value(5000);
1399		let compressed = false;
1400
1401		write_ops(&table, &log, |writer| {
1402			table.write_insert_plan(key1, &val1, writer, compressed).unwrap();
1403			table.write_insert_plan(key2, &val2, writer, compressed).unwrap();
1404		});
1405
1406		assert_eq!(table.get(key1, 1, log.overlays()).unwrap(), Some((val1, compressed)));
1407		assert_eq!(table.last_removed.load(Ordering::Relaxed), 0);
1408		assert_eq!(table.filled.load(Ordering::Relaxed), 7);
1409
1410		write_ops(&table, &log, |writer| {
1411			table.write_replace_plan(1, key1, &val1s, writer, compressed).unwrap();
1412		});
1413		assert_eq!(table.get(key1, 1, log.overlays()).unwrap(), Some((val1s, compressed)));
1414		assert_eq!(table.last_removed.load(Ordering::Relaxed), 5);
1415		write_ops(&table, &log, |writer| {
1416			assert_eq!(table.read_next_free(5, writer).unwrap(), 4);
1417			assert_eq!(table.read_next_free(4, writer).unwrap(), 3);
1418			assert_eq!(table.read_next_free(3, writer).unwrap(), 0);
1419		});
1420	}
1421
1422	#[test]
1423	fn replace_multipart_longer() {
1424		replace_multipart_longer_inner(&Default::default());
1425		replace_multipart_longer_inner(&rc_options());
1426	}
1427	fn replace_multipart_longer_inner(options: &ColumnOptions) {
1428		let dir = tempdir().unwrap();
1429		let table = new_table(&dir, None, options);
1430		let log = new_log(&dir);
1431
1432		let key1 = key(1);
1433		let key1 = &TableKey::Partial(key1);
1434		let key2 = key(2);
1435		let key2 = &TableKey::Partial(key2);
1436		let val1 = value(5000);
1437		let val2 = value(30);
1438		let val1l = value(20000);
1439		let compressed = false;
1440
1441		write_ops(&table, &log, |writer| {
1442			table.write_insert_plan(key1, &val1, writer, compressed).unwrap();
1443			table.write_insert_plan(key2, &val2, writer, compressed).unwrap();
1444		});
1445
1446		assert_eq!(table.get(key1, 1, log.overlays()).unwrap(), Some((val1, compressed)));
1447		assert_eq!(table.last_removed.load(Ordering::Relaxed), 0);
1448		assert_eq!(table.filled.load(Ordering::Relaxed), 4);
1449
1450		write_ops(&table, &log, |writer| {
1451			table.write_replace_plan(1, key1, &val1l, writer, compressed).unwrap();
1452		});
1453		assert_eq!(table.get(key1, 1, log.overlays()).unwrap(), Some((val1l, compressed)));
1454		assert_eq!(table.last_removed.load(Ordering::Relaxed), 0);
1455		assert_eq!(table.filled.load(Ordering::Relaxed), 7);
1456	}
1457
1458	#[test]
1459	fn ref_counting() {
1460		for compressed in [false, true] {
1461			let dir = tempdir().unwrap();
1462			let table = new_table(&dir, None, &rc_options());
1463			let log = new_log(&dir);
1464
1465			let key = key(1);
1466			let key = &TableKey::Partial(key);
1467			let val = value(5000);
1468
1469			write_ops(&table, &log, |writer| {
1470				table.write_insert_plan(key, &val, writer, compressed).unwrap();
1471				table.write_inc_ref(1, writer).unwrap();
1472			});
1473			assert_eq!(table.get(key, 1, log.overlays()).unwrap(), Some((val.clone(), compressed)));
1474			write_ops(&table, &log, |writer| {
1475				table.write_dec_ref(1, writer).unwrap();
1476			});
1477			assert_eq!(table.get(key, 1, log.overlays()).unwrap(), Some((val, compressed)));
1478			write_ops(&table, &log, |writer| {
1479				table.write_dec_ref(1, writer).unwrap();
1480			});
1481			assert_eq!(table.get(key, 1, log.overlays()).unwrap(), None);
1482		}
1483	}
1484
1485	#[test]
1486	fn iteration() {
1487		for multipart in [false, true] {
1488			for compressed in [false, true] {
1489				let (entry_size, size_mul) =
1490					if multipart { (None, 100) } else { (Some(MULTIPART_ENTRY_SIZE / 2), 1) };
1491
1492				let dir = tempdir().unwrap();
1493				let table = new_table(&dir, entry_size, &rc_options());
1494				let log = new_log(&dir);
1495
1496				let (v1, v2, v3) = (
1497					value(MULTIPART_ENTRY_SIZE as usize / 8 * size_mul),
1498					value(MULTIPART_ENTRY_SIZE as usize / 4 * size_mul),
1499					value(MULTIPART_ENTRY_SIZE as usize * 3 / 8 * size_mul),
1500				);
1501				let entries = [
1502					(TableKey::Partial(key(1)), &v1),
1503					(TableKey::Partial(key(2)), &v2),
1504					(TableKey::Partial(key(3)), &v3),
1505				];
1506
1507				write_ops(&table, &log, |writer| {
1508					for (k, v) in &entries {
1509						table.write_insert_plan(k, &v, writer, compressed).unwrap();
1510					}
1511					table.complete_plan(writer).unwrap();
1512				});
1513
1514				let mut res = Vec::new();
1515				table
1516					.iter_while(log.overlays(), |_index, _rc, v, cmpr| {
1517						res.push((v.len(), cmpr));
1518						true
1519					})
1520					.unwrap();
1521				assert_eq!(
1522					res,
1523					vec![(v1.len(), compressed), (v2.len(), compressed), (v3.len(), compressed)]
1524				);
1525
1526				let v2_index = 2 + v1.len() as u64 / super::MULTIPART_ENTRY_SIZE as u64;
1527				write_ops(&table, &log, |writer| {
1528					table.write_remove_plan(v2_index, writer).unwrap();
1529				});
1530
1531				let mut res = Vec::new();
1532				table
1533					.iter_while(log.overlays(), |_index, _rc, v, cmpr| {
1534						res.push((v.len(), cmpr));
1535						true
1536					})
1537					.unwrap();
1538				assert_eq!(res, vec![(v1.len(), compressed), (v3.len(), compressed)]);
1539			}
1540		}
1541	}
1542
1543	#[test]
1544	fn ref_underflow() {
1545		let dir = tempdir().unwrap();
1546		let table = new_table(&dir, Some(ENTRY_SIZE), &rc_options());
1547		let log = new_log(&dir);
1548
1549		let key = key(1);
1550		let key = &TableKey::Partial(key);
1551		let val = value(10);
1552
1553		let compressed = false;
1554		write_ops(&table, &log, |writer| {
1555			table.write_insert_plan(key, &val, writer, compressed).unwrap();
1556			table.write_inc_ref(1, writer).unwrap();
1557		});
1558		assert_eq!(table.get(key, 1, log.overlays()).unwrap(), Some((val, compressed)));
1559		write_ops(&table, &log, |writer| {
1560			table.write_dec_ref(1, writer).unwrap();
1561			table.write_dec_ref(1, writer).unwrap();
1562			table.write_dec_ref(1, writer).unwrap();
1563		});
1564		assert_eq!(table.get(key, 1, log.overlays()).unwrap(), None);
1565	}
1566
1567	#[test]
1568	fn multipart_collision() {
1569		use super::MAX_ENTRY_SIZE;
1570		let mut entry = super::Entry::new(super::MULTIPART.to_vec());
1571		let size = entry.read_size().0 as usize;
1572		assert!(size > MAX_ENTRY_SIZE);
1573		let mut entry = super::Entry::new(super::MULTIHEAD.to_vec());
1574		let size = entry.read_size().0 as usize;
1575		assert!(size > MAX_ENTRY_SIZE);
1576		let mut entry = super::Entry::new(super::MULTIHEAD_COMPRESSED.to_vec());
1577		let size = entry.read_size().0 as usize;
1578		assert!(size > MAX_ENTRY_SIZE);
1579		let dir = tempdir().unwrap();
1580		let table = new_table(&dir, Some(MAX_ENTRY_SIZE as u16), &rc_options());
1581		let log = new_log(&dir);
1582
1583		let key = key(1);
1584		let key = &TableKey::Partial(key);
1585		let val = value(32225); // This result in 0x7dff entry size, which conflicts with v4 multipart definition
1586
1587		let compressed = true;
1588		write_ops(&table, &log, |writer| {
1589			table.write_insert_plan(key, &val, writer, compressed).unwrap();
1590		});
1591		assert_eq!(table.get(key, 1, log.overlays()).unwrap(), Some((val, compressed)));
1592		write_ops(&table, &log, |writer| {
1593			table.write_dec_ref(1, writer).unwrap();
1594		});
1595		assert_eq!(table.last_removed.load(Ordering::Relaxed), 1);
1596
1597		// Check that max entry size values are OK.
1598		let value_size = table.value_size(key).unwrap();
1599		assert_eq!(0x7fd8, table.value_size(key).unwrap()); // Max value size for this configuration.
1600		let val = value(value_size as usize); // This result in 0x7ff8 entry size.
1601		write_ops(&table, &log, |writer| {
1602			table.write_insert_plan(key, &val, writer, compressed).unwrap();
1603		});
1604		assert_eq!(table.get(key, 1, log.overlays()).unwrap(), Some((val, compressed)));
1605	}
1606
1607	#[test]
1608	fn bad_size_header() {
1609		let dir = tempdir().unwrap();
1610		let table = new_table(&dir, Some(36), &rc_options());
1611		let log = new_log(&dir);
1612
1613		let key = &TableKey::Partial(key(1));
1614		let val = value(4);
1615
1616		let compressed = false;
1617		write_ops(&table, &log, |writer| {
1618			table.write_insert_plan(key, &val, writer, compressed).unwrap();
1619		});
1620		// Corrupt entry 1
1621		let zeroes = [0u8, 0u8];
1622		table.file.write_at(&zeroes, table.entry_size as u64).unwrap();
1623		let log = new_log(&dir);
1624		assert!(matches!(
1625			table.get(key, 1, log.overlays()),
1626			Err(crate::error::Error::Corruption(_))
1627		));
1628	}
1629}