1#![warn(missing_docs)]
39
40mod aux_schema;
41mod offchain_mmr;
42#[cfg(test)]
43pub mod test_utils;
44
45use crate::offchain_mmr::OffchainMmr;
46use futures::StreamExt;
47use log::{debug, error, trace, warn};
48use sc_client_api::{Backend, BlockchainEvents, FinalityNotification, FinalityNotifications};
49use sc_offchain::OffchainDb;
50use sp_api::ProvideRuntimeApi;
51use sp_blockchain::{HeaderBackend, HeaderMetadata};
52use sp_consensus_beefy::MmrRootHash;
53use sp_mmr_primitives::{utils, LeafIndex, MmrApi};
54use sp_runtime::traits::{Block, Header, NumberFor};
55use std::{marker::PhantomData, sync::Arc};
56
57pub const LOG_TARGET: &str = "mmr";
59
60pub trait MmrClient<B, BE>:
63 BlockchainEvents<B> + HeaderBackend<B> + HeaderMetadata<B> + ProvideRuntimeApi<B>
64where
65 B: Block,
66 BE: Backend<B>,
67 Self::Api: MmrApi<B, MmrRootHash, NumberFor<B>>,
68{
69 fn first_mmr_block_num(&self, notification: &FinalityNotification<B>) -> Option<NumberFor<B>> {
71 let best_block_hash = notification.header.hash();
72 let best_block_number = *notification.header.number();
73 match self.runtime_api().mmr_leaf_count(best_block_hash) {
74 Ok(Ok(mmr_leaf_count)) => {
75 match utils::first_mmr_block_num::<B::Header>(best_block_number, mmr_leaf_count) {
76 Ok(first_mmr_block) => {
77 debug!(
78 target: LOG_TARGET,
79 "pallet-mmr detected at block {:?} with genesis at block {:?}",
80 best_block_number,
81 first_mmr_block
82 );
83 Some(first_mmr_block)
84 },
85 Err(e) => {
86 error!(
87 target: LOG_TARGET,
88 "Error calculating the first mmr block: {:?}", e
89 );
90 None
91 },
92 }
93 },
94 _ => {
95 trace!(
96 target: LOG_TARGET,
97 "pallet-mmr not detected at block {:?} ... (best finalized {:?})",
98 best_block_number,
99 notification.header.number()
100 );
101 None
102 },
103 }
104 }
105}
106
107impl<B, BE, T> MmrClient<B, BE> for T
108where
109 B: Block,
110 BE: Backend<B>,
111 T: BlockchainEvents<B> + HeaderBackend<B> + HeaderMetadata<B> + ProvideRuntimeApi<B>,
112 T::Api: MmrApi<B, MmrRootHash, NumberFor<B>>,
113{
114 }
116
117struct OffchainMmrBuilder<B: Block, BE: Backend<B>, C> {
118 backend: Arc<BE>,
119 client: Arc<C>,
120 offchain_db: OffchainDb<BE::OffchainStorage>,
121 indexing_prefix: Vec<u8>,
122
123 _phantom: PhantomData<B>,
124}
125
126impl<B, BE, C> OffchainMmrBuilder<B, BE, C>
127where
128 B: Block,
129 BE: Backend<B>,
130 C: MmrClient<B, BE>,
131 C::Api: MmrApi<B, MmrRootHash, NumberFor<B>>,
132{
133 async fn try_build(
134 self,
135 finality_notifications: &mut FinalityNotifications<B>,
136 ) -> Option<OffchainMmr<B, BE, C>> {
137 while let Some(notification) = finality_notifications.next().await {
138 if let Some(first_mmr_block_num) = self.client.first_mmr_block_num(¬ification) {
139 let mut offchain_mmr = OffchainMmr::new(
140 self.backend,
141 self.client,
142 self.offchain_db,
143 self.indexing_prefix,
144 first_mmr_block_num,
145 )?;
146 offchain_mmr.canonicalize_catch_up(¬ification);
149 offchain_mmr.canonicalize_and_prune(notification);
152 return Some(offchain_mmr)
153 }
154 }
155
156 error!(
157 target: LOG_TARGET,
158 "Finality notifications stream closed unexpectedly. \
159 Couldn't build the canonicalization engine",
160 );
161 None
162 }
163}
164
165pub struct MmrGadget<B: Block, BE: Backend<B>, C> {
167 finality_notifications: FinalityNotifications<B>,
168
169 _phantom: PhantomData<(B, BE, C)>,
170}
171
172impl<B, BE, C> MmrGadget<B, BE, C>
173where
174 B: Block,
175 <B::Header as Header>::Number: Into<LeafIndex>,
176 BE: Backend<B>,
177 C: MmrClient<B, BE>,
178 C::Api: MmrApi<B, MmrRootHash, NumberFor<B>>,
179{
180 async fn run(mut self, builder: OffchainMmrBuilder<B, BE, C>) {
181 let mut offchain_mmr = match builder.try_build(&mut self.finality_notifications).await {
182 Some(offchain_mmr) => offchain_mmr,
183 None => return,
184 };
185
186 while let Some(notification) = self.finality_notifications.next().await {
187 offchain_mmr.canonicalize_and_prune(notification);
188 }
189 }
190
191 pub async fn start(client: Arc<C>, backend: Arc<BE>, indexing_prefix: Vec<u8>) {
193 let offchain_db = match backend.offchain_storage() {
194 Some(offchain_storage) => OffchainDb::new(offchain_storage),
195 None => {
196 warn!(
197 target: LOG_TARGET,
198 "Can't spawn a MmrGadget for a node without offchain storage."
199 );
200 return
201 },
202 };
203
204 let mmr_gadget = MmrGadget::<B, BE, C> {
205 finality_notifications: client.finality_notification_stream(),
206
207 _phantom: Default::default(),
208 };
209 mmr_gadget
210 .run(OffchainMmrBuilder {
211 backend,
212 client,
213 offchain_db,
214 indexing_prefix,
215 _phantom: Default::default(),
216 })
217 .await
218 }
219}
220
221#[cfg(test)]
222mod tests {
223 use crate::test_utils::run_test_with_mmr_gadget;
224 use sp_runtime::generic::BlockId;
225 use std::time::Duration;
226
227 #[test]
228 fn mmr_first_block_is_computed_correctly() {
229 run_test_with_mmr_gadget(|client| async move {
231 let a1 = client.import_block(&BlockId::Number(0), b"a1", Some(0)).await;
236 let a2 = client.import_block(&BlockId::Hash(a1.hash()), b"a2", Some(1)).await;
237
238 client.finalize_block(a1.hash(), Some(1));
239 tokio::time::sleep(Duration::from_millis(200)).await;
240 client.assert_canonicalized(&[&a1]);
242 client.assert_not_pruned(&[&a2]);
243 });
244
245 run_test_with_mmr_gadget(|client| async move {
247 let a1 = client.import_block(&BlockId::Number(0), b"a1", None).await;
252 let a2 = client.import_block(&BlockId::Hash(a1.hash()), b"a2", None).await;
253 let a3 = client.import_block(&BlockId::Hash(a2.hash()), b"a3", None).await;
254 let a4 = client.import_block(&BlockId::Hash(a3.hash()), b"a4", Some(0)).await;
255 let a5 = client.import_block(&BlockId::Hash(a4.hash()), b"a5", Some(1)).await;
256 let a6 = client.import_block(&BlockId::Hash(a5.hash()), b"a6", Some(2)).await;
257
258 client.finalize_block(a5.hash(), Some(2));
259 tokio::time::sleep(Duration::from_millis(200)).await;
260 client.assert_canonicalized(&[&a4, &a5]);
262 client.assert_not_pruned(&[&a6]);
263 });
264 }
265
266 #[test]
267 fn does_not_panic_on_invalid_num_mmr_blocks() {
268 run_test_with_mmr_gadget(|client| async move {
269 let a1 = client.import_block(&BlockId::Number(0), b"a1", Some(0)).await;
274
275 client.finalize_block(a1.hash(), Some(2));
278 tokio::time::sleep(Duration::from_millis(200)).await;
279 client.assert_not_canonicalized(&[&a1]);
281 });
282 }
283}