cumulus_client_consensus_common/
level_monitor.rs1use sc_client_api::{blockchain::Backend as _, Backend, HeaderBackend as _};
19use sp_blockchain::{HashAndNumber, HeaderMetadata, TreeRoute};
20use sp_runtime::traits::{Block as BlockT, NumberFor, One, Saturating, UniqueSaturatedInto, Zero};
21use std::{
22 collections::{HashMap, HashSet},
23 sync::Arc,
24};
25
26const LOG_TARGET: &str = "level-monitor";
27
28pub const MAX_LEAVES_PER_LEVEL_SENSIBLE_DEFAULT: usize = 32;
31
32const CLEANUP_THRESHOLD: u32 = 32;
34
35pub enum LevelLimit {
44 Default,
46 None,
48 Some(usize),
50}
51
52pub struct LevelMonitor<Block: BlockT, BE> {
54 level_limit: usize,
56 pub(crate) import_counter: NumberFor<Block>,
58 pub(crate) freshness: HashMap<Block::Hash, NumberFor<Block>>,
60 pub(crate) levels: HashMap<NumberFor<Block>, HashSet<Block::Hash>>,
62 lowest_level: NumberFor<Block>,
64 backend: Arc<BE>,
66}
67
68struct TargetInfo<Block: BlockT> {
70 freshest_leaf_idx: usize,
72 freshest_route: TreeRoute<Block>,
74}
75
76impl<Block, BE> LevelMonitor<Block, BE>
77where
78 Block: BlockT,
79 BE: Backend<Block>,
80{
81 pub fn new(level_limit: usize, backend: Arc<BE>) -> Self {
83 let mut monitor = LevelMonitor {
84 level_limit,
85 import_counter: Zero::zero(),
86 freshness: HashMap::new(),
87 levels: HashMap::new(),
88 lowest_level: Zero::zero(),
89 backend,
90 };
91 monitor.restore();
92 monitor
93 }
94
95 fn restore(&mut self) {
102 let info = self.backend.blockchain().info();
103
104 log::debug!(
105 target: LOG_TARGET,
106 "Restoring chain level monitor from last finalized block: {} {}",
107 info.finalized_number,
108 info.finalized_hash
109 );
110
111 self.lowest_level = info.finalized_number;
112 self.import_counter = info.finalized_number;
113
114 for leaf in self.backend.blockchain().leaves().unwrap_or_default() {
115 let Ok(mut meta) = self.backend.blockchain().header_metadata(leaf) else {
116 log::debug!(
117 target: LOG_TARGET,
118 "Could not fetch header metadata for leaf: {leaf:?}",
119 );
120
121 continue
122 };
123
124 self.import_counter = self.import_counter.max(meta.number);
125
126 while !self.freshness.contains_key(&meta.hash) {
128 self.freshness.insert(meta.hash, meta.number);
129 self.levels.entry(meta.number).or_default().insert(meta.hash);
130 if meta.number <= self.lowest_level {
131 break
132 }
133
134 meta = match self.backend.blockchain().header_metadata(meta.parent) {
135 Ok(m) => m,
136 Err(_) => {
137 log::debug!(
139 target: LOG_TARGET,
140 "Could not fetch header metadata for parent: {:?}",
141 meta.parent,
142 );
143 break
144 },
145 }
146 }
147 }
148
149 log::debug!(
150 target: LOG_TARGET,
151 "Restored chain level monitor up to height {}",
152 self.import_counter
153 );
154 }
155
156 pub fn enforce_limit(&mut self, number: NumberFor<Block>) {
171 let level_len = self.levels.get(&number).map(|l| l.len()).unwrap_or_default();
172 if level_len < self.level_limit {
173 return
174 }
175
176 let mut leaves = self.backend.blockchain().leaves().unwrap_or_default();
179 leaves.sort_unstable_by(|a, b| self.freshness.get(a).cmp(&self.freshness.get(b)));
180 let mut invalidated_leaves = HashSet::new();
181
182 let remove_count = level_len - self.level_limit + 1;
188
189 log::debug!(
190 target: LOG_TARGET,
191 "Detected leaves overflow at height {number}, removing {remove_count} obsolete blocks",
192 );
193
194 (0..remove_count).all(|_| {
195 self.find_target(number, &leaves, &invalidated_leaves).map_or(false, |target| {
196 self.remove_target(target, number, &leaves, &mut invalidated_leaves);
197 true
198 })
199 });
200 }
201
202 fn find_target(
214 &self,
215 number: NumberFor<Block>,
216 leaves: &[Block::Hash],
217 invalidated_leaves: &HashSet<usize>,
218 ) -> Option<TargetInfo<Block>> {
219 let mut target_info: Option<TargetInfo<Block>> = None;
220 let blockchain = self.backend.blockchain();
221 let best_hash = blockchain.info().best_hash;
222
223 let mut assigned_leaves = HashSet::new();
226
227 let level = self.levels.get(&number)?;
228
229 for blk_hash in level.iter().filter(|hash| **hash != best_hash) {
230 let candidate_info = leaves
232 .iter()
233 .enumerate()
234 .filter(|(leaf_idx, _)| {
235 !assigned_leaves.contains(leaf_idx) && !invalidated_leaves.contains(leaf_idx)
236 })
237 .rev()
238 .find_map(|(leaf_idx, leaf_hash)| {
239 if blk_hash == leaf_hash {
240 let entry = HashAndNumber { number, hash: *blk_hash };
241 TreeRoute::new(vec![entry], 0).ok().map(|freshest_route| TargetInfo {
242 freshest_leaf_idx: leaf_idx,
243 freshest_route,
244 })
245 } else {
246 match sp_blockchain::tree_route(blockchain, *blk_hash, *leaf_hash) {
247 Ok(route) if route.retracted().is_empty() => Some(TargetInfo {
248 freshest_leaf_idx: leaf_idx,
249 freshest_route: route,
250 }),
251 Err(err) => {
252 log::warn!(
253 target: LOG_TARGET,
254 "(Lookup) Unable getting route from {:?} to {:?}: {}",
255 blk_hash,
256 leaf_hash,
257 err,
258 );
259 None
260 },
261 _ => None,
262 }
263 }
264 });
265
266 let candidate_info = match candidate_info {
267 Some(candidate_info) => {
268 assigned_leaves.insert(candidate_info.freshest_leaf_idx);
269 candidate_info
270 },
271 None => {
272 log::error!(
274 target: LOG_TARGET,
275 "Unable getting route to any leaf from {:?} (this is a bug)",
276 blk_hash,
277 );
278 continue
279 },
280 };
281
282 let is_less_fresh = || {
288 target_info
289 .as_ref()
290 .map(|ti| candidate_info.freshest_leaf_idx < ti.freshest_leaf_idx)
291 .unwrap_or(true)
292 };
293 let not_contains_best = || {
294 candidate_info
295 .freshest_route
296 .enacted()
297 .iter()
298 .all(|entry| entry.hash != best_hash)
299 };
300
301 if is_less_fresh() && not_contains_best() {
302 let early_stop = candidate_info.freshest_leaf_idx == 0;
303 target_info = Some(candidate_info);
304 if early_stop {
305 break
307 }
308 }
309 }
310
311 target_info
312 }
313
314 fn remove_target(
318 &mut self,
319 target: TargetInfo<Block>,
320 number: NumberFor<Block>,
321 leaves: &[Block::Hash],
322 invalidated_leaves: &mut HashSet<usize>,
323 ) {
324 let mut remove_leaf = |number, hash| {
325 log::debug!(target: LOG_TARGET, "Removing block (@{}) {:?}", number, hash);
326 if let Err(err) = self.backend.remove_leaf_block(hash) {
327 log::debug!(target: LOG_TARGET, "Remove not possible for {}: {}", hash, err);
328 return false
329 }
330 self.levels.get_mut(&number).map(|level| level.remove(&hash));
331 self.freshness.remove(&hash);
332 true
333 };
334
335 invalidated_leaves.insert(target.freshest_leaf_idx);
336
337 let mut remove_route = |route: TreeRoute<Block>| {
342 route.enacted().iter().rev().all(|elem| remove_leaf(elem.number, elem.hash));
343 };
344
345 let target_hash = target.freshest_route.common_block().hash;
346 debug_assert_eq!(
347 target.freshest_route.common_block().number,
348 number,
349 "This is a bug in LevelMonitor::find_target() or the Backend is corrupted"
350 );
351
352 remove_route(target.freshest_route);
354
355 let to_skip = leaves.len() - target.freshest_leaf_idx;
357 leaves.iter().enumerate().rev().skip(to_skip).for_each(|(leaf_idx, leaf_hash)| {
358 if invalidated_leaves.contains(&leaf_idx) {
359 return
360 }
361 match sp_blockchain::tree_route(self.backend.blockchain(), target_hash, *leaf_hash) {
362 Ok(route) if route.retracted().is_empty() => {
363 invalidated_leaves.insert(leaf_idx);
364 remove_route(route);
365 },
366 Err(err) => {
367 log::warn!(
368 target: LOG_TARGET,
369 "(Removal) unable getting route from {:?} to {:?}: {}",
370 target_hash,
371 leaf_hash,
372 err,
373 );
374 },
375 _ => (),
376 };
377 });
378
379 remove_leaf(number, target_hash);
380 }
381
382 pub fn block_imported(&mut self, number: NumberFor<Block>, hash: Block::Hash) {
384 let finalized_num = self.backend.blockchain().info().finalized_number;
385
386 if number > finalized_num {
387 self.import_counter += One::one();
389 self.freshness.insert(hash, self.import_counter);
390 self.levels.entry(number).or_default().insert(hash);
391 }
392
393 let delta: u32 = finalized_num.saturating_sub(self.lowest_level).unique_saturated_into();
394 if delta >= CLEANUP_THRESHOLD {
395 for i in 0..delta {
397 let number = self.lowest_level + i.unique_saturated_into();
398 self.levels.remove(&number).map(|level| {
399 level.iter().for_each(|hash| {
400 self.freshness.remove(hash);
401 })
402 });
403 }
404 self.lowest_level = finalized_num;
405 }
406 }
407}