1use parking_lot::RwLock;
10use std::{
11 collections::HashMap,
12 str::FromStr,
13 sync::atomic::{AtomicU64, Ordering as AtomicOrdering},
14 time::Instant,
15};
16
17#[derive(Default, Clone, Copy)]
18pub struct RawDbStats {
19 pub reads: u64,
20 pub writes: u64,
21 pub bytes_written: u64,
22 pub bytes_read: u64,
23 pub transactions: u64,
24 pub cache_hit_count: u64,
25}
26
27#[derive(Default, Debug, Clone, Copy)]
28pub struct RocksDbStatsTimeValue {
29 pub p50: f64,
31 pub p95: f64,
33 pub p99: f64,
35 pub p100: f64,
37 pub sum: u64,
38}
39
40#[derive(Default, Debug, Clone, Copy)]
41pub struct RocksDbStatsValue {
42 pub count: u64,
43 pub times: Option<RocksDbStatsTimeValue>,
44}
45
46pub fn parse_rocksdb_stats(stats: &str) -> HashMap<String, RocksDbStatsValue> {
47 stats.lines().map(|line| parse_rocksdb_stats_row(line.splitn(2, ' '))).collect()
48}
49
50fn parse_rocksdb_stats_row<'a>(mut iter: impl Iterator<Item = &'a str>) -> (String, RocksDbStatsValue) {
51 const PROOF: &str = "rocksdb statistics format is valid and hasn't changed";
52 const SEPARATOR: &str = " : ";
53 let key = iter.next().expect(PROOF).trim_start_matches("rocksdb.").to_owned();
54 let values = iter.next().expect(PROOF);
55 let value = if values.starts_with("COUNT") {
56 RocksDbStatsValue {
58 count: u64::from_str(values.rsplit(SEPARATOR).next().expect(PROOF)).expect(PROOF),
59 times: None,
60 }
61 } else {
62 let values: Vec<&str> = values.split_whitespace().filter(|s| *s != ":").collect();
64 let times = RocksDbStatsTimeValue {
65 p50: f64::from_str(values.get(1).expect(PROOF)).expect(PROOF),
66 p95: f64::from_str(values.get(3).expect(PROOF)).expect(PROOF),
67 p99: f64::from_str(values.get(5).expect(PROOF)).expect(PROOF),
68 p100: f64::from_str(values.get(7).expect(PROOF)).expect(PROOF),
69 sum: u64::from_str(values.get(11).expect(PROOF)).expect(PROOF),
70 };
71 RocksDbStatsValue { count: u64::from_str(values.get(9).expect(PROOF)).expect(PROOF), times: Some(times) }
72 };
73 (key, value)
74}
75
76impl RawDbStats {
77 fn combine(&self, other: &RawDbStats) -> Self {
78 RawDbStats {
79 reads: self.reads + other.reads,
80 writes: self.writes + other.writes,
81 bytes_written: self.bytes_written + other.bytes_written,
82 bytes_read: self.bytes_read + other.bytes_written,
83 transactions: self.transactions + other.transactions,
84 cache_hit_count: self.cache_hit_count + other.cache_hit_count,
85 }
86 }
87}
88
89struct OverallDbStats {
90 stats: RawDbStats,
91 last_taken: Instant,
92 started: Instant,
93}
94
95impl OverallDbStats {
96 fn new() -> Self {
97 OverallDbStats { stats: RawDbStats::default(), last_taken: Instant::now(), started: Instant::now() }
98 }
99}
100
101pub struct RunningDbStats {
102 reads: AtomicU64,
103 writes: AtomicU64,
104 bytes_written: AtomicU64,
105 bytes_read: AtomicU64,
106 transactions: AtomicU64,
107 cache_hit_count: AtomicU64,
108 overall: RwLock<OverallDbStats>,
109}
110
111pub struct TakenDbStats {
112 pub raw: RawDbStats,
113 pub started: Instant,
114}
115
116impl RunningDbStats {
117 pub fn new() -> Self {
118 Self {
119 reads: 0.into(),
120 bytes_read: 0.into(),
121 writes: 0.into(),
122 bytes_written: 0.into(),
123 transactions: 0.into(),
124 cache_hit_count: 0.into(),
125 overall: OverallDbStats::new().into(),
126 }
127 }
128
129 pub fn tally_reads(&self, val: u64) {
130 self.reads.fetch_add(val, AtomicOrdering::Relaxed);
131 }
132
133 pub fn tally_bytes_read(&self, val: u64) {
134 self.bytes_read.fetch_add(val, AtomicOrdering::Relaxed);
135 }
136
137 pub fn tally_writes(&self, val: u64) {
138 self.writes.fetch_add(val, AtomicOrdering::Relaxed);
139 }
140
141 pub fn tally_bytes_written(&self, val: u64) {
142 self.bytes_written.fetch_add(val, AtomicOrdering::Relaxed);
143 }
144
145 pub fn tally_transactions(&self, val: u64) {
146 self.transactions.fetch_add(val, AtomicOrdering::Relaxed);
147 }
148
149 pub fn tally_cache_hit_count(&self, val: u64) {
150 self.cache_hit_count.fetch_add(val, AtomicOrdering::Relaxed);
151 }
152
153 fn take_current(&self) -> RawDbStats {
154 RawDbStats {
155 reads: self.reads.swap(0, AtomicOrdering::Relaxed),
156 writes: self.writes.swap(0, AtomicOrdering::Relaxed),
157 bytes_written: self.bytes_written.swap(0, AtomicOrdering::Relaxed),
158 bytes_read: self.bytes_read.swap(0, AtomicOrdering::Relaxed),
159 transactions: self.transactions.swap(0, AtomicOrdering::Relaxed),
160 cache_hit_count: self.cache_hit_count.swap(0, AtomicOrdering::Relaxed),
161 }
162 }
163
164 fn peek_current(&self) -> RawDbStats {
165 RawDbStats {
166 reads: self.reads.load(AtomicOrdering::Relaxed),
167 writes: self.writes.load(AtomicOrdering::Relaxed),
168 bytes_written: self.bytes_written.load(AtomicOrdering::Relaxed),
169 bytes_read: self.bytes_read.load(AtomicOrdering::Relaxed),
170 transactions: self.transactions.load(AtomicOrdering::Relaxed),
171 cache_hit_count: self.cache_hit_count.load(AtomicOrdering::Relaxed),
172 }
173 }
174
175 pub fn since_previous(&self) -> TakenDbStats {
176 let mut overall_lock = self.overall.write();
177
178 let current = self.take_current();
179
180 overall_lock.stats = overall_lock.stats.combine(¤t);
181
182 let stats = TakenDbStats { raw: current, started: overall_lock.last_taken };
183
184 overall_lock.last_taken = Instant::now();
185
186 stats
187 }
188
189 pub fn overall(&self) -> TakenDbStats {
190 let overall_lock = self.overall.read();
191
192 let current = self.peek_current();
193
194 TakenDbStats { raw: overall_lock.stats.combine(¤t), started: overall_lock.started }
195 }
196}