parity_db/
log.rs

1// Copyright 2021-2022 Parity Technologies (UK) Ltd.
2// This file is dual-licensed as Apache-2.0 or MIT.
3
4use crate::{
5	column::ColId,
6	error::{try_io, Error, Result},
7	file::MappedBytesGuard,
8	index::{Chunk as IndexChunk, TableId as IndexTableId, ENTRY_BYTES},
9	options::Options,
10	parking_lot::{RwLock, RwLockWriteGuard},
11	table::TableId as ValueTableId,
12};
13use std::{
14	cmp::min,
15	collections::{HashMap, VecDeque},
16	convert::TryInto,
17	io::{ErrorKind, Read, Seek, Write},
18	sync::atomic::{AtomicBool, AtomicU32, AtomicU64, Ordering},
19};
20
21const MAX_LOG_POOL_SIZE: usize = 16;
22const BEGIN_RECORD: u8 = 1;
23const INSERT_INDEX: u8 = 2;
24const INSERT_VALUE: u8 = 3;
25const END_RECORD: u8 = 4;
26const DROP_TABLE: u8 = 5;
27
28// Once index overly uses less than 1/10 of its capacity, it will be reclaimed.
29const INDEX_OVERLAY_RECLAIM_FACTOR: usize = 10;
30// Once value overly uses less than 1/10 of its capacity, it will be reclaimed.
31const VALUE_OVERLAY_RECLAIM_FACTOR: usize = 10;
32// Min number of value items to initiate reclaim. Each item is around 40 bytes.
33const VALUE_OVERLAY_MIN_RECLAIM_ITEMS: usize = 10240;
34
35#[derive(Debug)]
36pub struct InsertIndexAction {
37	pub table: IndexTableId,
38	pub index: u64,
39}
40
41#[derive(Debug)]
42pub struct InsertValueAction {
43	pub table: ValueTableId,
44	pub index: u64,
45}
46
47#[derive(Debug)]
48pub enum LogAction {
49	BeginRecord,
50	InsertIndex(InsertIndexAction),
51	InsertValue(InsertValueAction),
52	DropTable(IndexTableId),
53	EndRecord,
54}
55
56pub trait LogQuery {
57	type ValueRef<'a>: std::ops::Deref<Target = [u8]>
58	where
59		Self: 'a;
60
61	fn with_index<R, F: FnOnce(&IndexChunk) -> R>(
62		&self,
63		table: IndexTableId,
64		index: u64,
65		f: F,
66	) -> Option<R>;
67	fn value(&self, table: ValueTableId, index: u64, dest: &mut [u8]) -> bool;
68	fn value_ref<'a>(&'a self, table: ValueTableId, index: u64) -> Option<Self::ValueRef<'a>>;
69}
70
71#[derive(Debug)]
72pub struct LogOverlays {
73	index: Vec<IndexLogOverlay>,
74	value: Vec<ValueLogOverlay>,
75	last_record_ids: Vec<u64>,
76}
77
78impl LogOverlays {
79	pub fn last_record_id(&self, col: ColId) -> u64 {
80		self.last_record_ids.get(col as usize).cloned().unwrap_or(u64::MAX)
81	}
82
83	pub fn with_columns(count: usize) -> Self {
84		Self {
85			index: (0..IndexTableId::max_log_indicies(count))
86				.map(|_| IndexLogOverlay::default())
87				.collect(),
88			value: (0..ValueTableId::max_log_tables(count))
89				.map(|_| ValueLogOverlay::default())
90				.collect(),
91			last_record_ids: (0..count).map(|_| 0).collect(),
92		}
93	}
94}
95
96impl LogQuery for RwLock<LogOverlays> {
97	type ValueRef<'a> = MappedBytesGuard<'a>;
98
99	fn with_index<R, F: FnOnce(&IndexChunk) -> R>(
100		&self,
101		table: IndexTableId,
102		index: u64,
103		f: F,
104	) -> Option<R> {
105		(&*self.read()).with_index(table, index, f)
106	}
107
108	fn value(&self, table: ValueTableId, index: u64, dest: &mut [u8]) -> bool {
109		(&*self.read()).value(table, index, dest)
110	}
111
112	#[cfg(not(feature = "loom"))]
113	fn value_ref<'a>(&'a self, table: ValueTableId, index: u64) -> Option<Self::ValueRef<'a>> {
114		let lock =
115			parking_lot::RwLockReadGuard::try_map(self.read(), |o| o.value_ref(table, index));
116		lock.ok()
117	}
118
119	#[cfg(feature = "loom")]
120	fn value_ref<'a>(&'a self, table: ValueTableId, index: u64) -> Option<Self::ValueRef<'a>> {
121		self.read().value_ref(table, index).map(|o| MappedBytesGuard::new(o.to_vec()))
122	}
123}
124
125impl LogQuery for LogOverlays {
126	type ValueRef<'a> = &'a [u8];
127	fn with_index<R, F: FnOnce(&IndexChunk) -> R>(
128		&self,
129		table: IndexTableId,
130		index: u64,
131		f: F,
132	) -> Option<R> {
133		self.index
134			.get(table.log_index())
135			.and_then(|o| o.map.get(&index).map(|(_id, _mask, data)| f(data)))
136	}
137
138	fn value(&self, table: ValueTableId, index: u64, dest: &mut [u8]) -> bool {
139		let s = self;
140		if let Some(d) = s
141			.value
142			.get(table.log_index())
143			.and_then(|o| o.map.get(&index).map(|(_id, data)| data))
144		{
145			let len = dest.len().min(d.len());
146			dest[0..len].copy_from_slice(&d[0..len]);
147			true
148		} else {
149			false
150		}
151	}
152	fn value_ref<'a>(&'a self, table: ValueTableId, index: u64) -> Option<Self::ValueRef<'a>> {
153		self.value
154			.get(table.log_index())
155			.and_then(|o| o.map.get(&index).map(|(_id, data)| data.as_slice()))
156	}
157}
158
159#[derive(Debug, Default)]
160pub struct Cleared {
161	index: Vec<(IndexTableId, u64)>,
162	values: Vec<(ValueTableId, u64)>,
163}
164
165#[derive(Debug)]
166pub struct LogReader<'a> {
167	reading: RwLockWriteGuard<'a, Option<Reading>>,
168	record_id: u64,
169	read_bytes: u64,
170	crc32: crc32fast::Hasher,
171	validate: bool,
172	cleared: Cleared,
173}
174
175impl<'a> LogReader<'a> {
176	pub fn record_id(&self) -> u64 {
177		self.record_id
178	}
179
180	fn new(reading: RwLockWriteGuard<'a, Option<Reading>>, validate: bool) -> LogReader<'a> {
181		LogReader {
182			cleared: Default::default(),
183			reading,
184			record_id: 0,
185			read_bytes: 0,
186			crc32: crc32fast::Hasher::new(),
187			validate,
188		}
189	}
190
191	pub fn reset(&mut self) -> Result<()> {
192		self.cleared = Default::default();
193		try_io!(self
194			.reading
195			.as_mut()
196			.unwrap()
197			.file
198			.seek(std::io::SeekFrom::Current(-(self.read_bytes as i64))));
199		self.read_bytes = 0;
200		self.record_id = 0;
201		self.crc32 = crc32fast::Hasher::new();
202		Ok(())
203	}
204
205	pub fn next(&mut self) -> Result<LogAction> {
206		let mut read_buf = |size, buf: &mut [u8; 8]| -> Result<()> {
207			try_io!(self.reading.as_mut().unwrap().file.read_exact(&mut buf[0..size]));
208			self.read_bytes += size as u64;
209			if self.validate {
210				self.crc32.update(&buf[0..size]);
211			}
212			Ok(())
213		};
214
215		let mut buf = [0u8; 8];
216		read_buf(1, &mut buf)?;
217		match buf[0] {
218			BEGIN_RECORD => {
219				read_buf(8, &mut buf)?;
220				let record_id = u64::from_le_bytes(buf);
221				self.record_id = record_id;
222				Ok(LogAction::BeginRecord)
223			},
224			INSERT_INDEX => {
225				read_buf(2, &mut buf)?;
226				let table =
227					IndexTableId::from_u16(u16::from_le_bytes(buf[0..2].try_into().unwrap()));
228				read_buf(8, &mut buf)?;
229				let index = u64::from_le_bytes(buf);
230				self.cleared.index.push((table, index));
231				Ok(LogAction::InsertIndex(InsertIndexAction { table, index }))
232			},
233			INSERT_VALUE => {
234				read_buf(2, &mut buf)?;
235				let table =
236					ValueTableId::from_u16(u16::from_le_bytes(buf[0..2].try_into().unwrap()));
237				read_buf(8, &mut buf)?;
238				let index = u64::from_le_bytes(buf);
239				self.cleared.values.push((table, index));
240				Ok(LogAction::InsertValue(InsertValueAction { table, index }))
241			},
242			END_RECORD => {
243				try_io!(self.reading.as_mut().unwrap().file.read_exact(&mut buf[0..4]));
244				self.read_bytes += 4;
245				if self.validate {
246					let checksum = u32::from_le_bytes(buf[0..4].try_into().unwrap());
247					let expected = std::mem::take(&mut self.crc32).finalize();
248					log::trace!(target: "parity-db",
249						"Read end of record, checksum={:#x}, expected={:#x}",
250						checksum,
251						expected,
252					);
253					if checksum != expected {
254						return Err(Error::Corruption("Log record CRC-32 mismatch".into()))
255					}
256				} else {
257					log::trace!(target: "parity-db", "Read end of record");
258				}
259				Ok(LogAction::EndRecord)
260			},
261			DROP_TABLE => {
262				read_buf(2, &mut buf)?;
263				let table =
264					IndexTableId::from_u16(u16::from_le_bytes(buf[0..2].try_into().unwrap()));
265				Ok(LogAction::DropTable(table))
266			},
267			_ => Err(Error::Corruption("Bad log entry type".into())),
268		}
269	}
270
271	pub fn read(&mut self, buf: &mut [u8]) -> Result<()> {
272		try_io!(self.reading.as_mut().unwrap().file.read_exact(buf));
273		self.read_bytes += buf.len() as u64;
274		if self.validate {
275			self.crc32.update(buf);
276		}
277		Ok(())
278	}
279
280	pub fn drain(self) -> Cleared {
281		self.cleared
282	}
283
284	pub fn read_bytes(&self) -> u64 {
285		self.read_bytes
286	}
287}
288
289#[derive(Debug)]
290pub struct LogChange {
291	local_index: HashMap<IndexTableId, IndexLogOverlay>,
292	local_values: HashMap<ValueTableId, ValueLogOverlayLocal>,
293	record_id: u64,
294	dropped_tables: Vec<IndexTableId>,
295}
296
297impl LogChange {
298	fn new(record_id: u64) -> LogChange {
299		LogChange {
300			local_index: Default::default(),
301			local_values: Default::default(),
302			dropped_tables: Default::default(),
303			record_id,
304		}
305	}
306
307	pub fn local_values_changes(&self, id: ValueTableId) -> Option<&ValueLogOverlayLocal> {
308		self.local_values.get(&id)
309	}
310
311	fn flush_to_file(self, file: &mut std::io::BufWriter<std::fs::File>) -> Result<FlushedLog> {
312		let mut crc32 = crc32fast::Hasher::new();
313		let mut bytes: u64 = 0;
314
315		let mut write = |buf: &[u8]| -> Result<()> {
316			try_io!(file.write_all(buf));
317			crc32.update(buf);
318			bytes += buf.len() as u64;
319			Ok(())
320		};
321
322		write(&BEGIN_RECORD.to_le_bytes())?;
323		write(&self.record_id.to_le_bytes())?;
324
325		for (id, overlay) in self.local_index.iter() {
326			for (index, (_, modified_entries_mask, chunk)) in overlay.map.iter() {
327				write(INSERT_INDEX.to_le_bytes().as_ref())?;
328				write(&id.as_u16().to_le_bytes())?;
329				write(&index.to_le_bytes())?;
330				write(&modified_entries_mask.to_le_bytes())?;
331				let mut mask = *modified_entries_mask;
332				while mask != 0 {
333					let i = mask.trailing_zeros();
334					mask &= !(1 << i);
335					write(&chunk.0[i as usize * ENTRY_BYTES..(i as usize + 1) * ENTRY_BYTES])?;
336				}
337			}
338		}
339		for (id, overlay) in self.local_values.iter() {
340			for (index, (_, value)) in overlay.map.iter() {
341				write(INSERT_VALUE.to_le_bytes().as_ref())?;
342				write(&id.as_u16().to_le_bytes())?;
343				write(&index.to_le_bytes())?;
344				write(value)?;
345			}
346		}
347		for id in self.dropped_tables.iter() {
348			log::debug!(target: "parity-db", "Finalizing drop {}", id);
349			write(DROP_TABLE.to_le_bytes().as_ref())?;
350			write(&id.as_u16().to_le_bytes())?;
351		}
352		write(&END_RECORD.to_le_bytes())?;
353		let checksum: u32 = crc32.finalize();
354		try_io!(file.write_all(&checksum.to_le_bytes()));
355		bytes += 4;
356		try_io!(file.flush());
357		Ok(FlushedLog { index: self.local_index, values: self.local_values, bytes })
358	}
359}
360
361#[derive(Debug)]
362struct FlushedLog {
363	index: HashMap<IndexTableId, IndexLogOverlay>,
364	values: HashMap<ValueTableId, ValueLogOverlayLocal>,
365	bytes: u64,
366}
367
368#[derive(Debug)]
369pub struct LogWriter<'a> {
370	overlays: &'a RwLock<LogOverlays>,
371	log: LogChange,
372}
373
374impl<'a> LogWriter<'a> {
375	pub fn new(overlays: &'a RwLock<LogOverlays>, record_id: u64) -> LogWriter<'a> {
376		LogWriter { overlays, log: LogChange::new(record_id) }
377	}
378
379	pub fn record_id(&self) -> u64 {
380		self.log.record_id
381	}
382
383	pub fn insert_index(&mut self, table: IndexTableId, index: u64, sub: u8, data: IndexChunk) {
384		match self.log.local_index.entry(table).or_default().map.entry(index) {
385			std::collections::hash_map::Entry::Occupied(mut entry) => {
386				*entry.get_mut() = (self.log.record_id, entry.get().1 | (1 << sub), data);
387			},
388			std::collections::hash_map::Entry::Vacant(entry) => {
389				entry.insert((self.log.record_id, 1 << sub, data));
390			},
391		}
392	}
393
394	pub fn insert_value(&mut self, table: ValueTableId, index: u64, data: Vec<u8>) {
395		self.log
396			.local_values
397			.entry(table)
398			.or_default()
399			.map
400			.insert(index, (self.log.record_id, data));
401	}
402
403	pub fn drop_table(&mut self, id: IndexTableId) {
404		self.log.dropped_tables.push(id);
405	}
406
407	pub fn drain(self) -> LogChange {
408		self.log
409	}
410}
411
412pub enum LogWriterValueGuard<'a> {
413	Local(&'a [u8]),
414	Overlay(MappedBytesGuard<'a>),
415}
416
417impl std::ops::Deref for LogWriterValueGuard<'_> {
418	type Target = [u8];
419	fn deref(&self) -> &[u8] {
420		match self {
421			LogWriterValueGuard::Local(data) => data,
422			LogWriterValueGuard::Overlay(data) => data.deref(),
423		}
424	}
425}
426
427impl<'q> LogQuery for LogWriter<'q> {
428	type ValueRef<'a> = LogWriterValueGuard<'a> where Self: 'a;
429	fn with_index<R, F: FnOnce(&IndexChunk) -> R>(
430		&self,
431		table: IndexTableId,
432		index: u64,
433		f: F,
434	) -> Option<R> {
435		match self
436			.log
437			.local_index
438			.get(&table)
439			.and_then(|o| o.map.get(&index).map(|(_id, _mask, data)| data))
440		{
441			Some(data) => Some(f(data)),
442			None => self.overlays.with_index(table, index, f),
443		}
444	}
445
446	fn value(&self, table: ValueTableId, index: u64, dest: &mut [u8]) -> bool {
447		if let Some(d) = self
448			.log
449			.local_values
450			.get(&table)
451			.and_then(|o| o.map.get(&index).map(|(_id, data)| data))
452		{
453			let len = dest.len().min(d.len());
454			dest[0..len].copy_from_slice(&d[0..len]);
455			true
456		} else {
457			self.overlays.value(table, index, dest)
458		}
459	}
460	fn value_ref<'v>(&'v self, table: ValueTableId, index: u64) -> Option<Self::ValueRef<'v>> {
461		self.log
462			.local_values
463			.get(&table)
464			.and_then(|o| {
465				o.map.get(&index).map(|(_id, data)| LogWriterValueGuard::Local(data.as_slice()))
466			})
467			.or_else(|| {
468				self.overlays
469					.value_ref(table, index)
470					.map(|data| LogWriterValueGuard::Overlay(data))
471			})
472	}
473}
474
475// Identity hash.
476#[derive(Debug, Default, Clone)]
477pub struct IdentityHash(u64);
478pub type BuildIdHash = std::hash::BuildHasherDefault<IdentityHash>;
479
480impl std::hash::Hasher for IdentityHash {
481	fn write(&mut self, _: &[u8]) {
482		unreachable!()
483	}
484	fn write_u8(&mut self, _: u8) {
485		unreachable!()
486	}
487	fn write_u16(&mut self, _: u16) {
488		unreachable!()
489	}
490	fn write_u32(&mut self, _: u32) {
491		unreachable!()
492	}
493	fn write_u64(&mut self, n: u64) {
494		self.0 = n
495	}
496	fn write_usize(&mut self, _: usize) {
497		unreachable!()
498	}
499	fn write_i8(&mut self, _: i8) {
500		unreachable!()
501	}
502	fn write_i16(&mut self, _: i16) {
503		unreachable!()
504	}
505	fn write_i32(&mut self, _: i32) {
506		unreachable!()
507	}
508	fn write_i64(&mut self, _: i64) {
509		unreachable!()
510	}
511	fn write_isize(&mut self, _: isize) {
512		unreachable!()
513	}
514	fn finish(&self) -> u64 {
515		self.0
516	}
517}
518
519#[derive(Debug, Default)]
520pub struct IndexLogOverlay {
521	pub map: HashMap<u64, (u64, u64, IndexChunk)>, // index -> (record_id, modified_mask, entry)
522}
523
524// We use identity hash for value overlay/log records so that writes to value tables are in order.
525#[derive(Debug, Default)]
526pub struct ValueLogOverlay {
527	pub map: HashMap<u64, (u64, Vec<u8>)>, // index -> (record_id, entry)
528}
529#[derive(Debug, Default)]
530pub struct ValueLogOverlayLocal {
531	pub map: HashMap<u64, (u64, Vec<u8>), BuildIdHash>, // index -> (record_id, entry)
532}
533
534#[derive(Debug)]
535struct Appending {
536	id: u32,
537	file: std::io::BufWriter<std::fs::File>,
538	size: u64,
539}
540
541#[derive(Debug)]
542struct Reading {
543	id: u32,
544	file: std::io::BufReader<std::fs::File>,
545}
546
547#[derive(Debug)]
548pub struct Log {
549	overlays: RwLock<LogOverlays>,
550	appending: RwLock<Option<Appending>>,
551	reading: RwLock<Option<Reading>>,
552	read_queue: RwLock<VecDeque<(u32, std::fs::File)>>,
553	next_record_id: AtomicU64,
554	dirty: AtomicBool,
555	log_pool: RwLock<VecDeque<(u32, std::fs::File)>>,
556	cleanup_queue: RwLock<VecDeque<(u32, std::fs::File)>>,
557	replay_queue: RwLock<VecDeque<(u32, u64, std::fs::File)>>,
558	path: std::path::PathBuf,
559	next_log_id: AtomicU32,
560	sync: bool,
561}
562
563impl Log {
564	pub fn open(options: &Options) -> Result<Log> {
565		let path = options.path.clone();
566		let mut logs = VecDeque::new();
567		let mut max_log_id = 0;
568		for entry in try_io!(std::fs::read_dir(&path)) {
569			let entry = try_io!(entry);
570			if let Some(name) = entry.file_name().as_os_str().to_str() {
571				if try_io!(entry.metadata()).is_file() && name.starts_with("log") {
572					if let Ok(nlog) = std::str::FromStr::from_str(&name[3..]) {
573						let path = Self::log_path(&path, nlog);
574						let (file, record_id) = Self::open_log_file(&path)?;
575						if let Some(record_id) = record_id {
576							log::debug!(target: "parity-db", "Opened log {}, record {}", nlog, record_id);
577							logs.push_back((nlog, record_id, file));
578							if nlog > max_log_id {
579								max_log_id = nlog
580							}
581						} else {
582							log::debug!(target: "parity-db", "Removing log {}", nlog);
583							drop(file);
584							try_io!(std::fs::remove_file(&path));
585						}
586					}
587				}
588			}
589		}
590		logs.make_contiguous().sort_by_key(|(_id, record_id, _)| *record_id);
591		let next_log_id = if logs.is_empty() { 0 } else { max_log_id + 1 };
592
593		Ok(Log {
594			overlays: RwLock::new(LogOverlays::with_columns(options.columns.len())),
595			appending: RwLock::new(None),
596			reading: RwLock::new(None),
597			read_queue: RwLock::default(),
598			next_record_id: AtomicU64::new(1),
599			next_log_id: AtomicU32::new(next_log_id),
600			dirty: AtomicBool::new(true),
601			sync: options.sync_wal,
602			replay_queue: RwLock::new(logs),
603			cleanup_queue: RwLock::default(),
604			log_pool: RwLock::default(),
605			path,
606		})
607	}
608
609	fn log_path(root: &std::path::Path, id: u32) -> std::path::PathBuf {
610		let mut path: std::path::PathBuf = root.into();
611		path.push(format!("log{id}"));
612		path
613	}
614
615	pub fn replay_record_id(&self) -> Option<u64> {
616		self.replay_queue.read().front().map(|(_id, record_id, _)| *record_id)
617	}
618
619	pub fn open_log_file(path: &std::path::Path) -> Result<(std::fs::File, Option<u64>)> {
620		let mut file = try_io!(std::fs::OpenOptions::new().read(true).write(true).open(path));
621		if try_io!(file.metadata()).len() == 0 {
622			return Ok((file, None))
623		}
624		match Self::read_first_record_id(&mut file) {
625			Err(Error::Io(e)) if e.kind() == ErrorKind::UnexpectedEof => {
626				log::error!(target: "parity-db", "Opened existing log {}. No first record id found", path.display());
627				Ok((file, None))
628			},
629			Err(e) => Err(e),
630			Ok(id) => {
631				try_io!(file.rewind());
632				log::debug!(target: "parity-db", "Opened existing log {}, first record_id = {}", path.display(), id);
633				Ok((file, Some(id)))
634			},
635		}
636	}
637
638	fn read_first_record_id(file: &mut std::fs::File) -> Result<u64> {
639		let mut buf = [0; 9];
640		try_io!(file.read_exact(&mut buf));
641		Ok(u64::from_le_bytes(buf[1..].try_into().unwrap()))
642	}
643
644	fn drop_log(&self, id: u32) -> Result<()> {
645		log::debug!(target: "parity-db", "Drop log {}", id);
646		let path = Self::log_path(&self.path, id);
647		try_io!(std::fs::remove_file(path));
648		Ok(())
649	}
650
651	pub fn clear_replay_logs(&self) {
652		if let Some(reading) = self.reading.write().take() {
653			self.cleanup_queue.write().push_back((reading.id, reading.file.into_inner()));
654		}
655		for (id, _, file) in self.replay_queue.write().drain(0..) {
656			self.cleanup_queue.write().push_back((id, file));
657		}
658		let mut overlays = self.overlays.write();
659		for o in overlays.index.iter_mut() {
660			o.map.clear();
661		}
662		for o in overlays.value.iter_mut() {
663			o.map.clear();
664		}
665		for r in overlays.last_record_ids.iter_mut() {
666			*r = 0;
667		}
668		self.dirty.store(false, Ordering::Relaxed);
669	}
670
671	pub fn begin_record(&self) -> LogWriter<'_> {
672		let id = self.next_record_id.fetch_add(1, Ordering::Relaxed);
673		LogWriter::new(&self.overlays, id)
674	}
675
676	pub fn end_record(&self, log: LogChange) -> Result<u64> {
677		assert_eq!(log.record_id + 1, self.next_record_id.load(Ordering::Relaxed));
678		let record_id = log.record_id;
679		let mut appending = self.appending.write();
680		if appending.is_none() {
681			// Find a log file in the pool or create a new one
682			let (id, file) = if let Some((id, file)) = self.log_pool.write().pop_front() {
683				log::debug!(target: "parity-db", "Flush: Activated pool writer {}", id);
684				(id, file)
685			} else {
686				// find a free id
687				let id = self.next_log_id.fetch_add(1, Ordering::SeqCst);
688				let path = Self::log_path(&self.path, id);
689				let file = try_io!(std::fs::OpenOptions::new()
690					.create(true)
691					.read(true)
692					.write(true)
693					.open(path));
694				log::debug!(target: "parity-db", "Flush: Activated new writer {}", id);
695				(id, file)
696			};
697			*appending = Some(Appending { size: 0, file: std::io::BufWriter::new(file), id });
698		}
699		let appending = appending.as_mut().unwrap();
700		let FlushedLog { index, values, bytes } = log.flush_to_file(&mut appending.file)?;
701		let mut overlays = self.overlays.write();
702		let mut total_index = 0;
703		for (id, overlay) in index.into_iter() {
704			total_index += overlay.map.len();
705			overlays.index[id.log_index()].map.extend(overlay.map.into_iter());
706		}
707		let mut total_value = 0;
708		for (id, overlay) in values.into_iter() {
709			total_value += overlay.map.len();
710			overlays.last_record_ids[id.col() as usize] = record_id;
711			overlays.value[id.log_index()].map.extend(overlay.map.into_iter());
712		}
713
714		log::debug!(
715			target: "parity-db",
716			"Finalizing log record {} ({} index, {} value)",
717			record_id,
718			total_index,
719			total_value,
720		);
721		appending.size += bytes;
722		self.dirty.store(true, Ordering::Relaxed);
723		Ok(bytes)
724	}
725
726	pub fn end_read(&self, cleared: Cleared, record_id: u64) {
727		if record_id >= self.next_record_id.load(Ordering::Relaxed) {
728			self.next_record_id.store(record_id + 1, Ordering::Relaxed);
729		}
730		let mut overlays = self.overlays.write();
731		for (table, index) in cleared.index.into_iter() {
732			if let Some(ref mut overlay) = overlays.index.get_mut(table.log_index()) {
733				if let std::collections::hash_map::Entry::Occupied(e) = overlay.map.entry(index) {
734					if e.get().0 == record_id {
735						e.remove_entry();
736					}
737				}
738			}
739		}
740		for (table, index) in cleared.values.into_iter() {
741			if let Some(ref mut overlay) = overlays.value.get_mut(table.log_index()) {
742				if let std::collections::hash_map::Entry::Occupied(e) = overlay.map.entry(index) {
743					if e.get().0 == record_id {
744						e.remove_entry();
745					}
746				}
747			}
748		}
749		// Reclaim overlay memory
750		for (i, o) in overlays.index.iter_mut().enumerate() {
751			if o.map.capacity() > o.map.len() * INDEX_OVERLAY_RECLAIM_FACTOR {
752				log::trace!(
753					"Schrinking index overlay {}: {}/{}",
754					IndexTableId::from_log_index(i),
755					o.map.len(),
756					o.map.capacity(),
757				);
758				o.map.shrink_to_fit();
759			}
760		}
761		for (i, o) in overlays.value.iter_mut().enumerate() {
762			if o.map.capacity() > VALUE_OVERLAY_MIN_RECLAIM_ITEMS &&
763				o.map.capacity() > o.map.len() * VALUE_OVERLAY_RECLAIM_FACTOR
764			{
765				log::trace!(
766					"Schrinking value overlay {}: {}/{}",
767					ValueTableId::from_log_index(i),
768					o.map.len(),
769					o.map.capacity(),
770				);
771				o.map.shrink_to_fit();
772			}
773		}
774	}
775
776	pub fn flush_one(&self, min_size: u64) -> Result<bool> {
777		// If it exists take the writer and flush it
778		let cur_size = self.appending.read().as_ref().map_or(0, |r| r.size);
779		if cur_size > min_size {
780			if let Some(to_flush) = self.appending.write().take() {
781				let file = try_io!(to_flush.file.into_inner().map_err(|e| e.into_error()));
782				if self.sync {
783					log::debug!(target: "parity-db", "Flush: Flushing log to disk");
784					try_io!(file.sync_data());
785					log::debug!(target: "parity-db", "Flush: Flushing log completed");
786				}
787				self.read_queue.write().push_back((to_flush.id, file));
788			}
789			return Ok(true)
790		}
791		Ok(false)
792	}
793
794	pub fn replay_next(&self) -> Result<Option<u32>> {
795		let mut reading = self.reading.write();
796		{
797			if let Some(reading) = reading.take() {
798				log::debug!(target: "parity-db", "Replay: Activated log cleanup {}", reading.id);
799				let file = reading.file.into_inner();
800				self.cleanup_queue.write().push_back((reading.id, file));
801			}
802		}
803		if let Some((id, _record_id, file)) = self.replay_queue.write().pop_front() {
804			log::debug!(target: "parity-db", "Replay: Activated log reader {}", id);
805			*reading = Some(Reading { id, file: std::io::BufReader::new(file) });
806			Ok(Some(id))
807		} else {
808			Ok(None)
809		}
810	}
811
812	pub fn clean_logs(&self, max_count: usize) -> Result<bool> {
813		let mut cleaned: Vec<_> = {
814			let mut queue = self.cleanup_queue.write();
815			let count = min(max_count, queue.len());
816			queue.drain(0..count).collect()
817		};
818		for (id, ref mut file) in cleaned.iter_mut() {
819			log::debug!(target: "parity-db", "Cleaned: {}", id);
820			try_io!(file.rewind());
821			try_io!(file.set_len(0));
822			file.sync_all().map_err(Error::Io)?;
823		}
824		// Move cleaned logs back to the pool
825		let mut pool = self.log_pool.write();
826		pool.extend(cleaned);
827		// Sort to reuse lower IDs an prevent IDs from growing.
828		pool.make_contiguous().sort_by_key(|(id, _)| *id);
829		if pool.len() > MAX_LOG_POOL_SIZE {
830			let removed = pool.drain(MAX_LOG_POOL_SIZE..);
831			for (id, file) in removed {
832				drop(file);
833				self.drop_log(id)?;
834			}
835		}
836		Ok(!self.cleanup_queue.read().is_empty())
837	}
838
839	pub fn num_dirty_logs(&self) -> usize {
840		self.cleanup_queue.read().len()
841	}
842
843	pub fn read_next(&self, validate: bool) -> Result<Option<LogReader<'_>>> {
844		let mut reading = self.reading.write();
845		if reading.is_none() {
846			if let Some((id, mut file)) = self.read_queue.write().pop_front() {
847				try_io!(file.rewind());
848				*reading = Some(Reading { id, file: std::io::BufReader::new(file) });
849			} else {
850				log::trace!(target: "parity-db", "No active reader");
851				return Ok(None)
852			}
853		}
854		let mut reader = LogReader::new(reading, validate);
855		match reader.next() {
856			Ok(LogAction::BeginRecord) => Ok(Some(reader)),
857			Ok(_) => Err(Error::Corruption("Bad log record structure".into())),
858			Err(Error::Io(e)) if e.kind() == ErrorKind::UnexpectedEof => {
859				if let Some(reading) = reader.reading.take() {
860					log::debug!(target: "parity-db", "Read: End of log {}", reading.id);
861					let file = reading.file.into_inner();
862					self.cleanup_queue.write().push_back((reading.id, file));
863				}
864				Ok(None)
865			},
866			Err(e) => Err(e),
867		}
868	}
869
870	pub fn overlays(&self) -> &RwLock<LogOverlays> {
871		&self.overlays
872	}
873
874	pub fn has_log_files_to_read(&self) -> bool {
875		self.read_queue.read().len() > 0
876	}
877
878	pub fn kill_logs(&self) -> Result<()> {
879		let mut log_pool = self.log_pool.write();
880		for (id, file) in log_pool.drain(..) {
881			drop(file);
882			self.drop_log(id)?;
883		}
884		if let Some(reading) = self.reading.write().take() {
885			drop(reading.file);
886			self.drop_log(reading.id)?;
887		}
888		Ok(())
889	}
890}