referrerpolicy=no-referrer-when-downgrade

cumulus_client_consensus_common/
level_monitor.rs

1// Copyright (C) Parity Technologies (UK) Ltd.
2// This file is part of Cumulus.
3// SPDX-License-Identifier: GPL-3.0-or-later WITH Classpath-exception-2.0
4
5// Cumulus is free software: you can redistribute it and/or modify
6// it under the terms of the GNU General Public License as published by
7// the Free Software Foundation, either version 3 of the License, or
8// (at your option) any later version.
9
10// Cumulus is distributed in the hope that it will be useful,
11// but WITHOUT ANY WARRANTY; without even the implied warranty of
12// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
13// GNU General Public License for more details.
14
15// You should have received a copy of the GNU General Public License
16// along with Cumulus. If not, see <https://www.gnu.org/licenses/>.
17
18use sc_client_api::{blockchain::Backend as _, Backend, HeaderBackend as _};
19use sp_blockchain::{HashAndNumber, HeaderMetadata, TreeRoute};
20use sp_runtime::traits::{Block as BlockT, NumberFor, One, Saturating, UniqueSaturatedInto, Zero};
21use std::{
22	collections::{HashMap, HashSet},
23	sync::Arc,
24};
25
26const LOG_TARGET: &str = "level-monitor";
27
28/// Value good enough to be used with parachains using the current backend implementation
29/// that ships with Substrate. This value may change in the future.
30pub const MAX_LEAVES_PER_LEVEL_SENSIBLE_DEFAULT: usize = 32;
31
32// Counter threshold after which we are going to eventually cleanup our internal data.
33const CLEANUP_THRESHOLD: u32 = 32;
34
35/// Upper bound to the number of leaves allowed for each level of the blockchain.
36///
37/// If the limit is set and more leaves are detected on block import, then the older ones are
38/// dropped to make space for the fresh blocks.
39///
40/// In environments where blocks confirmations from the relay chain may be "slow", then
41/// setting an upper bound helps keeping the chain health by dropping old (presumably) stale
42/// leaves and prevents discarding new blocks because we've reached the backend max value.
43pub enum LevelLimit {
44	/// Limit set to [`MAX_LEAVES_PER_LEVEL_SENSIBLE_DEFAULT`].
45	Default,
46	/// No explicit limit, however a limit may be implicitly imposed by the backend implementation.
47	None,
48	/// Custom value.
49	Some(usize),
50}
51
52/// Support structure to constrain the number of leaves at each level.
53pub struct LevelMonitor<Block: BlockT, BE> {
54	/// Max number of leaves for each level.
55	level_limit: usize,
56	/// Monotonic counter used to keep track of block freshness.
57	pub(crate) import_counter: NumberFor<Block>,
58	/// Map between blocks hashes and freshness.
59	pub(crate) freshness: HashMap<Block::Hash, NumberFor<Block>>,
60	/// Blockchain levels cache.
61	pub(crate) levels: HashMap<NumberFor<Block>, HashSet<Block::Hash>>,
62	/// Lower level number stored by the levels map.
63	lowest_level: NumberFor<Block>,
64	/// Backend reference to remove blocks on level saturation.
65	backend: Arc<BE>,
66}
67
68/// Contains information about the target scheduled for removal.
69struct TargetInfo<Block: BlockT> {
70	/// Index of freshest leaf in the leaves array.
71	freshest_leaf_idx: usize,
72	/// Route from target to its freshest leaf.
73	freshest_route: TreeRoute<Block>,
74}
75
76impl<Block, BE> LevelMonitor<Block, BE>
77where
78	Block: BlockT,
79	BE: Backend<Block>,
80{
81	/// Instance a new monitor structure.
82	pub fn new(level_limit: usize, backend: Arc<BE>) -> Self {
83		let mut monitor = LevelMonitor {
84			level_limit,
85			import_counter: Zero::zero(),
86			freshness: HashMap::new(),
87			levels: HashMap::new(),
88			lowest_level: Zero::zero(),
89			backend,
90		};
91		monitor.restore();
92		monitor
93	}
94
95	/// Restore the structure using the backend.
96	///
97	/// Blocks freshness values are inferred from the height and not from the effective import
98	/// moment. This is a not accurate but "good-enough" best effort solution.
99	///
100	/// Level limits are not enforced during this phase.
101	fn restore(&mut self) {
102		let info = self.backend.blockchain().info();
103
104		log::debug!(
105			target: LOG_TARGET,
106			"Restoring chain level monitor from last finalized block: {} {}",
107			info.finalized_number,
108			info.finalized_hash
109		);
110
111		self.lowest_level = info.finalized_number;
112		self.import_counter = info.finalized_number;
113
114		for leaf in self.backend.blockchain().leaves().unwrap_or_default() {
115			let Ok(mut meta) = self.backend.blockchain().header_metadata(leaf) else {
116				log::debug!(
117					target: LOG_TARGET,
118					"Could not fetch header metadata for leaf: {leaf:?}",
119				);
120
121				continue
122			};
123
124			self.import_counter = self.import_counter.max(meta.number);
125
126			// Populate the monitor until we don't hit an already imported branch
127			while !self.freshness.contains_key(&meta.hash) {
128				self.freshness.insert(meta.hash, meta.number);
129				self.levels.entry(meta.number).or_default().insert(meta.hash);
130				if meta.number <= self.lowest_level {
131					break
132				}
133
134				meta = match self.backend.blockchain().header_metadata(meta.parent) {
135					Ok(m) => m,
136					Err(_) => {
137						// This can happen after we have warp synced a node.
138						log::debug!(
139							target: LOG_TARGET,
140							"Could not fetch header metadata for parent: {:?}",
141							meta.parent,
142						);
143						break
144					},
145				}
146			}
147		}
148
149		log::debug!(
150			target: LOG_TARGET,
151			"Restored chain level monitor up to height {}",
152			self.import_counter
153		);
154	}
155
156	/// Check and enforce the limit bound at the given height.
157	///
158	/// In practice this will enforce the given height in having a number of blocks less than
159	/// the limit passed to the constructor.
160	///
161	/// If the given level is found to have a number of blocks greater than or equal the limit
162	/// then the limit is enforced by choosing one (or more) blocks to remove.
163	///
164	/// The removal strategy is driven by the block freshness.
165	///
166	/// A block freshness is determined by the most recent leaf freshness descending from the block
167	/// itself. In other words its freshness is equal to its more "fresh" descendant.
168	///
169	/// The least "fresh" blocks are eventually removed.
170	pub fn enforce_limit(&mut self, number: NumberFor<Block>) {
171		let level_len = self.levels.get(&number).map(|l| l.len()).unwrap_or_default();
172		if level_len < self.level_limit {
173			return
174		}
175
176		// Sort leaves by freshness only once (less fresh first) and keep track of
177		// leaves that were invalidated on removal.
178		let mut leaves = self.backend.blockchain().leaves().unwrap_or_default();
179		leaves.sort_unstable_by(|a, b| self.freshness.get(a).cmp(&self.freshness.get(b)));
180		let mut invalidated_leaves = HashSet::new();
181
182		// This may not be the most efficient way to remove **multiple** entries, but is the easy
183		// one :-). Should be considered that in "normal" conditions the number of blocks to remove
184		// is 0 or 1, it is not worth to complicate the code too much. One condition that may
185		// trigger multiple removals (2+) is if we restart the node using an existing db and a
186		// smaller limit wrt the one previously used.
187		let remove_count = level_len - self.level_limit + 1;
188
189		log::debug!(
190			target: LOG_TARGET,
191			"Detected leaves overflow at height {number}, removing {remove_count} obsolete blocks",
192		);
193
194		(0..remove_count).all(|_| {
195			self.find_target(number, &leaves, &invalidated_leaves).map_or(false, |target| {
196				self.remove_target(target, number, &leaves, &mut invalidated_leaves);
197				true
198			})
199		});
200	}
201
202	// Helper function to find the best candidate to be removed.
203	//
204	// Given a set of blocks with height equal to `number` (potential candidates)
205	// 1. For each candidate fetch all the leaves that are descending from it.
206	// 2. Set the candidate freshness equal to the fresher of its descending leaves.
207	// 3. The target is set as the candidate that is less fresh.
208	//
209	// Input `leaves` are assumed to be already ordered by "freshness" (less fresh first).
210	//
211	// Returns the index of the target fresher leaf within `leaves` and the route from target to
212	// such leaf.
213	fn find_target(
214		&self,
215		number: NumberFor<Block>,
216		leaves: &[Block::Hash],
217		invalidated_leaves: &HashSet<usize>,
218	) -> Option<TargetInfo<Block>> {
219		let mut target_info: Option<TargetInfo<Block>> = None;
220		let blockchain = self.backend.blockchain();
221		let best_hash = blockchain.info().best_hash;
222
223		// Leaves that where already assigned to some node and thus can be skipped
224		// during the search.
225		let mut assigned_leaves = HashSet::new();
226
227		let level = self.levels.get(&number)?;
228
229		for blk_hash in level.iter().filter(|hash| **hash != best_hash) {
230			// Search for the fresher leaf information for this block
231			let candidate_info = leaves
232				.iter()
233				.enumerate()
234				.filter(|(leaf_idx, _)| {
235					!assigned_leaves.contains(leaf_idx) && !invalidated_leaves.contains(leaf_idx)
236				})
237				.rev()
238				.find_map(|(leaf_idx, leaf_hash)| {
239					if blk_hash == leaf_hash {
240						let entry = HashAndNumber { number, hash: *blk_hash };
241						TreeRoute::new(vec![entry], 0).ok().map(|freshest_route| TargetInfo {
242							freshest_leaf_idx: leaf_idx,
243							freshest_route,
244						})
245					} else {
246						match sp_blockchain::tree_route(blockchain, *blk_hash, *leaf_hash) {
247							Ok(route) if route.retracted().is_empty() => Some(TargetInfo {
248								freshest_leaf_idx: leaf_idx,
249								freshest_route: route,
250							}),
251							Err(err) => {
252								log::warn!(
253									target: LOG_TARGET,
254									"(Lookup) Unable getting route from {:?} to {:?}: {}",
255									blk_hash,
256									leaf_hash,
257									err,
258								);
259								None
260							},
261							_ => None,
262						}
263					}
264				});
265
266			let candidate_info = match candidate_info {
267				Some(candidate_info) => {
268					assigned_leaves.insert(candidate_info.freshest_leaf_idx);
269					candidate_info
270				},
271				None => {
272					// This should never happen
273					log::error!(
274						target: LOG_TARGET,
275						"Unable getting route to any leaf from {:?} (this is a bug)",
276						blk_hash,
277					);
278					continue
279				},
280			};
281
282			// Found fresher leaf for this candidate.
283			// This candidate is set as the new target if:
284			// 1. its fresher leaf is less fresh than the previous target fresher leaf AND
285			// 2. best block is not in its route
286
287			let is_less_fresh = || {
288				target_info
289					.as_ref()
290					.map(|ti| candidate_info.freshest_leaf_idx < ti.freshest_leaf_idx)
291					.unwrap_or(true)
292			};
293			let not_contains_best = || {
294				candidate_info
295					.freshest_route
296					.enacted()
297					.iter()
298					.all(|entry| entry.hash != best_hash)
299			};
300
301			if is_less_fresh() && not_contains_best() {
302				let early_stop = candidate_info.freshest_leaf_idx == 0;
303				target_info = Some(candidate_info);
304				if early_stop {
305					// We will never find a candidate with an worst freshest leaf than this.
306					break
307				}
308			}
309		}
310
311		target_info
312	}
313
314	// Remove the target block and all its descendants.
315	//
316	// Leaves should have already been ordered by "freshness" (less fresh first).
317	fn remove_target(
318		&mut self,
319		target: TargetInfo<Block>,
320		number: NumberFor<Block>,
321		leaves: &[Block::Hash],
322		invalidated_leaves: &mut HashSet<usize>,
323	) {
324		let mut remove_leaf = |number, hash| {
325			log::debug!(target: LOG_TARGET, "Removing block (@{}) {:?}", number, hash);
326			if let Err(err) = self.backend.remove_leaf_block(hash) {
327				log::debug!(target: LOG_TARGET, "Remove not possible for {}: {}", hash, err);
328				return false
329			}
330			self.levels.get_mut(&number).map(|level| level.remove(&hash));
331			self.freshness.remove(&hash);
332			true
333		};
334
335		invalidated_leaves.insert(target.freshest_leaf_idx);
336
337		// Takes care of route removal. Starts from the leaf and stops as soon as an error is
338		// encountered. In this case an error is interpreted as the block being not a leaf
339		// and it will be removed while removing another route from the same block but to a
340		// different leaf.
341		let mut remove_route = |route: TreeRoute<Block>| {
342			route.enacted().iter().rev().all(|elem| remove_leaf(elem.number, elem.hash));
343		};
344
345		let target_hash = target.freshest_route.common_block().hash;
346		debug_assert_eq!(
347			target.freshest_route.common_block().number,
348			number,
349			"This is a bug in LevelMonitor::find_target() or the Backend is corrupted"
350		);
351
352		// Remove freshest (cached) route first.
353		remove_route(target.freshest_route);
354
355		// Don't bother trying with leaves we already found to not be our descendants.
356		let to_skip = leaves.len() - target.freshest_leaf_idx;
357		leaves.iter().enumerate().rev().skip(to_skip).for_each(|(leaf_idx, leaf_hash)| {
358			if invalidated_leaves.contains(&leaf_idx) {
359				return
360			}
361			match sp_blockchain::tree_route(self.backend.blockchain(), target_hash, *leaf_hash) {
362				Ok(route) if route.retracted().is_empty() => {
363					invalidated_leaves.insert(leaf_idx);
364					remove_route(route);
365				},
366				Err(err) => {
367					log::warn!(
368						target: LOG_TARGET,
369						"(Removal) unable getting route from {:?} to {:?}: {}",
370						target_hash,
371						leaf_hash,
372						err,
373					);
374				},
375				_ => (),
376			};
377		});
378
379		remove_leaf(number, target_hash);
380	}
381
382	/// Add a new imported block information to the monitor.
383	pub fn block_imported(&mut self, number: NumberFor<Block>, hash: Block::Hash) {
384		let finalized_num = self.backend.blockchain().info().finalized_number;
385
386		if number > finalized_num {
387			// Only blocks above the last finalized block should be added to the monitor
388			self.import_counter += One::one();
389			self.freshness.insert(hash, self.import_counter);
390			self.levels.entry(number).or_default().insert(hash);
391		}
392
393		let delta: u32 = finalized_num.saturating_sub(self.lowest_level).unique_saturated_into();
394		if delta >= CLEANUP_THRESHOLD {
395			// Do cleanup once in a while, we are allowed to have some obsolete information.
396			for i in 0..delta {
397				let number = self.lowest_level + i.unique_saturated_into();
398				self.levels.remove(&number).map(|level| {
399					level.iter().for_each(|hash| {
400						self.freshness.remove(hash);
401					})
402				});
403			}
404			self.lowest_level = finalized_num;
405		}
406	}
407}