parity_db/
stats.rs

1// Copyright 2021-2022 Parity Technologies (UK) Ltd.
2// This file is dual-licensed as Apache-2.0 or MIT.
3
4use crate::{column::ColId, table::SIZE_TIERS};
5/// Database statistics.
6use std::sync::atomic::{AtomicI64, AtomicU32, AtomicU64, Ordering};
7use std::{
8	io::{Cursor, Read, Write},
9	iter,
10};
11
12// store up to value of size HISTOGRAM_BUCKETS * 2 ^ HISTOGRAM_BUCKET_BITS,
13// that is 32ko
14const HISTOGRAM_BUCKETS: usize = 1024;
15const HISTOGRAM_BUCKET_BITS: u8 = 5;
16
17pub const TOTAL_SIZE: usize =
18	4 * HISTOGRAM_BUCKETS + 8 * HISTOGRAM_BUCKETS + 8 * SIZE_TIERS + 8 * 13;
19
20// TODO: get rid of the struct and use index meta directly.
21#[derive(Debug)]
22pub struct ColumnStats {
23	value_histogram: Vec<AtomicU32>,
24	query_histogram: Vec<AtomicU64>, // Per size tier
25	oversized: AtomicU64,
26	oversized_bytes: AtomicU64,
27	total_values: AtomicU64,
28	total_bytes: AtomicU64,
29	commits: AtomicU64,
30	inserted_new: AtomicU64,
31	inserted_overwrite: AtomicU64,
32	reference_increase_hit: AtomicU64,
33	reference_increase_miss: AtomicU64,
34	removed_hit: AtomicU64,
35	removed_miss: AtomicU64,
36	queries_miss: AtomicU64,
37	uncompressed_bytes: AtomicU64,
38	compression_delta: Vec<AtomicI64>,
39}
40
41/// Database statistics summary.
42pub struct StatSummary {
43	/// Per column statistics.
44	/// Statistics may be available only for some columns.
45	pub columns: Vec<Option<ColumnStatSummary>>,
46}
47
48/// Column statistics summary.
49pub struct ColumnStatSummary {
50	/// Current number of values in the column.
51	pub total_values: u64,
52	/// Total size of (compressed) values in the column. This does not include key size and any
53	/// other overhead.
54	pub total_bytes: u64,
55	/// Total size of values in the column before compression. This does not include key size and
56	/// any other overhead.
57	pub uncompressed_bytes: u64,
58}
59
60fn read_u32(cursor: &mut Cursor<&[u8]>) -> AtomicU32 {
61	let mut buf = [0u8; 4];
62	cursor.read_exact(&mut buf).expect("Incorrect stats buffer");
63	AtomicU32::new(u32::from_le_bytes(buf))
64}
65
66fn read_u64(cursor: &mut Cursor<&[u8]>) -> AtomicU64 {
67	let mut buf = [0u8; 8];
68	cursor.read_exact(&mut buf).expect("Incorrect stats buffer");
69	AtomicU64::new(u64::from_le_bytes(buf))
70}
71
72fn read_i64(cursor: &mut Cursor<&[u8]>) -> AtomicI64 {
73	let mut buf = [0u8; 8];
74	cursor.read_exact(&mut buf).expect("Incorrect stats buffer");
75	AtomicI64::new(i64::from_le_bytes(buf))
76}
77
78fn write_u32(cursor: &mut Cursor<&mut [u8]>, val: &AtomicU32) {
79	cursor
80		.write_all(&val.load(Ordering::Relaxed).to_le_bytes())
81		.expect("Incorrect stats buffer");
82}
83
84fn write_u64(cursor: &mut Cursor<&mut [u8]>, val: &AtomicU64) {
85	cursor
86		.write_all(&val.load(Ordering::Relaxed).to_le_bytes())
87		.expect("Incorrect stats buffer");
88}
89
90fn write_i64(cursor: &mut Cursor<&mut [u8]>, val: &AtomicI64) {
91	cursor
92		.write_all(&val.load(Ordering::Relaxed).to_le_bytes())
93		.expect("Incorrect stats buffer");
94}
95
96fn value_histogram_index(size: u32) -> Option<usize> {
97	let bucket = size as usize >> HISTOGRAM_BUCKET_BITS;
98	if bucket < HISTOGRAM_BUCKETS {
99		Some(bucket)
100	} else {
101		None
102	}
103}
104
105impl ColumnStats {
106	pub fn from_slice(data: &[u8]) -> ColumnStats {
107		let mut cursor = Cursor::new(data);
108		let cursor = &mut cursor;
109
110		let value_histogram =
111			iter::repeat_with(|| read_u32(cursor)).take(HISTOGRAM_BUCKETS).collect();
112		let query_histogram = iter::repeat_with(|| read_u64(cursor)).take(SIZE_TIERS).collect();
113		let oversized = read_u64(cursor);
114		let oversized_bytes = read_u64(cursor);
115		let total_values = read_u64(cursor);
116		let total_bytes = read_u64(cursor);
117		let commits = read_u64(cursor);
118		let inserted_new = read_u64(cursor);
119		let inserted_overwrite = read_u64(cursor);
120		let removed_hit = read_u64(cursor);
121		let removed_miss = read_u64(cursor);
122		let queries_miss = read_u64(cursor);
123		let uncompressed_bytes = read_u64(cursor);
124		let compression_delta =
125			iter::repeat_with(|| read_i64(cursor)).take(HISTOGRAM_BUCKETS).collect();
126		let reference_increase_hit = read_u64(cursor);
127		let reference_increase_miss = read_u64(cursor);
128
129		ColumnStats {
130			value_histogram,
131			query_histogram,
132			oversized,
133			oversized_bytes,
134			total_values,
135			total_bytes,
136			commits,
137			inserted_new,
138			inserted_overwrite,
139			reference_increase_hit,
140			reference_increase_miss,
141			removed_hit,
142			removed_miss,
143			queries_miss,
144			uncompressed_bytes,
145			compression_delta,
146		}
147	}
148
149	pub fn empty() -> ColumnStats {
150		ColumnStats {
151			value_histogram: iter::repeat_with(Default::default).take(HISTOGRAM_BUCKETS).collect(),
152			query_histogram: iter::repeat_with(Default::default).take(SIZE_TIERS).collect(),
153			oversized: Default::default(),
154			oversized_bytes: Default::default(),
155			total_values: Default::default(),
156			total_bytes: Default::default(),
157			commits: Default::default(),
158			inserted_new: Default::default(),
159			inserted_overwrite: Default::default(),
160			reference_increase_hit: Default::default(),
161			reference_increase_miss: Default::default(),
162			removed_hit: Default::default(),
163			removed_miss: Default::default(),
164			queries_miss: Default::default(),
165			uncompressed_bytes: Default::default(),
166			compression_delta: iter::repeat_with(Default::default)
167				.take(HISTOGRAM_BUCKETS)
168				.collect(),
169		}
170	}
171
172	pub fn clear(&self) {
173		// This may break stat consistency, but we don't care too much.
174		for v in &self.value_histogram {
175			v.store(0, Ordering::Relaxed)
176		}
177		for v in &self.query_histogram {
178			v.store(0, Ordering::Relaxed)
179		}
180		self.oversized.store(0, Ordering::Relaxed);
181		self.oversized_bytes.store(0, Ordering::Relaxed);
182		self.total_values.store(0, Ordering::Relaxed);
183		self.total_bytes.store(0, Ordering::Relaxed);
184		self.commits.store(0, Ordering::Relaxed);
185		self.inserted_new.store(0, Ordering::Relaxed);
186		self.inserted_overwrite.store(0, Ordering::Relaxed);
187		self.reference_increase_hit.store(0, Ordering::Relaxed);
188		self.reference_increase_miss.store(0, Ordering::Relaxed);
189		self.removed_hit.store(0, Ordering::Relaxed);
190		self.removed_miss.store(0, Ordering::Relaxed);
191		self.queries_miss.store(0, Ordering::Relaxed);
192		self.uncompressed_bytes.store(0, Ordering::Relaxed);
193		for v in &self.compression_delta {
194			v.store(0, Ordering::Relaxed)
195		}
196	}
197
198	pub fn summary(&self) -> ColumnStatSummary {
199		ColumnStatSummary {
200			total_values: self.total_values.load(Ordering::Relaxed),
201			total_bytes: self.total_bytes.load(Ordering::Relaxed),
202			uncompressed_bytes: self.uncompressed_bytes.load(Ordering::Relaxed),
203		}
204	}
205
206	pub fn to_slice(&self, data: &mut [u8]) {
207		let mut cursor = Cursor::new(data);
208		for item in &self.value_histogram {
209			write_u32(&mut cursor, item);
210		}
211		for item in &self.query_histogram {
212			write_u64(&mut cursor, item);
213		}
214		write_u64(&mut cursor, &self.oversized);
215		write_u64(&mut cursor, &self.oversized_bytes);
216		write_u64(&mut cursor, &self.total_values);
217		write_u64(&mut cursor, &self.total_bytes);
218		write_u64(&mut cursor, &self.commits);
219		write_u64(&mut cursor, &self.inserted_new);
220		write_u64(&mut cursor, &self.inserted_overwrite);
221		write_u64(&mut cursor, &self.removed_hit);
222		write_u64(&mut cursor, &self.removed_miss);
223		write_u64(&mut cursor, &self.queries_miss);
224		write_u64(&mut cursor, &self.uncompressed_bytes);
225		for item in &self.compression_delta {
226			write_i64(&mut cursor, item);
227		}
228		write_u64(&mut cursor, &self.reference_increase_hit);
229		write_u64(&mut cursor, &self.reference_increase_miss);
230	}
231
232	pub fn write_stats_text(&self, writer: &mut impl Write, col: ColId) -> std::io::Result<()> {
233		writeln!(writer, "Column {col}")?;
234		writeln!(writer, "Total values: {}", self.total_values.load(Ordering::Relaxed))?;
235		writeln!(writer, "Total bytes: {}", self.total_bytes.load(Ordering::Relaxed))?;
236		writeln!(writer, "Total oversized values: {}", self.oversized.load(Ordering::Relaxed))?;
237		writeln!(
238			writer,
239			"Total oversized bytes: {}",
240			self.oversized_bytes.load(Ordering::Relaxed)
241		)?;
242		writeln!(writer, "Total commits: {}", self.commits.load(Ordering::Relaxed))?;
243		writeln!(writer, "New value insertions: {}", self.inserted_new.load(Ordering::Relaxed))?;
244		writeln!(
245			writer,
246			"Existing value insertions: {}",
247			self.inserted_overwrite.load(Ordering::Relaxed)
248		)?;
249		writeln!(
250			writer,
251			"Reference increases: {}",
252			self.reference_increase_hit.load(Ordering::Relaxed)
253		)?;
254		writeln!(
255			writer,
256			"Missed reference increases: {}",
257			self.reference_increase_miss.load(Ordering::Relaxed)
258		)?;
259		writeln!(writer, "Removals: {}", self.removed_hit.load(Ordering::Relaxed))?;
260		writeln!(writer, "Missed removals: {}", self.removed_miss.load(Ordering::Relaxed))?;
261		writeln!(
262			writer,
263			"Uncompressed bytes: {}",
264			self.uncompressed_bytes.load(Ordering::Relaxed)
265		)?;
266		writeln!(writer, "Compression deltas:")?;
267		for i in 0..HISTOGRAM_BUCKETS {
268			let count = self.value_histogram[i].load(Ordering::Relaxed);
269			let delta = self.compression_delta[i].load(Ordering::Relaxed);
270			if count != 0 && delta != 0 {
271				writeln!(
272					writer,
273					"    {}-{}: {}",
274					i << HISTOGRAM_BUCKET_BITS,
275					(((i + 1) << HISTOGRAM_BUCKET_BITS) - 1),
276					delta
277				)?;
278			}
279		}
280		write!(writer, "Queries per size tier: [")?;
281		for i in 0..SIZE_TIERS {
282			if i == SIZE_TIERS - 1 {
283				writeln!(writer, "{}]", self.query_histogram[i].load(Ordering::Relaxed))?;
284			} else {
285				write!(writer, "{}, ", self.query_histogram[i].load(Ordering::Relaxed))?;
286			}
287		}
288		writeln!(writer, "Missed queries: {}", self.queries_miss.load(Ordering::Relaxed))?;
289		writeln!(writer, "Value histogram:")?;
290		for i in 0..HISTOGRAM_BUCKETS {
291			let count = self.value_histogram[i].load(Ordering::Relaxed);
292			if count != 0 {
293				writeln!(
294					writer,
295					"    {}-{}: {}",
296					i << HISTOGRAM_BUCKET_BITS,
297					(((i + 1) << HISTOGRAM_BUCKET_BITS) - 1),
298					count
299				)?;
300			}
301		}
302		writeln!(writer)?;
303		Ok(())
304	}
305
306	pub fn query_hit(&self, size_tier: u8) {
307		self.query_histogram[size_tier as usize].fetch_add(1, Ordering::Relaxed);
308	}
309
310	pub fn query_miss(&self) {
311		self.queries_miss.fetch_add(1, Ordering::Relaxed);
312	}
313
314	pub fn insert(&self, size: u32, compressed: u32) {
315		if let Some(index) = value_histogram_index(size) {
316			self.value_histogram[index].fetch_add(1, Ordering::Relaxed);
317			self.compression_delta[index]
318				.fetch_add(size as i64 - compressed as i64, Ordering::Relaxed);
319		} else {
320			self.oversized.fetch_add(1, Ordering::Relaxed);
321			self.oversized_bytes.fetch_add(compressed as u64, Ordering::Relaxed);
322		}
323		self.total_values.fetch_add(1, Ordering::Relaxed);
324		self.total_bytes.fetch_add(compressed as u64, Ordering::Relaxed);
325		self.uncompressed_bytes.fetch_add(size as u64, Ordering::Relaxed);
326	}
327
328	pub fn remove(&self, size: u32, compressed: u32) {
329		if let Some(index) = value_histogram_index(size) {
330			self.value_histogram[index].fetch_sub(1, Ordering::Relaxed);
331			self.compression_delta[index]
332				.fetch_sub(size as i64 - compressed as i64, Ordering::Relaxed);
333		} else {
334			self.oversized.fetch_sub(1, Ordering::Relaxed);
335			self.oversized_bytes.fetch_sub(compressed as u64, Ordering::Relaxed);
336		}
337		self.total_values.fetch_sub(1, Ordering::Relaxed);
338		self.total_bytes.fetch_sub(compressed as u64, Ordering::Relaxed);
339		self.uncompressed_bytes.fetch_sub(size as u64, Ordering::Relaxed);
340	}
341
342	pub fn insert_val(&self, size: u32, compressed: u32) {
343		self.inserted_new.fetch_add(1, Ordering::Relaxed);
344		self.insert(size, compressed);
345	}
346
347	pub fn remove_val(&self, size: u32, compressed: u32) {
348		self.removed_hit.fetch_add(1, Ordering::Relaxed);
349		self.remove(size, compressed);
350	}
351
352	pub fn reference_increase(&self) {
353		self.reference_increase_hit.fetch_add(1, Ordering::Relaxed);
354	}
355
356	pub fn reference_increase_miss(&self) {
357		self.reference_increase_miss.fetch_add(1, Ordering::Relaxed);
358	}
359
360	pub fn remove_miss(&self) {
361		self.removed_miss.fetch_add(1, Ordering::Relaxed);
362	}
363
364	pub fn replace_val(&self, old: u32, old_compressed: u32, new: u32, new_compressed: u32) {
365		self.inserted_overwrite.fetch_add(1, Ordering::Relaxed);
366		self.remove(old, old_compressed);
367		self.insert(new, new_compressed);
368	}
369
370	pub fn commit(&self) {
371		self.commits.fetch_add(1, Ordering::Relaxed);
372	}
373}