parity_db/
db.rs

1// Copyright 2021-2022 Parity Technologies (UK) Ltd.
2// This file is dual-licensed as Apache-2.0 or MIT.
3
4//! The database objects is split into `Db` and `DbInner`.
5//! `Db` creates shared `DbInner` instance and manages background
6//! worker threads that all use the inner object.
7//!
8//! There are 4 worker threads:
9//! log_worker: Processes commit queue and reindexing. For each commit
10//! in the queue, log worker creates a write-ahead record using `Log`.
11//! Additionally, if there are active reindexing, it creates log records
12//! for batches of relocated index entries.
13//! flush_worker: Flushes log records to disk by calling `fsync` on the
14//! log files.
15//! commit_worker: Reads flushed log records and applies operations to the
16//! index and value tables.
17//! cleanup_worker: Flush tables by calling `fsync`, and cleanup log.
18//! Each background worker is signalled with a conditional variable once
19//! there is some work to be done.
20
21use crate::{
22	btree::{commit_overlay::BTreeChangeSet, BTreeIterator, BTreeTable},
23	column::{hash_key, ColId, Column, IterState, ReindexBatch, ValueIterState},
24	error::{try_io, Error, Result},
25	hash::IdentityBuildHasher,
26	index::PlanOutcome,
27	log::{Log, LogAction},
28	options::{Options, CURRENT_VERSION},
29	parking_lot::{Condvar, Mutex, RwLock},
30	stats::StatSummary,
31	ColumnOptions, Key,
32};
33use fs2::FileExt;
34use std::{
35	borrow::Borrow,
36	collections::{BTreeMap, HashMap, VecDeque},
37	ops::Bound,
38	sync::{
39		atomic::{AtomicBool, AtomicU64, Ordering},
40		Arc,
41	},
42	thread,
43};
44
45// Max size of commit queue. (Keys + Values). If the queue is
46// full `commit` will block.
47// These are in memory, so we use usize
48const MAX_COMMIT_QUEUE_BYTES: usize = 16 * 1024 * 1024;
49// Max size of log overlay. If the overlay is full, processing
50// of commit queue is blocked.
51const MAX_LOG_QUEUE_BYTES: i64 = 128 * 1024 * 1024;
52// Minimum size of log file before it is considered full.
53const MIN_LOG_SIZE_BYTES: u64 = 64 * 1024 * 1024;
54// Number of log files to keep after flush when sync mode is disabled. Give the database some chance
55// to recover in case of crash.
56const KEEP_LOGS: usize = 16;
57// Hard limit on the number of log files in sync mode. The number of log may grow while existing
58// logs are waiting on fsync. Commits will be throttled if total number of log files exceeds this
59// number.
60const MAX_LOG_FILES: usize = 4;
61
62/// Value is just a vector of bytes. Value sizes up to 4Gb are allowed.
63pub type Value = Vec<u8>;
64
65#[derive(Debug, Clone, PartialEq, Eq, PartialOrd, Ord)]
66pub struct RcValue(Arc<Value>);
67
68pub type RcKey = RcValue;
69
70impl RcValue {
71	pub fn value(&self) -> &Value {
72		&self.0
73	}
74}
75
76impl AsRef<[u8]> for RcValue {
77	fn as_ref(&self) -> &[u8] {
78		self.0.as_ref()
79	}
80}
81
82impl Borrow<[u8]> for RcValue {
83	fn borrow(&self) -> &[u8] {
84		self.value().borrow()
85	}
86}
87
88impl Borrow<Vec<u8>> for RcValue {
89	fn borrow(&self) -> &Vec<u8> {
90		self.value()
91	}
92}
93
94impl From<Value> for RcValue {
95	fn from(value: Value) -> Self {
96		Self(value.into())
97	}
98}
99
100#[cfg(test)]
101impl<const N: usize> TryFrom<RcValue> for [u8; N] {
102	type Error = <[u8; N] as TryFrom<Vec<u8>>>::Error;
103
104	fn try_from(value: RcValue) -> std::result::Result<Self, Self::Error> {
105		value.value().clone().try_into()
106	}
107}
108
109// Commit data passed to `commit`
110#[derive(Debug, Default)]
111struct Commit {
112	// Commit ID. This is not the same as log record id, as some records
113	// are originated within the DB. E.g. reindex.
114	id: u64,
115	// Size of user data pending insertion (keys + values) or
116	// removal (keys)
117	bytes: usize,
118	// Operations.
119	changeset: CommitChangeSet,
120}
121
122// Pending commits. This may not grow beyond `MAX_COMMIT_QUEUE_BYTES` bytes.
123#[derive(Debug, Default)]
124struct CommitQueue {
125	// Log record.
126	record_id: u64,
127	// Total size of all commits in the queue.
128	bytes: usize,
129	// FIFO queue.
130	commits: VecDeque<Commit>,
131}
132
133#[derive(Debug)]
134struct DbInner {
135	columns: Vec<Column>,
136	options: Options,
137	shutdown: AtomicBool,
138	log: Log,
139	commit_queue: Mutex<CommitQueue>,
140	commit_queue_full_cv: Condvar,
141	log_worker_wait: WaitCondvar<bool>,
142	commit_worker_wait: Arc<WaitCondvar<bool>>,
143	// Overlay of most recent values in the commit queue.
144	commit_overlay: RwLock<Vec<CommitOverlay>>,
145	// This may underflow occasionally, but is bound for 0 eventually.
146	log_queue_wait: WaitCondvar<i64>,
147	flush_worker_wait: Arc<WaitCondvar<bool>>,
148	cleanup_worker_wait: WaitCondvar<bool>,
149	cleanup_queue_wait: WaitCondvar<bool>,
150	iteration_lock: Mutex<()>,
151	last_enacted: AtomicU64,
152	next_reindex: AtomicU64,
153	bg_err: Mutex<Option<Arc<Error>>>,
154	db_version: u32,
155	lock_file: std::fs::File,
156}
157
158#[derive(Debug)]
159struct WaitCondvar<S> {
160	cv: Condvar,
161	work: Mutex<S>,
162}
163
164impl<S: Default> WaitCondvar<S> {
165	fn new() -> Self {
166		WaitCondvar { cv: Condvar::new(), work: Mutex::new(S::default()) }
167	}
168}
169
170impl WaitCondvar<bool> {
171	fn signal(&self) {
172		let mut work = self.work.lock();
173		*work = true;
174		self.cv.notify_one();
175	}
176
177	pub fn wait(&self) {
178		let mut work = self.work.lock();
179		while !*work {
180			self.cv.wait(&mut work)
181		}
182		*work = false;
183	}
184}
185
186impl DbInner {
187	fn open(options: &Options, opening_mode: OpeningMode) -> Result<DbInner> {
188		if opening_mode == OpeningMode::Create {
189			try_io!(std::fs::create_dir_all(&options.path));
190		} else if !options.path.is_dir() {
191			return Err(Error::DatabaseNotFound)
192		}
193
194		let mut lock_path: std::path::PathBuf = options.path.clone();
195		lock_path.push("lock");
196		let lock_file = try_io!(std::fs::OpenOptions::new()
197			.create(true)
198			.read(true)
199			.write(true)
200			.open(lock_path.as_path()));
201		lock_file.try_lock_exclusive().map_err(Error::Locked)?;
202
203		let metadata = options.load_and_validate_metadata(opening_mode == OpeningMode::Create)?;
204		let mut columns = Vec::with_capacity(metadata.columns.len());
205		let mut commit_overlay = Vec::with_capacity(metadata.columns.len());
206		let log = Log::open(options)?;
207		let last_enacted = log.replay_record_id().unwrap_or(2) - 1;
208		for c in 0..metadata.columns.len() {
209			let column = Column::open(c as ColId, options, &metadata)?;
210			commit_overlay.push(CommitOverlay::new());
211			columns.push(column);
212		}
213		log::debug!(target: "parity-db", "Opened db {:?}, metadata={:?}", options, metadata);
214		let mut options = options.clone();
215		if options.salt.is_none() {
216			options.salt = Some(metadata.salt);
217		}
218
219		Ok(DbInner {
220			columns,
221			options,
222			shutdown: AtomicBool::new(false),
223			log,
224			commit_queue: Mutex::new(Default::default()),
225			commit_queue_full_cv: Condvar::new(),
226			log_worker_wait: WaitCondvar::new(),
227			commit_worker_wait: Arc::new(WaitCondvar::new()),
228			commit_overlay: RwLock::new(commit_overlay),
229			log_queue_wait: WaitCondvar::new(),
230			flush_worker_wait: Arc::new(WaitCondvar::new()),
231			cleanup_worker_wait: WaitCondvar::new(),
232			cleanup_queue_wait: WaitCondvar::new(),
233			iteration_lock: Mutex::new(()),
234			next_reindex: AtomicU64::new(1),
235			last_enacted: AtomicU64::new(last_enacted),
236			bg_err: Mutex::new(None),
237			db_version: metadata.version,
238			lock_file,
239		})
240	}
241
242	fn get(&self, col: ColId, key: &[u8]) -> Result<Option<Value>> {
243		match &self.columns[col as usize] {
244			Column::Hash(column) => {
245				let key = column.hash_key(key);
246				let overlay = self.commit_overlay.read();
247				// Check commit overlay first
248				if let Some(v) = overlay.get(col as usize).and_then(|o| o.get(&key)) {
249					return Ok(v.map(|i| i.value().clone()))
250				}
251				// Go into tables and log overlay.
252				let log = self.log.overlays();
253				column.get(&key, log)
254			},
255			Column::Tree(column) => {
256				let overlay = self.commit_overlay.read();
257				if let Some(l) = overlay.get(col as usize).and_then(|o| o.btree_get(key)) {
258					return Ok(l.map(|i| i.value().clone()))
259				}
260				// We lock log, if btree structure changed while reading that would be an issue.
261				let log = self.log.overlays().read();
262				column.with_locked(|btree| BTreeTable::get(key, &*log, btree))
263			},
264		}
265	}
266
267	fn get_size(&self, col: ColId, key: &[u8]) -> Result<Option<u32>> {
268		match &self.columns[col as usize] {
269			Column::Hash(column) => {
270				let key = column.hash_key(key);
271				let overlay = self.commit_overlay.read();
272				// Check commit overlay first
273				if let Some(l) = overlay.get(col as usize).and_then(|o| o.get_size(&key)) {
274					return Ok(l)
275				}
276				// Go into tables and log overlay.
277				let log = self.log.overlays();
278				column.get_size(&key, log)
279			},
280			Column::Tree(column) => {
281				let overlay = self.commit_overlay.read();
282				if let Some(l) = overlay.get(col as usize).and_then(|o| o.btree_get(key)) {
283					return Ok(l.map(|v| v.value().len() as u32))
284				}
285				let log = self.log.overlays().read();
286				let l = column.with_locked(|btree| BTreeTable::get(key, &*log, btree))?;
287				Ok(l.map(|v| v.len() as u32))
288			},
289		}
290	}
291
292	fn btree_iter(&self, col: ColId) -> Result<BTreeIterator> {
293		match &self.columns[col as usize] {
294			Column::Hash(_column) =>
295				Err(Error::InvalidConfiguration("Not an indexed column.".to_string())),
296			Column::Tree(column) => {
297				let log = self.log.overlays();
298				BTreeIterator::new(column, col, log, &self.commit_overlay)
299			},
300		}
301	}
302
303	// Commit simply adds the data to the queue and to the overlay and
304	// exits as early as possible.
305	fn commit<I, K>(&self, tx: I) -> Result<()>
306	where
307		I: IntoIterator<Item = (ColId, K, Option<Value>)>,
308		K: AsRef<[u8]>,
309	{
310		self.commit_changes(tx.into_iter().map(|(c, k, v)| {
311			(
312				c,
313				match v {
314					Some(v) => Operation::Set(k.as_ref().to_vec(), v),
315					None => Operation::Dereference(k.as_ref().to_vec()),
316				},
317			)
318		}))
319	}
320
321	fn commit_changes<I>(&self, tx: I) -> Result<()>
322	where
323		I: IntoIterator<Item = (ColId, Operation<Vec<u8>, Vec<u8>>)>,
324	{
325		let mut commit: CommitChangeSet = Default::default();
326		for (col, change) in tx.into_iter() {
327			if self.options.columns[col as usize].btree_index {
328				commit
329					.btree_indexed
330					.entry(col)
331					.or_insert_with(|| BTreeChangeSet::new(col))
332					.push(change)
333			} else {
334				commit.indexed.entry(col).or_insert_with(|| IndexedChangeSet::new(col)).push(
335					change,
336					&self.options,
337					self.db_version,
338				)
339			}
340		}
341
342		self.commit_raw(commit)
343	}
344
345	fn commit_raw(&self, commit: CommitChangeSet) -> Result<()> {
346		let mut queue = self.commit_queue.lock();
347
348		#[cfg(any(test, feature = "instrumentation"))]
349		let might_wait_because_the_queue_is_full = self.options.with_background_thread;
350		#[cfg(not(any(test, feature = "instrumentation")))]
351		let might_wait_because_the_queue_is_full = true;
352		if might_wait_because_the_queue_is_full && queue.bytes > MAX_COMMIT_QUEUE_BYTES {
353			log::debug!(target: "parity-db", "Waiting, queue size={}", queue.bytes);
354			self.commit_queue_full_cv.wait(&mut queue);
355		}
356
357		{
358			let bg_err = self.bg_err.lock();
359			if let Some(err) = &*bg_err {
360				return Err(Error::Background(err.clone()))
361			}
362		}
363
364		let mut overlay = self.commit_overlay.write();
365
366		queue.record_id += 1;
367		let record_id = queue.record_id;
368
369		let mut bytes = 0;
370		for (c, indexed) in &commit.indexed {
371			indexed.copy_to_overlay(
372				&mut overlay[*c as usize],
373				record_id,
374				&mut bytes,
375				&self.options,
376			)?;
377		}
378
379		for (c, iterset) in &commit.btree_indexed {
380			iterset.copy_to_overlay(
381				&mut overlay[*c as usize].btree_indexed,
382				record_id,
383				&mut bytes,
384				&self.options,
385			)?;
386		}
387
388		let commit = Commit { id: record_id, changeset: commit, bytes };
389
390		log::debug!(
391			target: "parity-db",
392			"Queued commit {}, {} bytes",
393			commit.id,
394			bytes,
395		);
396		queue.commits.push_back(commit);
397		queue.bytes += bytes;
398		self.log_worker_wait.signal();
399		Ok(())
400	}
401
402	fn process_commits(&self) -> Result<bool> {
403		#[cfg(any(test, feature = "instrumentation"))]
404		let might_wait_because_the_queue_is_full = self.options.with_background_thread;
405		#[cfg(not(any(test, feature = "instrumentation")))]
406		let might_wait_because_the_queue_is_full = true;
407		if might_wait_because_the_queue_is_full {
408			// Wait if the queue is full.
409			let mut queue = self.log_queue_wait.work.lock();
410			if !self.shutdown.load(Ordering::Relaxed) && *queue > MAX_LOG_QUEUE_BYTES {
411				log::debug!(target: "parity-db", "Waiting, log_bytes={}", queue);
412				self.log_queue_wait.cv.wait(&mut queue);
413			}
414		}
415		let commit = {
416			let mut queue = self.commit_queue.lock();
417			if let Some(commit) = queue.commits.pop_front() {
418				queue.bytes -= commit.bytes;
419				log::debug!(
420					target: "parity-db",
421					"Removed {}. Still queued commits {} bytes",
422					commit.bytes,
423					queue.bytes,
424				);
425				if queue.bytes <= MAX_COMMIT_QUEUE_BYTES &&
426					(queue.bytes + commit.bytes) > MAX_COMMIT_QUEUE_BYTES
427				{
428					// Past the waiting threshold.
429					log::debug!(
430						target: "parity-db",
431						"Waking up commit queue worker",
432					);
433					self.commit_queue_full_cv.notify_all();
434				}
435				Some(commit)
436			} else {
437				None
438			}
439		};
440
441		if let Some(mut commit) = commit {
442			let mut reindex = false;
443			let mut writer = self.log.begin_record();
444			log::debug!(
445				target: "parity-db",
446				"Processing commit {}, record {}, {} bytes",
447				commit.id,
448				writer.record_id(),
449				commit.bytes,
450			);
451			let mut ops: u64 = 0;
452			for (c, key_values) in commit.changeset.indexed.iter() {
453				key_values.write_plan(
454					&self.columns[*c as usize],
455					&mut writer,
456					&mut ops,
457					&mut reindex,
458				)?;
459			}
460
461			for (c, btree) in commit.changeset.btree_indexed.iter_mut() {
462				match &self.columns[*c as usize] {
463					Column::Hash(_column) =>
464						return Err(Error::InvalidConfiguration(
465							"Not an indexed column.".to_string(),
466						)),
467					Column::Tree(column) => {
468						btree.write_plan(column, &mut writer, &mut ops)?;
469					},
470				}
471			}
472
473			// Collect final changes to value tables
474			for c in self.columns.iter() {
475				c.complete_plan(&mut writer)?;
476			}
477			let record_id = writer.record_id();
478			let l = writer.drain();
479
480			let bytes = {
481				let bytes = self.log.end_record(l)?;
482				let mut logged_bytes = self.log_queue_wait.work.lock();
483				*logged_bytes += bytes as i64;
484				self.flush_worker_wait.signal();
485				bytes
486			};
487
488			{
489				// Cleanup the commit overlay.
490				let mut overlay = self.commit_overlay.write();
491				for (c, key_values) in commit.changeset.indexed.iter() {
492					key_values.clean_overlay(&mut overlay[*c as usize], commit.id);
493				}
494				for (c, iterset) in commit.changeset.btree_indexed.iter_mut() {
495					iterset.clean_overlay(&mut overlay[*c as usize].btree_indexed, commit.id);
496				}
497			}
498
499			if reindex {
500				self.start_reindex(record_id);
501			}
502
503			log::debug!(
504				target: "parity-db",
505				"Processed commit {} (record {}), {} ops, {} bytes written",
506				commit.id,
507				record_id,
508				ops,
509				bytes,
510			);
511			Ok(true)
512		} else {
513			Ok(false)
514		}
515	}
516
517	fn start_reindex(&self, record_id: u64) {
518		log::trace!(target: "parity-db", "Scheduled reindex at record {}", record_id);
519		self.next_reindex.store(record_id, Ordering::SeqCst);
520	}
521
522	fn process_reindex(&self) -> Result<bool> {
523		let next_reindex = self.next_reindex.load(Ordering::SeqCst);
524		if next_reindex == 0 || next_reindex > self.last_enacted.load(Ordering::SeqCst) {
525			return Ok(false)
526		}
527		// Process any pending reindexes
528		for column in self.columns.iter() {
529			let column = if let Column::Hash(c) = column { c } else { continue };
530			let ReindexBatch { drop_index, batch } = column.reindex(&self.log)?;
531			if !batch.is_empty() || drop_index.is_some() {
532				let mut next_reindex = false;
533				let mut writer = self.log.begin_record();
534				log::debug!(
535					target: "parity-db",
536					"Creating reindex record {}",
537					writer.record_id(),
538				);
539				for (key, address) in batch.into_iter() {
540					if let PlanOutcome::NeedReindex =
541						column.write_reindex_plan(&key, address, &mut writer)?
542					{
543						next_reindex = true
544					}
545				}
546				if let Some(table) = drop_index {
547					writer.drop_table(table);
548				}
549				let record_id = writer.record_id();
550				let l = writer.drain();
551
552				let mut logged_bytes = self.log_queue_wait.work.lock();
553				let bytes = self.log.end_record(l)?;
554				log::debug!(
555					target: "parity-db",
556					"Created reindex record {}, {} bytes",
557					record_id,
558					bytes,
559				);
560				*logged_bytes += bytes as i64;
561				if next_reindex {
562					self.start_reindex(record_id);
563				}
564				self.flush_worker_wait.signal();
565				return Ok(true)
566			}
567		}
568		self.next_reindex.store(0, Ordering::SeqCst);
569		Ok(false)
570	}
571
572	fn enact_logs(&self, validation_mode: bool) -> Result<bool> {
573		let _iteration_lock = self.iteration_lock.lock();
574		let cleared = {
575			let reader = match self.log.read_next(validation_mode) {
576				Ok(reader) => reader,
577				Err(Error::Corruption(_)) if validation_mode => {
578					log::debug!(target: "parity-db", "Bad log header");
579					self.log.clear_replay_logs();
580					return Ok(false)
581				},
582				Err(e) => return Err(e),
583			};
584			if let Some(mut reader) = reader {
585				log::debug!(
586					target: "parity-db",
587					"Enacting log record {}",
588					reader.record_id(),
589				);
590				if validation_mode {
591					if reader.record_id() != self.last_enacted.load(Ordering::Relaxed) + 1 {
592						log::warn!(
593							target: "parity-db",
594							"Log sequence error. Expected record {}, got {}",
595							self.last_enacted.load(Ordering::Relaxed) + 1,
596							reader.record_id(),
597						);
598						drop(reader);
599						self.log.clear_replay_logs();
600						return Ok(false)
601					}
602					// Validate all records before applying anything
603					loop {
604						let next = match reader.next() {
605							Ok(next) => next,
606							Err(e) => {
607								log::debug!(target: "parity-db", "Error reading log: {:?}", e);
608								return Ok(false)
609							},
610						};
611						match next {
612							LogAction::BeginRecord => {
613								log::debug!(target: "parity-db", "Unexpected log header");
614								drop(reader);
615								self.log.clear_replay_logs();
616								return Ok(false)
617							},
618							LogAction::EndRecord => break,
619							LogAction::InsertIndex(insertion) => {
620								let col = insertion.table.col() as usize;
621								if let Err(e) = self.columns.get(col).map_or_else(
622									|| Err(Error::Corruption(format!("Invalid column id {col}"))),
623									|col| {
624										col.validate_plan(
625											LogAction::InsertIndex(insertion),
626											&mut reader,
627										)
628									},
629								) {
630									log::warn!(target: "parity-db", "Error validating log: {:?}.", e);
631									drop(reader);
632									self.log.clear_replay_logs();
633									return Ok(false)
634								}
635							},
636							LogAction::InsertValue(insertion) => {
637								let col = insertion.table.col() as usize;
638								if let Err(e) = self.columns.get(col).map_or_else(
639									|| Err(Error::Corruption(format!("Invalid column id {col}"))),
640									|col| {
641										col.validate_plan(
642											LogAction::InsertValue(insertion),
643											&mut reader,
644										)
645									},
646								) {
647									log::warn!(target: "parity-db", "Error validating log: {:?}.", e);
648									drop(reader);
649									self.log.clear_replay_logs();
650									return Ok(false)
651								}
652							},
653							LogAction::DropTable(_) => continue,
654						}
655					}
656					reader.reset()?;
657					reader.next()?;
658				}
659				loop {
660					match reader.next()? {
661						LogAction::BeginRecord =>
662							return Err(Error::Corruption("Bad log record".into())),
663						LogAction::EndRecord => break,
664						LogAction::InsertIndex(insertion) => {
665							self.columns[insertion.table.col() as usize]
666								.enact_plan(LogAction::InsertIndex(insertion), &mut reader)?;
667						},
668						LogAction::InsertValue(insertion) => {
669							self.columns[insertion.table.col() as usize]
670								.enact_plan(LogAction::InsertValue(insertion), &mut reader)?;
671						},
672						LogAction::DropTable(id) => {
673							log::debug!(
674								target: "parity-db",
675								"Dropping index {}",
676								id,
677							);
678							match &self.columns[id.col() as usize] {
679								Column::Hash(col) => {
680									col.drop_index(id)?;
681									// Check if there's another reindex on the next iteration
682									self.start_reindex(reader.record_id());
683								},
684								Column::Tree(_) => (),
685							}
686						},
687					}
688				}
689				log::debug!(
690					target: "parity-db",
691					"Enacted log record {}, {} bytes",
692					reader.record_id(),
693					reader.read_bytes(),
694				);
695				let record_id = reader.record_id();
696				let bytes = reader.read_bytes();
697				let cleared = reader.drain();
698				self.last_enacted.store(record_id, Ordering::SeqCst);
699				Some((record_id, cleared, bytes))
700			} else {
701				log::debug!(target: "parity-db", "End of log");
702				None
703			}
704		};
705
706		if let Some((record_id, cleared, bytes)) = cleared {
707			self.log.end_read(cleared, record_id);
708			{
709				if !validation_mode {
710					let mut queue = self.log_queue_wait.work.lock();
711					if *queue < bytes as i64 {
712						log::warn!(
713							target: "parity-db",
714							"Detected log underflow record {}, {} bytes, {} queued, reindex = {}",
715							record_id,
716							bytes,
717							*queue,
718							self.next_reindex.load(Ordering::SeqCst),
719						);
720					}
721					*queue -= bytes as i64;
722					if *queue <= MAX_LOG_QUEUE_BYTES &&
723						(*queue + bytes as i64) > MAX_LOG_QUEUE_BYTES
724					{
725						self.log_queue_wait.cv.notify_one();
726					}
727					log::debug!(target: "parity-db", "Log queue size: {} bytes", *queue);
728				}
729
730				let max_logs = if self.options.sync_data { MAX_LOG_FILES } else { KEEP_LOGS };
731				let dirty_logs = self.log.num_dirty_logs();
732				if !validation_mode {
733					while self.log.num_dirty_logs() > max_logs {
734						log::debug!(target: "parity-db", "Waiting for log cleanup. Queued: {}", dirty_logs);
735						self.cleanup_queue_wait.wait();
736					}
737				}
738			}
739			Ok(true)
740		} else {
741			Ok(false)
742		}
743	}
744
745	fn flush_logs(&self, min_log_size: u64) -> Result<bool> {
746		let has_flushed = self.log.flush_one(min_log_size)?;
747		if has_flushed {
748			self.commit_worker_wait.signal();
749		}
750		Ok(has_flushed)
751	}
752
753	fn clean_logs(&self) -> Result<bool> {
754		let keep_logs = if self.options.sync_data { 0 } else { KEEP_LOGS };
755		let num_cleanup = self.log.num_dirty_logs();
756		let result = if num_cleanup > keep_logs {
757			if self.options.sync_data {
758				for c in self.columns.iter() {
759					c.flush()?;
760				}
761			}
762			self.log.clean_logs(num_cleanup - keep_logs)?
763		} else {
764			false
765		};
766		self.cleanup_queue_wait.signal();
767		Ok(result)
768	}
769
770	fn clean_all_logs(&self) -> Result<()> {
771		for c in self.columns.iter() {
772			c.flush()?;
773		}
774		let num_cleanup = self.log.num_dirty_logs();
775		self.log.clean_logs(num_cleanup)?;
776		Ok(())
777	}
778
779	fn replay_all_logs(&self) -> Result<()> {
780		while let Some(id) = self.log.replay_next()? {
781			log::debug!(target: "parity-db", "Replaying database log {}", id);
782			while self.enact_logs(true)? {}
783		}
784
785		// Re-read any cached metadata
786		for c in self.columns.iter() {
787			c.refresh_metadata()?;
788		}
789		log::debug!(target: "parity-db", "Replay is complete.");
790		Ok(())
791	}
792
793	fn shutdown(&self) {
794		self.shutdown.store(true, Ordering::SeqCst);
795		self.log_queue_wait.cv.notify_one();
796		self.flush_worker_wait.signal();
797		self.log_worker_wait.signal();
798		self.commit_worker_wait.signal();
799		self.cleanup_worker_wait.signal();
800	}
801
802	fn kill_logs(&self) -> Result<()> {
803		{
804			if let Some(err) = self.bg_err.lock().as_ref() {
805				// On error the log reader may be left in inconsistent state. So it is important
806				// to no attempt any further log enactment.
807				log::debug!(target: "parity-db", "Shutdown with error state {}", err);
808				self.log.clean_logs(self.log.num_dirty_logs())?;
809				return Ok(())
810			}
811		}
812		log::debug!(target: "parity-db", "Processing leftover commits");
813		// Finish logged records and proceed to log and enact queued commits.
814		while self.enact_logs(false)? {}
815		self.flush_logs(0)?;
816		while self.process_commits()? {}
817		while self.enact_logs(false)? {}
818		self.flush_logs(0)?;
819		while self.enact_logs(false)? {}
820		self.clean_all_logs()?;
821		self.log.kill_logs()?;
822		if self.options.stats {
823			let mut path = self.options.path.clone();
824			path.push("stats.txt");
825			match std::fs::File::create(path) {
826				Ok(file) => {
827					let mut writer = std::io::BufWriter::new(file);
828					if let Err(e) = self.write_stats_text(&mut writer, None) {
829						log::warn!(target: "parity-db", "Error writing stats file: {:?}", e)
830					}
831				},
832				Err(e) => log::warn!(target: "parity-db", "Error creating stats file: {:?}", e),
833			}
834		}
835		Ok(())
836	}
837
838	fn write_stats_text(&self, writer: &mut impl std::io::Write, column: Option<u8>) -> Result<()> {
839		if let Some(col) = column {
840			self.columns[col as usize].write_stats_text(writer)
841		} else {
842			for c in self.columns.iter() {
843				c.write_stats_text(writer)?;
844			}
845			Ok(())
846		}
847	}
848
849	fn clear_stats(&self, column: Option<u8>) -> Result<()> {
850		if let Some(col) = column {
851			self.columns[col as usize].clear_stats()
852		} else {
853			for c in self.columns.iter() {
854				c.clear_stats()?;
855			}
856			Ok(())
857		}
858	}
859
860	fn stats(&self) -> StatSummary {
861		StatSummary { columns: self.columns.iter().map(|c| c.stats()).collect() }
862	}
863
864	fn store_err(&self, result: Result<()>) {
865		if let Err(e) = result {
866			log::warn!(target: "parity-db", "Background worker error: {}", e);
867			let mut err = self.bg_err.lock();
868			if err.is_none() {
869				*err = Some(Arc::new(e));
870				self.shutdown();
871			}
872			self.commit_queue_full_cv.notify_all();
873		}
874	}
875
876	fn iter_column_while(&self, c: ColId, f: impl FnMut(ValueIterState) -> bool) -> Result<()> {
877		let _lock = self.iteration_lock.lock();
878		match &self.columns[c as usize] {
879			Column::Hash(column) => column.iter_values(&self.log, f),
880			Column::Tree(_) => unimplemented!(),
881		}
882	}
883
884	fn iter_column_index_while(&self, c: ColId, f: impl FnMut(IterState) -> bool) -> Result<()> {
885		let _lock = self.iteration_lock.lock();
886		match &self.columns[c as usize] {
887			Column::Hash(column) => column.iter_index(&self.log, f),
888			Column::Tree(_) => unimplemented!(),
889		}
890	}
891}
892
893/// Database instance.
894pub struct Db {
895	inner: Arc<DbInner>,
896	commit_thread: Option<thread::JoinHandle<()>>,
897	flush_thread: Option<thread::JoinHandle<()>>,
898	log_thread: Option<thread::JoinHandle<()>>,
899	cleanup_thread: Option<thread::JoinHandle<()>>,
900}
901
902impl Db {
903	#[cfg(test)]
904	pub(crate) fn with_columns(path: &std::path::Path, num_columns: u8) -> Result<Db> {
905		let options = Options::with_columns(path, num_columns);
906		Self::open_inner(&options, OpeningMode::Create)
907	}
908
909	/// Open the database with given options. An error will be returned if the database does not
910	/// exist.
911	pub fn open(options: &Options) -> Result<Db> {
912		Self::open_inner(options, OpeningMode::Write)
913	}
914
915	/// Open the database using given options. If the database does not exist it will be created
916	/// empty.
917	pub fn open_or_create(options: &Options) -> Result<Db> {
918		Self::open_inner(options, OpeningMode::Create)
919	}
920
921	/// Open an existing database in read-only mode.
922	pub fn open_read_only(options: &Options) -> Result<Db> {
923		Self::open_inner(options, OpeningMode::ReadOnly)
924	}
925
926	fn open_inner(options: &Options, opening_mode: OpeningMode) -> Result<Db> {
927		assert!(options.is_valid());
928		let db = DbInner::open(options, opening_mode)?;
929		// This needs to be call before log thread: so first reindexing
930		// will run in correct state.
931		if let Err(e) = db.replay_all_logs() {
932			log::debug!(target: "parity-db", "Error during log replay.");
933			return Err(e)
934		} else {
935			db.log.clear_replay_logs();
936			db.clean_all_logs()?;
937			db.log.kill_logs()?;
938		}
939		let db = Arc::new(db);
940		#[cfg(any(test, feature = "instrumentation"))]
941		let start_threads = opening_mode != OpeningMode::ReadOnly && options.with_background_thread;
942		#[cfg(not(any(test, feature = "instrumentation")))]
943		let start_threads = opening_mode != OpeningMode::ReadOnly;
944		let commit_thread = if start_threads {
945			let commit_worker_db = db.clone();
946			Some(thread::spawn(move || {
947				commit_worker_db.store_err(Self::commit_worker(commit_worker_db.clone()))
948			}))
949		} else {
950			None
951		};
952		let flush_thread = if start_threads {
953			let flush_worker_db = db.clone();
954			#[cfg(any(test, feature = "instrumentation"))]
955			let min_log_size = if options.always_flush { 0 } else { MIN_LOG_SIZE_BYTES };
956			#[cfg(not(any(test, feature = "instrumentation")))]
957			let min_log_size = MIN_LOG_SIZE_BYTES;
958			Some(thread::spawn(move || {
959				flush_worker_db.store_err(Self::flush_worker(flush_worker_db.clone(), min_log_size))
960			}))
961		} else {
962			None
963		};
964		let log_thread = if start_threads {
965			let log_worker_db = db.clone();
966			Some(thread::spawn(move || {
967				log_worker_db.store_err(Self::log_worker(log_worker_db.clone()))
968			}))
969		} else {
970			None
971		};
972		let cleanup_thread = if start_threads {
973			let cleanup_worker_db = db.clone();
974			Some(thread::spawn(move || {
975				cleanup_worker_db.store_err(Self::cleanup_worker(cleanup_worker_db.clone()))
976			}))
977		} else {
978			None
979		};
980		Ok(Db { inner: db, commit_thread, flush_thread, log_thread, cleanup_thread })
981	}
982
983	/// Get a value in a specified column by key. Returns `None` if the key does not exist.
984	pub fn get(&self, col: ColId, key: &[u8]) -> Result<Option<Value>> {
985		self.inner.get(col, key)
986	}
987
988	/// Get value size by key. Returns `None` if the key does not exist.
989	pub fn get_size(&self, col: ColId, key: &[u8]) -> Result<Option<u32>> {
990		self.inner.get_size(col, key)
991	}
992
993	/// Iterate over all ordered key-value pairs. Only supported for columns configured with
994	/// `btree_indexed`.
995	pub fn iter(&self, col: ColId) -> Result<BTreeIterator> {
996		self.inner.btree_iter(col)
997	}
998
999	/// Commit a set of changes to the database.
1000	pub fn commit<I, K>(&self, tx: I) -> Result<()>
1001	where
1002		I: IntoIterator<Item = (ColId, K, Option<Value>)>,
1003		K: AsRef<[u8]>,
1004	{
1005		self.inner.commit(tx)
1006	}
1007
1008	/// Commit a set of changes to the database.
1009	pub fn commit_changes<I>(&self, tx: I) -> Result<()>
1010	where
1011		I: IntoIterator<Item = (ColId, Operation<Vec<u8>, Vec<u8>>)>,
1012	{
1013		self.inner.commit_changes(tx)
1014	}
1015
1016	pub(crate) fn commit_raw(&self, commit: CommitChangeSet) -> Result<()> {
1017		self.inner.commit_raw(commit)
1018	}
1019
1020	/// Returns the number of columns in the database.
1021	pub fn num_columns(&self) -> u8 {
1022		self.inner.columns.len() as u8
1023	}
1024
1025	/// Iterate a column and call a function for each value. This is only supported for columns with
1026	/// `btree_index` set to `false`. Iteration order is unspecified.
1027	/// Unlike `get` the iteration may not include changes made in recent `commit` calls.
1028	pub fn iter_column_while(&self, c: ColId, f: impl FnMut(ValueIterState) -> bool) -> Result<()> {
1029		self.inner.iter_column_while(c, f)
1030	}
1031
1032	/// Iterate a column and call a function for each value. This is only supported for columns with
1033	/// `btree_index` set to `false`. Iteration order is unspecified. Note that the
1034	/// `key` field in the state is the hash of the original key.
1035	/// Unlike `get` the iteration may not include changes made in recent `commit` calls.
1036	pub(crate) fn iter_column_index_while(
1037		&self,
1038		c: ColId,
1039		f: impl FnMut(IterState) -> bool,
1040	) -> Result<()> {
1041		self.inner.iter_column_index_while(c, f)
1042	}
1043
1044	fn commit_worker(db: Arc<DbInner>) -> Result<()> {
1045		let mut more_work = false;
1046		while !db.shutdown.load(Ordering::SeqCst) || more_work {
1047			if !more_work {
1048				db.cleanup_worker_wait.signal();
1049				if !db.log.has_log_files_to_read() {
1050					db.commit_worker_wait.wait();
1051				}
1052			}
1053
1054			more_work = db.enact_logs(false)?;
1055		}
1056		log::debug!(target: "parity-db", "Commit worker shutdown");
1057		Ok(())
1058	}
1059
1060	fn log_worker(db: Arc<DbInner>) -> Result<()> {
1061		// Start with pending reindex.
1062		let mut more_reindex = db.process_reindex()?;
1063		let mut more_commits = false;
1064		// Process all commits but allow reindex to be interrupted.
1065		while !db.shutdown.load(Ordering::SeqCst) || more_commits {
1066			if !more_commits && !more_reindex {
1067				db.log_worker_wait.wait();
1068			}
1069
1070			more_commits = db.process_commits()?;
1071			more_reindex = db.process_reindex()?;
1072		}
1073		log::debug!(target: "parity-db", "Log worker shutdown");
1074		Ok(())
1075	}
1076
1077	fn flush_worker(db: Arc<DbInner>, min_log_size: u64) -> Result<()> {
1078		let mut more_work = false;
1079		while !db.shutdown.load(Ordering::SeqCst) {
1080			if !more_work {
1081				db.flush_worker_wait.wait();
1082			}
1083			more_work = db.flush_logs(min_log_size)?;
1084		}
1085		log::debug!(target: "parity-db", "Flush worker shutdown");
1086		Ok(())
1087	}
1088
1089	fn cleanup_worker(db: Arc<DbInner>) -> Result<()> {
1090		let mut more_work = true;
1091		while !db.shutdown.load(Ordering::SeqCst) || more_work {
1092			if !more_work {
1093				db.cleanup_worker_wait.wait();
1094			}
1095			more_work = db.clean_logs()?;
1096		}
1097		log::debug!(target: "parity-db", "Cleanup worker shutdown");
1098		Ok(())
1099	}
1100
1101	/// Dump full database stats to the text output.
1102	pub fn write_stats_text(
1103		&self,
1104		writer: &mut impl std::io::Write,
1105		column: Option<u8>,
1106	) -> Result<()> {
1107		self.inner.write_stats_text(writer, column)
1108	}
1109
1110	/// Reset internal database statistics for the database or specified column.
1111	pub fn clear_stats(&self, column: Option<u8>) -> Result<()> {
1112		self.inner.clear_stats(column)
1113	}
1114
1115	/// Print database contents in text form to stderr.
1116	pub fn dump(&self, check_param: check::CheckOptions) -> Result<()> {
1117		if let Some(col) = check_param.column {
1118			self.inner.columns[col as usize].dump(&self.inner.log, &check_param, col)?;
1119		} else {
1120			for (ix, c) in self.inner.columns.iter().enumerate() {
1121				c.dump(&self.inner.log, &check_param, ix as ColId)?;
1122			}
1123		}
1124		Ok(())
1125	}
1126
1127	/// Get database statistics.
1128	pub fn stats(&self) -> StatSummary {
1129		self.inner.stats()
1130	}
1131
1132	// We open the DB before to check metadata validity and make sure there are no pending WAL
1133	// logs.
1134	fn precheck_column_operation(options: &mut Options) -> Result<[u8; 32]> {
1135		let db = Db::open(options)?;
1136		let salt = db.inner.options.salt;
1137		drop(db);
1138		Ok(salt.expect("`salt` is always `Some` after opening the DB; qed"))
1139	}
1140
1141	/// Add a new column with options specified by `new_column_options`.
1142	pub fn add_column(options: &mut Options, new_column_options: ColumnOptions) -> Result<()> {
1143		let salt = Self::precheck_column_operation(options)?;
1144
1145		options.columns.push(new_column_options);
1146		options.write_metadata_with_version(&options.path, &salt, Some(CURRENT_VERSION))?;
1147
1148		Ok(())
1149	}
1150
1151	/// Remove last column from the database.
1152	/// Db must be close when called.
1153	pub fn drop_last_column(options: &mut Options) -> Result<()> {
1154		let salt = Self::precheck_column_operation(options)?;
1155		let nb_column = options.columns.len();
1156		if nb_column == 0 {
1157			return Ok(())
1158		}
1159		let index = options.columns.len() - 1;
1160		Self::remove_column_files(options, index as u8)?;
1161		options.columns.pop();
1162		options.write_metadata(&options.path, &salt)?;
1163		Ok(())
1164	}
1165
1166	/// Truncate a column from the database, optionally changing its options.
1167	/// Db must be close when called.
1168	pub fn reset_column(
1169		options: &mut Options,
1170		index: u8,
1171		new_options: Option<ColumnOptions>,
1172	) -> Result<()> {
1173		let salt = Self::precheck_column_operation(options)?;
1174		Self::remove_column_files(options, index)?;
1175
1176		if let Some(new_options) = new_options {
1177			options.columns[index as usize] = new_options;
1178			options.write_metadata(&options.path, &salt)?;
1179		}
1180
1181		Ok(())
1182	}
1183
1184	fn remove_column_files(options: &mut Options, index: u8) -> Result<()> {
1185		if index as usize >= options.columns.len() {
1186			return Err(Error::IncompatibleColumnConfig {
1187				id: index,
1188				reason: "Column not found".to_string(),
1189			})
1190		}
1191
1192		Column::drop_files(index, options.path.clone())?;
1193		Ok(())
1194	}
1195
1196	#[cfg(feature = "instrumentation")]
1197	pub fn process_reindex(&self) -> Result<()> {
1198		self.inner.process_reindex()?;
1199		Ok(())
1200	}
1201
1202	#[cfg(feature = "instrumentation")]
1203	pub fn process_commits(&self) -> Result<()> {
1204		self.inner.process_commits()?;
1205		Ok(())
1206	}
1207
1208	#[cfg(feature = "instrumentation")]
1209	pub fn flush_logs(&self) -> Result<()> {
1210		self.inner.flush_logs(0)?;
1211		Ok(())
1212	}
1213
1214	#[cfg(feature = "instrumentation")]
1215	pub fn enact_logs(&self) -> Result<()> {
1216		while self.inner.enact_logs(false)? {}
1217		Ok(())
1218	}
1219
1220	#[cfg(feature = "instrumentation")]
1221	pub fn clean_logs(&self) -> Result<()> {
1222		self.inner.clean_logs()?;
1223		Ok(())
1224	}
1225}
1226
1227impl Drop for Db {
1228	fn drop(&mut self) {
1229		self.drop_inner()
1230	}
1231}
1232
1233impl Db {
1234	fn drop_inner(&mut self) {
1235		self.inner.shutdown();
1236		if let Some(t) = self.log_thread.take() {
1237			if let Err(e) = t.join() {
1238				log::warn!(target: "parity-db", "Log thread shutdown error: {:?}", e);
1239			}
1240		}
1241		if let Some(t) = self.flush_thread.take() {
1242			if let Err(e) = t.join() {
1243				log::warn!(target: "parity-db", "Flush thread shutdown error: {:?}", e);
1244			}
1245		}
1246		if let Some(t) = self.commit_thread.take() {
1247			if let Err(e) = t.join() {
1248				log::warn!(target: "parity-db", "Commit thread shutdown error: {:?}", e);
1249			}
1250		}
1251		if let Some(t) = self.cleanup_thread.take() {
1252			if let Err(e) = t.join() {
1253				log::warn!(target: "parity-db", "Cleanup thread shutdown error: {:?}", e);
1254			}
1255		}
1256		if let Err(e) = self.inner.kill_logs() {
1257			log::warn!(target: "parity-db", "Shutdown error: {:?}", e);
1258		}
1259		if let Err(e) = self.inner.lock_file.unlock() {
1260			log::debug!(target: "parity-db", "Error removing file lock: {:?}", e);
1261		}
1262	}
1263}
1264
1265pub type IndexedCommitOverlay = HashMap<Key, (u64, Option<RcValue>), IdentityBuildHasher>;
1266pub type BTreeCommitOverlay = BTreeMap<RcValue, (u64, Option<RcValue>)>;
1267
1268#[derive(Debug)]
1269pub struct CommitOverlay {
1270	indexed: IndexedCommitOverlay,
1271	btree_indexed: BTreeCommitOverlay,
1272}
1273
1274impl CommitOverlay {
1275	fn new() -> Self {
1276		CommitOverlay { indexed: Default::default(), btree_indexed: Default::default() }
1277	}
1278
1279	#[cfg(test)]
1280	fn is_empty(&self) -> bool {
1281		self.indexed.is_empty() && self.btree_indexed.is_empty()
1282	}
1283}
1284
1285impl CommitOverlay {
1286	fn get_ref(&self, key: &[u8]) -> Option<Option<&RcValue>> {
1287		self.indexed.get(key).map(|(_, v)| v.as_ref())
1288	}
1289
1290	fn get(&self, key: &[u8]) -> Option<Option<RcValue>> {
1291		self.get_ref(key).map(|v| v.cloned())
1292	}
1293
1294	fn get_size(&self, key: &[u8]) -> Option<Option<u32>> {
1295		self.get_ref(key).map(|res| res.as_ref().map(|b| b.value().len() as u32))
1296	}
1297
1298	fn btree_get(&self, key: &[u8]) -> Option<Option<&RcValue>> {
1299		self.btree_indexed.get(key).map(|(_, v)| v.as_ref())
1300	}
1301
1302	pub fn btree_next(
1303		&self,
1304		last_key: &crate::btree::LastKey,
1305	) -> Option<(RcValue, Option<RcValue>)> {
1306		use crate::btree::LastKey;
1307		match &last_key {
1308			LastKey::Start => self
1309				.btree_indexed
1310				.range::<Vec<u8>, _>(..)
1311				.next()
1312				.map(|(k, (_, v))| (k.clone(), v.clone())),
1313			LastKey::End => None,
1314			LastKey::At(key) => self
1315				.btree_indexed
1316				.range::<Vec<u8>, _>((Bound::Excluded(key), Bound::Unbounded))
1317				.next()
1318				.map(|(k, (_, v))| (k.clone(), v.clone())),
1319			LastKey::Seeked(key) => self
1320				.btree_indexed
1321				.range::<Value, _>(key..)
1322				.next()
1323				.map(|(k, (_, v))| (k.clone(), v.clone())),
1324		}
1325	}
1326
1327	pub fn btree_prev(
1328		&self,
1329		last_key: &crate::btree::LastKey,
1330	) -> Option<(RcValue, Option<RcValue>)> {
1331		use crate::btree::LastKey;
1332		match &last_key {
1333			LastKey::End => self
1334				.btree_indexed
1335				.range::<Vec<u8>, _>(..)
1336				.rev()
1337				.next()
1338				.map(|(k, (_, v))| (k.clone(), v.clone())),
1339			LastKey::Start => None,
1340			LastKey::At(key) => self
1341				.btree_indexed
1342				.range::<Vec<u8>, _>(..key)
1343				.rev()
1344				.next()
1345				.map(|(k, (_, v))| (k.clone(), v.clone())),
1346			LastKey::Seeked(key) => self
1347				.btree_indexed
1348				.range::<Vec<u8>, _>(..=key)
1349				.rev()
1350				.next()
1351				.map(|(k, (_, v))| (k.clone(), v.clone())),
1352		}
1353	}
1354}
1355
1356/// Different operations allowed for a commit.
1357/// Behavior may differs depending on column configuration.
1358#[derive(Debug, PartialEq, Eq)]
1359pub enum Operation<Key, Value> {
1360	/// Insert or update the value for a given key.
1361	Set(Key, Value),
1362
1363	/// Dereference at a given key, resulting in
1364	/// either removal of a key value or decrement of its
1365	/// reference count counter.
1366	Dereference(Key),
1367
1368	/// Increment the reference count counter of an existing value for a given key.
1369	/// If no value exists for the key, this operation is skipped.
1370	Reference(Key),
1371}
1372
1373impl<Key: Ord, Value: Eq> PartialOrd<Self> for Operation<Key, Value> {
1374	fn partial_cmp(&self, other: &Self) -> Option<std::cmp::Ordering> {
1375		Some(self.cmp(other))
1376	}
1377}
1378
1379impl<Key: Ord, Value: Eq> Ord for Operation<Key, Value> {
1380	fn cmp(&self, other: &Self) -> std::cmp::Ordering {
1381		self.key().cmp(other.key())
1382	}
1383}
1384
1385impl<Key, Value> Operation<Key, Value> {
1386	pub fn key(&self) -> &Key {
1387		match self {
1388			Operation::Set(k, _) | Operation::Dereference(k) | Operation::Reference(k) => k,
1389		}
1390	}
1391
1392	pub fn into_key(self) -> Key {
1393		match self {
1394			Operation::Set(k, _) | Operation::Dereference(k) | Operation::Reference(k) => k,
1395		}
1396	}
1397}
1398
1399impl<K: AsRef<[u8]>, Value> Operation<K, Value> {
1400	pub fn to_key_vec(self) -> Operation<Vec<u8>, Value> {
1401		match self {
1402			Operation::Set(k, v) => Operation::Set(k.as_ref().to_vec(), v),
1403			Operation::Dereference(k) => Operation::Dereference(k.as_ref().to_vec()),
1404			Operation::Reference(k) => Operation::Reference(k.as_ref().to_vec()),
1405		}
1406	}
1407}
1408
1409#[derive(Debug, Default)]
1410pub struct CommitChangeSet {
1411	pub indexed: HashMap<ColId, IndexedChangeSet>,
1412	pub btree_indexed: HashMap<ColId, BTreeChangeSet>,
1413}
1414
1415#[derive(Debug)]
1416pub struct IndexedChangeSet {
1417	pub col: ColId,
1418	pub changes: Vec<Operation<Key, RcValue>>,
1419}
1420
1421impl IndexedChangeSet {
1422	pub fn new(col: ColId) -> Self {
1423		IndexedChangeSet { col, changes: Default::default() }
1424	}
1425
1426	fn push<K: AsRef<[u8]>>(
1427		&mut self,
1428		change: Operation<K, Vec<u8>>,
1429		options: &Options,
1430		db_version: u32,
1431	) {
1432		let salt = options.salt.unwrap_or_default();
1433		let hash_key = |key: &[u8]| -> Key {
1434			hash_key(key, &salt, options.columns[self.col as usize].uniform, db_version)
1435		};
1436
1437		self.push_change_hashed(match change {
1438			Operation::Set(k, v) => Operation::Set(hash_key(k.as_ref()), v.into()),
1439			Operation::Dereference(k) => Operation::Dereference(hash_key(k.as_ref())),
1440			Operation::Reference(k) => Operation::Reference(hash_key(k.as_ref())),
1441		})
1442	}
1443
1444	fn push_change_hashed(&mut self, change: Operation<Key, RcValue>) {
1445		self.changes.push(change);
1446	}
1447
1448	fn copy_to_overlay(
1449		&self,
1450		overlay: &mut CommitOverlay,
1451		record_id: u64,
1452		bytes: &mut usize,
1453		options: &Options,
1454	) -> Result<()> {
1455		let ref_counted = options.columns[self.col as usize].ref_counted;
1456		for change in self.changes.iter() {
1457			match &change {
1458				Operation::Set(k, v) => {
1459					*bytes += k.len();
1460					*bytes += v.value().len();
1461					overlay.indexed.insert(*k, (record_id, Some(v.clone())));
1462				},
1463				Operation::Dereference(k) => {
1464					// Don't add removed ref-counted values to overlay.
1465					if !ref_counted {
1466						overlay.indexed.insert(*k, (record_id, None));
1467					}
1468				},
1469				Operation::Reference(..) => {
1470					// Don't add (we allow remove value in overlay when using rc: some
1471					// indexing on top of it is expected).
1472					if !ref_counted {
1473						return Err(Error::InvalidInput(format!("No Rc for column {}", self.col)))
1474					}
1475				},
1476			}
1477		}
1478		Ok(())
1479	}
1480
1481	fn write_plan(
1482		&self,
1483		column: &Column,
1484		writer: &mut crate::log::LogWriter,
1485		ops: &mut u64,
1486		reindex: &mut bool,
1487	) -> Result<()> {
1488		let column = match column {
1489			Column::Hash(column) => column,
1490			Column::Tree(_) => {
1491				log::warn!(target: "parity-db", "Skipping unindex commit in indexed column");
1492				return Ok(())
1493			},
1494		};
1495		for change in self.changes.iter() {
1496			if let PlanOutcome::NeedReindex = column.write_plan(change, writer)? {
1497				// Reindex has triggered another reindex.
1498				*reindex = true;
1499			}
1500			*ops += 1;
1501		}
1502		Ok(())
1503	}
1504
1505	fn clean_overlay(&self, overlay: &mut CommitOverlay, record_id: u64) {
1506		use std::collections::hash_map::Entry;
1507		for change in self.changes.iter() {
1508			match change {
1509				Operation::Set(k, _) | Operation::Dereference(k) => {
1510					if let Entry::Occupied(e) = overlay.indexed.entry(*k) {
1511						if e.get().0 == record_id {
1512							e.remove_entry();
1513						}
1514					}
1515				},
1516				Operation::Reference(..) => (),
1517			}
1518		}
1519	}
1520}
1521
1522/// Verification operation utilities.
1523pub mod check {
1524	/// Database dump verbosity.
1525	pub enum CheckDisplay {
1526		/// Don't output any data.
1527		None,
1528		/// Output full data.
1529		Full,
1530		/// Limit value output to the specified size.
1531		Short(u64),
1532	}
1533
1534	/// Options for producing a database dump.
1535	pub struct CheckOptions {
1536		/// Only process this column. If this is `None` all columns will be processed.
1537		pub column: Option<u8>,
1538		/// Start with this index.
1539		pub from: Option<u64>,
1540		/// End with this index.
1541		pub bound: Option<u64>,
1542		/// Verbosity.
1543		pub display: CheckDisplay,
1544		/// Ordered validation.
1545		pub fast: bool,
1546		/// Make sure free lists are correct.
1547		pub validate_free_refs: bool,
1548	}
1549
1550	impl CheckOptions {
1551		/// Create a new instance.
1552		pub fn new(
1553			column: Option<u8>,
1554			from: Option<u64>,
1555			bound: Option<u64>,
1556			display_content: bool,
1557			truncate_value_display: Option<u64>,
1558			fast: bool,
1559			validate_free_refs: bool,
1560		) -> Self {
1561			let display = if display_content {
1562				match truncate_value_display {
1563					Some(t) => CheckDisplay::Short(t),
1564					None => CheckDisplay::Full,
1565				}
1566			} else {
1567				CheckDisplay::None
1568			};
1569			CheckOptions { column, from, bound, display, fast, validate_free_refs }
1570		}
1571	}
1572}
1573
1574#[derive(Eq, PartialEq, Clone, Copy)]
1575enum OpeningMode {
1576	Create,
1577	Write,
1578	ReadOnly,
1579}
1580
1581#[cfg(test)]
1582mod tests {
1583	use super::{Db, Options};
1584	use crate::{
1585		column::ColId,
1586		db::{DbInner, OpeningMode},
1587		ColumnOptions, Value,
1588	};
1589	use rand::Rng;
1590	use std::{
1591		collections::{BTreeMap, HashMap, HashSet},
1592		path::Path,
1593	};
1594	use tempfile::tempdir;
1595
1596	// This is used in tests to disable certain commit stages.
1597	#[derive(Eq, PartialEq, Debug, Clone, Copy)]
1598	enum EnableCommitPipelineStages {
1599		// No threads started, data stays in commit overlay.
1600		#[allow(dead_code)]
1601		CommitOverlay,
1602		// Log worker run, data processed up to the log overlay.
1603		#[allow(dead_code)]
1604		LogOverlay,
1605		// Runing all.
1606		#[allow(dead_code)]
1607		DbFile,
1608		// Default run mode.
1609		Standard,
1610	}
1611
1612	impl EnableCommitPipelineStages {
1613		fn options(&self, path: &Path, num_columns: u8) -> Options {
1614			Options {
1615				path: path.into(),
1616				sync_wal: true,
1617				sync_data: true,
1618				stats: true,
1619				salt: None,
1620				columns: (0..num_columns).map(|_| Default::default()).collect(),
1621				compression_threshold: HashMap::new(),
1622				with_background_thread: *self == Self::Standard,
1623				always_flush: *self == Self::DbFile,
1624			}
1625		}
1626
1627		fn run_stages(&self, db: &Db) {
1628			let db = &db.inner;
1629			if *self == EnableCommitPipelineStages::DbFile ||
1630				*self == EnableCommitPipelineStages::LogOverlay
1631			{
1632				while db.process_commits().unwrap() {}
1633				while db.process_reindex().unwrap() {}
1634			}
1635			if *self == EnableCommitPipelineStages::DbFile {
1636				let _ = db.log.flush_one(0).unwrap();
1637				while db.enact_logs(false).unwrap() {}
1638				let _ = db.clean_logs().unwrap();
1639			}
1640		}
1641
1642		fn check_empty_overlay(&self, db: &DbInner, col: ColId) -> bool {
1643			match self {
1644				EnableCommitPipelineStages::DbFile | EnableCommitPipelineStages::LogOverlay => {
1645					if let Some(overlay) = db.commit_overlay.read().get(col as usize) {
1646						if !overlay.is_empty() {
1647							let mut replayed = 5;
1648							while !overlay.is_empty() {
1649								if replayed > 0 {
1650									replayed -= 1;
1651									// the signal is triggered just before cleaning the overlay, so
1652									// we wait a bit.
1653									// Warning this is still rather flaky and should be ignored
1654									// or removed.
1655									std::thread::sleep(std::time::Duration::from_millis(100));
1656								} else {
1657									return false
1658								}
1659							}
1660						}
1661					}
1662				},
1663				_ => (),
1664			}
1665			true
1666		}
1667	}
1668
1669	#[test]
1670	fn test_db_open_should_fail() {
1671		let tmp = tempdir().unwrap();
1672		let options = Options::with_columns(tmp.path(), 5);
1673		assert!(matches!(Db::open(&options), Err(crate::Error::DatabaseNotFound)));
1674	}
1675
1676	#[test]
1677	fn test_db_open_fail_then_recursively_create() {
1678		let tmp = tempdir().unwrap();
1679		let (db_path_first, db_path_last) = {
1680			let mut db_path_first = tmp.path().to_owned();
1681			db_path_first.push("nope");
1682
1683			let mut db_path_last = db_path_first.to_owned();
1684
1685			for p in ["does", "not", "yet", "exist"] {
1686				db_path_last.push(p);
1687			}
1688
1689			(db_path_first, db_path_last)
1690		};
1691
1692		assert!(
1693			!db_path_first.exists(),
1694			"That directory should not have existed at this point (dir: {db_path_first:?})"
1695		);
1696
1697		let options = Options::with_columns(&db_path_last, 5);
1698		assert!(matches!(Db::open(&options), Err(crate::Error::DatabaseNotFound)));
1699
1700		assert!(!db_path_first.exists(), "That directory should remain non-existent. Did the `open(create: false)` nonetheless create a directory? (dir: {db_path_first:?})");
1701		assert!(Db::open_or_create(&options).is_ok(), "New database should be created");
1702
1703		assert!(
1704			db_path_first.is_dir(),
1705			"A directory should have been been created (dir: {db_path_first:?})"
1706		);
1707		assert!(
1708			db_path_last.is_dir(),
1709			"A directory should have been been created (dir: {db_path_last:?})"
1710		);
1711	}
1712
1713	#[test]
1714	fn test_db_open_or_create() {
1715		let tmp = tempdir().unwrap();
1716		let options = Options::with_columns(tmp.path(), 5);
1717		assert!(Db::open_or_create(&options).is_ok(), "New database should be created");
1718		assert!(Db::open(&options).is_ok(), "Existing database should be reopened");
1719	}
1720
1721	#[test]
1722	fn test_indexed_keyvalues() {
1723		test_indexed_keyvalues_inner(EnableCommitPipelineStages::CommitOverlay);
1724		test_indexed_keyvalues_inner(EnableCommitPipelineStages::LogOverlay);
1725		test_indexed_keyvalues_inner(EnableCommitPipelineStages::DbFile);
1726		test_indexed_keyvalues_inner(EnableCommitPipelineStages::Standard);
1727	}
1728	fn test_indexed_keyvalues_inner(db_test: EnableCommitPipelineStages) {
1729		let tmp = tempdir().unwrap();
1730		let options = db_test.options(tmp.path(), 5);
1731		let col_nb = 0;
1732
1733		let key1 = b"key1".to_vec();
1734		let key2 = b"key2".to_vec();
1735		let key3 = b"key3".to_vec();
1736
1737		let db = Db::open_inner(&options, OpeningMode::Create).unwrap();
1738		assert!(db.get(col_nb, key1.as_slice()).unwrap().is_none());
1739
1740		db.commit(vec![(col_nb, key1.clone(), Some(b"value1".to_vec()))]).unwrap();
1741		db_test.run_stages(&db);
1742		assert!(db_test.check_empty_overlay(&db.inner, col_nb));
1743
1744		assert_eq!(db.get(col_nb, key1.as_slice()).unwrap(), Some(b"value1".to_vec()));
1745
1746		db.commit(vec![
1747			(col_nb, key1.clone(), None),
1748			(col_nb, key2.clone(), Some(b"value2".to_vec())),
1749			(col_nb, key3.clone(), Some(b"value3".to_vec())),
1750		])
1751		.unwrap();
1752		db_test.run_stages(&db);
1753		assert!(db_test.check_empty_overlay(&db.inner, col_nb));
1754
1755		assert!(db.get(col_nb, key1.as_slice()).unwrap().is_none());
1756		assert_eq!(db.get(col_nb, key2.as_slice()).unwrap(), Some(b"value2".to_vec()));
1757		assert_eq!(db.get(col_nb, key3.as_slice()).unwrap(), Some(b"value3".to_vec()));
1758
1759		db.commit(vec![
1760			(col_nb, key2.clone(), Some(b"value2b".to_vec())),
1761			(col_nb, key3.clone(), None),
1762		])
1763		.unwrap();
1764		db_test.run_stages(&db);
1765		assert!(db_test.check_empty_overlay(&db.inner, col_nb));
1766
1767		assert!(db.get(col_nb, key1.as_slice()).unwrap().is_none());
1768		assert_eq!(db.get(col_nb, key2.as_slice()).unwrap(), Some(b"value2b".to_vec()));
1769		assert_eq!(db.get(col_nb, key3.as_slice()).unwrap(), None);
1770	}
1771
1772	#[test]
1773	fn test_indexed_overlay_against_backend() {
1774		let tmp = tempdir().unwrap();
1775		let db_test = EnableCommitPipelineStages::DbFile;
1776		let options = db_test.options(tmp.path(), 5);
1777		let col_nb = 0;
1778
1779		let key1 = b"key1".to_vec();
1780		let key2 = b"key2".to_vec();
1781		let key3 = b"key3".to_vec();
1782
1783		let db = Db::open_inner(&options, OpeningMode::Create).unwrap();
1784
1785		db.commit(vec![
1786			(col_nb, key1.clone(), Some(b"value1".to_vec())),
1787			(col_nb, key2.clone(), Some(b"value2".to_vec())),
1788			(col_nb, key3.clone(), Some(b"value3".to_vec())),
1789		])
1790		.unwrap();
1791		db_test.run_stages(&db);
1792		drop(db);
1793
1794		// issue with some file reopening when no delay
1795		std::thread::sleep(std::time::Duration::from_millis(100));
1796
1797		let db_test = EnableCommitPipelineStages::CommitOverlay;
1798		let options = db_test.options(tmp.path(), 5);
1799		let db = Db::open_inner(&options, OpeningMode::Write).unwrap();
1800		assert_eq!(db.get(col_nb, key1.as_slice()).unwrap(), Some(b"value1".to_vec()));
1801		assert_eq!(db.get(col_nb, key2.as_slice()).unwrap(), Some(b"value2".to_vec()));
1802		assert_eq!(db.get(col_nb, key3.as_slice()).unwrap(), Some(b"value3".to_vec()));
1803		db.commit(vec![
1804			(col_nb, key2.clone(), Some(b"value2b".to_vec())),
1805			(col_nb, key3.clone(), None),
1806		])
1807		.unwrap();
1808		db_test.run_stages(&db);
1809
1810		assert_eq!(db.get(col_nb, key1.as_slice()).unwrap(), Some(b"value1".to_vec()));
1811		assert_eq!(db.get(col_nb, key2.as_slice()).unwrap(), Some(b"value2b".to_vec()));
1812		assert_eq!(db.get(col_nb, key3.as_slice()).unwrap(), None);
1813	}
1814
1815	#[test]
1816	fn test_add_column() {
1817		let tmp = tempdir().unwrap();
1818		let db_test = EnableCommitPipelineStages::DbFile;
1819		let mut options = db_test.options(tmp.path(), 1);
1820		options.salt = Some(options.salt.unwrap_or_default());
1821
1822		let old_col_id = 0;
1823		let new_col_id = 1;
1824		let new_col_indexed_id = 2;
1825
1826		let key1 = b"key1".to_vec();
1827		let key2 = b"key2".to_vec();
1828		let key3 = b"key3".to_vec();
1829
1830		let db = Db::open_inner(&options, OpeningMode::Create).unwrap();
1831
1832		db.commit(vec![
1833			(old_col_id, key1.clone(), Some(b"value1".to_vec())),
1834			(old_col_id, key2.clone(), Some(b"value2".to_vec())),
1835			(old_col_id, key3.clone(), Some(b"value3".to_vec())),
1836		])
1837		.unwrap();
1838		db_test.run_stages(&db);
1839
1840		drop(db);
1841
1842		Db::add_column(&mut options, ColumnOptions { btree_index: false, ..Default::default() })
1843			.unwrap();
1844
1845		Db::add_column(&mut options, ColumnOptions { btree_index: true, ..Default::default() })
1846			.unwrap();
1847
1848		let mut options = db_test.options(tmp.path(), 3);
1849		options.columns[new_col_indexed_id as usize].btree_index = true;
1850
1851		let db_test = EnableCommitPipelineStages::DbFile;
1852		let db = Db::open_inner(&options, OpeningMode::Create).unwrap();
1853
1854		// Expected number of columns
1855		assert_eq!(db.num_columns(), 3);
1856
1857		let new_key1 = b"abcdef".to_vec();
1858		let new_key2 = b"123456".to_vec();
1859
1860		// Write to new columns.
1861		db.commit(vec![
1862			(new_col_id, new_key1.clone(), Some(new_key1.to_vec())),
1863			(new_col_id, new_key2.clone(), Some(new_key2.to_vec())),
1864			(new_col_indexed_id, new_key1.clone(), Some(new_key1.to_vec())),
1865			(new_col_indexed_id, new_key2.clone(), Some(new_key2.to_vec())),
1866		])
1867		.unwrap();
1868		db_test.run_stages(&db);
1869
1870		drop(db);
1871
1872		// Reopen DB and fetch all keys we inserted.
1873		let db = Db::open_inner(&options, OpeningMode::Create).unwrap();
1874
1875		assert_eq!(db.get(old_col_id, key1.as_slice()).unwrap(), Some(b"value1".to_vec()));
1876		assert_eq!(db.get(old_col_id, key2.as_slice()).unwrap(), Some(b"value2".to_vec()));
1877		assert_eq!(db.get(old_col_id, key3.as_slice()).unwrap(), Some(b"value3".to_vec()));
1878
1879		// Fetch from new columns
1880		assert_eq!(db.get(new_col_id, new_key1.as_slice()).unwrap(), Some(new_key1.to_vec()));
1881		assert_eq!(db.get(new_col_id, new_key2.as_slice()).unwrap(), Some(new_key2.to_vec()));
1882		assert_eq!(
1883			db.get(new_col_indexed_id, new_key1.as_slice()).unwrap(),
1884			Some(new_key1.to_vec())
1885		);
1886		assert_eq!(
1887			db.get(new_col_indexed_id, new_key2.as_slice()).unwrap(),
1888			Some(new_key2.to_vec())
1889		);
1890	}
1891
1892	#[test]
1893	fn test_indexed_btree_1() {
1894		test_indexed_btree_inner(EnableCommitPipelineStages::CommitOverlay, false);
1895		test_indexed_btree_inner(EnableCommitPipelineStages::LogOverlay, false);
1896		test_indexed_btree_inner(EnableCommitPipelineStages::DbFile, false);
1897		test_indexed_btree_inner(EnableCommitPipelineStages::Standard, false);
1898		test_indexed_btree_inner(EnableCommitPipelineStages::CommitOverlay, true);
1899		test_indexed_btree_inner(EnableCommitPipelineStages::LogOverlay, true);
1900		test_indexed_btree_inner(EnableCommitPipelineStages::DbFile, true);
1901		test_indexed_btree_inner(EnableCommitPipelineStages::Standard, true);
1902	}
1903	fn test_indexed_btree_inner(db_test: EnableCommitPipelineStages, long_key: bool) {
1904		let tmp = tempdir().unwrap();
1905		let col_nb = 0u8;
1906		let mut options = db_test.options(tmp.path(), 5);
1907		options.columns[col_nb as usize].btree_index = true;
1908
1909		let (key1, key2, key3, key4) = if !long_key {
1910			(b"key1".to_vec(), b"key2".to_vec(), b"key3".to_vec(), b"key4".to_vec())
1911		} else {
1912			let key2 = vec![2; 272];
1913			let mut key3 = key2.clone();
1914			key3[271] = 3;
1915			(vec![1; 953], key2, key3, vec![4; 79])
1916		};
1917
1918		let db = Db::open_inner(&options, OpeningMode::Create).unwrap();
1919		assert_eq!(db.get(col_nb, &key1).unwrap(), None);
1920
1921		let mut iter = db.iter(col_nb).unwrap();
1922		assert_eq!(iter.next().unwrap(), None);
1923		assert_eq!(iter.prev().unwrap(), None);
1924
1925		db.commit(vec![(col_nb, key1.clone(), Some(b"value1".to_vec()))]).unwrap();
1926		db_test.run_stages(&db);
1927
1928		assert_eq!(db.get(col_nb, &key1).unwrap(), Some(b"value1".to_vec()));
1929		iter.seek_to_first().unwrap();
1930		assert_eq!(iter.next().unwrap(), Some((key1.clone(), b"value1".to_vec())));
1931		assert_eq!(iter.next().unwrap(), None);
1932		assert_eq!(iter.prev().unwrap(), Some((key1.clone(), b"value1".to_vec())));
1933		assert_eq!(iter.prev().unwrap(), None);
1934		assert_eq!(iter.next().unwrap(), Some((key1.clone(), b"value1".to_vec())));
1935		assert_eq!(iter.next().unwrap(), None);
1936
1937		iter.seek_to_first().unwrap();
1938		assert_eq!(iter.next().unwrap(), Some((key1.clone(), b"value1".to_vec())));
1939		assert_eq!(iter.prev().unwrap(), None);
1940
1941		iter.seek(&[0xff]).unwrap();
1942		assert_eq!(iter.prev().unwrap(), Some((key1.clone(), b"value1".to_vec())));
1943		assert_eq!(iter.prev().unwrap(), None);
1944
1945		db.commit(vec![
1946			(col_nb, key1.clone(), None),
1947			(col_nb, key2.clone(), Some(b"value2".to_vec())),
1948			(col_nb, key3.clone(), Some(b"value3".to_vec())),
1949		])
1950		.unwrap();
1951		db_test.run_stages(&db);
1952
1953		assert_eq!(db.get(col_nb, &key1).unwrap(), None);
1954		assert_eq!(db.get(col_nb, &key2).unwrap(), Some(b"value2".to_vec()));
1955		assert_eq!(db.get(col_nb, &key3).unwrap(), Some(b"value3".to_vec()));
1956
1957		iter.seek(key2.as_slice()).unwrap();
1958		assert_eq!(iter.next().unwrap(), Some((key2.clone(), b"value2".to_vec())));
1959		assert_eq!(iter.next().unwrap(), Some((key3.clone(), b"value3".to_vec())));
1960		assert_eq!(iter.next().unwrap(), None);
1961
1962		iter.seek(key3.as_slice()).unwrap();
1963		assert_eq!(iter.prev().unwrap(), Some((key3.clone(), b"value3".to_vec())));
1964		assert_eq!(iter.prev().unwrap(), Some((key2.clone(), b"value2".to_vec())));
1965		assert_eq!(iter.prev().unwrap(), None);
1966
1967		db.commit(vec![
1968			(col_nb, key2.clone(), Some(b"value2b".to_vec())),
1969			(col_nb, key4.clone(), Some(b"value4".to_vec())),
1970			(col_nb, key3.clone(), None),
1971		])
1972		.unwrap();
1973		db_test.run_stages(&db);
1974
1975		assert_eq!(db.get(col_nb, &key1).unwrap(), None);
1976		assert_eq!(db.get(col_nb, &key3).unwrap(), None);
1977		assert_eq!(db.get(col_nb, &key2).unwrap(), Some(b"value2b".to_vec()));
1978		assert_eq!(db.get(col_nb, &key4).unwrap(), Some(b"value4".to_vec()));
1979		let mut key22 = key2.clone();
1980		key22.push(2);
1981		iter.seek(key22.as_slice()).unwrap();
1982		assert_eq!(iter.next().unwrap(), Some((key4, b"value4".to_vec())));
1983		assert_eq!(iter.next().unwrap(), None);
1984	}
1985
1986	#[test]
1987	fn test_indexed_btree_2() {
1988		test_indexed_btree_inner_2(EnableCommitPipelineStages::CommitOverlay);
1989		test_indexed_btree_inner_2(EnableCommitPipelineStages::LogOverlay);
1990	}
1991	fn test_indexed_btree_inner_2(db_test: EnableCommitPipelineStages) {
1992		let tmp = tempdir().unwrap();
1993		let col_nb = 0u8;
1994		let mut options = db_test.options(tmp.path(), 5);
1995		options.columns[col_nb as usize].btree_index = true;
1996
1997		let key1 = b"key1".to_vec();
1998		let key2 = b"key2".to_vec();
1999		let key3 = b"key3".to_vec();
2000
2001		let db = Db::open_inner(&options, OpeningMode::Create).unwrap();
2002		let mut iter = db.iter(col_nb).unwrap();
2003		assert_eq!(db.get(col_nb, &key1).unwrap(), None);
2004		assert_eq!(iter.next().unwrap(), None);
2005
2006		db.commit(vec![(col_nb, key1.clone(), Some(b"value1".to_vec()))]).unwrap();
2007		EnableCommitPipelineStages::DbFile.run_stages(&db);
2008		drop(db);
2009
2010		// issue with some file reopening when no delay
2011		std::thread::sleep(std::time::Duration::from_millis(100));
2012
2013		let db = Db::open_inner(&options, OpeningMode::Write).unwrap();
2014
2015		let mut iter = db.iter(col_nb).unwrap();
2016		assert_eq!(db.get(col_nb, &key1).unwrap(), Some(b"value1".to_vec()));
2017		iter.seek_to_first().unwrap();
2018		assert_eq!(iter.next().unwrap(), Some((key1.clone(), b"value1".to_vec())));
2019		assert_eq!(iter.next().unwrap(), None);
2020
2021		db.commit(vec![
2022			(col_nb, key1.clone(), None),
2023			(col_nb, key2.clone(), Some(b"value2".to_vec())),
2024			(col_nb, key3.clone(), Some(b"value3".to_vec())),
2025		])
2026		.unwrap();
2027		db_test.run_stages(&db);
2028
2029		assert_eq!(db.get(col_nb, &key1).unwrap(), None);
2030		assert_eq!(db.get(col_nb, &key2).unwrap(), Some(b"value2".to_vec()));
2031		assert_eq!(db.get(col_nb, &key3).unwrap(), Some(b"value3".to_vec()));
2032		iter.seek(key2.as_slice()).unwrap();
2033		assert_eq!(iter.next().unwrap(), Some((key2.clone(), b"value2".to_vec())));
2034		assert_eq!(iter.next().unwrap(), Some((key3.clone(), b"value3".to_vec())));
2035		assert_eq!(iter.next().unwrap(), None);
2036
2037		iter.seek_to_last().unwrap();
2038		assert_eq!(iter.prev().unwrap(), Some((key3, b"value3".to_vec())));
2039		assert_eq!(iter.prev().unwrap(), Some((key2.clone(), b"value2".to_vec())));
2040		assert_eq!(iter.prev().unwrap(), None);
2041	}
2042
2043	#[test]
2044	fn test_indexed_btree_3() {
2045		test_indexed_btree_inner_3(EnableCommitPipelineStages::CommitOverlay);
2046		test_indexed_btree_inner_3(EnableCommitPipelineStages::LogOverlay);
2047		test_indexed_btree_inner_3(EnableCommitPipelineStages::DbFile);
2048		test_indexed_btree_inner_3(EnableCommitPipelineStages::Standard);
2049	}
2050
2051	fn test_indexed_btree_inner_3(db_test: EnableCommitPipelineStages) {
2052		use rand::SeedableRng;
2053
2054		use std::collections::BTreeSet;
2055
2056		let mut rng = rand::rngs::SmallRng::seed_from_u64(0);
2057
2058		let tmp = tempdir().unwrap();
2059		let col_nb = 0u8;
2060		let mut options = db_test.options(tmp.path(), 5);
2061		options.columns[col_nb as usize].btree_index = true;
2062
2063		let db = Db::open_inner(&options, OpeningMode::Create).unwrap();
2064
2065		db.commit(
2066			(0u64..1024)
2067				.map(|i| (0, i.to_be_bytes().to_vec(), Some(i.to_be_bytes().to_vec())))
2068				.chain((0u64..1024).step_by(2).map(|i| (0, i.to_be_bytes().to_vec(), None))),
2069		)
2070		.unwrap();
2071		let expected = (0u64..1024).filter(|i| i % 2 == 1).collect::<BTreeSet<_>>();
2072		let mut iter = db.iter(0).unwrap();
2073
2074		for _ in 0..100 {
2075			let at = rng.gen_range(0u64..=1024);
2076			iter.seek(&at.to_be_bytes()).unwrap();
2077
2078			let mut prev_run: bool = rng.gen();
2079			let at = if prev_run {
2080				let take = rng.gen_range(1..100);
2081				let got = std::iter::from_fn(|| iter.next().unwrap())
2082					.map(|(k, _)| u64::from_be_bytes(k.try_into().unwrap()))
2083					.take(take)
2084					.collect::<Vec<_>>();
2085				let expected = expected.range(at..).take(take).copied().collect::<Vec<_>>();
2086				assert_eq!(got, expected);
2087				if got.is_empty() {
2088					prev_run = false;
2089				}
2090				if got.len() < take {
2091					prev_run = false;
2092				}
2093				expected.last().copied().unwrap_or(at)
2094			} else {
2095				at
2096			};
2097
2098			let at = {
2099				let take = rng.gen_range(1..100);
2100				let got = std::iter::from_fn(|| iter.prev().unwrap())
2101					.map(|(k, _)| u64::from_be_bytes(k.try_into().unwrap()))
2102					.take(take)
2103					.collect::<Vec<_>>();
2104				let expected = if prev_run {
2105					expected.range(..at).rev().take(take).copied().collect::<Vec<_>>()
2106				} else {
2107					expected.range(..=at).rev().take(take).copied().collect::<Vec<_>>()
2108				};
2109				assert_eq!(got, expected);
2110				prev_run = !got.is_empty();
2111				if take > got.len() {
2112					prev_run = false;
2113				}
2114				expected.last().copied().unwrap_or(at)
2115			};
2116
2117			let take = rng.gen_range(1..100);
2118			let mut got = std::iter::from_fn(|| iter.next().unwrap())
2119				.map(|(k, _)| u64::from_be_bytes(k.try_into().unwrap()))
2120				.take(take)
2121				.collect::<Vec<_>>();
2122			let mut expected = expected.range(at..).take(take).copied().collect::<Vec<_>>();
2123			if prev_run {
2124				expected = expected.split_off(1);
2125				if got.len() == take {
2126					got.pop();
2127				}
2128			}
2129			assert_eq!(got, expected);
2130		}
2131
2132		let take = rng.gen_range(20..100);
2133		iter.seek_to_last().unwrap();
2134		let got = std::iter::from_fn(|| iter.prev().unwrap())
2135			.map(|(k, _)| u64::from_be_bytes(k.try_into().unwrap()))
2136			.take(take)
2137			.collect::<Vec<_>>();
2138		let expected = expected.iter().rev().take(take).copied().collect::<Vec<_>>();
2139		assert_eq!(got, expected);
2140	}
2141
2142	fn test_basic(change_set: &[(Vec<u8>, Option<Vec<u8>>)]) {
2143		test_basic_inner(change_set, false, false);
2144		test_basic_inner(change_set, false, true);
2145		test_basic_inner(change_set, true, false);
2146		test_basic_inner(change_set, true, true);
2147	}
2148
2149	fn test_basic_inner(
2150		change_set: &[(Vec<u8>, Option<Vec<u8>>)],
2151		btree_index: bool,
2152		ref_counted: bool,
2153	) {
2154		let tmp = tempdir().unwrap();
2155		let col_nb = 1u8;
2156		let db_test = EnableCommitPipelineStages::DbFile;
2157		let mut options = db_test.options(tmp.path(), 2);
2158		options.columns[col_nb as usize].btree_index = btree_index;
2159		options.columns[col_nb as usize].ref_counted = ref_counted;
2160		options.columns[col_nb as usize].preimage = ref_counted;
2161		// ref counted and commit overlay currently don't support removal
2162		assert!(!(ref_counted && db_test == EnableCommitPipelineStages::CommitOverlay));
2163		let db = Db::open_inner(&options, OpeningMode::Create).unwrap();
2164
2165		let iter = btree_index.then(|| db.iter(col_nb).unwrap());
2166		assert_eq!(iter.and_then(|mut i| i.next().unwrap()), None);
2167
2168		db.commit(change_set.iter().map(|(k, v)| (col_nb, k.clone(), v.clone())))
2169			.unwrap();
2170		db_test.run_stages(&db);
2171
2172		let mut keys = HashSet::new();
2173		let mut expected_count: u64 = 0;
2174		for (k, v) in change_set.iter() {
2175			if v.is_some() {
2176				if keys.insert(k) {
2177					expected_count += 1;
2178				}
2179			} else if keys.remove(k) {
2180				expected_count -= 1;
2181			}
2182		}
2183		if ref_counted {
2184			let mut state: BTreeMap<Vec<u8>, Option<(Vec<u8>, usize)>> = Default::default();
2185			for (k, v) in change_set.iter() {
2186				let mut remove = false;
2187				let mut insert = false;
2188				match state.get_mut(k) {
2189					Some(Some((_, counter))) =>
2190						if v.is_some() {
2191							*counter += 1;
2192						} else if *counter == 1 {
2193							remove = true;
2194						} else {
2195							*counter -= 1;
2196						},
2197					Some(None) | None =>
2198						if v.is_some() {
2199							insert = true;
2200						},
2201				}
2202				if insert {
2203					state.insert(k.clone(), v.clone().map(|v| (v, 1)));
2204				}
2205				if remove {
2206					state.remove(k);
2207				}
2208			}
2209			for (key, value) in state {
2210				assert_eq!(db.get(col_nb, &key).unwrap(), value.map(|v| v.0));
2211			}
2212		} else {
2213			let stats = db.stats();
2214			// btree do not have stats implemented
2215			if let Some(stats) = stats.columns[col_nb as usize].as_ref() {
2216				assert_eq!(stats.total_values, expected_count);
2217			}
2218
2219			let state: BTreeMap<Vec<u8>, Option<Vec<u8>>> =
2220				change_set.iter().map(|(k, v)| (k.clone(), v.clone())).collect();
2221			for (key, value) in state.iter() {
2222				assert_eq!(&db.get(col_nb, key).unwrap(), value);
2223			}
2224		}
2225	}
2226
2227	#[test]
2228	fn test_random() {
2229		fdlimit::raise_fd_limit();
2230		for i in 0..100 {
2231			test_random_inner(60, 60, i);
2232		}
2233		for i in 0..50 {
2234			test_random_inner(20, 60, i);
2235		}
2236	}
2237	fn test_random_inner(size: usize, key_size: usize, seed: u64) {
2238		use rand::{RngCore, SeedableRng};
2239		let mut rng = rand::rngs::SmallRng::seed_from_u64(seed);
2240		let mut data = Vec::<(Vec<u8>, Option<Vec<u8>>)>::new();
2241		for i in 0..size {
2242			let nb_delete: u32 = rng.next_u32(); // should be out of loop, yet it makes alternance insert/delete in some case.
2243			let nb_delete = (nb_delete as usize % size) / 2;
2244			let mut key = vec![0u8; key_size];
2245			rng.fill_bytes(&mut key[..]);
2246			let value = if i > size - nb_delete {
2247				let random_key = rng.next_u32();
2248				let random_key = (random_key % 4) > 0;
2249				if !random_key {
2250					key = data[i - size / 2].0.clone();
2251				}
2252				None
2253			} else {
2254				Some(key.clone())
2255			};
2256			let var_keysize = rng.next_u32();
2257			let var_keysize = var_keysize as usize % (key_size / 2);
2258			key.truncate(key_size - var_keysize);
2259			data.push((key, value));
2260		}
2261		test_basic(&data[..]);
2262	}
2263
2264	#[test]
2265	fn test_simple() {
2266		test_basic(&[
2267			(b"key1".to_vec(), Some(b"value1".to_vec())),
2268			(b"key1".to_vec(), Some(b"value1".to_vec())),
2269			(b"key1".to_vec(), None),
2270		]);
2271		test_basic(&[
2272			(b"key1".to_vec(), Some(b"value1".to_vec())),
2273			(b"key1".to_vec(), Some(b"value1".to_vec())),
2274			(b"key1".to_vec(), None),
2275			(b"key1".to_vec(), None),
2276		]);
2277		test_basic(&[
2278			(b"key1".to_vec(), Some(b"value1".to_vec())),
2279			(b"key1".to_vec(), Some(b"value2".to_vec())),
2280		]);
2281		test_basic(&[(b"key1".to_vec(), Some(b"value1".to_vec()))]);
2282		test_basic(&[
2283			(b"key1".to_vec(), Some(b"value1".to_vec())),
2284			(b"key2".to_vec(), Some(b"value2".to_vec())),
2285		]);
2286		test_basic(&[
2287			(b"key1".to_vec(), Some(b"value1".to_vec())),
2288			(b"key2".to_vec(), Some(b"value2".to_vec())),
2289			(b"key3".to_vec(), Some(b"value3".to_vec())),
2290		]);
2291		test_basic(&[
2292			(b"key1".to_vec(), Some(b"value1".to_vec())),
2293			(b"key3".to_vec(), Some(b"value3".to_vec())),
2294			(b"key2".to_vec(), Some(b"value2".to_vec())),
2295		]);
2296		test_basic(&[
2297			(b"key3".to_vec(), Some(b"value3".to_vec())),
2298			(b"key2".to_vec(), Some(b"value2".to_vec())),
2299			(b"key1".to_vec(), Some(b"value1".to_vec())),
2300		]);
2301		test_basic(&[
2302			(b"key1".to_vec(), Some(b"value1".to_vec())),
2303			(b"key2".to_vec(), Some(b"value2".to_vec())),
2304			(b"key3".to_vec(), Some(b"value3".to_vec())),
2305			(b"key4".to_vec(), Some(b"value4".to_vec())),
2306		]);
2307		test_basic(&[
2308			(b"key1".to_vec(), Some(b"value1".to_vec())),
2309			(b"key2".to_vec(), Some(b"value2".to_vec())),
2310			(b"key3".to_vec(), Some(b"value3".to_vec())),
2311			(b"key4".to_vec(), Some(b"value4".to_vec())),
2312			(b"key5".to_vec(), Some(b"value5".to_vec())),
2313		]);
2314		test_basic(&[
2315			(b"key5".to_vec(), Some(b"value5".to_vec())),
2316			(b"key3".to_vec(), Some(b"value3".to_vec())),
2317			(b"key4".to_vec(), Some(b"value4".to_vec())),
2318			(b"key2".to_vec(), Some(b"value2".to_vec())),
2319			(b"key1".to_vec(), Some(b"value1".to_vec())),
2320		]);
2321		test_basic(&[
2322			(b"key5".to_vec(), Some(b"value5".to_vec())),
2323			(b"key3".to_vec(), Some(b"value3".to_vec())),
2324			(b"key4".to_vec(), Some(b"value4".to_vec())),
2325			(b"key2".to_vec(), Some(b"value2".to_vec())),
2326			(b"key1".to_vec(), Some(b"value1".to_vec())),
2327			(b"key11".to_vec(), Some(b"value31".to_vec())),
2328			(b"key12".to_vec(), Some(b"value32".to_vec())),
2329		]);
2330		test_basic(&[
2331			(b"key5".to_vec(), Some(b"value5".to_vec())),
2332			(b"key3".to_vec(), Some(b"value3".to_vec())),
2333			(b"key4".to_vec(), Some(b"value4".to_vec())),
2334			(b"key2".to_vec(), Some(b"value2".to_vec())),
2335			(b"key1".to_vec(), Some(b"value1".to_vec())),
2336			(b"key51".to_vec(), Some(b"value31".to_vec())),
2337			(b"key52".to_vec(), Some(b"value32".to_vec())),
2338		]);
2339		test_basic(&[
2340			(b"key5".to_vec(), Some(b"value5".to_vec())),
2341			(b"key3".to_vec(), Some(b"value3".to_vec())),
2342			(b"key4".to_vec(), Some(b"value4".to_vec())),
2343			(b"key2".to_vec(), Some(b"value2".to_vec())),
2344			(b"key1".to_vec(), Some(b"value1".to_vec())),
2345			(b"key31".to_vec(), Some(b"value31".to_vec())),
2346			(b"key32".to_vec(), Some(b"value32".to_vec())),
2347		]);
2348		test_basic(&[
2349			(b"key1".to_vec(), Some(b"value5".to_vec())),
2350			(b"key2".to_vec(), Some(b"value3".to_vec())),
2351			(b"key3".to_vec(), Some(b"value4".to_vec())),
2352			(b"key4".to_vec(), Some(b"value7".to_vec())),
2353			(b"key5".to_vec(), Some(b"value2".to_vec())),
2354			(b"key6".to_vec(), Some(b"value1".to_vec())),
2355			(b"key3".to_vec(), None),
2356		]);
2357		test_basic(&[
2358			(b"key1".to_vec(), Some(b"value5".to_vec())),
2359			(b"key2".to_vec(), Some(b"value3".to_vec())),
2360			(b"key3".to_vec(), Some(b"value4".to_vec())),
2361			(b"key4".to_vec(), Some(b"value7".to_vec())),
2362			(b"key5".to_vec(), Some(b"value2".to_vec())),
2363			(b"key0".to_vec(), Some(b"value1".to_vec())),
2364			(b"key3".to_vec(), None),
2365		]);
2366		test_basic(&[
2367			(b"key1".to_vec(), Some(b"value5".to_vec())),
2368			(b"key2".to_vec(), Some(b"value3".to_vec())),
2369			(b"key3".to_vec(), Some(b"value4".to_vec())),
2370			(b"key4".to_vec(), Some(b"value7".to_vec())),
2371			(b"key5".to_vec(), Some(b"value2".to_vec())),
2372			(b"key3".to_vec(), None),
2373		]);
2374		test_basic(&[
2375			(b"key1".to_vec(), Some(b"value5".to_vec())),
2376			(b"key4".to_vec(), Some(b"value3".to_vec())),
2377			(b"key5".to_vec(), Some(b"value4".to_vec())),
2378			(b"key6".to_vec(), Some(b"value4".to_vec())),
2379			(b"key7".to_vec(), Some(b"value2".to_vec())),
2380			(b"key8".to_vec(), Some(b"value1".to_vec())),
2381			(b"key5".to_vec(), None),
2382		]);
2383		test_basic(&[
2384			(b"key1".to_vec(), Some(b"value5".to_vec())),
2385			(b"key4".to_vec(), Some(b"value3".to_vec())),
2386			(b"key5".to_vec(), Some(b"value4".to_vec())),
2387			(b"key7".to_vec(), Some(b"value2".to_vec())),
2388			(b"key8".to_vec(), Some(b"value1".to_vec())),
2389			(b"key3".to_vec(), None),
2390		]);
2391		test_basic(&[
2392			(b"key5".to_vec(), Some(b"value5".to_vec())),
2393			(b"key3".to_vec(), Some(b"value3".to_vec())),
2394			(b"key4".to_vec(), Some(b"value4".to_vec())),
2395			(b"key2".to_vec(), Some(b"value2".to_vec())),
2396			(b"key1".to_vec(), Some(b"value1".to_vec())),
2397			(b"key5".to_vec(), None),
2398			(b"key3".to_vec(), None),
2399		]);
2400		test_basic(&[
2401			(b"key5".to_vec(), Some(b"value5".to_vec())),
2402			(b"key3".to_vec(), Some(b"value3".to_vec())),
2403			(b"key4".to_vec(), Some(b"value4".to_vec())),
2404			(b"key2".to_vec(), Some(b"value2".to_vec())),
2405			(b"key1".to_vec(), Some(b"value1".to_vec())),
2406			(b"key5".to_vec(), None),
2407			(b"key3".to_vec(), None),
2408			(b"key2".to_vec(), None),
2409			(b"key4".to_vec(), None),
2410		]);
2411		test_basic(&[
2412			(b"key5".to_vec(), Some(b"value5".to_vec())),
2413			(b"key3".to_vec(), Some(b"value3".to_vec())),
2414			(b"key4".to_vec(), Some(b"value4".to_vec())),
2415			(b"key2".to_vec(), Some(b"value2".to_vec())),
2416			(b"key1".to_vec(), Some(b"value1".to_vec())),
2417			(b"key5".to_vec(), None),
2418			(b"key3".to_vec(), None),
2419			(b"key2".to_vec(), None),
2420			(b"key4".to_vec(), None),
2421			(b"key1".to_vec(), None),
2422		]);
2423		test_basic(&[
2424			([5u8; 250].to_vec(), Some(b"value5".to_vec())),
2425			([5u8; 200].to_vec(), Some(b"value3".to_vec())),
2426			([5u8; 100].to_vec(), Some(b"value4".to_vec())),
2427			([5u8; 150].to_vec(), Some(b"value2".to_vec())),
2428			([5u8; 101].to_vec(), Some(b"value1".to_vec())),
2429			([5u8; 250].to_vec(), None),
2430			([5u8; 101].to_vec(), None),
2431		]);
2432	}
2433
2434	#[test]
2435	fn test_btree_iter() {
2436		let col_nb = 0;
2437		let mut data_start = Vec::new();
2438		for i in 0u8..100 {
2439			let mut key = b"key0".to_vec();
2440			key[3] = i;
2441			let mut value = b"val0".to_vec();
2442			value[3] = i;
2443			data_start.push((col_nb, key, Some(value)));
2444		}
2445		let mut data_change = Vec::new();
2446		for i in 0u8..100 {
2447			let mut key = b"key0".to_vec();
2448			if i % 2 == 0 {
2449				key[2] = i;
2450				let mut value = b"val0".to_vec();
2451				value[2] = i;
2452				data_change.push((col_nb, key, Some(value)));
2453			} else if i % 3 == 0 {
2454				key[3] = i;
2455				data_change.push((col_nb, key, None));
2456			} else {
2457				key[3] = i;
2458				let mut value = b"val0".to_vec();
2459				value[2] = i;
2460				data_change.push((col_nb, key, Some(value)));
2461			}
2462		}
2463
2464		let start_state: BTreeMap<Vec<u8>, Vec<u8>> =
2465			data_start.iter().cloned().map(|(_c, k, v)| (k, v.unwrap())).collect();
2466		let mut end_state = start_state.clone();
2467		for (_c, k, v) in data_change.iter() {
2468			if let Some(v) = v {
2469				end_state.insert(k.clone(), v.clone());
2470			} else {
2471				end_state.remove(k);
2472			}
2473		}
2474
2475		for stage in [
2476			EnableCommitPipelineStages::CommitOverlay,
2477			EnableCommitPipelineStages::LogOverlay,
2478			EnableCommitPipelineStages::DbFile,
2479			EnableCommitPipelineStages::Standard,
2480		] {
2481			for i in 0..10 {
2482				test_btree_iter_inner(
2483					stage,
2484					&data_start,
2485					&data_change,
2486					&start_state,
2487					&end_state,
2488					i * 5,
2489				);
2490			}
2491			let data_start = vec![
2492				(0, b"key1".to_vec(), Some(b"val1".to_vec())),
2493				(0, b"key3".to_vec(), Some(b"val3".to_vec())),
2494			];
2495			let data_change = vec![(0, b"key2".to_vec(), Some(b"val2".to_vec()))];
2496			let start_state: BTreeMap<Vec<u8>, Vec<u8>> =
2497				data_start.iter().cloned().map(|(_c, k, v)| (k, v.unwrap())).collect();
2498			let mut end_state = start_state.clone();
2499			for (_c, k, v) in data_change.iter() {
2500				if let Some(v) = v {
2501					end_state.insert(k.clone(), v.clone());
2502				} else {
2503					end_state.remove(k);
2504				}
2505			}
2506			test_btree_iter_inner(stage, &data_start, &data_change, &start_state, &end_state, 1);
2507		}
2508	}
2509	fn test_btree_iter_inner(
2510		db_test: EnableCommitPipelineStages,
2511		data_start: &[(u8, Vec<u8>, Option<Value>)],
2512		data_change: &[(u8, Vec<u8>, Option<Value>)],
2513		start_state: &BTreeMap<Vec<u8>, Vec<u8>>,
2514		end_state: &BTreeMap<Vec<u8>, Vec<u8>>,
2515		commit_at: usize,
2516	) {
2517		let tmp = tempdir().unwrap();
2518		let mut options = db_test.options(tmp.path(), 5);
2519		let col_nb = 0;
2520		options.columns[col_nb as usize].btree_index = true;
2521		let db = Db::open_inner(&options, OpeningMode::Create).unwrap();
2522
2523		db.commit(data_start.iter().cloned()).unwrap();
2524		db_test.run_stages(&db);
2525
2526		let mut iter = db.iter(col_nb).unwrap();
2527		let mut iter_state = start_state.iter();
2528		let mut last_key = Value::new();
2529		for _ in 0..commit_at {
2530			let next = iter.next().unwrap();
2531			if let Some((k, _)) = next.as_ref() {
2532				last_key = k.clone();
2533			}
2534			assert_eq!(iter_state.next(), next.as_ref().map(|(k, v)| (k, v)));
2535		}
2536
2537		db.commit(data_change.iter().cloned()).unwrap();
2538		db_test.run_stages(&db);
2539
2540		let mut iter_state = end_state.range(last_key.clone()..);
2541		for _ in commit_at..100 {
2542			let mut state_next = iter_state.next();
2543			if let Some((k, _v)) = state_next.as_ref() {
2544				if *k == &last_key {
2545					state_next = iter_state.next();
2546				}
2547			}
2548			let iter_next = iter.next().unwrap();
2549			assert_eq!(state_next, iter_next.as_ref().map(|(k, v)| (k, v)));
2550		}
2551		let mut iter_state_rev = end_state.iter().rev();
2552		let mut iter = db.iter(col_nb).unwrap();
2553		iter.seek_to_last().unwrap();
2554		for _ in 0..100 {
2555			let next = iter.prev().unwrap();
2556			assert_eq!(iter_state_rev.next(), next.as_ref().map(|(k, v)| (k, v)));
2557		}
2558	}
2559
2560	#[cfg(feature = "instrumentation")]
2561	#[test]
2562	fn test_recover_from_log_on_error() {
2563		let tmp = tempdir().unwrap();
2564		let mut options = Options::with_columns(tmp.path(), 1);
2565		options.always_flush = true;
2566		options.with_background_thread = false;
2567
2568		// We do 2 commits and we fail while enacting the second one
2569		{
2570			let db = Db::open_or_create(&options).unwrap();
2571			db.commit::<_, Vec<u8>>(vec![(0, vec![0], Some(vec![0]))]).unwrap();
2572			db.process_commits().unwrap();
2573			db.flush_logs().unwrap();
2574			db.enact_logs().unwrap();
2575			db.commit::<_, Vec<u8>>(vec![(0, vec![1], Some(vec![1]))]).unwrap();
2576			db.process_commits().unwrap();
2577			db.flush_logs().unwrap();
2578			crate::set_number_of_allowed_io_operations(4);
2579
2580			// Set the background error explicitly as background threads are disabled in tests.
2581			let err = db.enact_logs();
2582			assert!(err.is_err());
2583			db.inner.store_err(err);
2584			crate::set_number_of_allowed_io_operations(usize::MAX);
2585		}
2586
2587		// Open the databases and check that both values are there.
2588		{
2589			let db = Db::open(&options).unwrap();
2590			assert_eq!(db.get(0, &[0]).unwrap(), Some(vec![0]));
2591			assert_eq!(db.get(0, &[1]).unwrap(), Some(vec![1]));
2592		}
2593	}
2594
2595	#[cfg(feature = "instrumentation")]
2596	#[test]
2597	fn test_partial_log_recovery() {
2598		let tmp = tempdir().unwrap();
2599		let mut options = Options::with_columns(tmp.path(), 1);
2600		options.columns[0].btree_index = true;
2601		options.always_flush = true;
2602		options.with_background_thread = false;
2603
2604		// We do 2 commits and we fail while writing the second one
2605		{
2606			let db = Db::open_or_create(&options).unwrap();
2607			db.commit::<_, Vec<u8>>(vec![(0, vec![0], Some(vec![0]))]).unwrap();
2608			db.process_commits().unwrap();
2609			db.commit::<_, Vec<u8>>(vec![(0, vec![1], Some(vec![1]))]).unwrap();
2610			crate::set_number_of_allowed_io_operations(4);
2611			assert!(db.process_commits().is_err());
2612			crate::set_number_of_allowed_io_operations(usize::MAX);
2613			db.flush_logs().unwrap();
2614		}
2615
2616		// We open a first time, the first value is there
2617		{
2618			let db = Db::open(&options).unwrap();
2619			assert_eq!(db.get(0, &[0]).unwrap(), Some(vec![0]));
2620		}
2621
2622		// We open a second time, the first value should be still there
2623		{
2624			let db = Db::open(&options).unwrap();
2625			assert!(db.get(0, &[0]).unwrap().is_some());
2626		}
2627	}
2628
2629	#[cfg(feature = "instrumentation")]
2630	#[test]
2631	fn test_continue_reindex() {
2632		let _ = env_logger::try_init();
2633		let tmp = tempdir().unwrap();
2634		let mut options = Options::with_columns(tmp.path(), 1);
2635		options.columns[0].preimage = true;
2636		options.columns[0].uniform = true;
2637		options.always_flush = true;
2638		options.with_background_thread = false;
2639		options.salt = Some(Default::default());
2640
2641		{
2642			// Force a reindex by committing more than 64 values with the same 16 bit prefix
2643			let db = Db::open_or_create(&options).unwrap();
2644			let commit: Vec<_> = (0..65u32)
2645				.map(|index| {
2646					let mut key = [0u8; 32];
2647					key[2] = (index as u8) << 1;
2648					(0, key.to_vec(), Some(vec![index as u8]))
2649				})
2650				.collect();
2651			db.commit(commit).unwrap();
2652
2653			db.process_commits().unwrap();
2654			db.flush_logs().unwrap();
2655			db.enact_logs().unwrap();
2656			// i16 now contains 64 values and i17 contains a single value that did not fit
2657
2658			// Simulate interrupted reindex by processing it first and then restoring the old index
2659			// file. Make a copy of the index file first.
2660			std::fs::copy(tmp.path().join("index_00_16"), tmp.path().join("index_00_16.bak"))
2661				.unwrap();
2662			db.process_reindex().unwrap();
2663			db.flush_logs().unwrap();
2664			db.enact_logs().unwrap();
2665			db.clean_logs().unwrap();
2666			std::fs::rename(tmp.path().join("index_00_16.bak"), tmp.path().join("index_00_16"))
2667				.unwrap();
2668		}
2669
2670		// Reopen the database which should load the reindex.
2671		{
2672			let db = Db::open(&options).unwrap();
2673			db.process_reindex().unwrap();
2674			let mut entries = 0;
2675			db.iter_column_while(0, |_| {
2676				entries += 1;
2677				true
2678			})
2679			.unwrap();
2680
2681			assert_eq!(entries, 65);
2682			assert_eq!(db.inner.columns[0].index_bits(), Some(17));
2683		}
2684	}
2685
2686	#[test]
2687	fn test_remove_column() {
2688		let tmp = tempdir().unwrap();
2689		let db_test_file = EnableCommitPipelineStages::DbFile;
2690		let mut options_db_files = db_test_file.options(tmp.path(), 2);
2691		options_db_files.salt = Some(options_db_files.salt.unwrap_or_default());
2692		let mut options_std = EnableCommitPipelineStages::Standard.options(tmp.path(), 2);
2693		options_std.salt = options_db_files.salt.clone();
2694
2695		let db = Db::open_inner(&options_db_files, OpeningMode::Create).unwrap();
2696
2697		let payload: Vec<(u8, _, _)> = (0u16..100)
2698			.map(|i| (1, i.to_le_bytes().to_vec(), Some(i.to_be_bytes().to_vec())))
2699			.collect();
2700
2701		db.commit(payload.clone()).unwrap();
2702
2703		db_test_file.run_stages(&db);
2704		drop(db);
2705
2706		let db = Db::open_inner(&options_std, OpeningMode::Write).unwrap();
2707		for (col, key, value) in payload.iter() {
2708			assert_eq!(db.get(*col, key).unwrap().as_ref(), value.as_ref());
2709		}
2710		drop(db);
2711		Db::reset_column(&mut options_db_files, 1, None).unwrap();
2712
2713		let db = Db::open_inner(&options_db_files, OpeningMode::Write).unwrap();
2714		for (col, key, _value) in payload.iter() {
2715			assert_eq!(db.get(*col, key).unwrap(), None);
2716		}
2717
2718		let payload: Vec<(u8, _, _)> = (0u16..10)
2719			.map(|i| (1, i.to_le_bytes().to_vec(), Some(i.to_be_bytes().to_vec())))
2720			.collect();
2721
2722		db.commit(payload.clone()).unwrap();
2723
2724		db_test_file.run_stages(&db);
2725		drop(db);
2726
2727		let db = Db::open_inner(&options_std, OpeningMode::Write).unwrap();
2728		let payload: Vec<(u8, _, _)> = (10u16..100)
2729			.map(|i| (1, i.to_le_bytes().to_vec(), Some(i.to_be_bytes().to_vec())))
2730			.collect();
2731
2732		db.commit(payload.clone()).unwrap();
2733		assert!(db.iter(1).is_err());
2734
2735		drop(db);
2736
2737		let mut col_option = options_std.columns[1].clone();
2738		col_option.btree_index = true;
2739		Db::reset_column(&mut options_std, 1, Some(col_option)).unwrap();
2740
2741		let db = Db::open_inner(&options_std, OpeningMode::Write).unwrap();
2742		let payload: Vec<(u8, _, _)> = (0u16..10)
2743			.map(|i| (1, i.to_le_bytes().to_vec(), Some(i.to_be_bytes().to_vec())))
2744			.collect();
2745
2746		db.commit(payload.clone()).unwrap();
2747		assert!(db.iter(1).is_ok());
2748	}
2749}