#![warn(missing_docs)]
mod aux_schema;
mod offchain_mmr;
#[cfg(test)]
pub mod test_utils;
use crate::offchain_mmr::OffchainMmr;
use futures::StreamExt;
use log::{debug, error, trace, warn};
use sc_client_api::{Backend, BlockchainEvents, FinalityNotification, FinalityNotifications};
use sc_offchain::OffchainDb;
use sp_api::ProvideRuntimeApi;
use sp_blockchain::{HeaderBackend, HeaderMetadata};
use sp_consensus_beefy::MmrRootHash;
use sp_mmr_primitives::{utils, LeafIndex, MmrApi};
use sp_runtime::traits::{Block, Header, NumberFor};
use std::{marker::PhantomData, sync::Arc};
pub const LOG_TARGET: &str = "mmr";
pub trait MmrClient<B, BE>:
BlockchainEvents<B> + HeaderBackend<B> + HeaderMetadata<B> + ProvideRuntimeApi<B>
where
B: Block,
BE: Backend<B>,
Self::Api: MmrApi<B, MmrRootHash, NumberFor<B>>,
{
fn first_mmr_block_num(&self, notification: &FinalityNotification<B>) -> Option<NumberFor<B>> {
let best_block_hash = notification.header.hash();
let best_block_number = *notification.header.number();
match self.runtime_api().mmr_leaf_count(best_block_hash) {
Ok(Ok(mmr_leaf_count)) => {
match utils::first_mmr_block_num::<B::Header>(best_block_number, mmr_leaf_count) {
Ok(first_mmr_block) => {
debug!(
target: LOG_TARGET,
"pallet-mmr detected at block {:?} with genesis at block {:?}",
best_block_number,
first_mmr_block
);
Some(first_mmr_block)
},
Err(e) => {
error!(
target: LOG_TARGET,
"Error calculating the first mmr block: {:?}", e
);
None
},
}
},
_ => {
trace!(
target: LOG_TARGET,
"pallet-mmr not detected at block {:?} ... (best finalized {:?})",
best_block_number,
notification.header.number()
);
None
},
}
}
}
impl<B, BE, T> MmrClient<B, BE> for T
where
B: Block,
BE: Backend<B>,
T: BlockchainEvents<B> + HeaderBackend<B> + HeaderMetadata<B> + ProvideRuntimeApi<B>,
T::Api: MmrApi<B, MmrRootHash, NumberFor<B>>,
{
}
struct OffchainMmrBuilder<B: Block, BE: Backend<B>, C> {
backend: Arc<BE>,
client: Arc<C>,
offchain_db: OffchainDb<BE::OffchainStorage>,
indexing_prefix: Vec<u8>,
_phantom: PhantomData<B>,
}
impl<B, BE, C> OffchainMmrBuilder<B, BE, C>
where
B: Block,
BE: Backend<B>,
C: MmrClient<B, BE>,
C::Api: MmrApi<B, MmrRootHash, NumberFor<B>>,
{
async fn try_build(
self,
finality_notifications: &mut FinalityNotifications<B>,
) -> Option<OffchainMmr<B, BE, C>> {
while let Some(notification) = finality_notifications.next().await {
if let Some(first_mmr_block_num) = self.client.first_mmr_block_num(¬ification) {
let mut offchain_mmr = OffchainMmr::new(
self.backend,
self.client,
self.offchain_db,
self.indexing_prefix,
first_mmr_block_num,
)?;
offchain_mmr.canonicalize_catch_up(¬ification);
offchain_mmr.canonicalize_and_prune(notification);
return Some(offchain_mmr)
}
}
error!(
target: LOG_TARGET,
"Finality notifications stream closed unexpectedly. \
Couldn't build the canonicalization engine",
);
None
}
}
pub struct MmrGadget<B: Block, BE: Backend<B>, C> {
finality_notifications: FinalityNotifications<B>,
_phantom: PhantomData<(B, BE, C)>,
}
impl<B, BE, C> MmrGadget<B, BE, C>
where
B: Block,
<B::Header as Header>::Number: Into<LeafIndex>,
BE: Backend<B>,
C: MmrClient<B, BE>,
C::Api: MmrApi<B, MmrRootHash, NumberFor<B>>,
{
async fn run(mut self, builder: OffchainMmrBuilder<B, BE, C>) {
let mut offchain_mmr = match builder.try_build(&mut self.finality_notifications).await {
Some(offchain_mmr) => offchain_mmr,
None => return,
};
while let Some(notification) = self.finality_notifications.next().await {
offchain_mmr.canonicalize_and_prune(notification);
}
}
pub async fn start(client: Arc<C>, backend: Arc<BE>, indexing_prefix: Vec<u8>) {
let offchain_db = match backend.offchain_storage() {
Some(offchain_storage) => OffchainDb::new(offchain_storage),
None => {
warn!(
target: LOG_TARGET,
"Can't spawn a MmrGadget for a node without offchain storage."
);
return
},
};
let mmr_gadget = MmrGadget::<B, BE, C> {
finality_notifications: client.finality_notification_stream(),
_phantom: Default::default(),
};
mmr_gadget
.run(OffchainMmrBuilder {
backend,
client,
offchain_db,
indexing_prefix,
_phantom: Default::default(),
})
.await
}
}
#[cfg(test)]
mod tests {
use crate::test_utils::run_test_with_mmr_gadget;
use sp_runtime::generic::BlockId;
use std::time::Duration;
#[test]
fn mmr_first_block_is_computed_correctly() {
run_test_with_mmr_gadget(|client| async move {
let a1 = client.import_block(&BlockId::Number(0), b"a1", Some(0)).await;
let a2 = client.import_block(&BlockId::Hash(a1.hash()), b"a2", Some(1)).await;
client.finalize_block(a1.hash(), Some(1));
tokio::time::sleep(Duration::from_millis(200)).await;
client.assert_canonicalized(&[&a1]);
client.assert_not_pruned(&[&a2]);
});
run_test_with_mmr_gadget(|client| async move {
let a1 = client.import_block(&BlockId::Number(0), b"a1", None).await;
let a2 = client.import_block(&BlockId::Hash(a1.hash()), b"a2", None).await;
let a3 = client.import_block(&BlockId::Hash(a2.hash()), b"a3", None).await;
let a4 = client.import_block(&BlockId::Hash(a3.hash()), b"a4", Some(0)).await;
let a5 = client.import_block(&BlockId::Hash(a4.hash()), b"a5", Some(1)).await;
let a6 = client.import_block(&BlockId::Hash(a5.hash()), b"a6", Some(2)).await;
client.finalize_block(a5.hash(), Some(2));
tokio::time::sleep(Duration::from_millis(200)).await;
client.assert_canonicalized(&[&a4, &a5]);
client.assert_not_pruned(&[&a6]);
});
}
#[test]
fn does_not_panic_on_invalid_num_mmr_blocks() {
run_test_with_mmr_gadget(|client| async move {
let a1 = client.import_block(&BlockId::Number(0), b"a1", Some(0)).await;
client.finalize_block(a1.hash(), Some(2));
tokio::time::sleep(Duration::from_millis(200)).await;
client.assert_not_canonicalized(&[&a1]);
});
}
}