1#![warn(missing_docs)]
23
24use crate::{aux_schema, MmrClient, LOG_TARGET};
25use log::{debug, error, info, warn};
26use sc_client_api::{Backend, FinalityNotification};
27use sc_offchain::OffchainDb;
28use sp_blockchain::{CachedHeaderMetadata, ForkBackend};
29use sp_consensus_beefy::MmrRootHash;
30use sp_core::offchain::{DbExternalities, StorageKind};
31use sp_mmr_primitives::{utils, utils::NodesUtils, MmrApi, NodeIndex};
32use sp_runtime::{
33 traits::{Block, Header, NumberFor, One},
34 Saturating,
35};
36use std::{collections::VecDeque, default::Default, sync::Arc};
37
38pub struct OffchainMmr<B: Block, BE: Backend<B>, C> {
40 backend: Arc<BE>,
41 client: Arc<C>,
42 offchain_db: OffchainDb<BE::OffchainStorage>,
43 indexing_prefix: Vec<u8>,
44 first_mmr_block: NumberFor<B>,
45 best_canonicalized: NumberFor<B>,
46}
47
48impl<B, BE, C> OffchainMmr<B, BE, C>
49where
50 BE: Backend<B>,
51 B: Block,
52 C: MmrClient<B, BE>,
53 C::Api: MmrApi<B, MmrRootHash, NumberFor<B>>,
54{
55 pub fn new(
56 backend: Arc<BE>,
57 client: Arc<C>,
58 offchain_db: OffchainDb<BE::OffchainStorage>,
59 indexing_prefix: Vec<u8>,
60 first_mmr_block: NumberFor<B>,
61 ) -> Option<Self> {
62 let mut best_canonicalized = first_mmr_block.saturating_sub(One::one());
63 best_canonicalized = aux_schema::load_or_init_state::<B, BE>(&*backend, best_canonicalized)
64 .map_err(|e| error!(target: LOG_TARGET, "Error loading state from aux db: {:?}", e))
65 .ok()?;
66
67 Some(Self {
68 backend,
69 client,
70 offchain_db,
71 indexing_prefix,
72 first_mmr_block,
73 best_canonicalized,
74 })
75 }
76
77 fn node_temp_offchain_key(&self, pos: NodeIndex, parent_hash: B::Hash) -> Vec<u8> {
78 NodesUtils::node_temp_offchain_key::<B::Header>(&self.indexing_prefix, pos, parent_hash)
79 }
80
81 fn node_canon_offchain_key(&self, pos: NodeIndex) -> Vec<u8> {
82 NodesUtils::node_canon_offchain_key(&self.indexing_prefix, pos)
83 }
84
85 fn write_gadget_state_or_log(&self) {
86 if let Err(e) =
87 aux_schema::write_gadget_state::<B, BE>(&*self.backend, &self.best_canonicalized)
88 {
89 debug!(target: LOG_TARGET, "error saving state: {:?}", e);
90 }
91 }
92
93 fn header_metadata_or_log(
94 &self,
95 hash: B::Hash,
96 action: &str,
97 ) -> Option<CachedHeaderMetadata<B>> {
98 match self.client.header_metadata(hash) {
99 Ok(header) => Some(header),
100 _ => {
101 debug!(
102 target: LOG_TARGET,
103 "Block {} not found. Couldn't {} associated branch.", hash, action
104 );
105 None
106 },
107 }
108 }
109
110 fn right_branch_ending_in_block_or_log(
111 &self,
112 block_num: NumberFor<B>,
113 action: &str,
114 ) -> Option<Vec<NodeIndex>> {
115 match utils::block_num_to_leaf_index::<B::Header>(block_num, self.first_mmr_block) {
116 Ok(leaf_idx) => {
117 let branch = NodesUtils::right_branch_ending_in_leaf(leaf_idx);
118 debug!(
119 target: LOG_TARGET,
120 "Nodes to {} for block {}: {:?}", action, block_num, branch
121 );
122 Some(branch)
123 },
124 Err(e) => {
125 error!(
126 target: LOG_TARGET,
127 "Error converting block number {} to leaf index: {:?}. \
128 Couldn't {} associated branch.",
129 block_num,
130 e,
131 action
132 );
133 None
134 },
135 }
136 }
137
138 fn prune_branch(&mut self, block_hash: &B::Hash) {
139 let action = "prune";
140 let header = match self.header_metadata_or_log(*block_hash, action) {
141 Some(header) => header,
142 _ => return,
143 };
144
145 let stale_nodes = match self.right_branch_ending_in_block_or_log(header.number, action) {
148 Some(nodes) => nodes,
149 None => {
150 return
153 },
154 };
155
156 for pos in stale_nodes {
157 let temp_key = self.node_temp_offchain_key(pos, header.parent);
158 self.offchain_db.local_storage_clear(StorageKind::PERSISTENT, &temp_key);
159 debug!(target: LOG_TARGET, "Pruned elem at pos {} with temp key {:?}", pos, temp_key);
160 }
161 }
162
163 fn canonicalize_branch(&mut self, block_hash: B::Hash) {
164 let action = "canonicalize";
165 let header = match self.header_metadata_or_log(block_hash, action) {
166 Some(header) => header,
167 _ => return,
168 };
169
170 if header.number < self.first_mmr_block {
173 return
174 }
175
176 let to_canon_nodes = match self.right_branch_ending_in_block_or_log(header.number, action) {
179 Some(nodes) => nodes,
180 None => {
181 self.best_canonicalized = header.number;
184 return
185 },
186 };
187
188 for pos in to_canon_nodes {
189 let temp_key = self.node_temp_offchain_key(pos, header.parent);
190 if let Some(elem) =
191 self.offchain_db.local_storage_get(StorageKind::PERSISTENT, &temp_key)
192 {
193 let canon_key = self.node_canon_offchain_key(pos);
194 self.offchain_db.local_storage_set(StorageKind::PERSISTENT, &canon_key, &elem);
195 self.offchain_db.local_storage_clear(StorageKind::PERSISTENT, &temp_key);
196 debug!(
197 target: LOG_TARGET,
198 "Moved elem at pos {} from temp key {:?} to canon key {:?}",
199 pos,
200 temp_key,
201 canon_key
202 );
203 } else {
204 debug!(
205 target: LOG_TARGET,
206 "Couldn't canonicalize elem at pos {} using temp key {:?}", pos, temp_key
207 );
208 }
209 }
210 if self.best_canonicalized != header.number.saturating_sub(One::one()) {
211 warn!(
212 target: LOG_TARGET,
213 "Detected canonicalization skip: best {:?} current {:?}.",
214 self.best_canonicalized,
215 header.number,
216 );
217 }
218 self.best_canonicalized = header.number;
219 }
220
221 pub fn canonicalize_catch_up(&mut self, notification: &FinalityNotification<B>) {
224 let first = notification.tree_route.first().unwrap_or(¬ification.hash);
225 if let Some(mut header) = self.header_metadata_or_log(*first, "canonicalize") {
226 let mut to_canon = VecDeque::<<B as Block>::Hash>::new();
227 loop {
229 header = match self.header_metadata_or_log(header.parent, "canonicalize") {
230 Some(header) => header,
231 _ => break,
232 };
233 if header.number <= self.best_canonicalized {
234 break
235 }
236 to_canon.push_front(header.hash);
237 }
238 for hash in to_canon.drain(..) {
240 self.canonicalize_branch(hash);
241 }
242 self.write_gadget_state_or_log();
243 }
244 }
245
246 fn handle_potential_pallet_reset(&mut self, notification: &FinalityNotification<B>) {
247 if let Some(first_mmr_block_num) = self.client.first_mmr_block_num(¬ification) {
248 if first_mmr_block_num != self.first_mmr_block {
249 info!(
250 target: LOG_TARGET,
251 "pallet-mmr reset detected at block {:?} with new genesis at block {:?}",
252 notification.header.number(),
253 first_mmr_block_num
254 );
255 self.first_mmr_block = first_mmr_block_num;
256 self.best_canonicalized = first_mmr_block_num.saturating_sub(One::one());
257 self.write_gadget_state_or_log();
258 }
259 }
260 }
261
262 pub fn canonicalize_and_prune(&mut self, notification: FinalityNotification<B>) {
266 self.handle_potential_pallet_reset(¬ification);
268
269 for hash in notification.tree_route.iter().chain(std::iter::once(¬ification.hash)) {
271 self.canonicalize_branch(*hash);
272 }
273 self.write_gadget_state_or_log();
274
275 let stale_forks = self.client.expand_forks(¬ification.stale_heads).unwrap_or_else(|e| {
277 warn!(target: LOG_TARGET, "{:?}", e);
278
279 Default::default()
280 });
281 for hash in stale_forks.iter() {
282 self.prune_branch(hash);
283 }
284 }
285}
286
287#[cfg(test)]
288mod tests {
289 use crate::test_utils::{run_test_with_mmr_gadget, run_test_with_mmr_gadget_pre_post};
290 use parking_lot::Mutex;
291 use sp_runtime::generic::BlockId;
292 use std::{sync::Arc, time::Duration};
293
294 #[test]
295 fn canonicalize_and_prune_works_correctly() {
296 run_test_with_mmr_gadget(|client| async move {
297 let a1 = client.import_block(&BlockId::Number(0), b"a1", Some(0)).await;
303 let a2 = client.import_block(&BlockId::Hash(a1.hash()), b"a2", Some(1)).await;
304 let a3 = client.import_block(&BlockId::Hash(a2.hash()), b"a3", Some(2)).await;
305 let a4 = client.import_block(&BlockId::Hash(a3.hash()), b"a4", Some(3)).await;
306
307 let b1 = client.import_block(&BlockId::Number(0), b"b1", Some(0)).await;
308 let b2 = client.import_block(&BlockId::Hash(b1.hash()), b"b2", Some(1)).await;
309 let b3 = client.import_block(&BlockId::Hash(b2.hash()), b"b3", Some(2)).await;
310
311 let c1 = client.import_block(&BlockId::Number(0), b"c1", Some(0)).await;
312
313 let d4 = client.import_block(&BlockId::Hash(a3.hash()), b"d4", Some(3)).await;
314 let d5 = client.import_block(&BlockId::Hash(d4.hash()), b"d5", Some(4)).await;
315
316 client.finalize_block(a3.hash(), Some(3));
317 tokio::time::sleep(Duration::from_millis(200)).await;
318 client.assert_canonicalized(&[&a1, &a2, &a3]);
320 client.assert_pruned(&[&c1, &b1]);
323
324 client.finalize_block(d5.hash(), Some(5));
325 tokio::time::sleep(Duration::from_millis(200)).await;
326 client.assert_canonicalized(&[&d4, &d5]);
328 client.assert_pruned(&[&b1, &b2, &b3, &a4]);
330 })
331 }
332
333 #[test]
334 fn canonicalize_and_prune_handles_pallet_reset() {
335 run_test_with_mmr_gadget(|client| async move {
336 let a1 = client.import_block(&BlockId::Number(0), b"a1", Some(0)).await;
343 let a2 = client.import_block(&BlockId::Hash(a1.hash()), b"a2", Some(1)).await;
344 let a3 = client.import_block(&BlockId::Hash(a2.hash()), b"a3", Some(0)).await;
345 let a4 = client.import_block(&BlockId::Hash(a3.hash()), b"a4", Some(1)).await;
346 let a5 = client.import_block(&BlockId::Hash(a4.hash()), b"a5", Some(2)).await;
347
348 client.finalize_block(a1.hash(), Some(1));
349 tokio::time::sleep(Duration::from_millis(200)).await;
350 client.assert_canonicalized(&[&a1]);
352 client.assert_not_canonicalized(&[&a2]);
355
356 client.finalize_block(a5.hash(), Some(3));
357 tokio::time::sleep(Duration::from_millis(200)).await;
358 client.assert_canonicalized(&[&a3, &a4, &a5]);
360 })
361 }
362
363 #[test]
364 fn canonicalize_catchup_works_correctly() {
365 let mmr_blocks = Arc::new(Mutex::new(vec![]));
366 let mmr_blocks_ref = mmr_blocks.clone();
367 run_test_with_mmr_gadget_pre_post(
368 |client| async move {
369 let a1 = client.import_block(&BlockId::Number(0), b"a1", Some(0)).await;
376 let a2 = client.import_block(&BlockId::Hash(a1.hash()), b"a2", Some(1)).await;
377
378 client.finalize_block(a2.hash(), Some(2));
379
380 let mut mmr_blocks = mmr_blocks_ref.lock();
381 mmr_blocks.push(a1);
382 mmr_blocks.push(a2);
383 },
384 |client| async move {
385 let blocks = mmr_blocks.lock();
395 let a1 = blocks[0].clone();
396 let a2 = blocks[1].clone();
397 let a3 = client.import_block(&BlockId::Hash(a2.hash()), b"a3", Some(2)).await;
398 let a4 = client.import_block(&BlockId::Hash(a3.hash()), b"a4", Some(3)).await;
399
400 client.finalize_block(a4.hash(), Some(4));
401 tokio::time::sleep(Duration::from_millis(200)).await;
402 client.assert_canonicalized(&[&a1, &a2, &a3, &a4]);
404 },
405 )
406 }
407
408 #[test]
409 fn canonicalize_catchup_works_correctly_with_pallet_reset() {
410 let mmr_blocks = Arc::new(Mutex::new(vec![]));
411 let mmr_blocks_ref = mmr_blocks.clone();
412 run_test_with_mmr_gadget_pre_post(
413 |client| async move {
414 let a1 = client.import_block(&BlockId::Number(0), b"a1", Some(0)).await;
421 let a2 = client.import_block(&BlockId::Hash(a1.hash()), b"a2", Some(0)).await;
422
423 client.finalize_block(a2.hash(), Some(1));
424
425 let mut mmr_blocks = mmr_blocks_ref.lock();
426 mmr_blocks.push(a1);
427 mmr_blocks.push(a2);
428 },
429 |client| async move {
430 let blocks = mmr_blocks.lock();
441 let a1 = blocks[0].clone();
442 let a2 = blocks[1].clone();
443 let a3 = client.import_block(&BlockId::Hash(a2.hash()), b"a3", Some(1)).await;
444 let a4 = client.import_block(&BlockId::Hash(a3.hash()), b"a4", Some(2)).await;
445
446 client.finalize_block(a4.hash(), Some(3));
447 tokio::time::sleep(Duration::from_millis(200)).await;
448 client.assert_not_pruned(&[&a1]);
452 client.assert_canonicalized(&[&a2, &a3, &a4]);
454 },
455 )
456 }
457}