1use crate::{
22 btree::{commit_overlay::BTreeChangeSet, BTreeIterator, BTreeTable},
23 column::{hash_key, ColId, Column, IterState, ReindexBatch, ValueIterState},
24 error::{try_io, Error, Result},
25 hash::IdentityBuildHasher,
26 index::PlanOutcome,
27 log::{Log, LogAction},
28 options::{Options, CURRENT_VERSION},
29 parking_lot::{Condvar, Mutex, RwLock},
30 stats::StatSummary,
31 ColumnOptions, Key,
32};
33use fs2::FileExt;
34use std::{
35 borrow::Borrow,
36 collections::{BTreeMap, HashMap, VecDeque},
37 ops::Bound,
38 sync::{
39 atomic::{AtomicBool, AtomicU64, Ordering},
40 Arc,
41 },
42 thread,
43};
44
45const MAX_COMMIT_QUEUE_BYTES: usize = 16 * 1024 * 1024;
49const MAX_LOG_QUEUE_BYTES: i64 = 128 * 1024 * 1024;
52const MIN_LOG_SIZE_BYTES: u64 = 64 * 1024 * 1024;
54const KEEP_LOGS: usize = 16;
57const MAX_LOG_FILES: usize = 4;
61
62pub type Value = Vec<u8>;
64
65#[derive(Debug, Clone, PartialEq, Eq, PartialOrd, Ord)]
66pub struct RcValue(Arc<Value>);
67
68pub type RcKey = RcValue;
69
70impl RcValue {
71 pub fn value(&self) -> &Value {
72 &self.0
73 }
74}
75
76impl AsRef<[u8]> for RcValue {
77 fn as_ref(&self) -> &[u8] {
78 self.0.as_ref()
79 }
80}
81
82impl Borrow<[u8]> for RcValue {
83 fn borrow(&self) -> &[u8] {
84 self.value().borrow()
85 }
86}
87
88impl Borrow<Vec<u8>> for RcValue {
89 fn borrow(&self) -> &Vec<u8> {
90 self.value()
91 }
92}
93
94impl From<Value> for RcValue {
95 fn from(value: Value) -> Self {
96 Self(value.into())
97 }
98}
99
100#[cfg(test)]
101impl<const N: usize> TryFrom<RcValue> for [u8; N] {
102 type Error = <[u8; N] as TryFrom<Vec<u8>>>::Error;
103
104 fn try_from(value: RcValue) -> std::result::Result<Self, Self::Error> {
105 value.value().clone().try_into()
106 }
107}
108
109#[derive(Debug, Default)]
111struct Commit {
112 id: u64,
115 bytes: usize,
118 changeset: CommitChangeSet,
120}
121
122#[derive(Debug, Default)]
124struct CommitQueue {
125 record_id: u64,
127 bytes: usize,
129 commits: VecDeque<Commit>,
131}
132
133#[derive(Debug)]
134struct DbInner {
135 columns: Vec<Column>,
136 options: Options,
137 shutdown: AtomicBool,
138 log: Log,
139 commit_queue: Mutex<CommitQueue>,
140 commit_queue_full_cv: Condvar,
141 log_worker_wait: WaitCondvar<bool>,
142 commit_worker_wait: Arc<WaitCondvar<bool>>,
143 commit_overlay: RwLock<Vec<CommitOverlay>>,
145 log_queue_wait: WaitCondvar<i64>,
147 flush_worker_wait: Arc<WaitCondvar<bool>>,
148 cleanup_worker_wait: WaitCondvar<bool>,
149 cleanup_queue_wait: WaitCondvar<bool>,
150 iteration_lock: Mutex<()>,
151 last_enacted: AtomicU64,
152 next_reindex: AtomicU64,
153 bg_err: Mutex<Option<Arc<Error>>>,
154 db_version: u32,
155 lock_file: std::fs::File,
156}
157
158#[derive(Debug)]
159struct WaitCondvar<S> {
160 cv: Condvar,
161 work: Mutex<S>,
162}
163
164impl<S: Default> WaitCondvar<S> {
165 fn new() -> Self {
166 WaitCondvar { cv: Condvar::new(), work: Mutex::new(S::default()) }
167 }
168}
169
170impl WaitCondvar<bool> {
171 fn signal(&self) {
172 let mut work = self.work.lock();
173 *work = true;
174 self.cv.notify_one();
175 }
176
177 pub fn wait(&self) {
178 let mut work = self.work.lock();
179 while !*work {
180 self.cv.wait(&mut work)
181 }
182 *work = false;
183 }
184}
185
186impl DbInner {
187 fn open(options: &Options, opening_mode: OpeningMode) -> Result<DbInner> {
188 if opening_mode == OpeningMode::Create {
189 try_io!(std::fs::create_dir_all(&options.path));
190 } else if !options.path.is_dir() {
191 return Err(Error::DatabaseNotFound)
192 }
193
194 let mut lock_path: std::path::PathBuf = options.path.clone();
195 lock_path.push("lock");
196 let lock_file = try_io!(std::fs::OpenOptions::new()
197 .create(true)
198 .read(true)
199 .write(true)
200 .open(lock_path.as_path()));
201 lock_file.try_lock_exclusive().map_err(Error::Locked)?;
202
203 let metadata = options.load_and_validate_metadata(opening_mode == OpeningMode::Create)?;
204 let mut columns = Vec::with_capacity(metadata.columns.len());
205 let mut commit_overlay = Vec::with_capacity(metadata.columns.len());
206 let log = Log::open(options)?;
207 let last_enacted = log.replay_record_id().unwrap_or(2) - 1;
208 for c in 0..metadata.columns.len() {
209 let column = Column::open(c as ColId, options, &metadata)?;
210 commit_overlay.push(CommitOverlay::new());
211 columns.push(column);
212 }
213 log::debug!(target: "parity-db", "Opened db {:?}, metadata={:?}", options, metadata);
214 let mut options = options.clone();
215 if options.salt.is_none() {
216 options.salt = Some(metadata.salt);
217 }
218
219 Ok(DbInner {
220 columns,
221 options,
222 shutdown: AtomicBool::new(false),
223 log,
224 commit_queue: Mutex::new(Default::default()),
225 commit_queue_full_cv: Condvar::new(),
226 log_worker_wait: WaitCondvar::new(),
227 commit_worker_wait: Arc::new(WaitCondvar::new()),
228 commit_overlay: RwLock::new(commit_overlay),
229 log_queue_wait: WaitCondvar::new(),
230 flush_worker_wait: Arc::new(WaitCondvar::new()),
231 cleanup_worker_wait: WaitCondvar::new(),
232 cleanup_queue_wait: WaitCondvar::new(),
233 iteration_lock: Mutex::new(()),
234 next_reindex: AtomicU64::new(1),
235 last_enacted: AtomicU64::new(last_enacted),
236 bg_err: Mutex::new(None),
237 db_version: metadata.version,
238 lock_file,
239 })
240 }
241
242 fn get(&self, col: ColId, key: &[u8]) -> Result<Option<Value>> {
243 match &self.columns[col as usize] {
244 Column::Hash(column) => {
245 let key = column.hash_key(key);
246 let overlay = self.commit_overlay.read();
247 if let Some(v) = overlay.get(col as usize).and_then(|o| o.get(&key)) {
249 return Ok(v.map(|i| i.value().clone()))
250 }
251 let log = self.log.overlays();
253 column.get(&key, log)
254 },
255 Column::Tree(column) => {
256 let overlay = self.commit_overlay.read();
257 if let Some(l) = overlay.get(col as usize).and_then(|o| o.btree_get(key)) {
258 return Ok(l.map(|i| i.value().clone()))
259 }
260 let log = self.log.overlays().read();
262 column.with_locked(|btree| BTreeTable::get(key, &*log, btree))
263 },
264 }
265 }
266
267 fn get_size(&self, col: ColId, key: &[u8]) -> Result<Option<u32>> {
268 match &self.columns[col as usize] {
269 Column::Hash(column) => {
270 let key = column.hash_key(key);
271 let overlay = self.commit_overlay.read();
272 if let Some(l) = overlay.get(col as usize).and_then(|o| o.get_size(&key)) {
274 return Ok(l)
275 }
276 let log = self.log.overlays();
278 column.get_size(&key, log)
279 },
280 Column::Tree(column) => {
281 let overlay = self.commit_overlay.read();
282 if let Some(l) = overlay.get(col as usize).and_then(|o| o.btree_get(key)) {
283 return Ok(l.map(|v| v.value().len() as u32))
284 }
285 let log = self.log.overlays().read();
286 let l = column.with_locked(|btree| BTreeTable::get(key, &*log, btree))?;
287 Ok(l.map(|v| v.len() as u32))
288 },
289 }
290 }
291
292 fn btree_iter(&self, col: ColId) -> Result<BTreeIterator> {
293 match &self.columns[col as usize] {
294 Column::Hash(_column) =>
295 Err(Error::InvalidConfiguration("Not an indexed column.".to_string())),
296 Column::Tree(column) => {
297 let log = self.log.overlays();
298 BTreeIterator::new(column, col, log, &self.commit_overlay)
299 },
300 }
301 }
302
303 fn commit<I, K>(&self, tx: I) -> Result<()>
306 where
307 I: IntoIterator<Item = (ColId, K, Option<Value>)>,
308 K: AsRef<[u8]>,
309 {
310 self.commit_changes(tx.into_iter().map(|(c, k, v)| {
311 (
312 c,
313 match v {
314 Some(v) => Operation::Set(k.as_ref().to_vec(), v),
315 None => Operation::Dereference(k.as_ref().to_vec()),
316 },
317 )
318 }))
319 }
320
321 fn commit_changes<I>(&self, tx: I) -> Result<()>
322 where
323 I: IntoIterator<Item = (ColId, Operation<Vec<u8>, Vec<u8>>)>,
324 {
325 let mut commit: CommitChangeSet = Default::default();
326 for (col, change) in tx.into_iter() {
327 if self.options.columns[col as usize].btree_index {
328 commit
329 .btree_indexed
330 .entry(col)
331 .or_insert_with(|| BTreeChangeSet::new(col))
332 .push(change)
333 } else {
334 commit.indexed.entry(col).or_insert_with(|| IndexedChangeSet::new(col)).push(
335 change,
336 &self.options,
337 self.db_version,
338 )
339 }
340 }
341
342 self.commit_raw(commit)
343 }
344
345 fn commit_raw(&self, commit: CommitChangeSet) -> Result<()> {
346 let mut queue = self.commit_queue.lock();
347
348 #[cfg(any(test, feature = "instrumentation"))]
349 let might_wait_because_the_queue_is_full = self.options.with_background_thread;
350 #[cfg(not(any(test, feature = "instrumentation")))]
351 let might_wait_because_the_queue_is_full = true;
352 if might_wait_because_the_queue_is_full && queue.bytes > MAX_COMMIT_QUEUE_BYTES {
353 log::debug!(target: "parity-db", "Waiting, queue size={}", queue.bytes);
354 self.commit_queue_full_cv.wait(&mut queue);
355 }
356
357 {
358 let bg_err = self.bg_err.lock();
359 if let Some(err) = &*bg_err {
360 return Err(Error::Background(err.clone()))
361 }
362 }
363
364 let mut overlay = self.commit_overlay.write();
365
366 queue.record_id += 1;
367 let record_id = queue.record_id;
368
369 let mut bytes = 0;
370 for (c, indexed) in &commit.indexed {
371 indexed.copy_to_overlay(
372 &mut overlay[*c as usize],
373 record_id,
374 &mut bytes,
375 &self.options,
376 )?;
377 }
378
379 for (c, iterset) in &commit.btree_indexed {
380 iterset.copy_to_overlay(
381 &mut overlay[*c as usize].btree_indexed,
382 record_id,
383 &mut bytes,
384 &self.options,
385 )?;
386 }
387
388 let commit = Commit { id: record_id, changeset: commit, bytes };
389
390 log::debug!(
391 target: "parity-db",
392 "Queued commit {}, {} bytes",
393 commit.id,
394 bytes,
395 );
396 queue.commits.push_back(commit);
397 queue.bytes += bytes;
398 self.log_worker_wait.signal();
399 Ok(())
400 }
401
402 fn process_commits(&self) -> Result<bool> {
403 #[cfg(any(test, feature = "instrumentation"))]
404 let might_wait_because_the_queue_is_full = self.options.with_background_thread;
405 #[cfg(not(any(test, feature = "instrumentation")))]
406 let might_wait_because_the_queue_is_full = true;
407 if might_wait_because_the_queue_is_full {
408 let mut queue = self.log_queue_wait.work.lock();
410 if !self.shutdown.load(Ordering::Relaxed) && *queue > MAX_LOG_QUEUE_BYTES {
411 log::debug!(target: "parity-db", "Waiting, log_bytes={}", queue);
412 self.log_queue_wait.cv.wait(&mut queue);
413 }
414 }
415 let commit = {
416 let mut queue = self.commit_queue.lock();
417 if let Some(commit) = queue.commits.pop_front() {
418 queue.bytes -= commit.bytes;
419 log::debug!(
420 target: "parity-db",
421 "Removed {}. Still queued commits {} bytes",
422 commit.bytes,
423 queue.bytes,
424 );
425 if queue.bytes <= MAX_COMMIT_QUEUE_BYTES &&
426 (queue.bytes + commit.bytes) > MAX_COMMIT_QUEUE_BYTES
427 {
428 log::debug!(
430 target: "parity-db",
431 "Waking up commit queue worker",
432 );
433 self.commit_queue_full_cv.notify_all();
434 }
435 Some(commit)
436 } else {
437 None
438 }
439 };
440
441 if let Some(mut commit) = commit {
442 let mut reindex = false;
443 let mut writer = self.log.begin_record();
444 log::debug!(
445 target: "parity-db",
446 "Processing commit {}, record {}, {} bytes",
447 commit.id,
448 writer.record_id(),
449 commit.bytes,
450 );
451 let mut ops: u64 = 0;
452 for (c, key_values) in commit.changeset.indexed.iter() {
453 key_values.write_plan(
454 &self.columns[*c as usize],
455 &mut writer,
456 &mut ops,
457 &mut reindex,
458 )?;
459 }
460
461 for (c, btree) in commit.changeset.btree_indexed.iter_mut() {
462 match &self.columns[*c as usize] {
463 Column::Hash(_column) =>
464 return Err(Error::InvalidConfiguration(
465 "Not an indexed column.".to_string(),
466 )),
467 Column::Tree(column) => {
468 btree.write_plan(column, &mut writer, &mut ops)?;
469 },
470 }
471 }
472
473 for c in self.columns.iter() {
475 c.complete_plan(&mut writer)?;
476 }
477 let record_id = writer.record_id();
478 let l = writer.drain();
479
480 let bytes = {
481 let bytes = self.log.end_record(l)?;
482 let mut logged_bytes = self.log_queue_wait.work.lock();
483 *logged_bytes += bytes as i64;
484 self.flush_worker_wait.signal();
485 bytes
486 };
487
488 {
489 let mut overlay = self.commit_overlay.write();
491 for (c, key_values) in commit.changeset.indexed.iter() {
492 key_values.clean_overlay(&mut overlay[*c as usize], commit.id);
493 }
494 for (c, iterset) in commit.changeset.btree_indexed.iter_mut() {
495 iterset.clean_overlay(&mut overlay[*c as usize].btree_indexed, commit.id);
496 }
497 }
498
499 if reindex {
500 self.start_reindex(record_id);
501 }
502
503 log::debug!(
504 target: "parity-db",
505 "Processed commit {} (record {}), {} ops, {} bytes written",
506 commit.id,
507 record_id,
508 ops,
509 bytes,
510 );
511 Ok(true)
512 } else {
513 Ok(false)
514 }
515 }
516
517 fn start_reindex(&self, record_id: u64) {
518 log::trace!(target: "parity-db", "Scheduled reindex at record {}", record_id);
519 self.next_reindex.store(record_id, Ordering::SeqCst);
520 }
521
522 fn process_reindex(&self) -> Result<bool> {
523 let next_reindex = self.next_reindex.load(Ordering::SeqCst);
524 if next_reindex == 0 || next_reindex > self.last_enacted.load(Ordering::SeqCst) {
525 return Ok(false)
526 }
527 for column in self.columns.iter() {
529 let column = if let Column::Hash(c) = column { c } else { continue };
530 let ReindexBatch { drop_index, batch } = column.reindex(&self.log)?;
531 if !batch.is_empty() || drop_index.is_some() {
532 let mut next_reindex = false;
533 let mut writer = self.log.begin_record();
534 log::debug!(
535 target: "parity-db",
536 "Creating reindex record {}",
537 writer.record_id(),
538 );
539 for (key, address) in batch.into_iter() {
540 if let PlanOutcome::NeedReindex =
541 column.write_reindex_plan(&key, address, &mut writer)?
542 {
543 next_reindex = true
544 }
545 }
546 if let Some(table) = drop_index {
547 writer.drop_table(table);
548 }
549 let record_id = writer.record_id();
550 let l = writer.drain();
551
552 let mut logged_bytes = self.log_queue_wait.work.lock();
553 let bytes = self.log.end_record(l)?;
554 log::debug!(
555 target: "parity-db",
556 "Created reindex record {}, {} bytes",
557 record_id,
558 bytes,
559 );
560 *logged_bytes += bytes as i64;
561 if next_reindex {
562 self.start_reindex(record_id);
563 }
564 self.flush_worker_wait.signal();
565 return Ok(true)
566 }
567 }
568 self.next_reindex.store(0, Ordering::SeqCst);
569 Ok(false)
570 }
571
572 fn enact_logs(&self, validation_mode: bool) -> Result<bool> {
573 let _iteration_lock = self.iteration_lock.lock();
574 let cleared = {
575 let reader = match self.log.read_next(validation_mode) {
576 Ok(reader) => reader,
577 Err(Error::Corruption(_)) if validation_mode => {
578 log::debug!(target: "parity-db", "Bad log header");
579 self.log.clear_replay_logs();
580 return Ok(false)
581 },
582 Err(e) => return Err(e),
583 };
584 if let Some(mut reader) = reader {
585 log::debug!(
586 target: "parity-db",
587 "Enacting log record {}",
588 reader.record_id(),
589 );
590 if validation_mode {
591 if reader.record_id() != self.last_enacted.load(Ordering::Relaxed) + 1 {
592 log::warn!(
593 target: "parity-db",
594 "Log sequence error. Expected record {}, got {}",
595 self.last_enacted.load(Ordering::Relaxed) + 1,
596 reader.record_id(),
597 );
598 drop(reader);
599 self.log.clear_replay_logs();
600 return Ok(false)
601 }
602 loop {
604 let next = match reader.next() {
605 Ok(next) => next,
606 Err(e) => {
607 log::debug!(target: "parity-db", "Error reading log: {:?}", e);
608 return Ok(false)
609 },
610 };
611 match next {
612 LogAction::BeginRecord => {
613 log::debug!(target: "parity-db", "Unexpected log header");
614 drop(reader);
615 self.log.clear_replay_logs();
616 return Ok(false)
617 },
618 LogAction::EndRecord => break,
619 LogAction::InsertIndex(insertion) => {
620 let col = insertion.table.col() as usize;
621 if let Err(e) = self.columns.get(col).map_or_else(
622 || Err(Error::Corruption(format!("Invalid column id {col}"))),
623 |col| {
624 col.validate_plan(
625 LogAction::InsertIndex(insertion),
626 &mut reader,
627 )
628 },
629 ) {
630 log::warn!(target: "parity-db", "Error validating log: {:?}.", e);
631 drop(reader);
632 self.log.clear_replay_logs();
633 return Ok(false)
634 }
635 },
636 LogAction::InsertValue(insertion) => {
637 let col = insertion.table.col() as usize;
638 if let Err(e) = self.columns.get(col).map_or_else(
639 || Err(Error::Corruption(format!("Invalid column id {col}"))),
640 |col| {
641 col.validate_plan(
642 LogAction::InsertValue(insertion),
643 &mut reader,
644 )
645 },
646 ) {
647 log::warn!(target: "parity-db", "Error validating log: {:?}.", e);
648 drop(reader);
649 self.log.clear_replay_logs();
650 return Ok(false)
651 }
652 },
653 LogAction::DropTable(_) => continue,
654 }
655 }
656 reader.reset()?;
657 reader.next()?;
658 }
659 loop {
660 match reader.next()? {
661 LogAction::BeginRecord =>
662 return Err(Error::Corruption("Bad log record".into())),
663 LogAction::EndRecord => break,
664 LogAction::InsertIndex(insertion) => {
665 self.columns[insertion.table.col() as usize]
666 .enact_plan(LogAction::InsertIndex(insertion), &mut reader)?;
667 },
668 LogAction::InsertValue(insertion) => {
669 self.columns[insertion.table.col() as usize]
670 .enact_plan(LogAction::InsertValue(insertion), &mut reader)?;
671 },
672 LogAction::DropTable(id) => {
673 log::debug!(
674 target: "parity-db",
675 "Dropping index {}",
676 id,
677 );
678 match &self.columns[id.col() as usize] {
679 Column::Hash(col) => {
680 col.drop_index(id)?;
681 self.start_reindex(reader.record_id());
683 },
684 Column::Tree(_) => (),
685 }
686 },
687 }
688 }
689 log::debug!(
690 target: "parity-db",
691 "Enacted log record {}, {} bytes",
692 reader.record_id(),
693 reader.read_bytes(),
694 );
695 let record_id = reader.record_id();
696 let bytes = reader.read_bytes();
697 let cleared = reader.drain();
698 self.last_enacted.store(record_id, Ordering::SeqCst);
699 Some((record_id, cleared, bytes))
700 } else {
701 log::debug!(target: "parity-db", "End of log");
702 None
703 }
704 };
705
706 if let Some((record_id, cleared, bytes)) = cleared {
707 self.log.end_read(cleared, record_id);
708 {
709 if !validation_mode {
710 let mut queue = self.log_queue_wait.work.lock();
711 if *queue < bytes as i64 {
712 log::warn!(
713 target: "parity-db",
714 "Detected log underflow record {}, {} bytes, {} queued, reindex = {}",
715 record_id,
716 bytes,
717 *queue,
718 self.next_reindex.load(Ordering::SeqCst),
719 );
720 }
721 *queue -= bytes as i64;
722 if *queue <= MAX_LOG_QUEUE_BYTES &&
723 (*queue + bytes as i64) > MAX_LOG_QUEUE_BYTES
724 {
725 self.log_queue_wait.cv.notify_one();
726 }
727 log::debug!(target: "parity-db", "Log queue size: {} bytes", *queue);
728 }
729
730 let max_logs = if self.options.sync_data { MAX_LOG_FILES } else { KEEP_LOGS };
731 let dirty_logs = self.log.num_dirty_logs();
732 if !validation_mode {
733 while self.log.num_dirty_logs() > max_logs {
734 log::debug!(target: "parity-db", "Waiting for log cleanup. Queued: {}", dirty_logs);
735 self.cleanup_queue_wait.wait();
736 }
737 }
738 }
739 Ok(true)
740 } else {
741 Ok(false)
742 }
743 }
744
745 fn flush_logs(&self, min_log_size: u64) -> Result<bool> {
746 let has_flushed = self.log.flush_one(min_log_size)?;
747 if has_flushed {
748 self.commit_worker_wait.signal();
749 }
750 Ok(has_flushed)
751 }
752
753 fn clean_logs(&self) -> Result<bool> {
754 let keep_logs = if self.options.sync_data { 0 } else { KEEP_LOGS };
755 let num_cleanup = self.log.num_dirty_logs();
756 let result = if num_cleanup > keep_logs {
757 if self.options.sync_data {
758 for c in self.columns.iter() {
759 c.flush()?;
760 }
761 }
762 self.log.clean_logs(num_cleanup - keep_logs)?
763 } else {
764 false
765 };
766 self.cleanup_queue_wait.signal();
767 Ok(result)
768 }
769
770 fn clean_all_logs(&self) -> Result<()> {
771 for c in self.columns.iter() {
772 c.flush()?;
773 }
774 let num_cleanup = self.log.num_dirty_logs();
775 self.log.clean_logs(num_cleanup)?;
776 Ok(())
777 }
778
779 fn replay_all_logs(&self) -> Result<()> {
780 while let Some(id) = self.log.replay_next()? {
781 log::debug!(target: "parity-db", "Replaying database log {}", id);
782 while self.enact_logs(true)? {}
783 }
784
785 for c in self.columns.iter() {
787 c.refresh_metadata()?;
788 }
789 log::debug!(target: "parity-db", "Replay is complete.");
790 Ok(())
791 }
792
793 fn shutdown(&self) {
794 self.shutdown.store(true, Ordering::SeqCst);
795 self.log_queue_wait.cv.notify_one();
796 self.flush_worker_wait.signal();
797 self.log_worker_wait.signal();
798 self.commit_worker_wait.signal();
799 self.cleanup_worker_wait.signal();
800 }
801
802 fn kill_logs(&self) -> Result<()> {
803 {
804 if let Some(err) = self.bg_err.lock().as_ref() {
805 log::debug!(target: "parity-db", "Shutdown with error state {}", err);
808 self.log.clean_logs(self.log.num_dirty_logs())?;
809 return Ok(())
810 }
811 }
812 log::debug!(target: "parity-db", "Processing leftover commits");
813 while self.enact_logs(false)? {}
815 self.flush_logs(0)?;
816 while self.process_commits()? {}
817 while self.enact_logs(false)? {}
818 self.flush_logs(0)?;
819 while self.enact_logs(false)? {}
820 self.clean_all_logs()?;
821 self.log.kill_logs()?;
822 if self.options.stats {
823 let mut path = self.options.path.clone();
824 path.push("stats.txt");
825 match std::fs::File::create(path) {
826 Ok(file) => {
827 let mut writer = std::io::BufWriter::new(file);
828 if let Err(e) = self.write_stats_text(&mut writer, None) {
829 log::warn!(target: "parity-db", "Error writing stats file: {:?}", e)
830 }
831 },
832 Err(e) => log::warn!(target: "parity-db", "Error creating stats file: {:?}", e),
833 }
834 }
835 Ok(())
836 }
837
838 fn write_stats_text(&self, writer: &mut impl std::io::Write, column: Option<u8>) -> Result<()> {
839 if let Some(col) = column {
840 self.columns[col as usize].write_stats_text(writer)
841 } else {
842 for c in self.columns.iter() {
843 c.write_stats_text(writer)?;
844 }
845 Ok(())
846 }
847 }
848
849 fn clear_stats(&self, column: Option<u8>) -> Result<()> {
850 if let Some(col) = column {
851 self.columns[col as usize].clear_stats()
852 } else {
853 for c in self.columns.iter() {
854 c.clear_stats()?;
855 }
856 Ok(())
857 }
858 }
859
860 fn stats(&self) -> StatSummary {
861 StatSummary { columns: self.columns.iter().map(|c| c.stats()).collect() }
862 }
863
864 fn store_err(&self, result: Result<()>) {
865 if let Err(e) = result {
866 log::warn!(target: "parity-db", "Background worker error: {}", e);
867 let mut err = self.bg_err.lock();
868 if err.is_none() {
869 *err = Some(Arc::new(e));
870 self.shutdown();
871 }
872 self.commit_queue_full_cv.notify_all();
873 }
874 }
875
876 fn iter_column_while(&self, c: ColId, f: impl FnMut(ValueIterState) -> bool) -> Result<()> {
877 let _lock = self.iteration_lock.lock();
878 match &self.columns[c as usize] {
879 Column::Hash(column) => column.iter_values(&self.log, f),
880 Column::Tree(_) => unimplemented!(),
881 }
882 }
883
884 fn iter_column_index_while(&self, c: ColId, f: impl FnMut(IterState) -> bool) -> Result<()> {
885 let _lock = self.iteration_lock.lock();
886 match &self.columns[c as usize] {
887 Column::Hash(column) => column.iter_index(&self.log, f),
888 Column::Tree(_) => unimplemented!(),
889 }
890 }
891}
892
893pub struct Db {
895 inner: Arc<DbInner>,
896 commit_thread: Option<thread::JoinHandle<()>>,
897 flush_thread: Option<thread::JoinHandle<()>>,
898 log_thread: Option<thread::JoinHandle<()>>,
899 cleanup_thread: Option<thread::JoinHandle<()>>,
900}
901
902impl Db {
903 #[cfg(test)]
904 pub(crate) fn with_columns(path: &std::path::Path, num_columns: u8) -> Result<Db> {
905 let options = Options::with_columns(path, num_columns);
906 Self::open_inner(&options, OpeningMode::Create)
907 }
908
909 pub fn open(options: &Options) -> Result<Db> {
912 Self::open_inner(options, OpeningMode::Write)
913 }
914
915 pub fn open_or_create(options: &Options) -> Result<Db> {
918 Self::open_inner(options, OpeningMode::Create)
919 }
920
921 pub fn open_read_only(options: &Options) -> Result<Db> {
923 Self::open_inner(options, OpeningMode::ReadOnly)
924 }
925
926 fn open_inner(options: &Options, opening_mode: OpeningMode) -> Result<Db> {
927 assert!(options.is_valid());
928 let db = DbInner::open(options, opening_mode)?;
929 if let Err(e) = db.replay_all_logs() {
932 log::debug!(target: "parity-db", "Error during log replay.");
933 return Err(e)
934 } else {
935 db.log.clear_replay_logs();
936 db.clean_all_logs()?;
937 db.log.kill_logs()?;
938 }
939 let db = Arc::new(db);
940 #[cfg(any(test, feature = "instrumentation"))]
941 let start_threads = opening_mode != OpeningMode::ReadOnly && options.with_background_thread;
942 #[cfg(not(any(test, feature = "instrumentation")))]
943 let start_threads = opening_mode != OpeningMode::ReadOnly;
944 let commit_thread = if start_threads {
945 let commit_worker_db = db.clone();
946 Some(thread::spawn(move || {
947 commit_worker_db.store_err(Self::commit_worker(commit_worker_db.clone()))
948 }))
949 } else {
950 None
951 };
952 let flush_thread = if start_threads {
953 let flush_worker_db = db.clone();
954 #[cfg(any(test, feature = "instrumentation"))]
955 let min_log_size = if options.always_flush { 0 } else { MIN_LOG_SIZE_BYTES };
956 #[cfg(not(any(test, feature = "instrumentation")))]
957 let min_log_size = MIN_LOG_SIZE_BYTES;
958 Some(thread::spawn(move || {
959 flush_worker_db.store_err(Self::flush_worker(flush_worker_db.clone(), min_log_size))
960 }))
961 } else {
962 None
963 };
964 let log_thread = if start_threads {
965 let log_worker_db = db.clone();
966 Some(thread::spawn(move || {
967 log_worker_db.store_err(Self::log_worker(log_worker_db.clone()))
968 }))
969 } else {
970 None
971 };
972 let cleanup_thread = if start_threads {
973 let cleanup_worker_db = db.clone();
974 Some(thread::spawn(move || {
975 cleanup_worker_db.store_err(Self::cleanup_worker(cleanup_worker_db.clone()))
976 }))
977 } else {
978 None
979 };
980 Ok(Db { inner: db, commit_thread, flush_thread, log_thread, cleanup_thread })
981 }
982
983 pub fn get(&self, col: ColId, key: &[u8]) -> Result<Option<Value>> {
985 self.inner.get(col, key)
986 }
987
988 pub fn get_size(&self, col: ColId, key: &[u8]) -> Result<Option<u32>> {
990 self.inner.get_size(col, key)
991 }
992
993 pub fn iter(&self, col: ColId) -> Result<BTreeIterator> {
996 self.inner.btree_iter(col)
997 }
998
999 pub fn commit<I, K>(&self, tx: I) -> Result<()>
1001 where
1002 I: IntoIterator<Item = (ColId, K, Option<Value>)>,
1003 K: AsRef<[u8]>,
1004 {
1005 self.inner.commit(tx)
1006 }
1007
1008 pub fn commit_changes<I>(&self, tx: I) -> Result<()>
1010 where
1011 I: IntoIterator<Item = (ColId, Operation<Vec<u8>, Vec<u8>>)>,
1012 {
1013 self.inner.commit_changes(tx)
1014 }
1015
1016 pub(crate) fn commit_raw(&self, commit: CommitChangeSet) -> Result<()> {
1017 self.inner.commit_raw(commit)
1018 }
1019
1020 pub fn num_columns(&self) -> u8 {
1022 self.inner.columns.len() as u8
1023 }
1024
1025 pub fn iter_column_while(&self, c: ColId, f: impl FnMut(ValueIterState) -> bool) -> Result<()> {
1029 self.inner.iter_column_while(c, f)
1030 }
1031
1032 pub(crate) fn iter_column_index_while(
1037 &self,
1038 c: ColId,
1039 f: impl FnMut(IterState) -> bool,
1040 ) -> Result<()> {
1041 self.inner.iter_column_index_while(c, f)
1042 }
1043
1044 fn commit_worker(db: Arc<DbInner>) -> Result<()> {
1045 let mut more_work = false;
1046 while !db.shutdown.load(Ordering::SeqCst) || more_work {
1047 if !more_work {
1048 db.cleanup_worker_wait.signal();
1049 if !db.log.has_log_files_to_read() {
1050 db.commit_worker_wait.wait();
1051 }
1052 }
1053
1054 more_work = db.enact_logs(false)?;
1055 }
1056 log::debug!(target: "parity-db", "Commit worker shutdown");
1057 Ok(())
1058 }
1059
1060 fn log_worker(db: Arc<DbInner>) -> Result<()> {
1061 let mut more_reindex = db.process_reindex()?;
1063 let mut more_commits = false;
1064 while !db.shutdown.load(Ordering::SeqCst) || more_commits {
1066 if !more_commits && !more_reindex {
1067 db.log_worker_wait.wait();
1068 }
1069
1070 more_commits = db.process_commits()?;
1071 more_reindex = db.process_reindex()?;
1072 }
1073 log::debug!(target: "parity-db", "Log worker shutdown");
1074 Ok(())
1075 }
1076
1077 fn flush_worker(db: Arc<DbInner>, min_log_size: u64) -> Result<()> {
1078 let mut more_work = false;
1079 while !db.shutdown.load(Ordering::SeqCst) {
1080 if !more_work {
1081 db.flush_worker_wait.wait();
1082 }
1083 more_work = db.flush_logs(min_log_size)?;
1084 }
1085 log::debug!(target: "parity-db", "Flush worker shutdown");
1086 Ok(())
1087 }
1088
1089 fn cleanup_worker(db: Arc<DbInner>) -> Result<()> {
1090 let mut more_work = true;
1091 while !db.shutdown.load(Ordering::SeqCst) || more_work {
1092 if !more_work {
1093 db.cleanup_worker_wait.wait();
1094 }
1095 more_work = db.clean_logs()?;
1096 }
1097 log::debug!(target: "parity-db", "Cleanup worker shutdown");
1098 Ok(())
1099 }
1100
1101 pub fn write_stats_text(
1103 &self,
1104 writer: &mut impl std::io::Write,
1105 column: Option<u8>,
1106 ) -> Result<()> {
1107 self.inner.write_stats_text(writer, column)
1108 }
1109
1110 pub fn clear_stats(&self, column: Option<u8>) -> Result<()> {
1112 self.inner.clear_stats(column)
1113 }
1114
1115 pub fn dump(&self, check_param: check::CheckOptions) -> Result<()> {
1117 if let Some(col) = check_param.column {
1118 self.inner.columns[col as usize].dump(&self.inner.log, &check_param, col)?;
1119 } else {
1120 for (ix, c) in self.inner.columns.iter().enumerate() {
1121 c.dump(&self.inner.log, &check_param, ix as ColId)?;
1122 }
1123 }
1124 Ok(())
1125 }
1126
1127 pub fn stats(&self) -> StatSummary {
1129 self.inner.stats()
1130 }
1131
1132 fn precheck_column_operation(options: &mut Options) -> Result<[u8; 32]> {
1135 let db = Db::open(options)?;
1136 let salt = db.inner.options.salt;
1137 drop(db);
1138 Ok(salt.expect("`salt` is always `Some` after opening the DB; qed"))
1139 }
1140
1141 pub fn add_column(options: &mut Options, new_column_options: ColumnOptions) -> Result<()> {
1143 let salt = Self::precheck_column_operation(options)?;
1144
1145 options.columns.push(new_column_options);
1146 options.write_metadata_with_version(&options.path, &salt, Some(CURRENT_VERSION))?;
1147
1148 Ok(())
1149 }
1150
1151 pub fn drop_last_column(options: &mut Options) -> Result<()> {
1154 let salt = Self::precheck_column_operation(options)?;
1155 let nb_column = options.columns.len();
1156 if nb_column == 0 {
1157 return Ok(())
1158 }
1159 let index = options.columns.len() - 1;
1160 Self::remove_column_files(options, index as u8)?;
1161 options.columns.pop();
1162 options.write_metadata(&options.path, &salt)?;
1163 Ok(())
1164 }
1165
1166 pub fn reset_column(
1169 options: &mut Options,
1170 index: u8,
1171 new_options: Option<ColumnOptions>,
1172 ) -> Result<()> {
1173 let salt = Self::precheck_column_operation(options)?;
1174 Self::remove_column_files(options, index)?;
1175
1176 if let Some(new_options) = new_options {
1177 options.columns[index as usize] = new_options;
1178 options.write_metadata(&options.path, &salt)?;
1179 }
1180
1181 Ok(())
1182 }
1183
1184 fn remove_column_files(options: &mut Options, index: u8) -> Result<()> {
1185 if index as usize >= options.columns.len() {
1186 return Err(Error::IncompatibleColumnConfig {
1187 id: index,
1188 reason: "Column not found".to_string(),
1189 })
1190 }
1191
1192 Column::drop_files(index, options.path.clone())?;
1193 Ok(())
1194 }
1195
1196 #[cfg(feature = "instrumentation")]
1197 pub fn process_reindex(&self) -> Result<()> {
1198 self.inner.process_reindex()?;
1199 Ok(())
1200 }
1201
1202 #[cfg(feature = "instrumentation")]
1203 pub fn process_commits(&self) -> Result<()> {
1204 self.inner.process_commits()?;
1205 Ok(())
1206 }
1207
1208 #[cfg(feature = "instrumentation")]
1209 pub fn flush_logs(&self) -> Result<()> {
1210 self.inner.flush_logs(0)?;
1211 Ok(())
1212 }
1213
1214 #[cfg(feature = "instrumentation")]
1215 pub fn enact_logs(&self) -> Result<()> {
1216 while self.inner.enact_logs(false)? {}
1217 Ok(())
1218 }
1219
1220 #[cfg(feature = "instrumentation")]
1221 pub fn clean_logs(&self) -> Result<()> {
1222 self.inner.clean_logs()?;
1223 Ok(())
1224 }
1225}
1226
1227impl Drop for Db {
1228 fn drop(&mut self) {
1229 self.drop_inner()
1230 }
1231}
1232
1233impl Db {
1234 fn drop_inner(&mut self) {
1235 self.inner.shutdown();
1236 if let Some(t) = self.log_thread.take() {
1237 if let Err(e) = t.join() {
1238 log::warn!(target: "parity-db", "Log thread shutdown error: {:?}", e);
1239 }
1240 }
1241 if let Some(t) = self.flush_thread.take() {
1242 if let Err(e) = t.join() {
1243 log::warn!(target: "parity-db", "Flush thread shutdown error: {:?}", e);
1244 }
1245 }
1246 if let Some(t) = self.commit_thread.take() {
1247 if let Err(e) = t.join() {
1248 log::warn!(target: "parity-db", "Commit thread shutdown error: {:?}", e);
1249 }
1250 }
1251 if let Some(t) = self.cleanup_thread.take() {
1252 if let Err(e) = t.join() {
1253 log::warn!(target: "parity-db", "Cleanup thread shutdown error: {:?}", e);
1254 }
1255 }
1256 if let Err(e) = self.inner.kill_logs() {
1257 log::warn!(target: "parity-db", "Shutdown error: {:?}", e);
1258 }
1259 if let Err(e) = self.inner.lock_file.unlock() {
1260 log::debug!(target: "parity-db", "Error removing file lock: {:?}", e);
1261 }
1262 }
1263}
1264
1265pub type IndexedCommitOverlay = HashMap<Key, (u64, Option<RcValue>), IdentityBuildHasher>;
1266pub type BTreeCommitOverlay = BTreeMap<RcValue, (u64, Option<RcValue>)>;
1267
1268#[derive(Debug)]
1269pub struct CommitOverlay {
1270 indexed: IndexedCommitOverlay,
1271 btree_indexed: BTreeCommitOverlay,
1272}
1273
1274impl CommitOverlay {
1275 fn new() -> Self {
1276 CommitOverlay { indexed: Default::default(), btree_indexed: Default::default() }
1277 }
1278
1279 #[cfg(test)]
1280 fn is_empty(&self) -> bool {
1281 self.indexed.is_empty() && self.btree_indexed.is_empty()
1282 }
1283}
1284
1285impl CommitOverlay {
1286 fn get_ref(&self, key: &[u8]) -> Option<Option<&RcValue>> {
1287 self.indexed.get(key).map(|(_, v)| v.as_ref())
1288 }
1289
1290 fn get(&self, key: &[u8]) -> Option<Option<RcValue>> {
1291 self.get_ref(key).map(|v| v.cloned())
1292 }
1293
1294 fn get_size(&self, key: &[u8]) -> Option<Option<u32>> {
1295 self.get_ref(key).map(|res| res.as_ref().map(|b| b.value().len() as u32))
1296 }
1297
1298 fn btree_get(&self, key: &[u8]) -> Option<Option<&RcValue>> {
1299 self.btree_indexed.get(key).map(|(_, v)| v.as_ref())
1300 }
1301
1302 pub fn btree_next(
1303 &self,
1304 last_key: &crate::btree::LastKey,
1305 ) -> Option<(RcValue, Option<RcValue>)> {
1306 use crate::btree::LastKey;
1307 match &last_key {
1308 LastKey::Start => self
1309 .btree_indexed
1310 .range::<Vec<u8>, _>(..)
1311 .next()
1312 .map(|(k, (_, v))| (k.clone(), v.clone())),
1313 LastKey::End => None,
1314 LastKey::At(key) => self
1315 .btree_indexed
1316 .range::<Vec<u8>, _>((Bound::Excluded(key), Bound::Unbounded))
1317 .next()
1318 .map(|(k, (_, v))| (k.clone(), v.clone())),
1319 LastKey::Seeked(key) => self
1320 .btree_indexed
1321 .range::<Value, _>(key..)
1322 .next()
1323 .map(|(k, (_, v))| (k.clone(), v.clone())),
1324 }
1325 }
1326
1327 pub fn btree_prev(
1328 &self,
1329 last_key: &crate::btree::LastKey,
1330 ) -> Option<(RcValue, Option<RcValue>)> {
1331 use crate::btree::LastKey;
1332 match &last_key {
1333 LastKey::End => self
1334 .btree_indexed
1335 .range::<Vec<u8>, _>(..)
1336 .rev()
1337 .next()
1338 .map(|(k, (_, v))| (k.clone(), v.clone())),
1339 LastKey::Start => None,
1340 LastKey::At(key) => self
1341 .btree_indexed
1342 .range::<Vec<u8>, _>(..key)
1343 .rev()
1344 .next()
1345 .map(|(k, (_, v))| (k.clone(), v.clone())),
1346 LastKey::Seeked(key) => self
1347 .btree_indexed
1348 .range::<Vec<u8>, _>(..=key)
1349 .rev()
1350 .next()
1351 .map(|(k, (_, v))| (k.clone(), v.clone())),
1352 }
1353 }
1354}
1355
1356#[derive(Debug, PartialEq, Eq)]
1359pub enum Operation<Key, Value> {
1360 Set(Key, Value),
1362
1363 Dereference(Key),
1367
1368 Reference(Key),
1371}
1372
1373impl<Key: Ord, Value: Eq> PartialOrd<Self> for Operation<Key, Value> {
1374 fn partial_cmp(&self, other: &Self) -> Option<std::cmp::Ordering> {
1375 Some(self.cmp(other))
1376 }
1377}
1378
1379impl<Key: Ord, Value: Eq> Ord for Operation<Key, Value> {
1380 fn cmp(&self, other: &Self) -> std::cmp::Ordering {
1381 self.key().cmp(other.key())
1382 }
1383}
1384
1385impl<Key, Value> Operation<Key, Value> {
1386 pub fn key(&self) -> &Key {
1387 match self {
1388 Operation::Set(k, _) | Operation::Dereference(k) | Operation::Reference(k) => k,
1389 }
1390 }
1391
1392 pub fn into_key(self) -> Key {
1393 match self {
1394 Operation::Set(k, _) | Operation::Dereference(k) | Operation::Reference(k) => k,
1395 }
1396 }
1397}
1398
1399impl<K: AsRef<[u8]>, Value> Operation<K, Value> {
1400 pub fn to_key_vec(self) -> Operation<Vec<u8>, Value> {
1401 match self {
1402 Operation::Set(k, v) => Operation::Set(k.as_ref().to_vec(), v),
1403 Operation::Dereference(k) => Operation::Dereference(k.as_ref().to_vec()),
1404 Operation::Reference(k) => Operation::Reference(k.as_ref().to_vec()),
1405 }
1406 }
1407}
1408
1409#[derive(Debug, Default)]
1410pub struct CommitChangeSet {
1411 pub indexed: HashMap<ColId, IndexedChangeSet>,
1412 pub btree_indexed: HashMap<ColId, BTreeChangeSet>,
1413}
1414
1415#[derive(Debug)]
1416pub struct IndexedChangeSet {
1417 pub col: ColId,
1418 pub changes: Vec<Operation<Key, RcValue>>,
1419}
1420
1421impl IndexedChangeSet {
1422 pub fn new(col: ColId) -> Self {
1423 IndexedChangeSet { col, changes: Default::default() }
1424 }
1425
1426 fn push<K: AsRef<[u8]>>(
1427 &mut self,
1428 change: Operation<K, Vec<u8>>,
1429 options: &Options,
1430 db_version: u32,
1431 ) {
1432 let salt = options.salt.unwrap_or_default();
1433 let hash_key = |key: &[u8]| -> Key {
1434 hash_key(key, &salt, options.columns[self.col as usize].uniform, db_version)
1435 };
1436
1437 self.push_change_hashed(match change {
1438 Operation::Set(k, v) => Operation::Set(hash_key(k.as_ref()), v.into()),
1439 Operation::Dereference(k) => Operation::Dereference(hash_key(k.as_ref())),
1440 Operation::Reference(k) => Operation::Reference(hash_key(k.as_ref())),
1441 })
1442 }
1443
1444 fn push_change_hashed(&mut self, change: Operation<Key, RcValue>) {
1445 self.changes.push(change);
1446 }
1447
1448 fn copy_to_overlay(
1449 &self,
1450 overlay: &mut CommitOverlay,
1451 record_id: u64,
1452 bytes: &mut usize,
1453 options: &Options,
1454 ) -> Result<()> {
1455 let ref_counted = options.columns[self.col as usize].ref_counted;
1456 for change in self.changes.iter() {
1457 match &change {
1458 Operation::Set(k, v) => {
1459 *bytes += k.len();
1460 *bytes += v.value().len();
1461 overlay.indexed.insert(*k, (record_id, Some(v.clone())));
1462 },
1463 Operation::Dereference(k) => {
1464 if !ref_counted {
1466 overlay.indexed.insert(*k, (record_id, None));
1467 }
1468 },
1469 Operation::Reference(..) => {
1470 if !ref_counted {
1473 return Err(Error::InvalidInput(format!("No Rc for column {}", self.col)))
1474 }
1475 },
1476 }
1477 }
1478 Ok(())
1479 }
1480
1481 fn write_plan(
1482 &self,
1483 column: &Column,
1484 writer: &mut crate::log::LogWriter,
1485 ops: &mut u64,
1486 reindex: &mut bool,
1487 ) -> Result<()> {
1488 let column = match column {
1489 Column::Hash(column) => column,
1490 Column::Tree(_) => {
1491 log::warn!(target: "parity-db", "Skipping unindex commit in indexed column");
1492 return Ok(())
1493 },
1494 };
1495 for change in self.changes.iter() {
1496 if let PlanOutcome::NeedReindex = column.write_plan(change, writer)? {
1497 *reindex = true;
1499 }
1500 *ops += 1;
1501 }
1502 Ok(())
1503 }
1504
1505 fn clean_overlay(&self, overlay: &mut CommitOverlay, record_id: u64) {
1506 use std::collections::hash_map::Entry;
1507 for change in self.changes.iter() {
1508 match change {
1509 Operation::Set(k, _) | Operation::Dereference(k) => {
1510 if let Entry::Occupied(e) = overlay.indexed.entry(*k) {
1511 if e.get().0 == record_id {
1512 e.remove_entry();
1513 }
1514 }
1515 },
1516 Operation::Reference(..) => (),
1517 }
1518 }
1519 }
1520}
1521
1522pub mod check {
1524 pub enum CheckDisplay {
1526 None,
1528 Full,
1530 Short(u64),
1532 }
1533
1534 pub struct CheckOptions {
1536 pub column: Option<u8>,
1538 pub from: Option<u64>,
1540 pub bound: Option<u64>,
1542 pub display: CheckDisplay,
1544 pub fast: bool,
1546 pub validate_free_refs: bool,
1548 }
1549
1550 impl CheckOptions {
1551 pub fn new(
1553 column: Option<u8>,
1554 from: Option<u64>,
1555 bound: Option<u64>,
1556 display_content: bool,
1557 truncate_value_display: Option<u64>,
1558 fast: bool,
1559 validate_free_refs: bool,
1560 ) -> Self {
1561 let display = if display_content {
1562 match truncate_value_display {
1563 Some(t) => CheckDisplay::Short(t),
1564 None => CheckDisplay::Full,
1565 }
1566 } else {
1567 CheckDisplay::None
1568 };
1569 CheckOptions { column, from, bound, display, fast, validate_free_refs }
1570 }
1571 }
1572}
1573
1574#[derive(Eq, PartialEq, Clone, Copy)]
1575enum OpeningMode {
1576 Create,
1577 Write,
1578 ReadOnly,
1579}
1580
1581#[cfg(test)]
1582mod tests {
1583 use super::{Db, Options};
1584 use crate::{
1585 column::ColId,
1586 db::{DbInner, OpeningMode},
1587 ColumnOptions, Value,
1588 };
1589 use rand::Rng;
1590 use std::{
1591 collections::{BTreeMap, HashMap, HashSet},
1592 path::Path,
1593 };
1594 use tempfile::tempdir;
1595
1596 #[derive(Eq, PartialEq, Debug, Clone, Copy)]
1598 enum EnableCommitPipelineStages {
1599 #[allow(dead_code)]
1601 CommitOverlay,
1602 #[allow(dead_code)]
1604 LogOverlay,
1605 #[allow(dead_code)]
1607 DbFile,
1608 Standard,
1610 }
1611
1612 impl EnableCommitPipelineStages {
1613 fn options(&self, path: &Path, num_columns: u8) -> Options {
1614 Options {
1615 path: path.into(),
1616 sync_wal: true,
1617 sync_data: true,
1618 stats: true,
1619 salt: None,
1620 columns: (0..num_columns).map(|_| Default::default()).collect(),
1621 compression_threshold: HashMap::new(),
1622 with_background_thread: *self == Self::Standard,
1623 always_flush: *self == Self::DbFile,
1624 }
1625 }
1626
1627 fn run_stages(&self, db: &Db) {
1628 let db = &db.inner;
1629 if *self == EnableCommitPipelineStages::DbFile ||
1630 *self == EnableCommitPipelineStages::LogOverlay
1631 {
1632 while db.process_commits().unwrap() {}
1633 while db.process_reindex().unwrap() {}
1634 }
1635 if *self == EnableCommitPipelineStages::DbFile {
1636 let _ = db.log.flush_one(0).unwrap();
1637 while db.enact_logs(false).unwrap() {}
1638 let _ = db.clean_logs().unwrap();
1639 }
1640 }
1641
1642 fn check_empty_overlay(&self, db: &DbInner, col: ColId) -> bool {
1643 match self {
1644 EnableCommitPipelineStages::DbFile | EnableCommitPipelineStages::LogOverlay => {
1645 if let Some(overlay) = db.commit_overlay.read().get(col as usize) {
1646 if !overlay.is_empty() {
1647 let mut replayed = 5;
1648 while !overlay.is_empty() {
1649 if replayed > 0 {
1650 replayed -= 1;
1651 std::thread::sleep(std::time::Duration::from_millis(100));
1656 } else {
1657 return false
1658 }
1659 }
1660 }
1661 }
1662 },
1663 _ => (),
1664 }
1665 true
1666 }
1667 }
1668
1669 #[test]
1670 fn test_db_open_should_fail() {
1671 let tmp = tempdir().unwrap();
1672 let options = Options::with_columns(tmp.path(), 5);
1673 assert!(matches!(Db::open(&options), Err(crate::Error::DatabaseNotFound)));
1674 }
1675
1676 #[test]
1677 fn test_db_open_fail_then_recursively_create() {
1678 let tmp = tempdir().unwrap();
1679 let (db_path_first, db_path_last) = {
1680 let mut db_path_first = tmp.path().to_owned();
1681 db_path_first.push("nope");
1682
1683 let mut db_path_last = db_path_first.to_owned();
1684
1685 for p in ["does", "not", "yet", "exist"] {
1686 db_path_last.push(p);
1687 }
1688
1689 (db_path_first, db_path_last)
1690 };
1691
1692 assert!(
1693 !db_path_first.exists(),
1694 "That directory should not have existed at this point (dir: {db_path_first:?})"
1695 );
1696
1697 let options = Options::with_columns(&db_path_last, 5);
1698 assert!(matches!(Db::open(&options), Err(crate::Error::DatabaseNotFound)));
1699
1700 assert!(!db_path_first.exists(), "That directory should remain non-existent. Did the `open(create: false)` nonetheless create a directory? (dir: {db_path_first:?})");
1701 assert!(Db::open_or_create(&options).is_ok(), "New database should be created");
1702
1703 assert!(
1704 db_path_first.is_dir(),
1705 "A directory should have been been created (dir: {db_path_first:?})"
1706 );
1707 assert!(
1708 db_path_last.is_dir(),
1709 "A directory should have been been created (dir: {db_path_last:?})"
1710 );
1711 }
1712
1713 #[test]
1714 fn test_db_open_or_create() {
1715 let tmp = tempdir().unwrap();
1716 let options = Options::with_columns(tmp.path(), 5);
1717 assert!(Db::open_or_create(&options).is_ok(), "New database should be created");
1718 assert!(Db::open(&options).is_ok(), "Existing database should be reopened");
1719 }
1720
1721 #[test]
1722 fn test_indexed_keyvalues() {
1723 test_indexed_keyvalues_inner(EnableCommitPipelineStages::CommitOverlay);
1724 test_indexed_keyvalues_inner(EnableCommitPipelineStages::LogOverlay);
1725 test_indexed_keyvalues_inner(EnableCommitPipelineStages::DbFile);
1726 test_indexed_keyvalues_inner(EnableCommitPipelineStages::Standard);
1727 }
1728 fn test_indexed_keyvalues_inner(db_test: EnableCommitPipelineStages) {
1729 let tmp = tempdir().unwrap();
1730 let options = db_test.options(tmp.path(), 5);
1731 let col_nb = 0;
1732
1733 let key1 = b"key1".to_vec();
1734 let key2 = b"key2".to_vec();
1735 let key3 = b"key3".to_vec();
1736
1737 let db = Db::open_inner(&options, OpeningMode::Create).unwrap();
1738 assert!(db.get(col_nb, key1.as_slice()).unwrap().is_none());
1739
1740 db.commit(vec![(col_nb, key1.clone(), Some(b"value1".to_vec()))]).unwrap();
1741 db_test.run_stages(&db);
1742 assert!(db_test.check_empty_overlay(&db.inner, col_nb));
1743
1744 assert_eq!(db.get(col_nb, key1.as_slice()).unwrap(), Some(b"value1".to_vec()));
1745
1746 db.commit(vec![
1747 (col_nb, key1.clone(), None),
1748 (col_nb, key2.clone(), Some(b"value2".to_vec())),
1749 (col_nb, key3.clone(), Some(b"value3".to_vec())),
1750 ])
1751 .unwrap();
1752 db_test.run_stages(&db);
1753 assert!(db_test.check_empty_overlay(&db.inner, col_nb));
1754
1755 assert!(db.get(col_nb, key1.as_slice()).unwrap().is_none());
1756 assert_eq!(db.get(col_nb, key2.as_slice()).unwrap(), Some(b"value2".to_vec()));
1757 assert_eq!(db.get(col_nb, key3.as_slice()).unwrap(), Some(b"value3".to_vec()));
1758
1759 db.commit(vec![
1760 (col_nb, key2.clone(), Some(b"value2b".to_vec())),
1761 (col_nb, key3.clone(), None),
1762 ])
1763 .unwrap();
1764 db_test.run_stages(&db);
1765 assert!(db_test.check_empty_overlay(&db.inner, col_nb));
1766
1767 assert!(db.get(col_nb, key1.as_slice()).unwrap().is_none());
1768 assert_eq!(db.get(col_nb, key2.as_slice()).unwrap(), Some(b"value2b".to_vec()));
1769 assert_eq!(db.get(col_nb, key3.as_slice()).unwrap(), None);
1770 }
1771
1772 #[test]
1773 fn test_indexed_overlay_against_backend() {
1774 let tmp = tempdir().unwrap();
1775 let db_test = EnableCommitPipelineStages::DbFile;
1776 let options = db_test.options(tmp.path(), 5);
1777 let col_nb = 0;
1778
1779 let key1 = b"key1".to_vec();
1780 let key2 = b"key2".to_vec();
1781 let key3 = b"key3".to_vec();
1782
1783 let db = Db::open_inner(&options, OpeningMode::Create).unwrap();
1784
1785 db.commit(vec![
1786 (col_nb, key1.clone(), Some(b"value1".to_vec())),
1787 (col_nb, key2.clone(), Some(b"value2".to_vec())),
1788 (col_nb, key3.clone(), Some(b"value3".to_vec())),
1789 ])
1790 .unwrap();
1791 db_test.run_stages(&db);
1792 drop(db);
1793
1794 std::thread::sleep(std::time::Duration::from_millis(100));
1796
1797 let db_test = EnableCommitPipelineStages::CommitOverlay;
1798 let options = db_test.options(tmp.path(), 5);
1799 let db = Db::open_inner(&options, OpeningMode::Write).unwrap();
1800 assert_eq!(db.get(col_nb, key1.as_slice()).unwrap(), Some(b"value1".to_vec()));
1801 assert_eq!(db.get(col_nb, key2.as_slice()).unwrap(), Some(b"value2".to_vec()));
1802 assert_eq!(db.get(col_nb, key3.as_slice()).unwrap(), Some(b"value3".to_vec()));
1803 db.commit(vec![
1804 (col_nb, key2.clone(), Some(b"value2b".to_vec())),
1805 (col_nb, key3.clone(), None),
1806 ])
1807 .unwrap();
1808 db_test.run_stages(&db);
1809
1810 assert_eq!(db.get(col_nb, key1.as_slice()).unwrap(), Some(b"value1".to_vec()));
1811 assert_eq!(db.get(col_nb, key2.as_slice()).unwrap(), Some(b"value2b".to_vec()));
1812 assert_eq!(db.get(col_nb, key3.as_slice()).unwrap(), None);
1813 }
1814
1815 #[test]
1816 fn test_add_column() {
1817 let tmp = tempdir().unwrap();
1818 let db_test = EnableCommitPipelineStages::DbFile;
1819 let mut options = db_test.options(tmp.path(), 1);
1820 options.salt = Some(options.salt.unwrap_or_default());
1821
1822 let old_col_id = 0;
1823 let new_col_id = 1;
1824 let new_col_indexed_id = 2;
1825
1826 let key1 = b"key1".to_vec();
1827 let key2 = b"key2".to_vec();
1828 let key3 = b"key3".to_vec();
1829
1830 let db = Db::open_inner(&options, OpeningMode::Create).unwrap();
1831
1832 db.commit(vec![
1833 (old_col_id, key1.clone(), Some(b"value1".to_vec())),
1834 (old_col_id, key2.clone(), Some(b"value2".to_vec())),
1835 (old_col_id, key3.clone(), Some(b"value3".to_vec())),
1836 ])
1837 .unwrap();
1838 db_test.run_stages(&db);
1839
1840 drop(db);
1841
1842 Db::add_column(&mut options, ColumnOptions { btree_index: false, ..Default::default() })
1843 .unwrap();
1844
1845 Db::add_column(&mut options, ColumnOptions { btree_index: true, ..Default::default() })
1846 .unwrap();
1847
1848 let mut options = db_test.options(tmp.path(), 3);
1849 options.columns[new_col_indexed_id as usize].btree_index = true;
1850
1851 let db_test = EnableCommitPipelineStages::DbFile;
1852 let db = Db::open_inner(&options, OpeningMode::Create).unwrap();
1853
1854 assert_eq!(db.num_columns(), 3);
1856
1857 let new_key1 = b"abcdef".to_vec();
1858 let new_key2 = b"123456".to_vec();
1859
1860 db.commit(vec![
1862 (new_col_id, new_key1.clone(), Some(new_key1.to_vec())),
1863 (new_col_id, new_key2.clone(), Some(new_key2.to_vec())),
1864 (new_col_indexed_id, new_key1.clone(), Some(new_key1.to_vec())),
1865 (new_col_indexed_id, new_key2.clone(), Some(new_key2.to_vec())),
1866 ])
1867 .unwrap();
1868 db_test.run_stages(&db);
1869
1870 drop(db);
1871
1872 let db = Db::open_inner(&options, OpeningMode::Create).unwrap();
1874
1875 assert_eq!(db.get(old_col_id, key1.as_slice()).unwrap(), Some(b"value1".to_vec()));
1876 assert_eq!(db.get(old_col_id, key2.as_slice()).unwrap(), Some(b"value2".to_vec()));
1877 assert_eq!(db.get(old_col_id, key3.as_slice()).unwrap(), Some(b"value3".to_vec()));
1878
1879 assert_eq!(db.get(new_col_id, new_key1.as_slice()).unwrap(), Some(new_key1.to_vec()));
1881 assert_eq!(db.get(new_col_id, new_key2.as_slice()).unwrap(), Some(new_key2.to_vec()));
1882 assert_eq!(
1883 db.get(new_col_indexed_id, new_key1.as_slice()).unwrap(),
1884 Some(new_key1.to_vec())
1885 );
1886 assert_eq!(
1887 db.get(new_col_indexed_id, new_key2.as_slice()).unwrap(),
1888 Some(new_key2.to_vec())
1889 );
1890 }
1891
1892 #[test]
1893 fn test_indexed_btree_1() {
1894 test_indexed_btree_inner(EnableCommitPipelineStages::CommitOverlay, false);
1895 test_indexed_btree_inner(EnableCommitPipelineStages::LogOverlay, false);
1896 test_indexed_btree_inner(EnableCommitPipelineStages::DbFile, false);
1897 test_indexed_btree_inner(EnableCommitPipelineStages::Standard, false);
1898 test_indexed_btree_inner(EnableCommitPipelineStages::CommitOverlay, true);
1899 test_indexed_btree_inner(EnableCommitPipelineStages::LogOverlay, true);
1900 test_indexed_btree_inner(EnableCommitPipelineStages::DbFile, true);
1901 test_indexed_btree_inner(EnableCommitPipelineStages::Standard, true);
1902 }
1903 fn test_indexed_btree_inner(db_test: EnableCommitPipelineStages, long_key: bool) {
1904 let tmp = tempdir().unwrap();
1905 let col_nb = 0u8;
1906 let mut options = db_test.options(tmp.path(), 5);
1907 options.columns[col_nb as usize].btree_index = true;
1908
1909 let (key1, key2, key3, key4) = if !long_key {
1910 (b"key1".to_vec(), b"key2".to_vec(), b"key3".to_vec(), b"key4".to_vec())
1911 } else {
1912 let key2 = vec![2; 272];
1913 let mut key3 = key2.clone();
1914 key3[271] = 3;
1915 (vec![1; 953], key2, key3, vec![4; 79])
1916 };
1917
1918 let db = Db::open_inner(&options, OpeningMode::Create).unwrap();
1919 assert_eq!(db.get(col_nb, &key1).unwrap(), None);
1920
1921 let mut iter = db.iter(col_nb).unwrap();
1922 assert_eq!(iter.next().unwrap(), None);
1923 assert_eq!(iter.prev().unwrap(), None);
1924
1925 db.commit(vec![(col_nb, key1.clone(), Some(b"value1".to_vec()))]).unwrap();
1926 db_test.run_stages(&db);
1927
1928 assert_eq!(db.get(col_nb, &key1).unwrap(), Some(b"value1".to_vec()));
1929 iter.seek_to_first().unwrap();
1930 assert_eq!(iter.next().unwrap(), Some((key1.clone(), b"value1".to_vec())));
1931 assert_eq!(iter.next().unwrap(), None);
1932 assert_eq!(iter.prev().unwrap(), Some((key1.clone(), b"value1".to_vec())));
1933 assert_eq!(iter.prev().unwrap(), None);
1934 assert_eq!(iter.next().unwrap(), Some((key1.clone(), b"value1".to_vec())));
1935 assert_eq!(iter.next().unwrap(), None);
1936
1937 iter.seek_to_first().unwrap();
1938 assert_eq!(iter.next().unwrap(), Some((key1.clone(), b"value1".to_vec())));
1939 assert_eq!(iter.prev().unwrap(), None);
1940
1941 iter.seek(&[0xff]).unwrap();
1942 assert_eq!(iter.prev().unwrap(), Some((key1.clone(), b"value1".to_vec())));
1943 assert_eq!(iter.prev().unwrap(), None);
1944
1945 db.commit(vec![
1946 (col_nb, key1.clone(), None),
1947 (col_nb, key2.clone(), Some(b"value2".to_vec())),
1948 (col_nb, key3.clone(), Some(b"value3".to_vec())),
1949 ])
1950 .unwrap();
1951 db_test.run_stages(&db);
1952
1953 assert_eq!(db.get(col_nb, &key1).unwrap(), None);
1954 assert_eq!(db.get(col_nb, &key2).unwrap(), Some(b"value2".to_vec()));
1955 assert_eq!(db.get(col_nb, &key3).unwrap(), Some(b"value3".to_vec()));
1956
1957 iter.seek(key2.as_slice()).unwrap();
1958 assert_eq!(iter.next().unwrap(), Some((key2.clone(), b"value2".to_vec())));
1959 assert_eq!(iter.next().unwrap(), Some((key3.clone(), b"value3".to_vec())));
1960 assert_eq!(iter.next().unwrap(), None);
1961
1962 iter.seek(key3.as_slice()).unwrap();
1963 assert_eq!(iter.prev().unwrap(), Some((key3.clone(), b"value3".to_vec())));
1964 assert_eq!(iter.prev().unwrap(), Some((key2.clone(), b"value2".to_vec())));
1965 assert_eq!(iter.prev().unwrap(), None);
1966
1967 db.commit(vec![
1968 (col_nb, key2.clone(), Some(b"value2b".to_vec())),
1969 (col_nb, key4.clone(), Some(b"value4".to_vec())),
1970 (col_nb, key3.clone(), None),
1971 ])
1972 .unwrap();
1973 db_test.run_stages(&db);
1974
1975 assert_eq!(db.get(col_nb, &key1).unwrap(), None);
1976 assert_eq!(db.get(col_nb, &key3).unwrap(), None);
1977 assert_eq!(db.get(col_nb, &key2).unwrap(), Some(b"value2b".to_vec()));
1978 assert_eq!(db.get(col_nb, &key4).unwrap(), Some(b"value4".to_vec()));
1979 let mut key22 = key2.clone();
1980 key22.push(2);
1981 iter.seek(key22.as_slice()).unwrap();
1982 assert_eq!(iter.next().unwrap(), Some((key4, b"value4".to_vec())));
1983 assert_eq!(iter.next().unwrap(), None);
1984 }
1985
1986 #[test]
1987 fn test_indexed_btree_2() {
1988 test_indexed_btree_inner_2(EnableCommitPipelineStages::CommitOverlay);
1989 test_indexed_btree_inner_2(EnableCommitPipelineStages::LogOverlay);
1990 }
1991 fn test_indexed_btree_inner_2(db_test: EnableCommitPipelineStages) {
1992 let tmp = tempdir().unwrap();
1993 let col_nb = 0u8;
1994 let mut options = db_test.options(tmp.path(), 5);
1995 options.columns[col_nb as usize].btree_index = true;
1996
1997 let key1 = b"key1".to_vec();
1998 let key2 = b"key2".to_vec();
1999 let key3 = b"key3".to_vec();
2000
2001 let db = Db::open_inner(&options, OpeningMode::Create).unwrap();
2002 let mut iter = db.iter(col_nb).unwrap();
2003 assert_eq!(db.get(col_nb, &key1).unwrap(), None);
2004 assert_eq!(iter.next().unwrap(), None);
2005
2006 db.commit(vec![(col_nb, key1.clone(), Some(b"value1".to_vec()))]).unwrap();
2007 EnableCommitPipelineStages::DbFile.run_stages(&db);
2008 drop(db);
2009
2010 std::thread::sleep(std::time::Duration::from_millis(100));
2012
2013 let db = Db::open_inner(&options, OpeningMode::Write).unwrap();
2014
2015 let mut iter = db.iter(col_nb).unwrap();
2016 assert_eq!(db.get(col_nb, &key1).unwrap(), Some(b"value1".to_vec()));
2017 iter.seek_to_first().unwrap();
2018 assert_eq!(iter.next().unwrap(), Some((key1.clone(), b"value1".to_vec())));
2019 assert_eq!(iter.next().unwrap(), None);
2020
2021 db.commit(vec![
2022 (col_nb, key1.clone(), None),
2023 (col_nb, key2.clone(), Some(b"value2".to_vec())),
2024 (col_nb, key3.clone(), Some(b"value3".to_vec())),
2025 ])
2026 .unwrap();
2027 db_test.run_stages(&db);
2028
2029 assert_eq!(db.get(col_nb, &key1).unwrap(), None);
2030 assert_eq!(db.get(col_nb, &key2).unwrap(), Some(b"value2".to_vec()));
2031 assert_eq!(db.get(col_nb, &key3).unwrap(), Some(b"value3".to_vec()));
2032 iter.seek(key2.as_slice()).unwrap();
2033 assert_eq!(iter.next().unwrap(), Some((key2.clone(), b"value2".to_vec())));
2034 assert_eq!(iter.next().unwrap(), Some((key3.clone(), b"value3".to_vec())));
2035 assert_eq!(iter.next().unwrap(), None);
2036
2037 iter.seek_to_last().unwrap();
2038 assert_eq!(iter.prev().unwrap(), Some((key3, b"value3".to_vec())));
2039 assert_eq!(iter.prev().unwrap(), Some((key2.clone(), b"value2".to_vec())));
2040 assert_eq!(iter.prev().unwrap(), None);
2041 }
2042
2043 #[test]
2044 fn test_indexed_btree_3() {
2045 test_indexed_btree_inner_3(EnableCommitPipelineStages::CommitOverlay);
2046 test_indexed_btree_inner_3(EnableCommitPipelineStages::LogOverlay);
2047 test_indexed_btree_inner_3(EnableCommitPipelineStages::DbFile);
2048 test_indexed_btree_inner_3(EnableCommitPipelineStages::Standard);
2049 }
2050
2051 fn test_indexed_btree_inner_3(db_test: EnableCommitPipelineStages) {
2052 use rand::SeedableRng;
2053
2054 use std::collections::BTreeSet;
2055
2056 let mut rng = rand::rngs::SmallRng::seed_from_u64(0);
2057
2058 let tmp = tempdir().unwrap();
2059 let col_nb = 0u8;
2060 let mut options = db_test.options(tmp.path(), 5);
2061 options.columns[col_nb as usize].btree_index = true;
2062
2063 let db = Db::open_inner(&options, OpeningMode::Create).unwrap();
2064
2065 db.commit(
2066 (0u64..1024)
2067 .map(|i| (0, i.to_be_bytes().to_vec(), Some(i.to_be_bytes().to_vec())))
2068 .chain((0u64..1024).step_by(2).map(|i| (0, i.to_be_bytes().to_vec(), None))),
2069 )
2070 .unwrap();
2071 let expected = (0u64..1024).filter(|i| i % 2 == 1).collect::<BTreeSet<_>>();
2072 let mut iter = db.iter(0).unwrap();
2073
2074 for _ in 0..100 {
2075 let at = rng.gen_range(0u64..=1024);
2076 iter.seek(&at.to_be_bytes()).unwrap();
2077
2078 let mut prev_run: bool = rng.gen();
2079 let at = if prev_run {
2080 let take = rng.gen_range(1..100);
2081 let got = std::iter::from_fn(|| iter.next().unwrap())
2082 .map(|(k, _)| u64::from_be_bytes(k.try_into().unwrap()))
2083 .take(take)
2084 .collect::<Vec<_>>();
2085 let expected = expected.range(at..).take(take).copied().collect::<Vec<_>>();
2086 assert_eq!(got, expected);
2087 if got.is_empty() {
2088 prev_run = false;
2089 }
2090 if got.len() < take {
2091 prev_run = false;
2092 }
2093 expected.last().copied().unwrap_or(at)
2094 } else {
2095 at
2096 };
2097
2098 let at = {
2099 let take = rng.gen_range(1..100);
2100 let got = std::iter::from_fn(|| iter.prev().unwrap())
2101 .map(|(k, _)| u64::from_be_bytes(k.try_into().unwrap()))
2102 .take(take)
2103 .collect::<Vec<_>>();
2104 let expected = if prev_run {
2105 expected.range(..at).rev().take(take).copied().collect::<Vec<_>>()
2106 } else {
2107 expected.range(..=at).rev().take(take).copied().collect::<Vec<_>>()
2108 };
2109 assert_eq!(got, expected);
2110 prev_run = !got.is_empty();
2111 if take > got.len() {
2112 prev_run = false;
2113 }
2114 expected.last().copied().unwrap_or(at)
2115 };
2116
2117 let take = rng.gen_range(1..100);
2118 let mut got = std::iter::from_fn(|| iter.next().unwrap())
2119 .map(|(k, _)| u64::from_be_bytes(k.try_into().unwrap()))
2120 .take(take)
2121 .collect::<Vec<_>>();
2122 let mut expected = expected.range(at..).take(take).copied().collect::<Vec<_>>();
2123 if prev_run {
2124 expected = expected.split_off(1);
2125 if got.len() == take {
2126 got.pop();
2127 }
2128 }
2129 assert_eq!(got, expected);
2130 }
2131
2132 let take = rng.gen_range(20..100);
2133 iter.seek_to_last().unwrap();
2134 let got = std::iter::from_fn(|| iter.prev().unwrap())
2135 .map(|(k, _)| u64::from_be_bytes(k.try_into().unwrap()))
2136 .take(take)
2137 .collect::<Vec<_>>();
2138 let expected = expected.iter().rev().take(take).copied().collect::<Vec<_>>();
2139 assert_eq!(got, expected);
2140 }
2141
2142 fn test_basic(change_set: &[(Vec<u8>, Option<Vec<u8>>)]) {
2143 test_basic_inner(change_set, false, false);
2144 test_basic_inner(change_set, false, true);
2145 test_basic_inner(change_set, true, false);
2146 test_basic_inner(change_set, true, true);
2147 }
2148
2149 fn test_basic_inner(
2150 change_set: &[(Vec<u8>, Option<Vec<u8>>)],
2151 btree_index: bool,
2152 ref_counted: bool,
2153 ) {
2154 let tmp = tempdir().unwrap();
2155 let col_nb = 1u8;
2156 let db_test = EnableCommitPipelineStages::DbFile;
2157 let mut options = db_test.options(tmp.path(), 2);
2158 options.columns[col_nb as usize].btree_index = btree_index;
2159 options.columns[col_nb as usize].ref_counted = ref_counted;
2160 options.columns[col_nb as usize].preimage = ref_counted;
2161 assert!(!(ref_counted && db_test == EnableCommitPipelineStages::CommitOverlay));
2163 let db = Db::open_inner(&options, OpeningMode::Create).unwrap();
2164
2165 let iter = btree_index.then(|| db.iter(col_nb).unwrap());
2166 assert_eq!(iter.and_then(|mut i| i.next().unwrap()), None);
2167
2168 db.commit(change_set.iter().map(|(k, v)| (col_nb, k.clone(), v.clone())))
2169 .unwrap();
2170 db_test.run_stages(&db);
2171
2172 let mut keys = HashSet::new();
2173 let mut expected_count: u64 = 0;
2174 for (k, v) in change_set.iter() {
2175 if v.is_some() {
2176 if keys.insert(k) {
2177 expected_count += 1;
2178 }
2179 } else if keys.remove(k) {
2180 expected_count -= 1;
2181 }
2182 }
2183 if ref_counted {
2184 let mut state: BTreeMap<Vec<u8>, Option<(Vec<u8>, usize)>> = Default::default();
2185 for (k, v) in change_set.iter() {
2186 let mut remove = false;
2187 let mut insert = false;
2188 match state.get_mut(k) {
2189 Some(Some((_, counter))) =>
2190 if v.is_some() {
2191 *counter += 1;
2192 } else if *counter == 1 {
2193 remove = true;
2194 } else {
2195 *counter -= 1;
2196 },
2197 Some(None) | None =>
2198 if v.is_some() {
2199 insert = true;
2200 },
2201 }
2202 if insert {
2203 state.insert(k.clone(), v.clone().map(|v| (v, 1)));
2204 }
2205 if remove {
2206 state.remove(k);
2207 }
2208 }
2209 for (key, value) in state {
2210 assert_eq!(db.get(col_nb, &key).unwrap(), value.map(|v| v.0));
2211 }
2212 } else {
2213 let stats = db.stats();
2214 if let Some(stats) = stats.columns[col_nb as usize].as_ref() {
2216 assert_eq!(stats.total_values, expected_count);
2217 }
2218
2219 let state: BTreeMap<Vec<u8>, Option<Vec<u8>>> =
2220 change_set.iter().map(|(k, v)| (k.clone(), v.clone())).collect();
2221 for (key, value) in state.iter() {
2222 assert_eq!(&db.get(col_nb, key).unwrap(), value);
2223 }
2224 }
2225 }
2226
2227 #[test]
2228 fn test_random() {
2229 fdlimit::raise_fd_limit();
2230 for i in 0..100 {
2231 test_random_inner(60, 60, i);
2232 }
2233 for i in 0..50 {
2234 test_random_inner(20, 60, i);
2235 }
2236 }
2237 fn test_random_inner(size: usize, key_size: usize, seed: u64) {
2238 use rand::{RngCore, SeedableRng};
2239 let mut rng = rand::rngs::SmallRng::seed_from_u64(seed);
2240 let mut data = Vec::<(Vec<u8>, Option<Vec<u8>>)>::new();
2241 for i in 0..size {
2242 let nb_delete: u32 = rng.next_u32(); let nb_delete = (nb_delete as usize % size) / 2;
2244 let mut key = vec![0u8; key_size];
2245 rng.fill_bytes(&mut key[..]);
2246 let value = if i > size - nb_delete {
2247 let random_key = rng.next_u32();
2248 let random_key = (random_key % 4) > 0;
2249 if !random_key {
2250 key = data[i - size / 2].0.clone();
2251 }
2252 None
2253 } else {
2254 Some(key.clone())
2255 };
2256 let var_keysize = rng.next_u32();
2257 let var_keysize = var_keysize as usize % (key_size / 2);
2258 key.truncate(key_size - var_keysize);
2259 data.push((key, value));
2260 }
2261 test_basic(&data[..]);
2262 }
2263
2264 #[test]
2265 fn test_simple() {
2266 test_basic(&[
2267 (b"key1".to_vec(), Some(b"value1".to_vec())),
2268 (b"key1".to_vec(), Some(b"value1".to_vec())),
2269 (b"key1".to_vec(), None),
2270 ]);
2271 test_basic(&[
2272 (b"key1".to_vec(), Some(b"value1".to_vec())),
2273 (b"key1".to_vec(), Some(b"value1".to_vec())),
2274 (b"key1".to_vec(), None),
2275 (b"key1".to_vec(), None),
2276 ]);
2277 test_basic(&[
2278 (b"key1".to_vec(), Some(b"value1".to_vec())),
2279 (b"key1".to_vec(), Some(b"value2".to_vec())),
2280 ]);
2281 test_basic(&[(b"key1".to_vec(), Some(b"value1".to_vec()))]);
2282 test_basic(&[
2283 (b"key1".to_vec(), Some(b"value1".to_vec())),
2284 (b"key2".to_vec(), Some(b"value2".to_vec())),
2285 ]);
2286 test_basic(&[
2287 (b"key1".to_vec(), Some(b"value1".to_vec())),
2288 (b"key2".to_vec(), Some(b"value2".to_vec())),
2289 (b"key3".to_vec(), Some(b"value3".to_vec())),
2290 ]);
2291 test_basic(&[
2292 (b"key1".to_vec(), Some(b"value1".to_vec())),
2293 (b"key3".to_vec(), Some(b"value3".to_vec())),
2294 (b"key2".to_vec(), Some(b"value2".to_vec())),
2295 ]);
2296 test_basic(&[
2297 (b"key3".to_vec(), Some(b"value3".to_vec())),
2298 (b"key2".to_vec(), Some(b"value2".to_vec())),
2299 (b"key1".to_vec(), Some(b"value1".to_vec())),
2300 ]);
2301 test_basic(&[
2302 (b"key1".to_vec(), Some(b"value1".to_vec())),
2303 (b"key2".to_vec(), Some(b"value2".to_vec())),
2304 (b"key3".to_vec(), Some(b"value3".to_vec())),
2305 (b"key4".to_vec(), Some(b"value4".to_vec())),
2306 ]);
2307 test_basic(&[
2308 (b"key1".to_vec(), Some(b"value1".to_vec())),
2309 (b"key2".to_vec(), Some(b"value2".to_vec())),
2310 (b"key3".to_vec(), Some(b"value3".to_vec())),
2311 (b"key4".to_vec(), Some(b"value4".to_vec())),
2312 (b"key5".to_vec(), Some(b"value5".to_vec())),
2313 ]);
2314 test_basic(&[
2315 (b"key5".to_vec(), Some(b"value5".to_vec())),
2316 (b"key3".to_vec(), Some(b"value3".to_vec())),
2317 (b"key4".to_vec(), Some(b"value4".to_vec())),
2318 (b"key2".to_vec(), Some(b"value2".to_vec())),
2319 (b"key1".to_vec(), Some(b"value1".to_vec())),
2320 ]);
2321 test_basic(&[
2322 (b"key5".to_vec(), Some(b"value5".to_vec())),
2323 (b"key3".to_vec(), Some(b"value3".to_vec())),
2324 (b"key4".to_vec(), Some(b"value4".to_vec())),
2325 (b"key2".to_vec(), Some(b"value2".to_vec())),
2326 (b"key1".to_vec(), Some(b"value1".to_vec())),
2327 (b"key11".to_vec(), Some(b"value31".to_vec())),
2328 (b"key12".to_vec(), Some(b"value32".to_vec())),
2329 ]);
2330 test_basic(&[
2331 (b"key5".to_vec(), Some(b"value5".to_vec())),
2332 (b"key3".to_vec(), Some(b"value3".to_vec())),
2333 (b"key4".to_vec(), Some(b"value4".to_vec())),
2334 (b"key2".to_vec(), Some(b"value2".to_vec())),
2335 (b"key1".to_vec(), Some(b"value1".to_vec())),
2336 (b"key51".to_vec(), Some(b"value31".to_vec())),
2337 (b"key52".to_vec(), Some(b"value32".to_vec())),
2338 ]);
2339 test_basic(&[
2340 (b"key5".to_vec(), Some(b"value5".to_vec())),
2341 (b"key3".to_vec(), Some(b"value3".to_vec())),
2342 (b"key4".to_vec(), Some(b"value4".to_vec())),
2343 (b"key2".to_vec(), Some(b"value2".to_vec())),
2344 (b"key1".to_vec(), Some(b"value1".to_vec())),
2345 (b"key31".to_vec(), Some(b"value31".to_vec())),
2346 (b"key32".to_vec(), Some(b"value32".to_vec())),
2347 ]);
2348 test_basic(&[
2349 (b"key1".to_vec(), Some(b"value5".to_vec())),
2350 (b"key2".to_vec(), Some(b"value3".to_vec())),
2351 (b"key3".to_vec(), Some(b"value4".to_vec())),
2352 (b"key4".to_vec(), Some(b"value7".to_vec())),
2353 (b"key5".to_vec(), Some(b"value2".to_vec())),
2354 (b"key6".to_vec(), Some(b"value1".to_vec())),
2355 (b"key3".to_vec(), None),
2356 ]);
2357 test_basic(&[
2358 (b"key1".to_vec(), Some(b"value5".to_vec())),
2359 (b"key2".to_vec(), Some(b"value3".to_vec())),
2360 (b"key3".to_vec(), Some(b"value4".to_vec())),
2361 (b"key4".to_vec(), Some(b"value7".to_vec())),
2362 (b"key5".to_vec(), Some(b"value2".to_vec())),
2363 (b"key0".to_vec(), Some(b"value1".to_vec())),
2364 (b"key3".to_vec(), None),
2365 ]);
2366 test_basic(&[
2367 (b"key1".to_vec(), Some(b"value5".to_vec())),
2368 (b"key2".to_vec(), Some(b"value3".to_vec())),
2369 (b"key3".to_vec(), Some(b"value4".to_vec())),
2370 (b"key4".to_vec(), Some(b"value7".to_vec())),
2371 (b"key5".to_vec(), Some(b"value2".to_vec())),
2372 (b"key3".to_vec(), None),
2373 ]);
2374 test_basic(&[
2375 (b"key1".to_vec(), Some(b"value5".to_vec())),
2376 (b"key4".to_vec(), Some(b"value3".to_vec())),
2377 (b"key5".to_vec(), Some(b"value4".to_vec())),
2378 (b"key6".to_vec(), Some(b"value4".to_vec())),
2379 (b"key7".to_vec(), Some(b"value2".to_vec())),
2380 (b"key8".to_vec(), Some(b"value1".to_vec())),
2381 (b"key5".to_vec(), None),
2382 ]);
2383 test_basic(&[
2384 (b"key1".to_vec(), Some(b"value5".to_vec())),
2385 (b"key4".to_vec(), Some(b"value3".to_vec())),
2386 (b"key5".to_vec(), Some(b"value4".to_vec())),
2387 (b"key7".to_vec(), Some(b"value2".to_vec())),
2388 (b"key8".to_vec(), Some(b"value1".to_vec())),
2389 (b"key3".to_vec(), None),
2390 ]);
2391 test_basic(&[
2392 (b"key5".to_vec(), Some(b"value5".to_vec())),
2393 (b"key3".to_vec(), Some(b"value3".to_vec())),
2394 (b"key4".to_vec(), Some(b"value4".to_vec())),
2395 (b"key2".to_vec(), Some(b"value2".to_vec())),
2396 (b"key1".to_vec(), Some(b"value1".to_vec())),
2397 (b"key5".to_vec(), None),
2398 (b"key3".to_vec(), None),
2399 ]);
2400 test_basic(&[
2401 (b"key5".to_vec(), Some(b"value5".to_vec())),
2402 (b"key3".to_vec(), Some(b"value3".to_vec())),
2403 (b"key4".to_vec(), Some(b"value4".to_vec())),
2404 (b"key2".to_vec(), Some(b"value2".to_vec())),
2405 (b"key1".to_vec(), Some(b"value1".to_vec())),
2406 (b"key5".to_vec(), None),
2407 (b"key3".to_vec(), None),
2408 (b"key2".to_vec(), None),
2409 (b"key4".to_vec(), None),
2410 ]);
2411 test_basic(&[
2412 (b"key5".to_vec(), Some(b"value5".to_vec())),
2413 (b"key3".to_vec(), Some(b"value3".to_vec())),
2414 (b"key4".to_vec(), Some(b"value4".to_vec())),
2415 (b"key2".to_vec(), Some(b"value2".to_vec())),
2416 (b"key1".to_vec(), Some(b"value1".to_vec())),
2417 (b"key5".to_vec(), None),
2418 (b"key3".to_vec(), None),
2419 (b"key2".to_vec(), None),
2420 (b"key4".to_vec(), None),
2421 (b"key1".to_vec(), None),
2422 ]);
2423 test_basic(&[
2424 ([5u8; 250].to_vec(), Some(b"value5".to_vec())),
2425 ([5u8; 200].to_vec(), Some(b"value3".to_vec())),
2426 ([5u8; 100].to_vec(), Some(b"value4".to_vec())),
2427 ([5u8; 150].to_vec(), Some(b"value2".to_vec())),
2428 ([5u8; 101].to_vec(), Some(b"value1".to_vec())),
2429 ([5u8; 250].to_vec(), None),
2430 ([5u8; 101].to_vec(), None),
2431 ]);
2432 }
2433
2434 #[test]
2435 fn test_btree_iter() {
2436 let col_nb = 0;
2437 let mut data_start = Vec::new();
2438 for i in 0u8..100 {
2439 let mut key = b"key0".to_vec();
2440 key[3] = i;
2441 let mut value = b"val0".to_vec();
2442 value[3] = i;
2443 data_start.push((col_nb, key, Some(value)));
2444 }
2445 let mut data_change = Vec::new();
2446 for i in 0u8..100 {
2447 let mut key = b"key0".to_vec();
2448 if i % 2 == 0 {
2449 key[2] = i;
2450 let mut value = b"val0".to_vec();
2451 value[2] = i;
2452 data_change.push((col_nb, key, Some(value)));
2453 } else if i % 3 == 0 {
2454 key[3] = i;
2455 data_change.push((col_nb, key, None));
2456 } else {
2457 key[3] = i;
2458 let mut value = b"val0".to_vec();
2459 value[2] = i;
2460 data_change.push((col_nb, key, Some(value)));
2461 }
2462 }
2463
2464 let start_state: BTreeMap<Vec<u8>, Vec<u8>> =
2465 data_start.iter().cloned().map(|(_c, k, v)| (k, v.unwrap())).collect();
2466 let mut end_state = start_state.clone();
2467 for (_c, k, v) in data_change.iter() {
2468 if let Some(v) = v {
2469 end_state.insert(k.clone(), v.clone());
2470 } else {
2471 end_state.remove(k);
2472 }
2473 }
2474
2475 for stage in [
2476 EnableCommitPipelineStages::CommitOverlay,
2477 EnableCommitPipelineStages::LogOverlay,
2478 EnableCommitPipelineStages::DbFile,
2479 EnableCommitPipelineStages::Standard,
2480 ] {
2481 for i in 0..10 {
2482 test_btree_iter_inner(
2483 stage,
2484 &data_start,
2485 &data_change,
2486 &start_state,
2487 &end_state,
2488 i * 5,
2489 );
2490 }
2491 let data_start = vec![
2492 (0, b"key1".to_vec(), Some(b"val1".to_vec())),
2493 (0, b"key3".to_vec(), Some(b"val3".to_vec())),
2494 ];
2495 let data_change = vec![(0, b"key2".to_vec(), Some(b"val2".to_vec()))];
2496 let start_state: BTreeMap<Vec<u8>, Vec<u8>> =
2497 data_start.iter().cloned().map(|(_c, k, v)| (k, v.unwrap())).collect();
2498 let mut end_state = start_state.clone();
2499 for (_c, k, v) in data_change.iter() {
2500 if let Some(v) = v {
2501 end_state.insert(k.clone(), v.clone());
2502 } else {
2503 end_state.remove(k);
2504 }
2505 }
2506 test_btree_iter_inner(stage, &data_start, &data_change, &start_state, &end_state, 1);
2507 }
2508 }
2509 fn test_btree_iter_inner(
2510 db_test: EnableCommitPipelineStages,
2511 data_start: &[(u8, Vec<u8>, Option<Value>)],
2512 data_change: &[(u8, Vec<u8>, Option<Value>)],
2513 start_state: &BTreeMap<Vec<u8>, Vec<u8>>,
2514 end_state: &BTreeMap<Vec<u8>, Vec<u8>>,
2515 commit_at: usize,
2516 ) {
2517 let tmp = tempdir().unwrap();
2518 let mut options = db_test.options(tmp.path(), 5);
2519 let col_nb = 0;
2520 options.columns[col_nb as usize].btree_index = true;
2521 let db = Db::open_inner(&options, OpeningMode::Create).unwrap();
2522
2523 db.commit(data_start.iter().cloned()).unwrap();
2524 db_test.run_stages(&db);
2525
2526 let mut iter = db.iter(col_nb).unwrap();
2527 let mut iter_state = start_state.iter();
2528 let mut last_key = Value::new();
2529 for _ in 0..commit_at {
2530 let next = iter.next().unwrap();
2531 if let Some((k, _)) = next.as_ref() {
2532 last_key = k.clone();
2533 }
2534 assert_eq!(iter_state.next(), next.as_ref().map(|(k, v)| (k, v)));
2535 }
2536
2537 db.commit(data_change.iter().cloned()).unwrap();
2538 db_test.run_stages(&db);
2539
2540 let mut iter_state = end_state.range(last_key.clone()..);
2541 for _ in commit_at..100 {
2542 let mut state_next = iter_state.next();
2543 if let Some((k, _v)) = state_next.as_ref() {
2544 if *k == &last_key {
2545 state_next = iter_state.next();
2546 }
2547 }
2548 let iter_next = iter.next().unwrap();
2549 assert_eq!(state_next, iter_next.as_ref().map(|(k, v)| (k, v)));
2550 }
2551 let mut iter_state_rev = end_state.iter().rev();
2552 let mut iter = db.iter(col_nb).unwrap();
2553 iter.seek_to_last().unwrap();
2554 for _ in 0..100 {
2555 let next = iter.prev().unwrap();
2556 assert_eq!(iter_state_rev.next(), next.as_ref().map(|(k, v)| (k, v)));
2557 }
2558 }
2559
2560 #[cfg(feature = "instrumentation")]
2561 #[test]
2562 fn test_recover_from_log_on_error() {
2563 let tmp = tempdir().unwrap();
2564 let mut options = Options::with_columns(tmp.path(), 1);
2565 options.always_flush = true;
2566 options.with_background_thread = false;
2567
2568 {
2570 let db = Db::open_or_create(&options).unwrap();
2571 db.commit::<_, Vec<u8>>(vec![(0, vec![0], Some(vec![0]))]).unwrap();
2572 db.process_commits().unwrap();
2573 db.flush_logs().unwrap();
2574 db.enact_logs().unwrap();
2575 db.commit::<_, Vec<u8>>(vec![(0, vec![1], Some(vec![1]))]).unwrap();
2576 db.process_commits().unwrap();
2577 db.flush_logs().unwrap();
2578 crate::set_number_of_allowed_io_operations(4);
2579
2580 let err = db.enact_logs();
2582 assert!(err.is_err());
2583 db.inner.store_err(err);
2584 crate::set_number_of_allowed_io_operations(usize::MAX);
2585 }
2586
2587 {
2589 let db = Db::open(&options).unwrap();
2590 assert_eq!(db.get(0, &[0]).unwrap(), Some(vec![0]));
2591 assert_eq!(db.get(0, &[1]).unwrap(), Some(vec![1]));
2592 }
2593 }
2594
2595 #[cfg(feature = "instrumentation")]
2596 #[test]
2597 fn test_partial_log_recovery() {
2598 let tmp = tempdir().unwrap();
2599 let mut options = Options::with_columns(tmp.path(), 1);
2600 options.columns[0].btree_index = true;
2601 options.always_flush = true;
2602 options.with_background_thread = false;
2603
2604 {
2606 let db = Db::open_or_create(&options).unwrap();
2607 db.commit::<_, Vec<u8>>(vec![(0, vec![0], Some(vec![0]))]).unwrap();
2608 db.process_commits().unwrap();
2609 db.commit::<_, Vec<u8>>(vec![(0, vec![1], Some(vec![1]))]).unwrap();
2610 crate::set_number_of_allowed_io_operations(4);
2611 assert!(db.process_commits().is_err());
2612 crate::set_number_of_allowed_io_operations(usize::MAX);
2613 db.flush_logs().unwrap();
2614 }
2615
2616 {
2618 let db = Db::open(&options).unwrap();
2619 assert_eq!(db.get(0, &[0]).unwrap(), Some(vec![0]));
2620 }
2621
2622 {
2624 let db = Db::open(&options).unwrap();
2625 assert!(db.get(0, &[0]).unwrap().is_some());
2626 }
2627 }
2628
2629 #[cfg(feature = "instrumentation")]
2630 #[test]
2631 fn test_continue_reindex() {
2632 let _ = env_logger::try_init();
2633 let tmp = tempdir().unwrap();
2634 let mut options = Options::with_columns(tmp.path(), 1);
2635 options.columns[0].preimage = true;
2636 options.columns[0].uniform = true;
2637 options.always_flush = true;
2638 options.with_background_thread = false;
2639 options.salt = Some(Default::default());
2640
2641 {
2642 let db = Db::open_or_create(&options).unwrap();
2644 let commit: Vec<_> = (0..65u32)
2645 .map(|index| {
2646 let mut key = [0u8; 32];
2647 key[2] = (index as u8) << 1;
2648 (0, key.to_vec(), Some(vec![index as u8]))
2649 })
2650 .collect();
2651 db.commit(commit).unwrap();
2652
2653 db.process_commits().unwrap();
2654 db.flush_logs().unwrap();
2655 db.enact_logs().unwrap();
2656 std::fs::copy(tmp.path().join("index_00_16"), tmp.path().join("index_00_16.bak"))
2661 .unwrap();
2662 db.process_reindex().unwrap();
2663 db.flush_logs().unwrap();
2664 db.enact_logs().unwrap();
2665 db.clean_logs().unwrap();
2666 std::fs::rename(tmp.path().join("index_00_16.bak"), tmp.path().join("index_00_16"))
2667 .unwrap();
2668 }
2669
2670 {
2672 let db = Db::open(&options).unwrap();
2673 db.process_reindex().unwrap();
2674 let mut entries = 0;
2675 db.iter_column_while(0, |_| {
2676 entries += 1;
2677 true
2678 })
2679 .unwrap();
2680
2681 assert_eq!(entries, 65);
2682 assert_eq!(db.inner.columns[0].index_bits(), Some(17));
2683 }
2684 }
2685
2686 #[test]
2687 fn test_remove_column() {
2688 let tmp = tempdir().unwrap();
2689 let db_test_file = EnableCommitPipelineStages::DbFile;
2690 let mut options_db_files = db_test_file.options(tmp.path(), 2);
2691 options_db_files.salt = Some(options_db_files.salt.unwrap_or_default());
2692 let mut options_std = EnableCommitPipelineStages::Standard.options(tmp.path(), 2);
2693 options_std.salt = options_db_files.salt.clone();
2694
2695 let db = Db::open_inner(&options_db_files, OpeningMode::Create).unwrap();
2696
2697 let payload: Vec<(u8, _, _)> = (0u16..100)
2698 .map(|i| (1, i.to_le_bytes().to_vec(), Some(i.to_be_bytes().to_vec())))
2699 .collect();
2700
2701 db.commit(payload.clone()).unwrap();
2702
2703 db_test_file.run_stages(&db);
2704 drop(db);
2705
2706 let db = Db::open_inner(&options_std, OpeningMode::Write).unwrap();
2707 for (col, key, value) in payload.iter() {
2708 assert_eq!(db.get(*col, key).unwrap().as_ref(), value.as_ref());
2709 }
2710 drop(db);
2711 Db::reset_column(&mut options_db_files, 1, None).unwrap();
2712
2713 let db = Db::open_inner(&options_db_files, OpeningMode::Write).unwrap();
2714 for (col, key, _value) in payload.iter() {
2715 assert_eq!(db.get(*col, key).unwrap(), None);
2716 }
2717
2718 let payload: Vec<(u8, _, _)> = (0u16..10)
2719 .map(|i| (1, i.to_le_bytes().to_vec(), Some(i.to_be_bytes().to_vec())))
2720 .collect();
2721
2722 db.commit(payload.clone()).unwrap();
2723
2724 db_test_file.run_stages(&db);
2725 drop(db);
2726
2727 let db = Db::open_inner(&options_std, OpeningMode::Write).unwrap();
2728 let payload: Vec<(u8, _, _)> = (10u16..100)
2729 .map(|i| (1, i.to_le_bytes().to_vec(), Some(i.to_be_bytes().to_vec())))
2730 .collect();
2731
2732 db.commit(payload.clone()).unwrap();
2733 assert!(db.iter(1).is_err());
2734
2735 drop(db);
2736
2737 let mut col_option = options_std.columns[1].clone();
2738 col_option.btree_index = true;
2739 Db::reset_column(&mut options_std, 1, Some(col_option)).unwrap();
2740
2741 let db = Db::open_inner(&options_std, OpeningMode::Write).unwrap();
2742 let payload: Vec<(u8, _, _)> = (0u16..10)
2743 .map(|i| (1, i.to_le_bytes().to_vec(), Some(i.to_be_bytes().to_vec())))
2744 .collect();
2745
2746 db.commit(payload.clone()).unwrap();
2747 assert!(db.iter(1).is_ok());
2748 }
2749}