1use crate::{
21 BlockInfoProvider,
22 client::{Client, ClientError, GapFillRequest, SubstrateBlockNumber},
23};
24use pallet_revive::evm::H256;
25use tokio::sync::mpsc;
26
27const LOG_TARGET: &str = "eth-rpc::block-sync";
28
29pub trait SyncStateKey: std::fmt::Display {}
31
32#[derive(Debug, Clone, Copy, derive_more::Display)]
34pub enum SyncLabel {
35 #[display(fmt = "sync-tail")]
37 Tail,
38 #[display(fmt = "sync-head")]
42 Head,
43}
44
45#[derive(Debug, Clone, Copy, derive_more::Display)]
47pub enum ChainMetadata {
48 #[display(fmt = "chain-genesis")]
50 Genesis,
51 #[display(fmt = "chain-first-evm-block")]
53 FirstEvmBlock,
54}
55
56impl SyncStateKey for SyncLabel {}
57impl SyncStateKey for ChainMetadata {}
58
59#[derive(Debug, Clone, Copy, Eq, PartialEq)]
61pub struct SyncCheckpoint {
62 pub block_number: SubstrateBlockNumber,
63 pub block_hash: Option<H256>,
64}
65
66impl SyncCheckpoint {
67 pub fn new(block_number: SubstrateBlockNumber, block_hash: H256) -> Self {
69 Self { block_number, block_hash: Some(block_hash) }
70 }
71
72 pub fn from_number(block_number: SubstrateBlockNumber) -> Self {
74 Self { block_number, block_hash: None }
75 }
76}
77
78const BLOCK_INTERVAL: u32 = 128;
80
81struct BackwardSyncRange {
83 from: SubstrateBlockNumber,
84 to: SubstrateBlockNumber,
85 set_head: bool,
87 checkpoint_tail: bool,
89 persist_first_evm_block: bool,
91}
92
93impl Client {
94 async fn validate_chain_identity(&self) -> Result<H256, ClientError> {
96 let genesis_hash: H256 = self.api().genesis_hash();
97
98 if let Some(checkpoint) =
99 self.receipt_provider().get_sync_label(ChainMetadata::Genesis).await?
100 {
101 if let Some(stored) = checkpoint.block_hash {
102 if stored != genesis_hash {
103 return Err(ClientError::ChainMismatch);
104 }
105 }
106 }
107
108 Ok(genesis_hash)
109 }
110
111 async fn verify_boundary(&self, checkpoint: &SyncCheckpoint) -> Result<(), ClientError> {
113 let num = checkpoint.block_number;
114 let hash = checkpoint.block_hash;
115 match (num, hash) {
116 (_, None) => {
117 log::error!(target: LOG_TARGET,
118 "Boundary #{num}: missing stored hash");
119 Err(ClientError::SyncBoundaryMismatch)
120 },
121 (_, Some(stored_hash)) => {
122 let block = self.block_provider().block_by_number(num).await?.ok_or_else(|| {
123 log::error!(target: LOG_TARGET,
124 "Boundary #{num}: block not found on chain \
125 (node may have pruned it โ use an archive node with --eth-pruning archive)");
126 ClientError::SyncBoundaryMismatch
127 })?;
128 if block.hash() != stored_hash {
129 log::error!(target: LOG_TARGET,
130 "Boundary #{num}: hash mismatch โ stored {stored_hash:?}, \
131 chain {:?}", block.hash());
132 return Err(ClientError::SyncBoundaryMismatch);
133 }
134 Ok(())
135 },
136 }
137 }
138
139 async fn checkpoint_sync_label(&self, label: SyncLabel, num: SubstrateBlockNumber, hash: H256) {
141 let cp = SyncCheckpoint::new(num, hash);
142 let result = match label {
143 SyncLabel::Head => self.receipt_provider().advance_sync_label(label, cp).await,
144 SyncLabel::Tail => self.receipt_provider().recede_sync_label(label, cp).await,
145 };
146 if let Err(err) = result {
147 log::warn!(target: LOG_TARGET, "Failed to update sync_label[{label}]: {err:?}");
148 }
149 }
150
151 pub async fn sync_backward(&self) -> Result<(), ClientError> {
156 log::info!(target: LOG_TARGET,
157 "๐ Historical block sync enabled. \
158 For a complete sync, the connected node should be an archive node.");
159 match self.sync_backward_inner().await {
160 Ok(()) => Ok(()),
161 Err(err) if err.is_chain_validation_error() => Err(err),
162 Err(err) => {
163 log::error!(target: LOG_TARGET, "๐๏ธ Sync stopped due to {err}.");
164 Ok(())
165 },
166 }
167 }
168
169 async fn sync_backward_inner(&self) -> Result<(), ClientError> {
170 let genesis_hash = self.validate_chain_identity().await?;
171 let latest_finalized_block = self.latest_finalized_block().await;
172 let latest_finalized =
173 SyncCheckpoint::new(latest_finalized_block.number(), latest_finalized_block.hash());
174
175 self.receipt_provider()
177 .set_sync_label(ChainMetadata::Genesis, SyncCheckpoint::new(0, genesis_hash))
178 .await?;
179
180 let (head, tail) = tokio::try_join!(
181 self.receipt_provider().get_sync_label(SyncLabel::Head),
182 self.receipt_provider().get_sync_label(SyncLabel::Tail),
183 )?;
184
185 match (tail, head) {
186 (Some(tail), Some(head)) => {
187 tokio::try_join!(self.verify_boundary(&tail), self.verify_boundary(&head),)?;
189 self.sync_backward_resume(tail, head, latest_finalized).await?;
190 },
191 (Some(_), None) => {
192 log::warn!(target: LOG_TARGET,
193 "๐๏ธ Tail exists without Head โ possible partial corruption, \
194 starting fresh sync from #{}", latest_finalized.block_number);
195 self.sync_backward_fresh(latest_finalized.block_number).await?;
196 },
197 _ => {
198 log::info!(target: LOG_TARGET,
199 "๐๏ธ Fresh sync: syncing backward from #{}", latest_finalized.block_number);
200 self.sync_backward_fresh(latest_finalized.block_number).await?;
201 },
202 }
203
204 self.mark_backfill_complete();
205
206 log::info!(target: LOG_TARGET, "๐๏ธ Historic sync complete");
207 Ok(())
208 }
209
210 async fn sync_backward_fresh(
212 &self,
213 latest_finalized: SubstrateBlockNumber,
214 ) -> Result<(), ClientError> {
215 let first_evm = self.receipt_provider().first_evm_block().unwrap_or(0);
216 self.sync_backward_range(BackwardSyncRange {
217 from: latest_finalized,
218 to: first_evm,
219 set_head: true,
220 checkpoint_tail: true,
221 persist_first_evm_block: true,
222 })
223 .await
224 }
225
226 async fn sync_backward_resume(
228 &self,
229 tail: SyncCheckpoint,
230 head: SyncCheckpoint,
231 latest_finalized: SyncCheckpoint,
232 ) -> Result<(), ClientError> {
233 log::info!(target: LOG_TARGET,
234 "๐๏ธ Resuming sync: DB has blocks #{}..#{}, chain head is #{}",
235 tail.block_number, head.block_number, latest_finalized.block_number);
236
237 let top_gap = async {
238 if head.block_number < latest_finalized.block_number {
240 self.sync_backward_range(BackwardSyncRange {
241 from: latest_finalized.block_number,
242 to: head.block_number.saturating_add(1),
243 set_head: false,
244 checkpoint_tail: false,
245 persist_first_evm_block: false,
246 })
247 .await?;
248
249 self.receipt_provider()
251 .advance_sync_label(SyncLabel::Head, latest_finalized)
252 .await?;
253 }
254 Ok::<_, ClientError>(())
255 };
256
257 let bottom_gap = async {
258 let first_evm = self.receipt_provider().first_evm_block().unwrap_or(0);
260 if tail.block_number > first_evm {
261 self.sync_backward_range(BackwardSyncRange {
262 from: tail.block_number.saturating_sub(1),
263 to: first_evm,
264 set_head: false,
265 checkpoint_tail: true,
266 persist_first_evm_block: true,
267 })
268 .await?;
269 } else {
270 log::debug!(target: LOG_TARGET, "๐๏ธ No backward gap to fill");
271 }
272 Ok::<_, ClientError>(())
273 };
274
275 tokio::try_join!(top_gap, bottom_gap)?;
276
277 Ok(())
278 }
279
280 async fn sync_backward_range(
283 &self,
284 BackwardSyncRange {
285 from,
286 to,
287 set_head,
288 checkpoint_tail,
289 persist_first_evm_block,
290 }: BackwardSyncRange,
291 ) -> Result<(), ClientError> {
292 if from < to {
293 log::debug!(target: LOG_TARGET, "โฌ๏ธ Backward sync: nothing to sync (#{from}..#{to})");
294 return Ok(());
295 }
296
297 log::info!(target: LOG_TARGET, "โฌ๏ธ Backward sync: #{from} down to #{to}");
298
299 let mut block = self
300 .block_provider()
301 .block_by_number(from)
302 .await?
303 .ok_or(ClientError::BlockNotFound)?;
304
305 let mut blocks_synced = 0u64;
306 let mut last_synced: Option<(SubstrateBlockNumber, H256)> = None;
307 let at_checkpoint =
308 |synced: u64| synced <= 1 || synced.is_multiple_of(u64::from(BLOCK_INTERVAL));
309
310 let loop_result: Result<(), ClientError> = loop {
311 let block_number = block.number();
312 let block_hash = block.hash();
313
314 let ethereum_hash = match self
315 .runtime_api(block_hash)
316 .eth_block_hash(pallet_revive::evm::U256::from(block_number))
317 .await
318 {
319 Ok(h) => h,
320 Err(err) => {
321 log::error!(target: LOG_TARGET, "โ ๏ธ eth_block_hash failed for #{block_number}: {err:?}, stopping");
322 break Err(err.into());
323 },
324 };
325
326 match ethereum_hash {
327 Some(hash) => {
328 if let Err(err) =
329 self.receipt_provider().insert_block_receipts_past(&block, &hash).await
330 {
331 log::error!(target: LOG_TARGET,
332 "โ ๏ธ Insert failed for #{block_number}: {err:?}, stopping");
333 break Err(err);
334 }
335
336 last_synced = Some((block_number, block_hash));
337 blocks_synced += 1;
338
339 if blocks_synced == 1 && set_head {
340 self.checkpoint_sync_label(SyncLabel::Head, block_number, block_hash).await;
341 }
342
343 if at_checkpoint(blocks_synced) {
344 log::debug!(target: LOG_TARGET,
345 "โฌ๏ธ Backward sync progress: #{block_number} ({blocks_synced} blocks synced)");
346 if checkpoint_tail {
347 self.checkpoint_sync_label(SyncLabel::Tail, block_number, block_hash)
348 .await;
349 }
350 }
351 },
352 None => {
353 if persist_first_evm_block {
354 let first_evm_block = block_number.saturating_add(1);
355 log::debug!(target: LOG_TARGET,
356 "๐ No EVM hash at #{block_number}, setting first_evm_block to #{first_evm_block}");
357 if let Err(err) =
358 self.receipt_provider().set_first_evm_block(first_evm_block).await
359 {
360 log::warn!(target: LOG_TARGET, "Failed to persist first-evm-block: {err:?}");
361 }
362 } else {
363 log::debug!(target: LOG_TARGET,
364 "๐ No EVM hash at #{block_number}, skipping first EVM block update");
365 }
366
367 break Ok(());
368 },
369 }
370
371 if block_number > to {
372 let parent_hash = block.header().parent_hash;
373 match self
374 .block_provider()
375 .block_by_hash(&parent_hash)
376 .await
377 .map_err(Into::into)
378 .and_then(|opt| opt.ok_or(ClientError::BlockNotFound))
379 {
380 Ok(b) => block = b,
381 Err(err) => {
382 log::error!(target: LOG_TARGET,
383 "โ ๏ธ Could not fetch parent of #{block_number}: {err:?}, stopping");
384 break Err(err);
385 },
386 }
387 } else {
388 break Ok(());
389 }
390 };
391
392 if loop_result.is_ok() && checkpoint_tail && !at_checkpoint(blocks_synced) {
394 if let Some((num, hash)) = last_synced {
395 self.checkpoint_sync_label(SyncLabel::Tail, num, hash).await;
396 }
397 }
398
399 log::info!(target: LOG_TARGET,
400 "โฌ๏ธ Backward sync: {blocks_synced} blocks synced \
401 (requested #{from}..#{to})");
402
403 loop_result
404 }
405
406 pub(crate) async fn run_subscription_gap_filler(&self, mut rx: mpsc::Receiver<GapFillRequest>) {
408 log::info!(target: LOG_TARGET, "๐ Subscription gap filler started");
409
410 while let Some(GapFillRequest { from_inclusive, to_inclusive }) = rx.recv().await {
411 log::info!(target: LOG_TARGET, "๐ Subscription gap filler: processing #{from_inclusive} down to #{to_inclusive}");
412 if let Err(err) = self
413 .sync_backward_range(BackwardSyncRange {
414 from: from_inclusive,
415 to: to_inclusive,
416 set_head: false,
417 checkpoint_tail: false,
418 persist_first_evm_block: false,
419 })
420 .await
421 {
422 log::error!(target: LOG_TARGET, "๐ Subscription gap fill failed for #{from_inclusive}..#{to_inclusive}: {err:?}");
423 } else {
424 log::info!(target: LOG_TARGET, "๐ Subscription gap filler: done with #{from_inclusive}..#{to_inclusive}");
425 }
426 self.subscription_gap_queue().mark_done();
429 }
430
431 log::info!(target: LOG_TARGET, "๐ Subscription gap filler stopped");
432 }
433}