1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
// Copyright 2022 Parity Technologies (UK) Ltd.
// This file is part of Cumulus.

// Cumulus is free software: you can redistribute it and/or modify
// it under the terms of the GNU General Public License as published by
// the Free Software Foundation, either version 3 of the License, or
// (at your option) any later version.

// Cumulus is distributed in the hope that it will be useful,
// but WITHOUT ANY WARRANTY; without even the implied warranty of
// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
// GNU General Public License for more details.

// You should have received a copy of the GNU General Public License
// along with Cumulus.  If not, see <http://www.gnu.org/licenses/>.

use sc_client_api::{blockchain::Backend as _, Backend, HeaderBackend as _};
use sp_blockchain::{HashAndNumber, HeaderMetadata, TreeRoute};
use sp_runtime::traits::{Block as BlockT, NumberFor, One, Saturating, UniqueSaturatedInto, Zero};
use std::{
	collections::{HashMap, HashSet},
	sync::Arc,
};

const LOG_TARGET: &str = "level-monitor";

/// Value good enough to be used with parachains using the current backend implementation
/// that ships with Substrate. This value may change in the future.
pub const MAX_LEAVES_PER_LEVEL_SENSIBLE_DEFAULT: usize = 32;

// Counter threshold after which we are going to eventually cleanup our internal data.
const CLEANUP_THRESHOLD: u32 = 32;

/// Upper bound to the number of leaves allowed for each level of the blockchain.
///
/// If the limit is set and more leaves are detected on block import, then the older ones are
/// dropped to make space for the fresh blocks.
///
/// In environments where blocks confirmations from the relay chain may be "slow", then
/// setting an upper bound helps keeping the chain health by dropping old (presumably) stale
/// leaves and prevents discarding new blocks because we've reached the backend max value.
pub enum LevelLimit {
	/// Limit set to [`MAX_LEAVES_PER_LEVEL_SENSIBLE_DEFAULT`].
	Default,
	/// No explicit limit, however a limit may be implicitly imposed by the backend implementation.
	None,
	/// Custom value.
	Some(usize),
}

/// Support structure to constrain the number of leaves at each level.
pub struct LevelMonitor<Block: BlockT, BE> {
	/// Max number of leaves for each level.
	level_limit: usize,
	/// Monotonic counter used to keep track of block freshness.
	pub(crate) import_counter: NumberFor<Block>,
	/// Map between blocks hashes and freshness.
	pub(crate) freshness: HashMap<Block::Hash, NumberFor<Block>>,
	/// Blockchain levels cache.
	pub(crate) levels: HashMap<NumberFor<Block>, HashSet<Block::Hash>>,
	/// Lower level number stored by the levels map.
	lowest_level: NumberFor<Block>,
	/// Backend reference to remove blocks on level saturation.
	backend: Arc<BE>,
}

/// Contains information about the target scheduled for removal.
struct TargetInfo<Block: BlockT> {
	/// Index of freshest leaf in the leaves array.
	freshest_leaf_idx: usize,
	/// Route from target to its freshest leaf.
	freshest_route: TreeRoute<Block>,
}

impl<Block, BE> LevelMonitor<Block, BE>
where
	Block: BlockT,
	BE: Backend<Block>,
{
	/// Instance a new monitor structure.
	pub fn new(level_limit: usize, backend: Arc<BE>) -> Self {
		let mut monitor = LevelMonitor {
			level_limit,
			import_counter: Zero::zero(),
			freshness: HashMap::new(),
			levels: HashMap::new(),
			lowest_level: Zero::zero(),
			backend,
		};
		monitor.restore();
		monitor
	}

	/// Restore the structure using the backend.
	///
	/// Blocks freshness values are inferred from the height and not from the effective import
	/// moment. This is a not accurate but "good-enough" best effort solution.
	///
	/// Level limits are not enforced during this phase.
	fn restore(&mut self) {
		const ERR_MSG: &str = "route from finalized to leaf should be available; qed";
		let info = self.backend.blockchain().info();

		log::debug!(
			target: LOG_TARGET,
			"Restoring chain level monitor from last finalized block: {} {}",
			info.finalized_number,
			info.finalized_hash
		);

		self.lowest_level = info.finalized_number;
		self.import_counter = info.finalized_number;

		for leaf in self.backend.blockchain().leaves().unwrap_or_default() {
			let mut meta = self.backend.blockchain().header_metadata(leaf).expect(ERR_MSG);

			self.import_counter = self.import_counter.max(meta.number);

			// Populate the monitor until we don't hit an already imported branch
			while !self.freshness.contains_key(&meta.hash) {
				self.freshness.insert(meta.hash, meta.number);
				self.levels.entry(meta.number).or_default().insert(meta.hash);
				if meta.number <= self.lowest_level {
					break
				}
				meta = self.backend.blockchain().header_metadata(meta.parent).expect(ERR_MSG);
			}
		}

		log::debug!(
			target: LOG_TARGET,
			"Restored chain level monitor up to height {}",
			self.import_counter
		);
	}

	/// Check and enforce the limit bound at the given height.
	///
	/// In practice this will enforce the given height in having a number of blocks less than
	/// the limit passed to the constructor.
	///
	/// If the given level is found to have a number of blocks greater than or equal the limit
	/// then the limit is enforced by chosing one (or more) blocks to remove.
	///
	/// The removal strategy is driven by the block freshness.
	///
	/// A block freshness is determined by the most recent leaf freshness descending from the block
	/// itself. In other words its freshness is equal to its more "fresh" descendant.
	///
	/// The least "fresh" blocks are eventually removed.
	pub fn enforce_limit(&mut self, number: NumberFor<Block>) {
		let level_len = self.levels.get(&number).map(|l| l.len()).unwrap_or_default();
		if level_len < self.level_limit {
			return
		}

		// Sort leaves by freshness only once (less fresh first) and keep track of
		// leaves that were invalidated on removal.
		let mut leaves = self.backend.blockchain().leaves().unwrap_or_default();
		leaves.sort_unstable_by(|a, b| self.freshness.get(a).cmp(&self.freshness.get(b)));
		let mut invalidated_leaves = HashSet::new();

		// This may not be the most efficient way to remove **multiple** entries, but is the easy
		// one :-). Should be considered that in "normal" conditions the number of blocks to remove
		// is 0 or 1, it is not worth to complicate the code too much. One condition that may
		// trigger multiple removals (2+) is if we restart the node using an existing db and a
		// smaller limit wrt the one previously used.
		let remove_count = level_len - self.level_limit + 1;

		log::debug!(
			target: LOG_TARGET,
			"Detected leaves overflow at height {number}, removing {remove_count} obsolete blocks",
		);

		(0..remove_count).all(|_| {
			self.find_target(number, &leaves, &invalidated_leaves).map_or(false, |target| {
				self.remove_target(target, number, &leaves, &mut invalidated_leaves);
				true
			})
		});
	}

	// Helper function to find the best candidate to be removed.
	//
	// Given a set of blocks with height equal to `number` (potential candidates)
	// 1. For each candidate fetch all the leaves that are descending from it.
	// 2. Set the candidate freshness equal to the fresher of its descending leaves.
	// 3. The target is set as the candidate that is less fresh.
	//
	// Input `leaves` are assumed to be already ordered by "freshness" (less fresh first).
	//
	// Returns the index of the target fresher leaf within `leaves` and the route from target to
	// such leaf.
	fn find_target(
		&self,
		number: NumberFor<Block>,
		leaves: &[Block::Hash],
		invalidated_leaves: &HashSet<usize>,
	) -> Option<TargetInfo<Block>> {
		let mut target_info: Option<TargetInfo<Block>> = None;
		let blockchain = self.backend.blockchain();
		let best_hash = blockchain.info().best_hash;

		// Leaves that where already assigned to some node and thus can be skipped
		// during the search.
		let mut assigned_leaves = HashSet::new();

		let level = self.levels.get(&number)?;

		for blk_hash in level.iter().filter(|hash| **hash != best_hash) {
			// Search for the fresher leaf information for this block
			let candidate_info = leaves
				.iter()
				.enumerate()
				.filter(|(leaf_idx, _)| {
					!assigned_leaves.contains(leaf_idx) && !invalidated_leaves.contains(leaf_idx)
				})
				.rev()
				.find_map(|(leaf_idx, leaf_hash)| {
					if blk_hash == leaf_hash {
						let entry = HashAndNumber { number, hash: *blk_hash };
						TreeRoute::new(vec![entry], 0).ok().map(|freshest_route| TargetInfo {
							freshest_leaf_idx: leaf_idx,
							freshest_route,
						})
					} else {
						match sp_blockchain::tree_route(blockchain, *blk_hash, *leaf_hash) {
							Ok(route) if route.retracted().is_empty() => Some(TargetInfo {
								freshest_leaf_idx: leaf_idx,
								freshest_route: route,
							}),
							Err(err) => {
								log::warn!(
									target: LOG_TARGET,
									"(Lookup) Unable getting route from {:?} to {:?}: {}",
									blk_hash,
									leaf_hash,
									err,
								);
								None
							},
							_ => None,
						}
					}
				});

			let candidate_info = match candidate_info {
				Some(candidate_info) => {
					assigned_leaves.insert(candidate_info.freshest_leaf_idx);
					candidate_info
				},
				None => {
					// This should never happen
					log::error!(
						target: LOG_TARGET,
						"Unable getting route to any leaf from {:?} (this is a bug)",
						blk_hash,
					);
					continue
				},
			};

			// Found fresher leaf for this candidate.
			// This candidate is set as the new target if:
			// 1. its fresher leaf is less fresh than the previous target fresher leaf AND
			// 2. best block is not in its route

			let is_less_fresh = || {
				target_info
					.as_ref()
					.map(|ti| candidate_info.freshest_leaf_idx < ti.freshest_leaf_idx)
					.unwrap_or(true)
			};
			let not_contains_best = || {
				candidate_info
					.freshest_route
					.enacted()
					.iter()
					.all(|entry| entry.hash != best_hash)
			};

			if is_less_fresh() && not_contains_best() {
				let early_stop = candidate_info.freshest_leaf_idx == 0;
				target_info = Some(candidate_info);
				if early_stop {
					// We will never find a candidate with an worst freshest leaf than this.
					break
				}
			}
		}

		target_info
	}

	// Remove the target block and all its descendants.
	//
	// Leaves should have already been ordered by "freshness" (less fresh first).
	fn remove_target(
		&mut self,
		target: TargetInfo<Block>,
		number: NumberFor<Block>,
		leaves: &[Block::Hash],
		invalidated_leaves: &mut HashSet<usize>,
	) {
		let mut remove_leaf = |number, hash| {
			log::debug!(target: LOG_TARGET, "Removing block (@{}) {:?}", number, hash);
			if let Err(err) = self.backend.remove_leaf_block(hash) {
				log::debug!(target: LOG_TARGET, "Remove not possible for {}: {}", hash, err);
				return false
			}
			self.levels.get_mut(&number).map(|level| level.remove(&hash));
			self.freshness.remove(&hash);
			true
		};

		invalidated_leaves.insert(target.freshest_leaf_idx);

		// Takes care of route removal. Starts from the leaf and stops as soon as an error is
		// encountered. In this case an error is interpreted as the block being not a leaf
		// and it will be removed while removing another route from the same block but to a
		// different leaf.
		let mut remove_route = |route: TreeRoute<Block>| {
			route.enacted().iter().rev().all(|elem| remove_leaf(elem.number, elem.hash));
		};

		let target_hash = target.freshest_route.common_block().hash;
		debug_assert_eq!(
			target.freshest_route.common_block().number,
			number,
			"This is a bug in LevelMonitor::find_target() or the Backend is corrupted"
		);

		// Remove freshest (cached) route first.
		remove_route(target.freshest_route);

		// Don't bother trying with leaves we already found to not be our descendants.
		let to_skip = leaves.len() - target.freshest_leaf_idx;
		leaves.iter().enumerate().rev().skip(to_skip).for_each(|(leaf_idx, leaf_hash)| {
			if invalidated_leaves.contains(&leaf_idx) {
				return
			}
			match sp_blockchain::tree_route(self.backend.blockchain(), target_hash, *leaf_hash) {
				Ok(route) if route.retracted().is_empty() => {
					invalidated_leaves.insert(leaf_idx);
					remove_route(route);
				},
				Err(err) => {
					log::warn!(
						target: LOG_TARGET,
						"(Removal) unable getting route from {:?} to {:?}: {}",
						target_hash,
						leaf_hash,
						err,
					);
				},
				_ => (),
			};
		});

		remove_leaf(number, target_hash);
	}

	/// Add a new imported block information to the monitor.
	pub fn block_imported(&mut self, number: NumberFor<Block>, hash: Block::Hash) {
		let finalized_num = self.backend.blockchain().info().finalized_number;

		if number > finalized_num {
			// Only blocks above the last finalized block should be added to the monitor
			self.import_counter += One::one();
			self.freshness.insert(hash, self.import_counter);
			self.levels.entry(number).or_default().insert(hash);
		}

		let delta: u32 = finalized_num.saturating_sub(self.lowest_level).unique_saturated_into();
		if delta >= CLEANUP_THRESHOLD {
			// Do cleanup once in a while, we are allowed to have some obsolete information.
			for i in 0..delta {
				let number = self.lowest_level + i.unique_saturated_into();
				self.levels.remove(&number).map(|level| {
					level.iter().for_each(|hash| {
						self.freshness.remove(hash);
					})
				});
			}
			self.lowest_level = finalized_num;
		}
	}
}