referrerpolicy=no-referrer-when-downgrade

pallet_revive_eth_rpc/
block_sync.rs

1// This file is part of Substrate.
2
3// Copyright (C) Parity Technologies (UK) Ltd.
4// SPDX-License-Identifier: Apache-2.0
5
6// Licensed under the Apache License, Version 2.0 (the "License");
7// you may not use this file except in compliance with the License.
8// You may obtain a copy of the License at
9//
10// 	http://www.apache.org/licenses/LICENSE-2.0
11//
12// Unless required by applicable law or agreed to in writing, software
13// distributed under the License is distributed on an "AS IS" BASIS,
14// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
15// See the License for the specific language governing permissions and
16// limitations under the License.
17
18//! Historic block syncing logic for the Ethereum JSON-RPC server.
19
20use crate::{
21	BlockInfoProvider,
22	client::{Client, ClientError, GapFillRequest, SubstrateBlockNumber},
23};
24use pallet_revive::evm::H256;
25use tokio::sync::mpsc;
26
27const LOG_TARGET: &str = "eth-rpc::block-sync";
28
29/// Trait for types that can be used as keys in the `sync_state` table.
30pub trait SyncStateKey: std::fmt::Display {}
31
32/// Labels used to track sync progress in the `sync_state` table.
33#[derive(Debug, Clone, Copy, derive_more::Display)]
34pub enum SyncLabel {
35	/// Lowest synced block. Only decreases.
36	#[display(fmt = "sync-tail")]
37	Tail,
38	/// Highest synced block. Absent means no sync has started.
39	/// During backfill: upper boundary being filled.
40	/// After backfill: advanced by the finalized-block subscription.
41	#[display(fmt = "sync-head")]
42	Head,
43}
44
45/// Chain metadata stored in the `sync_state` table.
46#[derive(Debug, Clone, Copy, derive_more::Display)]
47pub enum ChainMetadata {
48	/// Genesis block hash โ€” used for chain identity verification.
49	#[display(fmt = "chain-genesis")]
50	Genesis,
51	/// Auto-discovered first EVM block on the chain.
52	#[display(fmt = "chain-first-evm-block")]
53	FirstEvmBlock,
54}
55
56impl SyncStateKey for SyncLabel {}
57impl SyncStateKey for ChainMetadata {}
58
59/// Sync checkpoint persisted in the `sync_state` table to allow resuming after a restart.
60#[derive(Debug, Clone, Copy, Eq, PartialEq)]
61pub struct SyncCheckpoint {
62	pub block_number: SubstrateBlockNumber,
63	pub block_hash: Option<H256>,
64}
65
66impl SyncCheckpoint {
67	/// Create a checkpoint with a known block hash.
68	pub fn new(block_number: SubstrateBlockNumber, block_hash: H256) -> Self {
69		Self { block_number, block_hash: Some(block_hash) }
70	}
71
72	/// Create a checkpoint with only a block number (no hash).
73	pub fn from_number(block_number: SubstrateBlockNumber) -> Self {
74		Self { block_number, block_hash: None }
75	}
76}
77
78/// How often (in blocks) the backward sync checkpoints are persisted to the database.
79const BLOCK_INTERVAL: u32 = 128;
80
81/// Options for [`Client::sync_backward_range`].
82struct BackwardSyncRange {
83	from: SubstrateBlockNumber,
84	to: SubstrateBlockNumber,
85	/// Set `Head` label after syncing the first block.
86	set_head: bool,
87	/// Checkpoint `Tail` label periodically and at end.
88	checkpoint_tail: bool,
89	/// When true, persist the first EVM block boundary if a non-EVM block is encountered.
90	persist_first_evm_block: bool,
91}
92
93impl Client {
94	/// Verify that the stored genesis hash matches the connected chain.
95	async fn validate_chain_identity(&self) -> Result<H256, ClientError> {
96		let genesis_hash: H256 = self.api().genesis_hash();
97
98		if let Some(checkpoint) =
99			self.receipt_provider().get_sync_label(ChainMetadata::Genesis).await?
100		{
101			if let Some(stored) = checkpoint.block_hash {
102				if stored != genesis_hash {
103					return Err(ClientError::ChainMismatch);
104				}
105			}
106		}
107
108		Ok(genesis_hash)
109	}
110
111	/// Verify that a stored boundary block still exists on the finalized chain.
112	async fn verify_boundary(&self, checkpoint: &SyncCheckpoint) -> Result<(), ClientError> {
113		let num = checkpoint.block_number;
114		let hash = checkpoint.block_hash;
115		match (num, hash) {
116			(_, None) => {
117				log::error!(target: LOG_TARGET,
118					"Boundary #{num}: missing stored hash");
119				Err(ClientError::SyncBoundaryMismatch)
120			},
121			(_, Some(stored_hash)) => {
122				let block = self.block_provider().block_by_number(num).await?.ok_or_else(|| {
123					log::error!(target: LOG_TARGET,
124						"Boundary #{num}: block not found on chain \
125						 (node may have pruned it โ€” use an archive node with --eth-pruning archive)");
126					ClientError::SyncBoundaryMismatch
127				})?;
128				if block.hash() != stored_hash {
129					log::error!(target: LOG_TARGET,
130						"Boundary #{num}: hash mismatch โ€” stored {stored_hash:?}, \
131						 chain {:?}", block.hash());
132					return Err(ClientError::SyncBoundaryMismatch);
133				}
134				Ok(())
135			},
136		}
137	}
138
139	/// Checkpoint the given sync label to the DB.
140	async fn checkpoint_sync_label(&self, label: SyncLabel, num: SubstrateBlockNumber, hash: H256) {
141		let cp = SyncCheckpoint::new(num, hash);
142		let result = match label {
143			SyncLabel::Head => self.receipt_provider().advance_sync_label(label, cp).await,
144			SyncLabel::Tail => self.receipt_provider().recede_sync_label(label, cp).await,
145		};
146		if let Err(err) = result {
147			log::warn!(target: LOG_TARGET, "Failed to update sync_label[{label}]: {err:?}");
148		}
149	}
150
151	/// Backward sync historical blocks from the latest finalized block to the first EVM block.
152	/// Resumes from the last checkpoint if a previous sync was interrupted.
153	/// Fatal errors (chain/DB mismatch) are propagated; transient errors are swallowed
154	/// to avoid taking down the RPC server.
155	pub async fn sync_backward(&self) -> Result<(), ClientError> {
156		log::info!(target: LOG_TARGET,
157			"๐Ÿ”„ Historical block sync enabled. \
158			 For a complete sync, the connected node should be an archive node.");
159		match self.sync_backward_inner().await {
160			Ok(()) => Ok(()),
161			Err(err) if err.is_chain_validation_error() => Err(err),
162			Err(err) => {
163				log::error!(target: LOG_TARGET, "๐Ÿ—„๏ธ Sync stopped due to {err}.");
164				Ok(())
165			},
166		}
167	}
168
169	async fn sync_backward_inner(&self) -> Result<(), ClientError> {
170		let genesis_hash = self.validate_chain_identity().await?;
171		let latest_finalized_block = self.latest_finalized_block().await;
172		let latest_finalized =
173			SyncCheckpoint::new(latest_finalized_block.number(), latest_finalized_block.hash());
174
175		// Store genesis (idempotent).
176		self.receipt_provider()
177			.set_sync_label(ChainMetadata::Genesis, SyncCheckpoint::new(0, genesis_hash))
178			.await?;
179
180		let (head, tail) = tokio::try_join!(
181			self.receipt_provider().get_sync_label(SyncLabel::Head),
182			self.receipt_provider().get_sync_label(SyncLabel::Tail),
183		)?;
184
185		match (tail, head) {
186			(Some(tail), Some(head)) => {
187				// Verify boundary hashes still match the finalized chain.
188				tokio::try_join!(self.verify_boundary(&tail), self.verify_boundary(&head),)?;
189				self.sync_backward_resume(tail, head, latest_finalized).await?;
190			},
191			(Some(_), None) => {
192				log::warn!(target: LOG_TARGET,
193					"๐Ÿ—„๏ธ Tail exists without Head โ€” possible partial corruption, \
194					 starting fresh sync from #{}", latest_finalized.block_number);
195				self.sync_backward_fresh(latest_finalized.block_number).await?;
196			},
197			_ => {
198				log::info!(target: LOG_TARGET,
199					"๐Ÿ—„๏ธ Fresh sync: syncing backward from #{}", latest_finalized.block_number);
200				self.sync_backward_fresh(latest_finalized.block_number).await?;
201			},
202		}
203
204		self.mark_backfill_complete();
205
206		log::info!(target: LOG_TARGET, "๐Ÿ—„๏ธ Historic sync complete");
207		Ok(())
208	}
209
210	/// Backward sync from `latest_finalized` down to the first EVM block.
211	async fn sync_backward_fresh(
212		&self,
213		latest_finalized: SubstrateBlockNumber,
214	) -> Result<(), ClientError> {
215		let first_evm = self.receipt_provider().first_evm_block().unwrap_or(0);
216		self.sync_backward_range(BackwardSyncRange {
217			from: latest_finalized,
218			to: first_evm,
219			set_head: true,
220			checkpoint_tail: true,
221			persist_first_evm_block: true,
222		})
223		.await
224	}
225
226	/// Resume backward sync by filling the top gap (new blocks) and bottom gap (backfill).
227	async fn sync_backward_resume(
228		&self,
229		tail: SyncCheckpoint,
230		head: SyncCheckpoint,
231		latest_finalized: SyncCheckpoint,
232	) -> Result<(), ClientError> {
233		log::info!(target: LOG_TARGET,
234			"๐Ÿ—„๏ธ Resuming sync: DB has blocks #{}..#{}, chain head is #{}",
235			tail.block_number, head.block_number, latest_finalized.block_number);
236
237		let top_gap = async {
238			// Top gap: sync from latest_finalized down to head + 1.
239			if head.block_number < latest_finalized.block_number {
240				self.sync_backward_range(BackwardSyncRange {
241					from: latest_finalized.block_number,
242					to: head.block_number.saturating_add(1),
243					set_head: false,
244					checkpoint_tail: false,
245					persist_first_evm_block: false,
246				})
247				.await?;
248
249				// Mark top gap complete so a restart won't redo it.
250				self.receipt_provider()
251					.advance_sync_label(SyncLabel::Head, latest_finalized)
252					.await?;
253			}
254			Ok::<_, ClientError>(())
255		};
256
257		let bottom_gap = async {
258			// Bottom gap: sync from tail - 1 down to the first EVM block.
259			let first_evm = self.receipt_provider().first_evm_block().unwrap_or(0);
260			if tail.block_number > first_evm {
261				self.sync_backward_range(BackwardSyncRange {
262					from: tail.block_number.saturating_sub(1),
263					to: first_evm,
264					set_head: false,
265					checkpoint_tail: true,
266					persist_first_evm_block: true,
267				})
268				.await?;
269			} else {
270				log::debug!(target: LOG_TARGET, "๐Ÿ—„๏ธ No backward gap to fill");
271			}
272			Ok::<_, ClientError>(())
273		};
274
275		tokio::try_join!(top_gap, bottom_gap)?;
276
277		Ok(())
278	}
279
280	/// Backward sync from block `from` down to block `to` (inclusive).
281	/// Stops early if a non-EVM block is discovered (auto-discovery of first EVM block).
282	async fn sync_backward_range(
283		&self,
284		BackwardSyncRange {
285			from,
286			to,
287			set_head,
288			checkpoint_tail,
289			persist_first_evm_block,
290		}: BackwardSyncRange,
291	) -> Result<(), ClientError> {
292		if from < to {
293			log::debug!(target: LOG_TARGET,	"โฌ‡๏ธ Backward sync: nothing to sync (#{from}..#{to})");
294			return Ok(());
295		}
296
297		log::info!(target: LOG_TARGET, "โฌ‡๏ธ Backward sync: #{from} down to #{to}");
298
299		let mut block = self
300			.block_provider()
301			.block_by_number(from)
302			.await?
303			.ok_or(ClientError::BlockNotFound)?;
304
305		let mut blocks_synced = 0u64;
306		let mut last_synced: Option<(SubstrateBlockNumber, H256)> = None;
307		let at_checkpoint =
308			|synced: u64| synced <= 1 || synced.is_multiple_of(u64::from(BLOCK_INTERVAL));
309
310		let loop_result: Result<(), ClientError> = loop {
311			let block_number = block.number();
312			let block_hash = block.hash();
313
314			let ethereum_hash = match self
315				.runtime_api(block_hash)
316				.eth_block_hash(pallet_revive::evm::U256::from(block_number))
317				.await
318			{
319				Ok(h) => h,
320				Err(err) => {
321					log::error!(target: LOG_TARGET,	"โš ๏ธ eth_block_hash failed for #{block_number}: {err:?}, stopping");
322					break Err(err.into());
323				},
324			};
325
326			match ethereum_hash {
327				Some(hash) => {
328					if let Err(err) =
329						self.receipt_provider().insert_block_receipts_past(&block, &hash).await
330					{
331						log::error!(target: LOG_TARGET,
332							"โš ๏ธ Insert failed for #{block_number}: {err:?}, stopping");
333						break Err(err);
334					}
335
336					last_synced = Some((block_number, block_hash));
337					blocks_synced += 1;
338
339					if blocks_synced == 1 && set_head {
340						self.checkpoint_sync_label(SyncLabel::Head, block_number, block_hash).await;
341					}
342
343					if at_checkpoint(blocks_synced) {
344						log::debug!(target: LOG_TARGET,
345							"โฌ‡๏ธ Backward sync progress: #{block_number} ({blocks_synced} blocks synced)");
346						if checkpoint_tail {
347							self.checkpoint_sync_label(SyncLabel::Tail, block_number, block_hash)
348								.await;
349						}
350					}
351				},
352				None => {
353					if persist_first_evm_block {
354						let first_evm_block = block_number.saturating_add(1);
355						log::debug!(target: LOG_TARGET,
356							"๐Ÿ” No EVM hash at #{block_number}, setting first_evm_block to #{first_evm_block}");
357						if let Err(err) =
358							self.receipt_provider().set_first_evm_block(first_evm_block).await
359						{
360							log::warn!(target: LOG_TARGET, "Failed to persist first-evm-block: {err:?}");
361						}
362					} else {
363						log::debug!(target: LOG_TARGET,
364							"๐Ÿ” No EVM hash at #{block_number}, skipping first EVM block update");
365					}
366
367					break Ok(());
368				},
369			}
370
371			if block_number > to {
372				let parent_hash = block.header().parent_hash;
373				match self
374					.block_provider()
375					.block_by_hash(&parent_hash)
376					.await
377					.map_err(Into::into)
378					.and_then(|opt| opt.ok_or(ClientError::BlockNotFound))
379				{
380					Ok(b) => block = b,
381					Err(err) => {
382						log::error!(target: LOG_TARGET,
383							"โš ๏ธ Could not fetch parent of #{block_number}: {err:?}, stopping");
384						break Err(err);
385					},
386				}
387			} else {
388				break Ok(());
389			}
390		};
391
392		// Checkpoint the last synced block if it wasn't already at a checkpoint interval.
393		if loop_result.is_ok() && checkpoint_tail && !at_checkpoint(blocks_synced) {
394			if let Some((num, hash)) = last_synced {
395				self.checkpoint_sync_label(SyncLabel::Tail, num, hash).await;
396			}
397		}
398
399		log::info!(target: LOG_TARGET,
400			"โฌ‡๏ธ Backward sync: {blocks_synced} blocks synced \
401			 (requested #{from}..#{to})");
402
403		loop_result
404	}
405
406	/// Run the background subscription gap filler, processing requests sequentially.
407	pub(crate) async fn run_subscription_gap_filler(&self, mut rx: mpsc::Receiver<GapFillRequest>) {
408		log::info!(target: LOG_TARGET, "๐Ÿ”„ Subscription gap filler started");
409
410		while let Some(GapFillRequest { from_inclusive, to_inclusive }) = rx.recv().await {
411			log::info!(target: LOG_TARGET, "๐Ÿ”„ Subscription gap filler: processing #{from_inclusive} down to #{to_inclusive}");
412			if let Err(err) = self
413				.sync_backward_range(BackwardSyncRange {
414					from: from_inclusive,
415					to: to_inclusive,
416					set_head: false,
417					checkpoint_tail: false,
418					persist_first_evm_block: false,
419				})
420				.await
421			{
422				log::error!(target: LOG_TARGET, "๐Ÿ”„ Subscription gap fill failed for #{from_inclusive}..#{to_inclusive}: {err:?}");
423			} else {
424				log::info!(target: LOG_TARGET, "๐Ÿ”„ Subscription gap filler: done with #{from_inclusive}..#{to_inclusive}");
425			}
426			// Mark done unconditionally โ€” mirrors how subscribe_new_blocks handles
427			// callback errors: log and move on.
428			self.subscription_gap_queue().mark_done();
429		}
430
431		log::info!(target: LOG_TARGET, "๐Ÿ”„ Subscription gap filler stopped");
432	}
433}