#![warn(missing_docs)]
use crate::{aux_schema, MmrClient, LOG_TARGET};
use log::{debug, error, info, warn};
use sc_client_api::{Backend, FinalityNotification};
use sc_offchain::OffchainDb;
use sp_blockchain::{CachedHeaderMetadata, ForkBackend};
use sp_consensus_beefy::MmrRootHash;
use sp_core::offchain::{DbExternalities, StorageKind};
use sp_mmr_primitives::{utils, utils::NodesUtils, MmrApi, NodeIndex};
use sp_runtime::{
traits::{Block, Header, NumberFor, One},
Saturating,
};
use std::{collections::VecDeque, default::Default, sync::Arc};
pub struct OffchainMmr<B: Block, BE: Backend<B>, C> {
backend: Arc<BE>,
client: Arc<C>,
offchain_db: OffchainDb<BE::OffchainStorage>,
indexing_prefix: Vec<u8>,
first_mmr_block: NumberFor<B>,
best_canonicalized: NumberFor<B>,
}
impl<B, BE, C> OffchainMmr<B, BE, C>
where
BE: Backend<B>,
B: Block,
C: MmrClient<B, BE>,
C::Api: MmrApi<B, MmrRootHash, NumberFor<B>>,
{
pub fn new(
backend: Arc<BE>,
client: Arc<C>,
offchain_db: OffchainDb<BE::OffchainStorage>,
indexing_prefix: Vec<u8>,
first_mmr_block: NumberFor<B>,
) -> Option<Self> {
let mut best_canonicalized = first_mmr_block.saturating_sub(One::one());
best_canonicalized = aux_schema::load_or_init_state::<B, BE>(&*backend, best_canonicalized)
.map_err(|e| error!(target: LOG_TARGET, "Error loading state from aux db: {:?}", e))
.ok()?;
Some(Self {
backend,
client,
offchain_db,
indexing_prefix,
first_mmr_block,
best_canonicalized,
})
}
fn node_temp_offchain_key(&self, pos: NodeIndex, parent_hash: B::Hash) -> Vec<u8> {
NodesUtils::node_temp_offchain_key::<B::Header>(&self.indexing_prefix, pos, parent_hash)
}
fn node_canon_offchain_key(&self, pos: NodeIndex) -> Vec<u8> {
NodesUtils::node_canon_offchain_key(&self.indexing_prefix, pos)
}
fn write_gadget_state_or_log(&self) {
if let Err(e) =
aux_schema::write_gadget_state::<B, BE>(&*self.backend, &self.best_canonicalized)
{
debug!(target: LOG_TARGET, "error saving state: {:?}", e);
}
}
fn header_metadata_or_log(
&self,
hash: B::Hash,
action: &str,
) -> Option<CachedHeaderMetadata<B>> {
match self.client.header_metadata(hash) {
Ok(header) => Some(header),
_ => {
debug!(
target: LOG_TARGET,
"Block {} not found. Couldn't {} associated branch.", hash, action
);
None
},
}
}
fn right_branch_ending_in_block_or_log(
&self,
block_num: NumberFor<B>,
action: &str,
) -> Option<Vec<NodeIndex>> {
match utils::block_num_to_leaf_index::<B::Header>(block_num, self.first_mmr_block) {
Ok(leaf_idx) => {
let branch = NodesUtils::right_branch_ending_in_leaf(leaf_idx);
debug!(
target: LOG_TARGET,
"Nodes to {} for block {}: {:?}", action, block_num, branch
);
Some(branch)
},
Err(e) => {
error!(
target: LOG_TARGET,
"Error converting block number {} to leaf index: {:?}. \
Couldn't {} associated branch.",
block_num,
e,
action
);
None
},
}
}
fn prune_branch(&mut self, block_hash: &B::Hash) {
let action = "prune";
let header = match self.header_metadata_or_log(*block_hash, action) {
Some(header) => header,
_ => return,
};
let stale_nodes = match self.right_branch_ending_in_block_or_log(header.number, action) {
Some(nodes) => nodes,
None => {
return
},
};
for pos in stale_nodes {
let temp_key = self.node_temp_offchain_key(pos, header.parent);
self.offchain_db.local_storage_clear(StorageKind::PERSISTENT, &temp_key);
debug!(target: LOG_TARGET, "Pruned elem at pos {} with temp key {:?}", pos, temp_key);
}
}
fn canonicalize_branch(&mut self, block_hash: B::Hash) {
let action = "canonicalize";
let header = match self.header_metadata_or_log(block_hash, action) {
Some(header) => header,
_ => return,
};
if header.number < self.first_mmr_block {
return
}
let to_canon_nodes = match self.right_branch_ending_in_block_or_log(header.number, action) {
Some(nodes) => nodes,
None => {
self.best_canonicalized = header.number;
return
},
};
for pos in to_canon_nodes {
let temp_key = self.node_temp_offchain_key(pos, header.parent);
if let Some(elem) =
self.offchain_db.local_storage_get(StorageKind::PERSISTENT, &temp_key)
{
let canon_key = self.node_canon_offchain_key(pos);
self.offchain_db.local_storage_set(StorageKind::PERSISTENT, &canon_key, &elem);
self.offchain_db.local_storage_clear(StorageKind::PERSISTENT, &temp_key);
debug!(
target: LOG_TARGET,
"Moved elem at pos {} from temp key {:?} to canon key {:?}",
pos,
temp_key,
canon_key
);
} else {
debug!(
target: LOG_TARGET,
"Couldn't canonicalize elem at pos {} using temp key {:?}", pos, temp_key
);
}
}
if self.best_canonicalized != header.number.saturating_sub(One::one()) {
warn!(
target: LOG_TARGET,
"Detected canonicalization skip: best {:?} current {:?}.",
self.best_canonicalized,
header.number,
);
}
self.best_canonicalized = header.number;
}
pub fn canonicalize_catch_up(&mut self, notification: &FinalityNotification<B>) {
let first = notification.tree_route.first().unwrap_or(¬ification.hash);
if let Some(mut header) = self.header_metadata_or_log(*first, "canonicalize") {
let mut to_canon = VecDeque::<<B as Block>::Hash>::new();
loop {
header = match self.header_metadata_or_log(header.parent, "canonicalize") {
Some(header) => header,
_ => break,
};
if header.number <= self.best_canonicalized {
break
}
to_canon.push_front(header.hash);
}
for hash in to_canon.drain(..) {
self.canonicalize_branch(hash);
}
self.write_gadget_state_or_log();
}
}
fn handle_potential_pallet_reset(&mut self, notification: &FinalityNotification<B>) {
if let Some(first_mmr_block_num) = self.client.first_mmr_block_num(¬ification) {
if first_mmr_block_num != self.first_mmr_block {
info!(
target: LOG_TARGET,
"pallet-mmr reset detected at block {:?} with new genesis at block {:?}",
notification.header.number(),
first_mmr_block_num
);
self.first_mmr_block = first_mmr_block_num;
self.best_canonicalized = first_mmr_block_num.saturating_sub(One::one());
self.write_gadget_state_or_log();
}
}
}
pub fn canonicalize_and_prune(&mut self, notification: FinalityNotification<B>) {
self.handle_potential_pallet_reset(¬ification);
for hash in notification.tree_route.iter().chain(std::iter::once(¬ification.hash)) {
self.canonicalize_branch(*hash);
}
self.write_gadget_state_or_log();
let stale_forks = self.client.expand_forks(¬ification.stale_heads).unwrap_or_else(|e| {
warn!(target: LOG_TARGET, "{:?}", e);
Default::default()
});
for hash in stale_forks.iter() {
self.prune_branch(hash);
}
}
}
#[cfg(test)]
mod tests {
use crate::test_utils::{run_test_with_mmr_gadget, run_test_with_mmr_gadget_pre_post};
use parking_lot::Mutex;
use sp_runtime::generic::BlockId;
use std::{sync::Arc, time::Duration};
#[test]
fn canonicalize_and_prune_works_correctly() {
run_test_with_mmr_gadget(|client| async move {
let a1 = client.import_block(&BlockId::Number(0), b"a1", Some(0)).await;
let a2 = client.import_block(&BlockId::Hash(a1.hash()), b"a2", Some(1)).await;
let a3 = client.import_block(&BlockId::Hash(a2.hash()), b"a3", Some(2)).await;
let a4 = client.import_block(&BlockId::Hash(a3.hash()), b"a4", Some(3)).await;
let b1 = client.import_block(&BlockId::Number(0), b"b1", Some(0)).await;
let b2 = client.import_block(&BlockId::Hash(b1.hash()), b"b2", Some(1)).await;
let b3 = client.import_block(&BlockId::Hash(b2.hash()), b"b3", Some(2)).await;
let c1 = client.import_block(&BlockId::Number(0), b"c1", Some(0)).await;
let d4 = client.import_block(&BlockId::Hash(a3.hash()), b"d4", Some(3)).await;
let d5 = client.import_block(&BlockId::Hash(d4.hash()), b"d5", Some(4)).await;
client.finalize_block(a3.hash(), Some(3));
tokio::time::sleep(Duration::from_millis(200)).await;
client.assert_canonicalized(&[&a1, &a2, &a3]);
client.assert_pruned(&[&c1, &b1]);
client.finalize_block(d5.hash(), Some(5));
tokio::time::sleep(Duration::from_millis(200)).await;
client.assert_canonicalized(&[&d4, &d5]);
client.assert_pruned(&[&b1, &b2, &b3, &a4]);
})
}
#[test]
fn canonicalize_and_prune_handles_pallet_reset() {
run_test_with_mmr_gadget(|client| async move {
let a1 = client.import_block(&BlockId::Number(0), b"a1", Some(0)).await;
let a2 = client.import_block(&BlockId::Hash(a1.hash()), b"a2", Some(1)).await;
let a3 = client.import_block(&BlockId::Hash(a2.hash()), b"a3", Some(0)).await;
let a4 = client.import_block(&BlockId::Hash(a3.hash()), b"a4", Some(1)).await;
let a5 = client.import_block(&BlockId::Hash(a4.hash()), b"a5", Some(2)).await;
client.finalize_block(a1.hash(), Some(1));
tokio::time::sleep(Duration::from_millis(200)).await;
client.assert_canonicalized(&[&a1]);
client.assert_not_canonicalized(&[&a2]);
client.finalize_block(a5.hash(), Some(3));
tokio::time::sleep(Duration::from_millis(200)).await;
client.assert_canonicalized(&[&a3, &a4, &a5]);
})
}
#[test]
fn canonicalize_catchup_works_correctly() {
let mmr_blocks = Arc::new(Mutex::new(vec![]));
let mmr_blocks_ref = mmr_blocks.clone();
run_test_with_mmr_gadget_pre_post(
|client| async move {
let a1 = client.import_block(&BlockId::Number(0), b"a1", Some(0)).await;
let a2 = client.import_block(&BlockId::Hash(a1.hash()), b"a2", Some(1)).await;
client.finalize_block(a2.hash(), Some(2));
let mut mmr_blocks = mmr_blocks_ref.lock();
mmr_blocks.push(a1);
mmr_blocks.push(a2);
},
|client| async move {
let blocks = mmr_blocks.lock();
let a1 = blocks[0].clone();
let a2 = blocks[1].clone();
let a3 = client.import_block(&BlockId::Hash(a2.hash()), b"a3", Some(2)).await;
let a4 = client.import_block(&BlockId::Hash(a3.hash()), b"a4", Some(3)).await;
client.finalize_block(a4.hash(), Some(4));
tokio::time::sleep(Duration::from_millis(200)).await;
client.assert_canonicalized(&[&a1, &a2, &a3, &a4]);
},
)
}
#[test]
fn canonicalize_catchup_works_correctly_with_pallet_reset() {
let mmr_blocks = Arc::new(Mutex::new(vec![]));
let mmr_blocks_ref = mmr_blocks.clone();
run_test_with_mmr_gadget_pre_post(
|client| async move {
let a1 = client.import_block(&BlockId::Number(0), b"a1", Some(0)).await;
let a2 = client.import_block(&BlockId::Hash(a1.hash()), b"a2", Some(0)).await;
client.finalize_block(a2.hash(), Some(1));
let mut mmr_blocks = mmr_blocks_ref.lock();
mmr_blocks.push(a1);
mmr_blocks.push(a2);
},
|client| async move {
let blocks = mmr_blocks.lock();
let a1 = blocks[0].clone();
let a2 = blocks[1].clone();
let a3 = client.import_block(&BlockId::Hash(a2.hash()), b"a3", Some(1)).await;
let a4 = client.import_block(&BlockId::Hash(a3.hash()), b"a4", Some(2)).await;
client.finalize_block(a4.hash(), Some(3));
tokio::time::sleep(Duration::from_millis(200)).await;
client.assert_not_pruned(&[&a1]);
client.assert_canonicalized(&[&a2, &a3, &a4]);
},
)
}
}