1use crate::{
5 column::ColId,
6 error::{try_io, Error, Result},
7 file::MappedBytesGuard,
8 index::{Chunk as IndexChunk, TableId as IndexTableId, ENTRY_BYTES},
9 options::Options,
10 parking_lot::{RwLock, RwLockWriteGuard},
11 table::TableId as ValueTableId,
12};
13use std::{
14 cmp::min,
15 collections::{HashMap, VecDeque},
16 convert::TryInto,
17 io::{ErrorKind, Read, Seek, Write},
18 sync::atomic::{AtomicBool, AtomicU32, AtomicU64, Ordering},
19};
20
21const MAX_LOG_POOL_SIZE: usize = 16;
22const BEGIN_RECORD: u8 = 1;
23const INSERT_INDEX: u8 = 2;
24const INSERT_VALUE: u8 = 3;
25const END_RECORD: u8 = 4;
26const DROP_TABLE: u8 = 5;
27
28const INDEX_OVERLAY_RECLAIM_FACTOR: usize = 10;
30const VALUE_OVERLAY_RECLAIM_FACTOR: usize = 10;
32const VALUE_OVERLAY_MIN_RECLAIM_ITEMS: usize = 10240;
34
35#[derive(Debug)]
36pub struct InsertIndexAction {
37 pub table: IndexTableId,
38 pub index: u64,
39}
40
41#[derive(Debug)]
42pub struct InsertValueAction {
43 pub table: ValueTableId,
44 pub index: u64,
45}
46
47#[derive(Debug)]
48pub enum LogAction {
49 BeginRecord,
50 InsertIndex(InsertIndexAction),
51 InsertValue(InsertValueAction),
52 DropTable(IndexTableId),
53 EndRecord,
54}
55
56pub trait LogQuery {
57 type ValueRef<'a>: std::ops::Deref<Target = [u8]>
58 where
59 Self: 'a;
60
61 fn with_index<R, F: FnOnce(&IndexChunk) -> R>(
62 &self,
63 table: IndexTableId,
64 index: u64,
65 f: F,
66 ) -> Option<R>;
67 fn value(&self, table: ValueTableId, index: u64, dest: &mut [u8]) -> bool;
68 fn value_ref<'a>(&'a self, table: ValueTableId, index: u64) -> Option<Self::ValueRef<'a>>;
69}
70
71#[derive(Debug)]
72pub struct LogOverlays {
73 index: Vec<IndexLogOverlay>,
74 value: Vec<ValueLogOverlay>,
75 last_record_ids: Vec<u64>,
76}
77
78impl LogOverlays {
79 pub fn last_record_id(&self, col: ColId) -> u64 {
80 self.last_record_ids.get(col as usize).cloned().unwrap_or(u64::MAX)
81 }
82
83 pub fn with_columns(count: usize) -> Self {
84 Self {
85 index: (0..IndexTableId::max_log_indicies(count))
86 .map(|_| IndexLogOverlay::default())
87 .collect(),
88 value: (0..ValueTableId::max_log_tables(count))
89 .map(|_| ValueLogOverlay::default())
90 .collect(),
91 last_record_ids: (0..count).map(|_| 0).collect(),
92 }
93 }
94}
95
96impl LogQuery for RwLock<LogOverlays> {
97 type ValueRef<'a> = MappedBytesGuard<'a>;
98
99 fn with_index<R, F: FnOnce(&IndexChunk) -> R>(
100 &self,
101 table: IndexTableId,
102 index: u64,
103 f: F,
104 ) -> Option<R> {
105 (&*self.read()).with_index(table, index, f)
106 }
107
108 fn value(&self, table: ValueTableId, index: u64, dest: &mut [u8]) -> bool {
109 (&*self.read()).value(table, index, dest)
110 }
111
112 #[cfg(not(feature = "loom"))]
113 fn value_ref<'a>(&'a self, table: ValueTableId, index: u64) -> Option<Self::ValueRef<'a>> {
114 let lock =
115 parking_lot::RwLockReadGuard::try_map(self.read(), |o| o.value_ref(table, index));
116 lock.ok()
117 }
118
119 #[cfg(feature = "loom")]
120 fn value_ref<'a>(&'a self, table: ValueTableId, index: u64) -> Option<Self::ValueRef<'a>> {
121 self.read().value_ref(table, index).map(|o| MappedBytesGuard::new(o.to_vec()))
122 }
123}
124
125impl LogQuery for LogOverlays {
126 type ValueRef<'a> = &'a [u8];
127 fn with_index<R, F: FnOnce(&IndexChunk) -> R>(
128 &self,
129 table: IndexTableId,
130 index: u64,
131 f: F,
132 ) -> Option<R> {
133 self.index
134 .get(table.log_index())
135 .and_then(|o| o.map.get(&index).map(|(_id, _mask, data)| f(data)))
136 }
137
138 fn value(&self, table: ValueTableId, index: u64, dest: &mut [u8]) -> bool {
139 let s = self;
140 if let Some(d) = s
141 .value
142 .get(table.log_index())
143 .and_then(|o| o.map.get(&index).map(|(_id, data)| data))
144 {
145 let len = dest.len().min(d.len());
146 dest[0..len].copy_from_slice(&d[0..len]);
147 true
148 } else {
149 false
150 }
151 }
152 fn value_ref<'a>(&'a self, table: ValueTableId, index: u64) -> Option<Self::ValueRef<'a>> {
153 self.value
154 .get(table.log_index())
155 .and_then(|o| o.map.get(&index).map(|(_id, data)| data.as_slice()))
156 }
157}
158
159#[derive(Debug, Default)]
160pub struct Cleared {
161 index: Vec<(IndexTableId, u64)>,
162 values: Vec<(ValueTableId, u64)>,
163}
164
165#[derive(Debug)]
166pub struct LogReader<'a> {
167 reading: RwLockWriteGuard<'a, Option<Reading>>,
168 record_id: u64,
169 read_bytes: u64,
170 crc32: crc32fast::Hasher,
171 validate: bool,
172 cleared: Cleared,
173}
174
175impl<'a> LogReader<'a> {
176 pub fn record_id(&self) -> u64 {
177 self.record_id
178 }
179
180 fn new(reading: RwLockWriteGuard<'a, Option<Reading>>, validate: bool) -> LogReader<'a> {
181 LogReader {
182 cleared: Default::default(),
183 reading,
184 record_id: 0,
185 read_bytes: 0,
186 crc32: crc32fast::Hasher::new(),
187 validate,
188 }
189 }
190
191 pub fn reset(&mut self) -> Result<()> {
192 self.cleared = Default::default();
193 try_io!(self
194 .reading
195 .as_mut()
196 .unwrap()
197 .file
198 .seek(std::io::SeekFrom::Current(-(self.read_bytes as i64))));
199 self.read_bytes = 0;
200 self.record_id = 0;
201 self.crc32 = crc32fast::Hasher::new();
202 Ok(())
203 }
204
205 pub fn next(&mut self) -> Result<LogAction> {
206 let mut read_buf = |size, buf: &mut [u8; 8]| -> Result<()> {
207 try_io!(self.reading.as_mut().unwrap().file.read_exact(&mut buf[0..size]));
208 self.read_bytes += size as u64;
209 if self.validate {
210 self.crc32.update(&buf[0..size]);
211 }
212 Ok(())
213 };
214
215 let mut buf = [0u8; 8];
216 read_buf(1, &mut buf)?;
217 match buf[0] {
218 BEGIN_RECORD => {
219 read_buf(8, &mut buf)?;
220 let record_id = u64::from_le_bytes(buf);
221 self.record_id = record_id;
222 Ok(LogAction::BeginRecord)
223 },
224 INSERT_INDEX => {
225 read_buf(2, &mut buf)?;
226 let table =
227 IndexTableId::from_u16(u16::from_le_bytes(buf[0..2].try_into().unwrap()));
228 read_buf(8, &mut buf)?;
229 let index = u64::from_le_bytes(buf);
230 self.cleared.index.push((table, index));
231 Ok(LogAction::InsertIndex(InsertIndexAction { table, index }))
232 },
233 INSERT_VALUE => {
234 read_buf(2, &mut buf)?;
235 let table =
236 ValueTableId::from_u16(u16::from_le_bytes(buf[0..2].try_into().unwrap()));
237 read_buf(8, &mut buf)?;
238 let index = u64::from_le_bytes(buf);
239 self.cleared.values.push((table, index));
240 Ok(LogAction::InsertValue(InsertValueAction { table, index }))
241 },
242 END_RECORD => {
243 try_io!(self.reading.as_mut().unwrap().file.read_exact(&mut buf[0..4]));
244 self.read_bytes += 4;
245 if self.validate {
246 let checksum = u32::from_le_bytes(buf[0..4].try_into().unwrap());
247 let expected = std::mem::take(&mut self.crc32).finalize();
248 log::trace!(target: "parity-db",
249 "Read end of record, checksum={:#x}, expected={:#x}",
250 checksum,
251 expected,
252 );
253 if checksum != expected {
254 return Err(Error::Corruption("Log record CRC-32 mismatch".into()))
255 }
256 } else {
257 log::trace!(target: "parity-db", "Read end of record");
258 }
259 Ok(LogAction::EndRecord)
260 },
261 DROP_TABLE => {
262 read_buf(2, &mut buf)?;
263 let table =
264 IndexTableId::from_u16(u16::from_le_bytes(buf[0..2].try_into().unwrap()));
265 Ok(LogAction::DropTable(table))
266 },
267 _ => Err(Error::Corruption("Bad log entry type".into())),
268 }
269 }
270
271 pub fn read(&mut self, buf: &mut [u8]) -> Result<()> {
272 try_io!(self.reading.as_mut().unwrap().file.read_exact(buf));
273 self.read_bytes += buf.len() as u64;
274 if self.validate {
275 self.crc32.update(buf);
276 }
277 Ok(())
278 }
279
280 pub fn drain(self) -> Cleared {
281 self.cleared
282 }
283
284 pub fn read_bytes(&self) -> u64 {
285 self.read_bytes
286 }
287}
288
289#[derive(Debug)]
290pub struct LogChange {
291 local_index: HashMap<IndexTableId, IndexLogOverlay>,
292 local_values: HashMap<ValueTableId, ValueLogOverlayLocal>,
293 record_id: u64,
294 dropped_tables: Vec<IndexTableId>,
295}
296
297impl LogChange {
298 fn new(record_id: u64) -> LogChange {
299 LogChange {
300 local_index: Default::default(),
301 local_values: Default::default(),
302 dropped_tables: Default::default(),
303 record_id,
304 }
305 }
306
307 pub fn local_values_changes(&self, id: ValueTableId) -> Option<&ValueLogOverlayLocal> {
308 self.local_values.get(&id)
309 }
310
311 fn flush_to_file(self, file: &mut std::io::BufWriter<std::fs::File>) -> Result<FlushedLog> {
312 let mut crc32 = crc32fast::Hasher::new();
313 let mut bytes: u64 = 0;
314
315 let mut write = |buf: &[u8]| -> Result<()> {
316 try_io!(file.write_all(buf));
317 crc32.update(buf);
318 bytes += buf.len() as u64;
319 Ok(())
320 };
321
322 write(&BEGIN_RECORD.to_le_bytes())?;
323 write(&self.record_id.to_le_bytes())?;
324
325 for (id, overlay) in self.local_index.iter() {
326 for (index, (_, modified_entries_mask, chunk)) in overlay.map.iter() {
327 write(INSERT_INDEX.to_le_bytes().as_ref())?;
328 write(&id.as_u16().to_le_bytes())?;
329 write(&index.to_le_bytes())?;
330 write(&modified_entries_mask.to_le_bytes())?;
331 let mut mask = *modified_entries_mask;
332 while mask != 0 {
333 let i = mask.trailing_zeros();
334 mask &= !(1 << i);
335 write(&chunk.0[i as usize * ENTRY_BYTES..(i as usize + 1) * ENTRY_BYTES])?;
336 }
337 }
338 }
339 for (id, overlay) in self.local_values.iter() {
340 for (index, (_, value)) in overlay.map.iter() {
341 write(INSERT_VALUE.to_le_bytes().as_ref())?;
342 write(&id.as_u16().to_le_bytes())?;
343 write(&index.to_le_bytes())?;
344 write(value)?;
345 }
346 }
347 for id in self.dropped_tables.iter() {
348 log::debug!(target: "parity-db", "Finalizing drop {}", id);
349 write(DROP_TABLE.to_le_bytes().as_ref())?;
350 write(&id.as_u16().to_le_bytes())?;
351 }
352 write(&END_RECORD.to_le_bytes())?;
353 let checksum: u32 = crc32.finalize();
354 try_io!(file.write_all(&checksum.to_le_bytes()));
355 bytes += 4;
356 try_io!(file.flush());
357 Ok(FlushedLog { index: self.local_index, values: self.local_values, bytes })
358 }
359}
360
361#[derive(Debug)]
362struct FlushedLog {
363 index: HashMap<IndexTableId, IndexLogOverlay>,
364 values: HashMap<ValueTableId, ValueLogOverlayLocal>,
365 bytes: u64,
366}
367
368#[derive(Debug)]
369pub struct LogWriter<'a> {
370 overlays: &'a RwLock<LogOverlays>,
371 log: LogChange,
372}
373
374impl<'a> LogWriter<'a> {
375 pub fn new(overlays: &'a RwLock<LogOverlays>, record_id: u64) -> LogWriter<'a> {
376 LogWriter { overlays, log: LogChange::new(record_id) }
377 }
378
379 pub fn record_id(&self) -> u64 {
380 self.log.record_id
381 }
382
383 pub fn insert_index(&mut self, table: IndexTableId, index: u64, sub: u8, data: IndexChunk) {
384 match self.log.local_index.entry(table).or_default().map.entry(index) {
385 std::collections::hash_map::Entry::Occupied(mut entry) => {
386 *entry.get_mut() = (self.log.record_id, entry.get().1 | (1 << sub), data);
387 },
388 std::collections::hash_map::Entry::Vacant(entry) => {
389 entry.insert((self.log.record_id, 1 << sub, data));
390 },
391 }
392 }
393
394 pub fn insert_value(&mut self, table: ValueTableId, index: u64, data: Vec<u8>) {
395 self.log
396 .local_values
397 .entry(table)
398 .or_default()
399 .map
400 .insert(index, (self.log.record_id, data));
401 }
402
403 pub fn drop_table(&mut self, id: IndexTableId) {
404 self.log.dropped_tables.push(id);
405 }
406
407 pub fn drain(self) -> LogChange {
408 self.log
409 }
410}
411
412pub enum LogWriterValueGuard<'a> {
413 Local(&'a [u8]),
414 Overlay(MappedBytesGuard<'a>),
415}
416
417impl std::ops::Deref for LogWriterValueGuard<'_> {
418 type Target = [u8];
419 fn deref(&self) -> &[u8] {
420 match self {
421 LogWriterValueGuard::Local(data) => data,
422 LogWriterValueGuard::Overlay(data) => data.deref(),
423 }
424 }
425}
426
427impl<'q> LogQuery for LogWriter<'q> {
428 type ValueRef<'a> = LogWriterValueGuard<'a> where Self: 'a;
429 fn with_index<R, F: FnOnce(&IndexChunk) -> R>(
430 &self,
431 table: IndexTableId,
432 index: u64,
433 f: F,
434 ) -> Option<R> {
435 match self
436 .log
437 .local_index
438 .get(&table)
439 .and_then(|o| o.map.get(&index).map(|(_id, _mask, data)| data))
440 {
441 Some(data) => Some(f(data)),
442 None => self.overlays.with_index(table, index, f),
443 }
444 }
445
446 fn value(&self, table: ValueTableId, index: u64, dest: &mut [u8]) -> bool {
447 if let Some(d) = self
448 .log
449 .local_values
450 .get(&table)
451 .and_then(|o| o.map.get(&index).map(|(_id, data)| data))
452 {
453 let len = dest.len().min(d.len());
454 dest[0..len].copy_from_slice(&d[0..len]);
455 true
456 } else {
457 self.overlays.value(table, index, dest)
458 }
459 }
460 fn value_ref<'v>(&'v self, table: ValueTableId, index: u64) -> Option<Self::ValueRef<'v>> {
461 self.log
462 .local_values
463 .get(&table)
464 .and_then(|o| {
465 o.map.get(&index).map(|(_id, data)| LogWriterValueGuard::Local(data.as_slice()))
466 })
467 .or_else(|| {
468 self.overlays
469 .value_ref(table, index)
470 .map(|data| LogWriterValueGuard::Overlay(data))
471 })
472 }
473}
474
475#[derive(Debug, Default, Clone)]
477pub struct IdentityHash(u64);
478pub type BuildIdHash = std::hash::BuildHasherDefault<IdentityHash>;
479
480impl std::hash::Hasher for IdentityHash {
481 fn write(&mut self, _: &[u8]) {
482 unreachable!()
483 }
484 fn write_u8(&mut self, _: u8) {
485 unreachable!()
486 }
487 fn write_u16(&mut self, _: u16) {
488 unreachable!()
489 }
490 fn write_u32(&mut self, _: u32) {
491 unreachable!()
492 }
493 fn write_u64(&mut self, n: u64) {
494 self.0 = n
495 }
496 fn write_usize(&mut self, _: usize) {
497 unreachable!()
498 }
499 fn write_i8(&mut self, _: i8) {
500 unreachable!()
501 }
502 fn write_i16(&mut self, _: i16) {
503 unreachable!()
504 }
505 fn write_i32(&mut self, _: i32) {
506 unreachable!()
507 }
508 fn write_i64(&mut self, _: i64) {
509 unreachable!()
510 }
511 fn write_isize(&mut self, _: isize) {
512 unreachable!()
513 }
514 fn finish(&self) -> u64 {
515 self.0
516 }
517}
518
519#[derive(Debug, Default)]
520pub struct IndexLogOverlay {
521 pub map: HashMap<u64, (u64, u64, IndexChunk)>, }
523
524#[derive(Debug, Default)]
526pub struct ValueLogOverlay {
527 pub map: HashMap<u64, (u64, Vec<u8>)>, }
529#[derive(Debug, Default)]
530pub struct ValueLogOverlayLocal {
531 pub map: HashMap<u64, (u64, Vec<u8>), BuildIdHash>, }
533
534#[derive(Debug)]
535struct Appending {
536 id: u32,
537 file: std::io::BufWriter<std::fs::File>,
538 size: u64,
539}
540
541#[derive(Debug)]
542struct Reading {
543 id: u32,
544 file: std::io::BufReader<std::fs::File>,
545}
546
547#[derive(Debug)]
548pub struct Log {
549 overlays: RwLock<LogOverlays>,
550 appending: RwLock<Option<Appending>>,
551 reading: RwLock<Option<Reading>>,
552 read_queue: RwLock<VecDeque<(u32, std::fs::File)>>,
553 next_record_id: AtomicU64,
554 dirty: AtomicBool,
555 log_pool: RwLock<VecDeque<(u32, std::fs::File)>>,
556 cleanup_queue: RwLock<VecDeque<(u32, std::fs::File)>>,
557 replay_queue: RwLock<VecDeque<(u32, u64, std::fs::File)>>,
558 path: std::path::PathBuf,
559 next_log_id: AtomicU32,
560 sync: bool,
561}
562
563impl Log {
564 pub fn open(options: &Options) -> Result<Log> {
565 let path = options.path.clone();
566 let mut logs = VecDeque::new();
567 let mut max_log_id = 0;
568 for entry in try_io!(std::fs::read_dir(&path)) {
569 let entry = try_io!(entry);
570 if let Some(name) = entry.file_name().as_os_str().to_str() {
571 if try_io!(entry.metadata()).is_file() && name.starts_with("log") {
572 if let Ok(nlog) = std::str::FromStr::from_str(&name[3..]) {
573 let path = Self::log_path(&path, nlog);
574 let (file, record_id) = Self::open_log_file(&path)?;
575 if let Some(record_id) = record_id {
576 log::debug!(target: "parity-db", "Opened log {}, record {}", nlog, record_id);
577 logs.push_back((nlog, record_id, file));
578 if nlog > max_log_id {
579 max_log_id = nlog
580 }
581 } else {
582 log::debug!(target: "parity-db", "Removing log {}", nlog);
583 drop(file);
584 try_io!(std::fs::remove_file(&path));
585 }
586 }
587 }
588 }
589 }
590 logs.make_contiguous().sort_by_key(|(_id, record_id, _)| *record_id);
591 let next_log_id = if logs.is_empty() { 0 } else { max_log_id + 1 };
592
593 Ok(Log {
594 overlays: RwLock::new(LogOverlays::with_columns(options.columns.len())),
595 appending: RwLock::new(None),
596 reading: RwLock::new(None),
597 read_queue: RwLock::default(),
598 next_record_id: AtomicU64::new(1),
599 next_log_id: AtomicU32::new(next_log_id),
600 dirty: AtomicBool::new(true),
601 sync: options.sync_wal,
602 replay_queue: RwLock::new(logs),
603 cleanup_queue: RwLock::default(),
604 log_pool: RwLock::default(),
605 path,
606 })
607 }
608
609 fn log_path(root: &std::path::Path, id: u32) -> std::path::PathBuf {
610 let mut path: std::path::PathBuf = root.into();
611 path.push(format!("log{id}"));
612 path
613 }
614
615 pub fn replay_record_id(&self) -> Option<u64> {
616 self.replay_queue.read().front().map(|(_id, record_id, _)| *record_id)
617 }
618
619 pub fn open_log_file(path: &std::path::Path) -> Result<(std::fs::File, Option<u64>)> {
620 let mut file = try_io!(std::fs::OpenOptions::new().read(true).write(true).open(path));
621 if try_io!(file.metadata()).len() == 0 {
622 return Ok((file, None))
623 }
624 match Self::read_first_record_id(&mut file) {
625 Err(Error::Io(e)) if e.kind() == ErrorKind::UnexpectedEof => {
626 log::error!(target: "parity-db", "Opened existing log {}. No first record id found", path.display());
627 Ok((file, None))
628 },
629 Err(e) => Err(e),
630 Ok(id) => {
631 try_io!(file.rewind());
632 log::debug!(target: "parity-db", "Opened existing log {}, first record_id = {}", path.display(), id);
633 Ok((file, Some(id)))
634 },
635 }
636 }
637
638 fn read_first_record_id(file: &mut std::fs::File) -> Result<u64> {
639 let mut buf = [0; 9];
640 try_io!(file.read_exact(&mut buf));
641 Ok(u64::from_le_bytes(buf[1..].try_into().unwrap()))
642 }
643
644 fn drop_log(&self, id: u32) -> Result<()> {
645 log::debug!(target: "parity-db", "Drop log {}", id);
646 let path = Self::log_path(&self.path, id);
647 try_io!(std::fs::remove_file(path));
648 Ok(())
649 }
650
651 pub fn clear_replay_logs(&self) {
652 if let Some(reading) = self.reading.write().take() {
653 self.cleanup_queue.write().push_back((reading.id, reading.file.into_inner()));
654 }
655 for (id, _, file) in self.replay_queue.write().drain(0..) {
656 self.cleanup_queue.write().push_back((id, file));
657 }
658 let mut overlays = self.overlays.write();
659 for o in overlays.index.iter_mut() {
660 o.map.clear();
661 }
662 for o in overlays.value.iter_mut() {
663 o.map.clear();
664 }
665 for r in overlays.last_record_ids.iter_mut() {
666 *r = 0;
667 }
668 self.dirty.store(false, Ordering::Relaxed);
669 }
670
671 pub fn begin_record(&self) -> LogWriter<'_> {
672 let id = self.next_record_id.fetch_add(1, Ordering::Relaxed);
673 LogWriter::new(&self.overlays, id)
674 }
675
676 pub fn end_record(&self, log: LogChange) -> Result<u64> {
677 assert_eq!(log.record_id + 1, self.next_record_id.load(Ordering::Relaxed));
678 let record_id = log.record_id;
679 let mut appending = self.appending.write();
680 if appending.is_none() {
681 let (id, file) = if let Some((id, file)) = self.log_pool.write().pop_front() {
683 log::debug!(target: "parity-db", "Flush: Activated pool writer {}", id);
684 (id, file)
685 } else {
686 let id = self.next_log_id.fetch_add(1, Ordering::SeqCst);
688 let path = Self::log_path(&self.path, id);
689 let file = try_io!(std::fs::OpenOptions::new()
690 .create(true)
691 .read(true)
692 .write(true)
693 .open(path));
694 log::debug!(target: "parity-db", "Flush: Activated new writer {}", id);
695 (id, file)
696 };
697 *appending = Some(Appending { size: 0, file: std::io::BufWriter::new(file), id });
698 }
699 let appending = appending.as_mut().unwrap();
700 let FlushedLog { index, values, bytes } = log.flush_to_file(&mut appending.file)?;
701 let mut overlays = self.overlays.write();
702 let mut total_index = 0;
703 for (id, overlay) in index.into_iter() {
704 total_index += overlay.map.len();
705 overlays.index[id.log_index()].map.extend(overlay.map.into_iter());
706 }
707 let mut total_value = 0;
708 for (id, overlay) in values.into_iter() {
709 total_value += overlay.map.len();
710 overlays.last_record_ids[id.col() as usize] = record_id;
711 overlays.value[id.log_index()].map.extend(overlay.map.into_iter());
712 }
713
714 log::debug!(
715 target: "parity-db",
716 "Finalizing log record {} ({} index, {} value)",
717 record_id,
718 total_index,
719 total_value,
720 );
721 appending.size += bytes;
722 self.dirty.store(true, Ordering::Relaxed);
723 Ok(bytes)
724 }
725
726 pub fn end_read(&self, cleared: Cleared, record_id: u64) {
727 if record_id >= self.next_record_id.load(Ordering::Relaxed) {
728 self.next_record_id.store(record_id + 1, Ordering::Relaxed);
729 }
730 let mut overlays = self.overlays.write();
731 for (table, index) in cleared.index.into_iter() {
732 if let Some(ref mut overlay) = overlays.index.get_mut(table.log_index()) {
733 if let std::collections::hash_map::Entry::Occupied(e) = overlay.map.entry(index) {
734 if e.get().0 == record_id {
735 e.remove_entry();
736 }
737 }
738 }
739 }
740 for (table, index) in cleared.values.into_iter() {
741 if let Some(ref mut overlay) = overlays.value.get_mut(table.log_index()) {
742 if let std::collections::hash_map::Entry::Occupied(e) = overlay.map.entry(index) {
743 if e.get().0 == record_id {
744 e.remove_entry();
745 }
746 }
747 }
748 }
749 for (i, o) in overlays.index.iter_mut().enumerate() {
751 if o.map.capacity() > o.map.len() * INDEX_OVERLAY_RECLAIM_FACTOR {
752 log::trace!(
753 "Schrinking index overlay {}: {}/{}",
754 IndexTableId::from_log_index(i),
755 o.map.len(),
756 o.map.capacity(),
757 );
758 o.map.shrink_to_fit();
759 }
760 }
761 for (i, o) in overlays.value.iter_mut().enumerate() {
762 if o.map.capacity() > VALUE_OVERLAY_MIN_RECLAIM_ITEMS &&
763 o.map.capacity() > o.map.len() * VALUE_OVERLAY_RECLAIM_FACTOR
764 {
765 log::trace!(
766 "Schrinking value overlay {}: {}/{}",
767 ValueTableId::from_log_index(i),
768 o.map.len(),
769 o.map.capacity(),
770 );
771 o.map.shrink_to_fit();
772 }
773 }
774 }
775
776 pub fn flush_one(&self, min_size: u64) -> Result<bool> {
777 let cur_size = self.appending.read().as_ref().map_or(0, |r| r.size);
779 if cur_size > min_size {
780 if let Some(to_flush) = self.appending.write().take() {
781 let file = try_io!(to_flush.file.into_inner().map_err(|e| e.into_error()));
782 if self.sync {
783 log::debug!(target: "parity-db", "Flush: Flushing log to disk");
784 try_io!(file.sync_data());
785 log::debug!(target: "parity-db", "Flush: Flushing log completed");
786 }
787 self.read_queue.write().push_back((to_flush.id, file));
788 }
789 return Ok(true)
790 }
791 Ok(false)
792 }
793
794 pub fn replay_next(&self) -> Result<Option<u32>> {
795 let mut reading = self.reading.write();
796 {
797 if let Some(reading) = reading.take() {
798 log::debug!(target: "parity-db", "Replay: Activated log cleanup {}", reading.id);
799 let file = reading.file.into_inner();
800 self.cleanup_queue.write().push_back((reading.id, file));
801 }
802 }
803 if let Some((id, _record_id, file)) = self.replay_queue.write().pop_front() {
804 log::debug!(target: "parity-db", "Replay: Activated log reader {}", id);
805 *reading = Some(Reading { id, file: std::io::BufReader::new(file) });
806 Ok(Some(id))
807 } else {
808 Ok(None)
809 }
810 }
811
812 pub fn clean_logs(&self, max_count: usize) -> Result<bool> {
813 let mut cleaned: Vec<_> = {
814 let mut queue = self.cleanup_queue.write();
815 let count = min(max_count, queue.len());
816 queue.drain(0..count).collect()
817 };
818 for (id, ref mut file) in cleaned.iter_mut() {
819 log::debug!(target: "parity-db", "Cleaned: {}", id);
820 try_io!(file.rewind());
821 try_io!(file.set_len(0));
822 file.sync_all().map_err(Error::Io)?;
823 }
824 let mut pool = self.log_pool.write();
826 pool.extend(cleaned);
827 pool.make_contiguous().sort_by_key(|(id, _)| *id);
829 if pool.len() > MAX_LOG_POOL_SIZE {
830 let removed = pool.drain(MAX_LOG_POOL_SIZE..);
831 for (id, file) in removed {
832 drop(file);
833 self.drop_log(id)?;
834 }
835 }
836 Ok(!self.cleanup_queue.read().is_empty())
837 }
838
839 pub fn num_dirty_logs(&self) -> usize {
840 self.cleanup_queue.read().len()
841 }
842
843 pub fn read_next(&self, validate: bool) -> Result<Option<LogReader<'_>>> {
844 let mut reading = self.reading.write();
845 if reading.is_none() {
846 if let Some((id, mut file)) = self.read_queue.write().pop_front() {
847 try_io!(file.rewind());
848 *reading = Some(Reading { id, file: std::io::BufReader::new(file) });
849 } else {
850 log::trace!(target: "parity-db", "No active reader");
851 return Ok(None)
852 }
853 }
854 let mut reader = LogReader::new(reading, validate);
855 match reader.next() {
856 Ok(LogAction::BeginRecord) => Ok(Some(reader)),
857 Ok(_) => Err(Error::Corruption("Bad log record structure".into())),
858 Err(Error::Io(e)) if e.kind() == ErrorKind::UnexpectedEof => {
859 if let Some(reading) = reader.reading.take() {
860 log::debug!(target: "parity-db", "Read: End of log {}", reading.id);
861 let file = reading.file.into_inner();
862 self.cleanup_queue.write().push_back((reading.id, file));
863 }
864 Ok(None)
865 },
866 Err(e) => Err(e),
867 }
868 }
869
870 pub fn overlays(&self) -> &RwLock<LogOverlays> {
871 &self.overlays
872 }
873
874 pub fn has_log_files_to_read(&self) -> bool {
875 self.read_queue.read().len() > 0
876 }
877
878 pub fn kill_logs(&self) -> Result<()> {
879 let mut log_pool = self.log_pool.write();
880 for (id, file) in log_pool.drain(..) {
881 drop(file);
882 self.drop_log(id)?;
883 }
884 if let Some(reading) = self.reading.write().take() {
885 drop(reading.file);
886 self.drop_log(reading.id)?;
887 }
888 Ok(())
889 }
890}