1use crate::{column::ColId, table::SIZE_TIERS};
5use std::sync::atomic::{AtomicI64, AtomicU32, AtomicU64, Ordering};
7use std::{
8 io::{Cursor, Read, Write},
9 iter,
10};
11
12const HISTOGRAM_BUCKETS: usize = 1024;
15const HISTOGRAM_BUCKET_BITS: u8 = 5;
16
17pub const TOTAL_SIZE: usize =
18 4 * HISTOGRAM_BUCKETS + 8 * HISTOGRAM_BUCKETS + 8 * SIZE_TIERS + 8 * 13;
19
20#[derive(Debug)]
22pub struct ColumnStats {
23 value_histogram: Vec<AtomicU32>,
24 query_histogram: Vec<AtomicU64>, oversized: AtomicU64,
26 oversized_bytes: AtomicU64,
27 total_values: AtomicU64,
28 total_bytes: AtomicU64,
29 commits: AtomicU64,
30 inserted_new: AtomicU64,
31 inserted_overwrite: AtomicU64,
32 reference_increase_hit: AtomicU64,
33 reference_increase_miss: AtomicU64,
34 removed_hit: AtomicU64,
35 removed_miss: AtomicU64,
36 queries_miss: AtomicU64,
37 uncompressed_bytes: AtomicU64,
38 compression_delta: Vec<AtomicI64>,
39}
40
41pub struct StatSummary {
43 pub columns: Vec<Option<ColumnStatSummary>>,
46}
47
48pub struct ColumnStatSummary {
50 pub total_values: u64,
52 pub total_bytes: u64,
55 pub uncompressed_bytes: u64,
58}
59
60fn read_u32(cursor: &mut Cursor<&[u8]>) -> AtomicU32 {
61 let mut buf = [0u8; 4];
62 cursor.read_exact(&mut buf).expect("Incorrect stats buffer");
63 AtomicU32::new(u32::from_le_bytes(buf))
64}
65
66fn read_u64(cursor: &mut Cursor<&[u8]>) -> AtomicU64 {
67 let mut buf = [0u8; 8];
68 cursor.read_exact(&mut buf).expect("Incorrect stats buffer");
69 AtomicU64::new(u64::from_le_bytes(buf))
70}
71
72fn read_i64(cursor: &mut Cursor<&[u8]>) -> AtomicI64 {
73 let mut buf = [0u8; 8];
74 cursor.read_exact(&mut buf).expect("Incorrect stats buffer");
75 AtomicI64::new(i64::from_le_bytes(buf))
76}
77
78fn write_u32(cursor: &mut Cursor<&mut [u8]>, val: &AtomicU32) {
79 cursor
80 .write_all(&val.load(Ordering::Relaxed).to_le_bytes())
81 .expect("Incorrect stats buffer");
82}
83
84fn write_u64(cursor: &mut Cursor<&mut [u8]>, val: &AtomicU64) {
85 cursor
86 .write_all(&val.load(Ordering::Relaxed).to_le_bytes())
87 .expect("Incorrect stats buffer");
88}
89
90fn write_i64(cursor: &mut Cursor<&mut [u8]>, val: &AtomicI64) {
91 cursor
92 .write_all(&val.load(Ordering::Relaxed).to_le_bytes())
93 .expect("Incorrect stats buffer");
94}
95
96fn value_histogram_index(size: u32) -> Option<usize> {
97 let bucket = size as usize >> HISTOGRAM_BUCKET_BITS;
98 if bucket < HISTOGRAM_BUCKETS {
99 Some(bucket)
100 } else {
101 None
102 }
103}
104
105impl ColumnStats {
106 pub fn from_slice(data: &[u8]) -> ColumnStats {
107 let mut cursor = Cursor::new(data);
108 let cursor = &mut cursor;
109
110 let value_histogram =
111 iter::repeat_with(|| read_u32(cursor)).take(HISTOGRAM_BUCKETS).collect();
112 let query_histogram = iter::repeat_with(|| read_u64(cursor)).take(SIZE_TIERS).collect();
113 let oversized = read_u64(cursor);
114 let oversized_bytes = read_u64(cursor);
115 let total_values = read_u64(cursor);
116 let total_bytes = read_u64(cursor);
117 let commits = read_u64(cursor);
118 let inserted_new = read_u64(cursor);
119 let inserted_overwrite = read_u64(cursor);
120 let removed_hit = read_u64(cursor);
121 let removed_miss = read_u64(cursor);
122 let queries_miss = read_u64(cursor);
123 let uncompressed_bytes = read_u64(cursor);
124 let compression_delta =
125 iter::repeat_with(|| read_i64(cursor)).take(HISTOGRAM_BUCKETS).collect();
126 let reference_increase_hit = read_u64(cursor);
127 let reference_increase_miss = read_u64(cursor);
128
129 ColumnStats {
130 value_histogram,
131 query_histogram,
132 oversized,
133 oversized_bytes,
134 total_values,
135 total_bytes,
136 commits,
137 inserted_new,
138 inserted_overwrite,
139 reference_increase_hit,
140 reference_increase_miss,
141 removed_hit,
142 removed_miss,
143 queries_miss,
144 uncompressed_bytes,
145 compression_delta,
146 }
147 }
148
149 pub fn empty() -> ColumnStats {
150 ColumnStats {
151 value_histogram: iter::repeat_with(Default::default).take(HISTOGRAM_BUCKETS).collect(),
152 query_histogram: iter::repeat_with(Default::default).take(SIZE_TIERS).collect(),
153 oversized: Default::default(),
154 oversized_bytes: Default::default(),
155 total_values: Default::default(),
156 total_bytes: Default::default(),
157 commits: Default::default(),
158 inserted_new: Default::default(),
159 inserted_overwrite: Default::default(),
160 reference_increase_hit: Default::default(),
161 reference_increase_miss: Default::default(),
162 removed_hit: Default::default(),
163 removed_miss: Default::default(),
164 queries_miss: Default::default(),
165 uncompressed_bytes: Default::default(),
166 compression_delta: iter::repeat_with(Default::default)
167 .take(HISTOGRAM_BUCKETS)
168 .collect(),
169 }
170 }
171
172 pub fn clear(&self) {
173 for v in &self.value_histogram {
175 v.store(0, Ordering::Relaxed)
176 }
177 for v in &self.query_histogram {
178 v.store(0, Ordering::Relaxed)
179 }
180 self.oversized.store(0, Ordering::Relaxed);
181 self.oversized_bytes.store(0, Ordering::Relaxed);
182 self.total_values.store(0, Ordering::Relaxed);
183 self.total_bytes.store(0, Ordering::Relaxed);
184 self.commits.store(0, Ordering::Relaxed);
185 self.inserted_new.store(0, Ordering::Relaxed);
186 self.inserted_overwrite.store(0, Ordering::Relaxed);
187 self.reference_increase_hit.store(0, Ordering::Relaxed);
188 self.reference_increase_miss.store(0, Ordering::Relaxed);
189 self.removed_hit.store(0, Ordering::Relaxed);
190 self.removed_miss.store(0, Ordering::Relaxed);
191 self.queries_miss.store(0, Ordering::Relaxed);
192 self.uncompressed_bytes.store(0, Ordering::Relaxed);
193 for v in &self.compression_delta {
194 v.store(0, Ordering::Relaxed)
195 }
196 }
197
198 pub fn summary(&self) -> ColumnStatSummary {
199 ColumnStatSummary {
200 total_values: self.total_values.load(Ordering::Relaxed),
201 total_bytes: self.total_bytes.load(Ordering::Relaxed),
202 uncompressed_bytes: self.uncompressed_bytes.load(Ordering::Relaxed),
203 }
204 }
205
206 pub fn to_slice(&self, data: &mut [u8]) {
207 let mut cursor = Cursor::new(data);
208 for item in &self.value_histogram {
209 write_u32(&mut cursor, item);
210 }
211 for item in &self.query_histogram {
212 write_u64(&mut cursor, item);
213 }
214 write_u64(&mut cursor, &self.oversized);
215 write_u64(&mut cursor, &self.oversized_bytes);
216 write_u64(&mut cursor, &self.total_values);
217 write_u64(&mut cursor, &self.total_bytes);
218 write_u64(&mut cursor, &self.commits);
219 write_u64(&mut cursor, &self.inserted_new);
220 write_u64(&mut cursor, &self.inserted_overwrite);
221 write_u64(&mut cursor, &self.removed_hit);
222 write_u64(&mut cursor, &self.removed_miss);
223 write_u64(&mut cursor, &self.queries_miss);
224 write_u64(&mut cursor, &self.uncompressed_bytes);
225 for item in &self.compression_delta {
226 write_i64(&mut cursor, item);
227 }
228 write_u64(&mut cursor, &self.reference_increase_hit);
229 write_u64(&mut cursor, &self.reference_increase_miss);
230 }
231
232 pub fn write_stats_text(&self, writer: &mut impl Write, col: ColId) -> std::io::Result<()> {
233 writeln!(writer, "Column {col}")?;
234 writeln!(writer, "Total values: {}", self.total_values.load(Ordering::Relaxed))?;
235 writeln!(writer, "Total bytes: {}", self.total_bytes.load(Ordering::Relaxed))?;
236 writeln!(writer, "Total oversized values: {}", self.oversized.load(Ordering::Relaxed))?;
237 writeln!(
238 writer,
239 "Total oversized bytes: {}",
240 self.oversized_bytes.load(Ordering::Relaxed)
241 )?;
242 writeln!(writer, "Total commits: {}", self.commits.load(Ordering::Relaxed))?;
243 writeln!(writer, "New value insertions: {}", self.inserted_new.load(Ordering::Relaxed))?;
244 writeln!(
245 writer,
246 "Existing value insertions: {}",
247 self.inserted_overwrite.load(Ordering::Relaxed)
248 )?;
249 writeln!(
250 writer,
251 "Reference increases: {}",
252 self.reference_increase_hit.load(Ordering::Relaxed)
253 )?;
254 writeln!(
255 writer,
256 "Missed reference increases: {}",
257 self.reference_increase_miss.load(Ordering::Relaxed)
258 )?;
259 writeln!(writer, "Removals: {}", self.removed_hit.load(Ordering::Relaxed))?;
260 writeln!(writer, "Missed removals: {}", self.removed_miss.load(Ordering::Relaxed))?;
261 writeln!(
262 writer,
263 "Uncompressed bytes: {}",
264 self.uncompressed_bytes.load(Ordering::Relaxed)
265 )?;
266 writeln!(writer, "Compression deltas:")?;
267 for i in 0..HISTOGRAM_BUCKETS {
268 let count = self.value_histogram[i].load(Ordering::Relaxed);
269 let delta = self.compression_delta[i].load(Ordering::Relaxed);
270 if count != 0 && delta != 0 {
271 writeln!(
272 writer,
273 " {}-{}: {}",
274 i << HISTOGRAM_BUCKET_BITS,
275 (((i + 1) << HISTOGRAM_BUCKET_BITS) - 1),
276 delta
277 )?;
278 }
279 }
280 write!(writer, "Queries per size tier: [")?;
281 for i in 0..SIZE_TIERS {
282 if i == SIZE_TIERS - 1 {
283 writeln!(writer, "{}]", self.query_histogram[i].load(Ordering::Relaxed))?;
284 } else {
285 write!(writer, "{}, ", self.query_histogram[i].load(Ordering::Relaxed))?;
286 }
287 }
288 writeln!(writer, "Missed queries: {}", self.queries_miss.load(Ordering::Relaxed))?;
289 writeln!(writer, "Value histogram:")?;
290 for i in 0..HISTOGRAM_BUCKETS {
291 let count = self.value_histogram[i].load(Ordering::Relaxed);
292 if count != 0 {
293 writeln!(
294 writer,
295 " {}-{}: {}",
296 i << HISTOGRAM_BUCKET_BITS,
297 (((i + 1) << HISTOGRAM_BUCKET_BITS) - 1),
298 count
299 )?;
300 }
301 }
302 writeln!(writer)?;
303 Ok(())
304 }
305
306 pub fn query_hit(&self, size_tier: u8) {
307 self.query_histogram[size_tier as usize].fetch_add(1, Ordering::Relaxed);
308 }
309
310 pub fn query_miss(&self) {
311 self.queries_miss.fetch_add(1, Ordering::Relaxed);
312 }
313
314 pub fn insert(&self, size: u32, compressed: u32) {
315 if let Some(index) = value_histogram_index(size) {
316 self.value_histogram[index].fetch_add(1, Ordering::Relaxed);
317 self.compression_delta[index]
318 .fetch_add(size as i64 - compressed as i64, Ordering::Relaxed);
319 } else {
320 self.oversized.fetch_add(1, Ordering::Relaxed);
321 self.oversized_bytes.fetch_add(compressed as u64, Ordering::Relaxed);
322 }
323 self.total_values.fetch_add(1, Ordering::Relaxed);
324 self.total_bytes.fetch_add(compressed as u64, Ordering::Relaxed);
325 self.uncompressed_bytes.fetch_add(size as u64, Ordering::Relaxed);
326 }
327
328 pub fn remove(&self, size: u32, compressed: u32) {
329 if let Some(index) = value_histogram_index(size) {
330 self.value_histogram[index].fetch_sub(1, Ordering::Relaxed);
331 self.compression_delta[index]
332 .fetch_sub(size as i64 - compressed as i64, Ordering::Relaxed);
333 } else {
334 self.oversized.fetch_sub(1, Ordering::Relaxed);
335 self.oversized_bytes.fetch_sub(compressed as u64, Ordering::Relaxed);
336 }
337 self.total_values.fetch_sub(1, Ordering::Relaxed);
338 self.total_bytes.fetch_sub(compressed as u64, Ordering::Relaxed);
339 self.uncompressed_bytes.fetch_sub(size as u64, Ordering::Relaxed);
340 }
341
342 pub fn insert_val(&self, size: u32, compressed: u32) {
343 self.inserted_new.fetch_add(1, Ordering::Relaxed);
344 self.insert(size, compressed);
345 }
346
347 pub fn remove_val(&self, size: u32, compressed: u32) {
348 self.removed_hit.fetch_add(1, Ordering::Relaxed);
349 self.remove(size, compressed);
350 }
351
352 pub fn reference_increase(&self) {
353 self.reference_increase_hit.fetch_add(1, Ordering::Relaxed);
354 }
355
356 pub fn reference_increase_miss(&self) {
357 self.reference_increase_miss.fetch_add(1, Ordering::Relaxed);
358 }
359
360 pub fn remove_miss(&self) {
361 self.removed_miss.fetch_add(1, Ordering::Relaxed);
362 }
363
364 pub fn replace_val(&self, old: u32, old_compressed: u32, new: u32, new_compressed: u32) {
365 self.inserted_overwrite.fetch_add(1, Ordering::Relaxed);
366 self.remove(old, old_compressed);
367 self.insert(new, new_compressed);
368 }
369
370 pub fn commit(&self) {
371 self.commits.fetch_add(1, Ordering::Relaxed);
372 }
373}