referrerpolicy=no-referrer-when-downgrade

pallet_revive_eth_rpc/
client.rs

1// This file is part of Substrate.
2
3// Copyright (C) Parity Technologies (UK) Ltd.
4// SPDX-License-Identifier: Apache-2.0
5
6// Licensed under the Apache License, Version 2.0 (the "License");
7// you may not use this file except in compliance with the License.
8// You may obtain a copy of the License at
9//
10// 	http://www.apache.org/licenses/LICENSE-2.0
11//
12// Unless required by applicable law or agreed to in writing, software
13// distributed under the License is distributed on an "AS IS" BASIS,
14// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
15// See the License for the specific language governing permissions and
16// limitations under the License.
17//! The client connects to the source substrate chain
18//! and is used by the rpc server to query and send transactions to the substrate chain.
19
20pub(crate) mod runtime_api;
21pub(crate) mod storage_api;
22
23use crate::{
24	BlockInfoProvider, BlockTag, FeeHistoryProvider, ReceiptProvider, SubxtBlockInfoProvider,
25	SyncLabel, TransactionInfo,
26	block_sync::SyncCheckpoint,
27	subxt_client::{self, SrcChainConfig, revive::calls::types::EthTransact},
28};
29use futures::TryStreamExt;
30use jsonrpsee::types::{ErrorObjectOwned, error::CALL_EXECUTION_FAILED_CODE};
31use pallet_revive::{
32	EthTransactError,
33	evm::{
34		Block, BlockNumberOrTag, BlockNumberOrTagOrHash, FeeHistoryResult, Filter,
35		GenericTransaction, H256, HashesOrTransactionInfos, Log, ReceiptInfo, StateOverrideSet,
36		SyncingProgress, SyncingStatus, TransactionSigned, TransactionTrace, U256,
37		decode_revert_reason,
38	},
39};
40use pallet_revive_types::runtime_api::*;
41use runtime_api::RuntimeApi;
42use sp_runtime::traits::Block as BlockT;
43use sp_weights::Weight;
44use std::{
45	sync::{
46		Arc,
47		atomic::{AtomicBool, AtomicUsize, Ordering},
48	},
49	time::Duration,
50};
51use storage_api::StorageApi;
52use subxt::{
53	Config, OnlineClient,
54	backend::{
55		StreamOf, StreamOfResults,
56		legacy::{
57			LegacyRpcMethods,
58			rpc_methods::{SystemHealth, TransactionStatus},
59		},
60		rpc::{
61			RpcClient,
62			reconnecting_rpc_client::{ExponentialBackoff, RpcClient as ReconnectingRpcClient},
63		},
64	},
65	config::{HashFor, Header},
66	ext::subxt_rpcs::rpc_params,
67};
68use thiserror::Error;
69use tokio::sync::{Mutex, mpsc};
70
71/// The substrate block type.
72pub type SubstrateBlock = subxt::blocks::Block<SrcChainConfig, OnlineClient<SrcChainConfig>>;
73
74/// The substrate block header.
75pub type SubstrateBlockHeader = <SrcChainConfig as Config>::Header;
76
77/// The substrate block number type.
78pub type SubstrateBlockNumber = <SubstrateBlockHeader as Header>::Number;
79
80/// The substrate block hash type.
81pub type SubstrateBlockHash = HashFor<SrcChainConfig>;
82
83/// The runtime balance type.
84pub type Balance = u128;
85
86/// The subscription type used to listen to new blocks.
87#[derive(Debug, Clone, Copy, PartialEq)]
88pub enum SubscriptionType {
89	/// Subscribe to best blocks.
90	BestBlocks,
91	/// Subscribe to finalized blocks.
92	FinalizedBlocks,
93}
94
95/// Submit Error reason.
96#[derive(Debug, Clone, Copy, PartialEq, Eq, Error)]
97pub enum SubmitError {
98	/// Transaction was usurped by another with the same nonce.
99	#[error("Transaction was usurped by another with the same nonce")]
100	Usurped,
101	/// Transaction was dropped from the pool.
102	#[error("Transaction was dropped")]
103	Dropped,
104	/// Transaction is invalid (e.g. bad nonce, signature, etc).
105	#[error("Transaction is invalid (e.g. bad nonce, signature, etc)")]
106	Invalid,
107	/// Transaction stream ended without a terminal status.
108	#[error("Transaction stream ended without status")]
109	StreamEnded,
110	/// Unknown transaction status.
111	#[error("Unknown transaction status")]
112	Unknown,
113}
114
115impl From<TransactionStatus<SubstrateBlockHash>> for SubmitError {
116	fn from(status: TransactionStatus<SubstrateBlockHash>) -> Self {
117		match status {
118			TransactionStatus::Usurped(_) => SubmitError::Usurped,
119			TransactionStatus::Dropped => SubmitError::Dropped,
120			TransactionStatus::Invalid => SubmitError::Invalid,
121			_ => SubmitError::Unknown,
122		}
123	}
124}
125
126/// The error type for the client.
127#[derive(Error, Debug)]
128pub enum ClientError {
129	/// A [`jsonrpsee::core::ClientError`] wrapper error.
130	#[error(transparent)]
131	Jsonrpsee(#[from] jsonrpsee::core::ClientError),
132	/// A [`subxt::Error`] wrapper error.
133	#[error(transparent)]
134	SubxtError(#[from] subxt::Error),
135	#[error(transparent)]
136	RpcError(#[from] subxt::ext::subxt_rpcs::Error),
137	/// A [`sqlx::Error`] wrapper error.
138	#[error(transparent)]
139	SqlxError(#[from] sqlx::Error),
140	/// A [`codec::Error`] wrapper error.
141	#[error(transparent)]
142	CodecError(#[from] codec::Error),
143	/// author_submitExtrinsic failed.
144	#[error("Invalid transaction: {0}")]
145	SubmitError(SubmitError),
146	/// Transact call failed.
147	#[error("contract reverted: {0:?}")]
148	TransactError(EthTransactError),
149	/// A decimal conversion failed.
150	#[error("conversion failed")]
151	ConversionFailed,
152	/// The block hash was not found.
153	#[error("hash not found")]
154	BlockNotFound,
155	/// The contract was not found.
156	#[error("Contract not found")]
157	ContractNotFound,
158	#[error("No Ethereum extrinsic found")]
159	EthExtrinsicNotFound,
160	/// The transaction fee could not be found
161	#[error("transactionFeePaid event not found")]
162	TxFeeNotFound,
163	/// Failed to decode a raw payload into a signed transaction.
164	#[error("Failed to decode a raw payload into a signed transaction")]
165	TxDecodingFailed,
166	/// Failed to recover eth address.
167	#[error("failed to recover eth address")]
168	RecoverEthAddressFailed,
169	/// Failed to filter logs.
170	#[error("Failed to filter logs")]
171	LogFilterFailed(#[from] anyhow::Error),
172	/// Receipt storage was not found.
173	#[error("Receipt storage not found")]
174	ReceiptDataNotFound,
175	/// Ethereum block was not found.
176	#[error("Ethereum block not found")]
177	EthereumBlockNotFound,
178	/// Receipt data length mismatch.
179	#[error("Receipt data length mismatch")]
180	ReceiptDataLengthMismatch,
181	/// Transaction submission timeout.
182	#[error("Transaction submission timeout")]
183	Timeout,
184	/// All of the estimation methods `eth_estimate`, `eth_transact_with_config`, and
185	/// `eth_transact` were not found and therefore none of the estimation methods succeeded.
186	#[error("None of the estimation methods were found")]
187	NoEstimationMethodSucceeded,
188	/// Chain identity mismatch between stored genesis and connected node.
189	#[error("Genesis hash mismatch")]
190	ChainMismatch,
191	/// Stored sync boundary does not match the connected node.
192	#[error("Sync boundary mismatch")]
193	SyncBoundaryMismatch,
194}
195
196impl ClientError {
197	/// Errors that indicate a mismatch between the stored sync state and the connected node.
198	pub(crate) fn is_chain_validation_error(&self) -> bool {
199		matches!(self, Self::ChainMismatch | Self::SyncBoundaryMismatch)
200	}
201}
202
203const LOG_TARGET: &str = "eth-rpc::client";
204const LOG_TARGET_SUBSCRIPTION: &str = "eth-rpc::subscription";
205
206const REVERT_CODE: i32 = 3;
207
208const NOTIFIER_CAPACITY: usize = 16;
209
210impl From<ClientError> for ErrorObjectOwned {
211	fn from(err: ClientError) -> Self {
212		match err {
213			ClientError::SubxtError(subxt::Error::Rpc(subxt::error::RpcError::ClientError(
214				subxt::ext::subxt_rpcs::Error::User(err),
215			))) |
216			ClientError::RpcError(subxt::ext::subxt_rpcs::Error::User(err)) => {
217				ErrorObjectOwned::owned::<Vec<u8>>(err.code, err.message, None)
218			},
219			ClientError::TransactError(EthTransactError::Data(data)) => {
220				let msg = match decode_revert_reason(&data) {
221					Some(reason) => format!("execution reverted: {reason}"),
222					None => "execution reverted".to_string(),
223				};
224
225				let data = format!("0x{}", hex::encode(data));
226				ErrorObjectOwned::owned::<String>(REVERT_CODE, msg, Some(data))
227			},
228			ClientError::TransactError(EthTransactError::Message(msg)) => {
229				ErrorObjectOwned::owned::<String>(CALL_EXECUTION_FAILED_CODE, msg, None)
230			},
231			_ => {
232				ErrorObjectOwned::owned::<String>(CALL_EXECUTION_FAILED_CODE, err.to_string(), None)
233			},
234		}
235	}
236}
237
238/// A client that connects to a substrate node and provides Ethereum-compatible RPC functionality.
239#[derive(Clone)]
240pub struct Client {
241	api: OnlineClient<SrcChainConfig>,
242	rpc_client: RpcClient,
243	rpc: LegacyRpcMethods<SrcChainConfig>,
244	receipt_provider: ReceiptProvider,
245	block_provider: SubxtBlockInfoProvider,
246	fee_history_provider: FeeHistoryProvider,
247	chain_id: u64,
248	max_block_weight: Weight,
249	/// Whether the node has automine enabled.
250	automine: bool,
251	/// A notifier, that informs subscribers of new best blocks.
252	block_notifier: Option<tokio::sync::broadcast::Sender<H256>>,
253	/// A lock to ensure only one subscription can perform write operations at a time.
254	subscription_lock: Arc<Mutex<()>>,
255
256	/// Block subscription sender side.
257	block_subscription_tx: tokio::sync::broadcast::Sender<Block>,
258	/// Log subscription sender side.
259	log_subscription_tx: tokio::sync::broadcast::Sender<Log>,
260	/// Whether archive mode is enabled
261	is_archive: bool,
262	/// Whether historic backfill has completed. `false` if not started or in progress.
263	backfill_complete: Arc<AtomicBool>,
264	/// Queue for backfilling blocks missed during subscription reconnects.
265	subscription_gap_queue: SubscriptionGapQueue,
266}
267
268/// A request to backfill a range of missed blocks (both bounds inclusive).
269pub(crate) struct GapFillRequest {
270	pub from_inclusive: SubstrateBlockNumber,
271	pub to_inclusive: SubstrateBlockNumber,
272}
273
274/// Queues gap-fill requests for blocks missed during subscription reconnects.
275#[derive(Clone)]
276pub(crate) struct SubscriptionGapQueue {
277	/// Sender half of the gap-fill queue.
278	tx: mpsc::Sender<GapFillRequest>,
279	/// Queued + in-flight gap fills. Channel length alone is insufficient
280	/// because it drops to zero as soon as the receiver dequeues the item.
281	pending: Arc<AtomicUsize>,
282}
283
284impl SubscriptionGapQueue {
285	pub(crate) fn new() -> (Self, mpsc::Receiver<GapFillRequest>) {
286		// Each reconnect produces one gap-fill request for the entire missed range,
287		// so 32 allows for 32 rapid disconnects before the consumer processes any.
288		let (tx, rx) = mpsc::channel(32);
289		(Self { tx, pending: Arc::new(AtomicUsize::new(0)) }, rx)
290	}
291
292	/// If `current` is not consecutive to `last`, queue a gap-fill for the missing range.
293	pub fn detect_and_queue(&self, current: SubstrateBlockNumber, last: SubstrateBlockNumber) {
294		if current.saturating_sub(last) <= 1 {
295			return;
296		}
297
298		let from_inclusive = current.saturating_sub(1);
299		let to_inclusive = last.saturating_add(1);
300		let gap_len = from_inclusive.saturating_sub(to_inclusive) + 1;
301		self.pending.fetch_add(1, Ordering::Release);
302		match self.tx.try_send(GapFillRequest { from_inclusive, to_inclusive }) {
303			Ok(_) => {
304				log::info!(target: LOG_TARGET,
305					"๐Ÿ”„ Subscription gap queue: queued #{from_inclusive} down to #{to_inclusive} ({gap_len} blocks)");
306			},
307			Err(err) => {
308				self.pending.fetch_sub(1, Ordering::Release);
309				log::warn!(target: LOG_TARGET,
310					"๐Ÿ”„ Subscription gap queue error, dropping #{from_inclusive}..#{to_inclusive} ({gap_len} blocks): {err}");
311			},
312		}
313	}
314
315	/// Mark one request as processed.
316	pub fn mark_done(&self) {
317		let res = self
318			.pending
319			.fetch_update(Ordering::AcqRel, Ordering::Acquire, |v| v.checked_sub(1));
320		if res.is_err() {
321			debug_assert!(false, "subscription gap queue pending counter underflowed");
322			log::error!(target: LOG_TARGET,
323				"๐Ÿ”„ Subscription gap queue pending counter underflow, delete the database and restart with --eth-pruning=archive to resync");
324		}
325	}
326
327	/// Returns `true` if there are pending gap-fill requests.
328	pub fn has_pending(&self) -> bool {
329		self.pending.load(Ordering::Acquire) > 0
330	}
331}
332
333/// Returns the first EVM block number for main and test nets, `None` otherwise.
334fn known_first_evm_block_for_chain(chain_id: u64) -> Option<u32> {
335	match chain_id {
336		420420417 => Some(4_367_914),  // Paseo Asset Hub
337		420420418 => Some(12_234_156), // Kusama Asset Hub
338		420420419 => Some(11_405_259), // Polkadot Asset Hub
339		420420421 => Some(13_169_391), // Westend Asset Hub
340		_ => None,
341	}
342}
343
344/// Fetch the chain ID from the substrate chain.
345async fn chain_id(api: &OnlineClient<SrcChainConfig>) -> Result<u64, ClientError> {
346	let query = subxt_client::constants().revive().chain_id().unvalidated();
347	api.constants().at(&query).map_err(|err| err.into())
348}
349
350/// Fetch the max block weight from the substrate chain.
351async fn max_block_weight(api: &OnlineClient<SrcChainConfig>) -> Result<Weight, ClientError> {
352	let query = subxt_client::constants().system().block_weights().unvalidated();
353	let weights = api.constants().at(&query)?;
354	let max_block = weights.per_class.normal.max_extrinsic.unwrap_or(weights.max_block);
355	Ok(max_block.0)
356}
357
358/// Get the automine status from the node.
359async fn get_automine(rpc_client: &RpcClient) -> bool {
360	match rpc_client.request::<bool>("getAutomine", rpc_params![]).await {
361		Ok(val) => val,
362		Err(err) => {
363			log::info!(target: LOG_TARGET, "Node does not have getAutomine RPC. Defaulting to automine=false. error: {err:?}");
364			false
365		},
366	}
367}
368
369/// Connect to a node at the given URL, and return the underlying API, RPC client, and legacy RPC
370/// clients.
371pub async fn connect(
372	node_rpc_url: &str,
373	max_request_size: u32,
374	max_response_size: u32,
375) -> Result<(OnlineClient<SrcChainConfig>, RpcClient, LegacyRpcMethods<SrcChainConfig>), ClientError>
376{
377	log::info!(target: LOG_TARGET, "๐ŸŒ Connecting to node at: {node_rpc_url} ...");
378	let rpc_client = ReconnectingRpcClient::builder()
379		.retry_policy(ExponentialBackoff::from_millis(100).max_delay(Duration::from_secs(10)))
380		.max_request_size(max_request_size)
381		.max_response_size(max_response_size)
382		.build(node_rpc_url.to_string())
383		.await?;
384	let rpc_client = RpcClient::new(rpc_client);
385	log::info!(target: LOG_TARGET, "๐ŸŒŸ Connected to node at: {node_rpc_url}");
386
387	let api = OnlineClient::<SrcChainConfig>::from_rpc_client(rpc_client.clone()).await?;
388	let rpc = LegacyRpcMethods::<SrcChainConfig>::new(rpc_client.clone());
389	Ok((api, rpc_client, rpc))
390}
391
392impl Client {
393	/// Create a new client instance.
394	pub(crate) async fn new(
395		api: OnlineClient<SrcChainConfig>,
396		rpc_client: RpcClient,
397		rpc: LegacyRpcMethods<SrcChainConfig>,
398		block_provider: SubxtBlockInfoProvider,
399		receipt_provider: ReceiptProvider,
400		is_archive: bool,
401		subscription_gap_queue: SubscriptionGapQueue,
402	) -> Result<Self, ClientError> {
403		let (chain_id, max_block_weight, automine) =
404			tokio::try_join!(chain_id(&api), max_block_weight(&api), async {
405				Ok(get_automine(&rpc_client).await)
406			},)?;
407
408		// Fall back to 0 when the hardcoded value exceeds the current best block (e.g. zombienet
409		// reusing a known chain ID) and backward sync is disabled.
410		if !is_archive {
411			if let Some(known) = known_first_evm_block_for_chain(chain_id) {
412				let best = block_provider.latest_block_number().await;
413				if known > best {
414					log::debug!(
415						target: LOG_TARGET,
416						"Hardcoded first EVM block {known} exceeds best block {best} \
417						 for chain {chain_id}, defaulting to 0"
418					);
419					receipt_provider.set_first_evm_block(0).await?;
420				}
421			}
422		}
423
424		let client = Self {
425			api,
426			rpc_client,
427			rpc,
428			receipt_provider,
429			block_provider,
430			fee_history_provider: FeeHistoryProvider::default(),
431			chain_id,
432			max_block_weight,
433			automine,
434			block_notifier: automine
435				.then(|| tokio::sync::broadcast::channel::<H256>(NOTIFIER_CAPACITY).0),
436			subscription_lock: Arc::new(Mutex::new(())),
437			block_subscription_tx: tokio::sync::broadcast::channel(256).0,
438			log_subscription_tx: tokio::sync::broadcast::channel(1000).0,
439			is_archive,
440			backfill_complete: Arc::new(AtomicBool::new(false)),
441			subscription_gap_queue,
442		};
443
444		Ok(client)
445	}
446
447	/// Mark historic backfill as complete.
448	pub(crate) fn mark_backfill_complete(&self) {
449		self.backfill_complete.store(true, Ordering::Release);
450	}
451
452	/// Advance the sync_state head label if safe to do so.
453	/// Requires: archive mode, historic backfill complete, and no pending gap fills.
454	async fn advance_sync_head(&self, block_number: SubstrateBlockNumber, hash: H256) {
455		if !self.is_archive ||
456			!self.backfill_complete.load(Ordering::Acquire) ||
457			self.subscription_gap_queue.has_pending()
458		{
459			return;
460		}
461
462		if let Err(err) = self
463			.receipt_provider
464			.advance_sync_label(SyncLabel::Head, SyncCheckpoint::new(block_number, hash))
465			.await
466		{
467			log::warn!(target: LOG_TARGET, "Failed to advance sync head: {err:?}");
468		}
469	}
470
471	/// Creates a block notifier instance.
472	pub fn create_block_notifier(&mut self) {
473		self.block_notifier = Some(tokio::sync::broadcast::channel::<H256>(NOTIFIER_CAPACITY).0);
474	}
475
476	/// Sets a block notifier
477	pub fn set_block_notifier(&mut self, notifier: Option<tokio::sync::broadcast::Sender<H256>>) {
478		self.block_notifier = notifier;
479	}
480
481	pub(crate) fn api(&self) -> &OnlineClient<SrcChainConfig> {
482		&self.api
483	}
484
485	pub(crate) fn receipt_provider(&self) -> &ReceiptProvider {
486		&self.receipt_provider
487	}
488
489	pub(crate) fn block_provider(&self) -> &SubxtBlockInfoProvider {
490		&self.block_provider
491	}
492
493	pub(crate) fn subscription_gap_queue(&self) -> &SubscriptionGapQueue {
494		&self.subscription_gap_queue
495	}
496
497	/// The earliest block number where the ReviveApi is available.
498	/// Resolution order: in-memory value > known-networks table > 0.
499	fn earliest_block_number(&self) -> u32 {
500		self.receipt_provider
501			.first_evm_block()
502			.or_else(|| known_first_evm_block_for_chain(self.chain_id))
503			.unwrap_or(0)
504	}
505
506	/// Subscribe to new blocks, and execute the async closure for each block.
507	async fn subscribe_new_blocks<F, Fut>(
508		&self,
509		subscription_type: SubscriptionType,
510		callback: F,
511	) -> Result<(), ClientError>
512	where
513		F: Fn(SubstrateBlock) -> Fut + Send + Sync,
514		Fut: std::future::Future<Output = Result<(), ClientError>> + Send,
515	{
516		let mut block_stream = match subscription_type {
517			SubscriptionType::BestBlocks => self.api.blocks().subscribe_best().await,
518			SubscriptionType::FinalizedBlocks => self.api.blocks().subscribe_finalized().await,
519		}
520		.inspect_err(|err| {
521			log::error!(target: LOG_TARGET, "Failed to subscribe to blocks: {err:?}");
522		})?;
523
524		let mut last_finalized_seen: Option<SubstrateBlockNumber> = None;
525
526		while let Some(block) = block_stream.next().await {
527			let block = match block {
528				Ok(block) => block,
529				Err(err) => {
530					if err.is_disconnected_will_reconnect() {
531						log::warn!(
532							target: LOG_TARGET,
533							"The RPC connection was lost and we may have missed a few blocks \
534							({subscription_type:?}, last finalized: {last_finalized_seen:?}): {err:?}"
535						);
536						continue;
537					}
538
539					log::error!(target: LOG_TARGET, "Failed to fetch block ({subscription_type:?}): {err:?}");
540					return Err(err.into());
541				},
542			};
543
544			// Acquire lock to ensure only one subscription can perform write operations at a time
545			let _guard = self.subscription_lock.lock().await;
546
547			let block_number = block.number();
548
549			// Only check finalized blocks for gaps.
550			if subscription_type == SubscriptionType::FinalizedBlocks {
551				if let Some(last) = last_finalized_seen {
552					self.subscription_gap_queue.detect_and_queue(block_number, last);
553				}
554				// Update unconditionally โ€” a callback failure doesn't mean the block was missed.
555				last_finalized_seen = Some(block_number);
556			}
557
558			log::trace!(target: LOG_TARGET_SUBSCRIPTION, "โณ Processing {subscription_type:?} block: {block_number}");
559			if let Err(err) = callback(block).await {
560				log::error!(target: LOG_TARGET, "Failed to process block {block_number}: {err:?}");
561			} else {
562				log::trace!(target: LOG_TARGET_SUBSCRIPTION, "โœ… Processed {subscription_type:?} block: {block_number}");
563			}
564		}
565
566		log::info!(target: LOG_TARGET, "Block subscription ended");
567		Ok(())
568	}
569
570	/// Extract receipts from a block, persist them and update fee history.
571	async fn process_block(
572		&self,
573		block: &SubstrateBlock,
574	) -> Result<(Block, Vec<ReceiptInfo>), ClientError> {
575		let block_number = block.number();
576		let hash = block.hash();
577
578		macro_rules! time {
579			($label:expr, $expr:expr) => {{
580				let t = std::time::Instant::now();
581				let r = $expr;
582				log::trace!(
583					target: LOG_TARGET,
584					"โฑ๏ธ #{block_number} {}: {:?}",
585					$label, t.elapsed(),
586				);
587				r
588			}};
589		}
590
591		let eth_block = time!("eth_block", self.runtime_api(hash).eth_block().await?);
592		let receipts = time!(
593			"receipts_from_block",
594			self.receipt_provider.receipts_from_block(block, eth_block.hash).await?
595		);
596		time!(
597			"insert_block_receipts",
598			self.receipt_provider
599				.insert_block_receipts(block, &receipts, &eth_block.hash)
600				.await?
601		);
602
603		let (_, receipt_infos): (Vec<_>, Vec<_>) = receipts.into_iter().unzip();
604		self.fee_history_provider.update_fee_history(&eth_block, &receipt_infos).await;
605
606		Ok((eth_block, receipt_infos))
607	}
608
609	/// Start the block subscription, and populate the block cache.
610	pub async fn subscribe_and_cache_new_blocks(
611		&self,
612		subscription_type: SubscriptionType,
613	) -> Result<(), ClientError> {
614		log::info!(target: LOG_TARGET, "๐Ÿ”Œ Subscribing to new blocks ({subscription_type:?})");
615		self.subscribe_new_blocks(subscription_type, |block| async {
616			let hash = block.hash();
617
618			match subscription_type {
619				SubscriptionType::BestBlocks => {
620					let (eth_block, _) = self.process_block(&block).await?;
621					self.block_provider.update_latest(Arc::new(block), subscription_type).await;
622
623					if let Some(sender) = &self.block_notifier {
624						if sender.receiver_count() > 0 {
625							let _ = sender.send(hash);
626						}
627					}
628					if self.block_subscription_tx.receiver_count() > 0 {
629						let _ = self.block_subscription_tx.send(eth_block);
630					}
631				},
632				SubscriptionType::FinalizedBlocks => {
633					let block_number = block.number();
634					let (receipt_infos, eth_hash) = match self
635						.receipt_provider
636						.get_processed_eth_block_hash(block_number, hash)
637						.await
638					{
639						Some(eth_hash) => {
640							log::trace!(target: LOG_TARGET_SUBSCRIPTION,
641									"โฉ Finalized block #{block_number} already processed, \
642									 skipping extraction");
643							(None, eth_hash)
644						},
645						None => {
646							let (eth_block, infos) = self.process_block(&block).await?;
647							(Some(infos), eth_block.hash)
648						},
649					};
650
651					self.block_provider.update_latest(Arc::new(block), subscription_type).await;
652					self.advance_sync_head(block_number, hash).await;
653
654					if self.log_subscription_tx.receiver_count() > 0 {
655						let logs = match receipt_infos {
656							Some(infos) => infos.into_iter().flat_map(|r| r.logs).collect(),
657							None => {
658								self.receipt_provider
659									.logs_by_block_number(block_number, eth_hash)
660									.await?
661							},
662						};
663						for log in logs {
664							let _ = self.log_subscription_tx.send(log);
665						}
666					}
667				},
668			}
669
670			Ok(())
671		})
672		.await
673	}
674
675	/// Get the block hash for the given block number or tag.
676	pub async fn block_hash_for_tag(
677		&self,
678		at: BlockNumberOrTagOrHash,
679	) -> Result<SubstrateBlockHash, ClientError> {
680		match at {
681			BlockNumberOrTagOrHash::BlockHash(hash) => self
682				.resolve_substrate_hash(&hash)
683				.await
684				.ok_or(ClientError::EthereumBlockNotFound),
685			BlockNumberOrTagOrHash::BlockNumber(block_number) => {
686				let n: SubstrateBlockNumber =
687					(block_number).try_into().map_err(|_| ClientError::ConversionFailed)?;
688				let hash = self.get_block_hash(n).await?.ok_or(ClientError::BlockNotFound)?;
689				Ok(hash)
690			},
691			BlockNumberOrTagOrHash::BlockTag(BlockTag::Finalized | BlockTag::Safe) => {
692				let block = self.latest_finalized_block().await;
693				Ok(block.hash())
694			},
695			BlockNumberOrTagOrHash::BlockTag(BlockTag::Earliest) => {
696				let hash = self
697					.get_block_hash(self.earliest_block_number())
698					.await?
699					.ok_or(ClientError::BlockNotFound)?;
700				Ok(hash)
701			},
702			BlockNumberOrTagOrHash::BlockTag(_) => {
703				let block = self.latest_block().await;
704				Ok(block.hash())
705			},
706		}
707	}
708
709	/// Get the storage API for the given block.
710	pub fn storage_api(&self, block_hash: H256) -> StorageApi {
711		StorageApi::new(self.api.storage().at(block_hash))
712	}
713
714	/// Get the runtime API for the given block.
715	pub fn runtime_api(&self, block_hash: H256) -> RuntimeApi {
716		RuntimeApi::new(self.api.runtime_api().at(block_hash))
717	}
718
719	/// Get the latest finalized block.
720	pub async fn latest_finalized_block(&self) -> Arc<SubstrateBlock> {
721		self.block_provider.latest_finalized_block().await
722	}
723
724	/// Get the latest best block.
725	pub async fn latest_block(&self) -> Arc<SubstrateBlock> {
726		self.block_provider.latest_block().await
727	}
728
729	/// Submit an ethereum transaction and return a stream of transaction status updates.
730	async fn submit_transaction(
731		&self,
732		call: subxt::tx::DefaultPayload<EthTransact>,
733	) -> Result<StreamOfResults<TransactionStatus<SubstrateBlockHash>>, ClientError> {
734		let ext = self.api.tx().create_unsigned(&call.unvalidated()).map_err(ClientError::from)?;
735
736		let sub = self
737			.rpc_client
738			.subscribe(
739				"author_submitAndWatchExtrinsic",
740				rpc_params![to_hex(ext.encoded())],
741				"author_unwatchExtrinsic",
742			)
743			.await?;
744
745		let sub = sub.map_err(|e| e.into());
746		Ok(StreamOf::new(Box::pin(sub)))
747	}
748
749	/// Expose the transaction API.
750	pub async fn submit(
751		&self,
752		call: subxt::tx::DefaultPayload<EthTransact>,
753	) -> Result<TransactionStatus<SubstrateBlockHash>, ClientError> {
754		let mut progress = self.submit_transaction(call).await.inspect_err(|err| {
755			log::debug!(target: LOG_TARGET, "Failed to submit transaction: {err:?}");
756		})?;
757
758		tokio::time::timeout(Duration::from_secs(5), async {
759			if let Some(status) = progress.next().await {
760				match status {
761					Ok(
762						tx @ (TransactionStatus::Future |
763						TransactionStatus::Ready |
764						// Add other events that follow Ready here for completeness,
765						// but they can be ignored.
766						TransactionStatus::Broadcast(_) |
767						TransactionStatus::InBlock(_) |
768						TransactionStatus::FinalityTimeout(_) |
769						TransactionStatus::Retracted(_) |
770						TransactionStatus::Finalized(_)),
771					) => {
772						return Ok(tx);
773					},
774					Ok(
775						tx @ (TransactionStatus::Usurped(_) |
776						TransactionStatus::Dropped |
777						TransactionStatus::Invalid),
778					) => {
779						return Err(ClientError::SubmitError(tx.into()));
780					},
781					Err(err) => {
782						log::debug!(target: LOG_TARGET, "Transaction submission failed: {err:?}");
783						return Err(ClientError::from(err));
784					},
785				}
786			}
787			return Err(ClientError::SubmitError(SubmitError::StreamEnded));
788		})
789		.await
790		.map_err(|_| ClientError::Timeout)?
791	}
792
793	/// Get an EVM transaction receipt by hash.
794	pub async fn receipt(&self, tx_hash: &H256) -> Option<ReceiptInfo> {
795		self.receipt_provider.receipt_by_hash(tx_hash).await
796	}
797
798	/// Get The post dispatch weight associated with this Ethereum transaction hash.
799	pub async fn post_dispatch_weight(&self, tx_hash: &H256) -> Option<Weight> {
800		use crate::subxt_client::system::events::ExtrinsicSuccess;
801		let ReceiptInfo { block_hash, transaction_index, .. } = self.receipt(tx_hash).await?;
802		let block_hash = self.resolve_substrate_hash(&block_hash).await?;
803		let block = self.block_provider.block_by_hash(&block_hash).await.ok()??;
804		let ext = block.extrinsics().await.ok()?.iter().nth(transaction_index.as_u32() as _)?;
805		let event = ext.events().await.ok()?.find_first::<ExtrinsicSuccess>().ok()??;
806		Some(event.dispatch_info.weight.0)
807	}
808
809	pub async fn sync_state(
810		&self,
811	) -> Result<sc_rpc::system::SyncState<SubstrateBlockNumber>, ClientError> {
812		let client = self.rpc_client.clone();
813		let sync_state: sc_rpc::system::SyncState<SubstrateBlockNumber> =
814			client.request("system_syncState", Default::default()).await?;
815		Ok(sync_state)
816	}
817
818	/// Get the syncing status of the chain.
819	pub async fn syncing(&self) -> Result<SyncingStatus, ClientError> {
820		let health = self.rpc.system_health().await?;
821
822		let status = if health.is_syncing {
823			let sync_state = self.sync_state().await?;
824			SyncingProgress {
825				current_block: Some(sync_state.current_block.into()),
826				highest_block: Some(sync_state.highest_block.into()),
827				starting_block: Some(sync_state.starting_block.into()),
828			}
829			.into()
830		} else {
831			SyncingStatus::Bool(false)
832		};
833
834		Ok(status)
835	}
836
837	/// Get an EVM transaction receipt by hash.
838	pub async fn receipt_by_hash_and_index(
839		&self,
840		block_hash: &H256,
841		transaction_index: usize,
842	) -> Option<ReceiptInfo> {
843		self.receipt_provider
844			.receipt_by_block_hash_and_index(block_hash, transaction_index)
845			.await
846	}
847
848	pub async fn signed_tx_by_hash(&self, tx_hash: &H256) -> Option<TransactionSigned> {
849		self.receipt_provider.signed_tx_by_hash(tx_hash).await
850	}
851
852	/// Get receipts count per block.
853	pub async fn receipts_count_per_block(&self, block_hash: &SubstrateBlockHash) -> Option<usize> {
854		self.receipt_provider.receipts_count_per_block(block_hash).await
855	}
856
857	/// Get an EVM transaction receipt by specified Ethereum block hash.
858	pub async fn receipt_by_ethereum_hash_and_index(
859		&self,
860		ethereum_hash: &H256,
861		transaction_index: usize,
862	) -> Option<ReceiptInfo> {
863		// Fallback: use hash as Substrate hash if Ethereum hash cannot be resolved
864		let substrate_hash =
865			self.resolve_substrate_hash(ethereum_hash).await.unwrap_or_else(|| {
866				log::trace!(target: LOG_TARGET,
867					"receipt_by_ethereum_hash_and_index: no ETH-to-substrate mapping for \
868					 {ethereum_hash:?}, falling back to substrate hash lookup");
869				*ethereum_hash
870			});
871		self.receipt_by_hash_and_index(&substrate_hash, transaction_index).await
872	}
873
874	/// Get the system health.
875	pub async fn system_health(&self) -> Result<SystemHealth, ClientError> {
876		let health = self.rpc.system_health().await?;
877		Ok(health)
878	}
879
880	/// Get the block number of the latest block.
881	pub async fn block_number(&self) -> Result<SubstrateBlockNumber, ClientError> {
882		let latest_block = self.block_provider.latest_block().await;
883		Ok(latest_block.number())
884	}
885
886	/// Get a block hash for the given block number.
887	pub async fn get_block_hash(
888		&self,
889		block_number: SubstrateBlockNumber,
890	) -> Result<Option<SubstrateBlockHash>, ClientError> {
891		let maybe_block = self.block_provider.block_by_number(block_number).await?;
892		Ok(maybe_block.map(|block| block.hash()))
893	}
894
895	/// Get a block for the specified hash or number.
896	pub async fn block_by_number_or_tag(
897		&self,
898		block: &BlockNumberOrTag,
899	) -> Result<Option<Arc<SubstrateBlock>>, ClientError> {
900		match block {
901			BlockNumberOrTag::U256(n) => {
902				let n = (*n).try_into().map_err(|_| ClientError::ConversionFailed)?;
903				self.block_by_number(n).await
904			},
905			BlockNumberOrTag::BlockTag(BlockTag::Finalized | BlockTag::Safe) => {
906				let block = self.block_provider.latest_finalized_block().await;
907				Ok(Some(block))
908			},
909			BlockNumberOrTag::BlockTag(BlockTag::Earliest) => {
910				self.block_by_number(self.earliest_block_number()).await
911			},
912			BlockNumberOrTag::BlockTag(_) => {
913				let block = self.block_provider.latest_block().await;
914				Ok(Some(block))
915			},
916		}
917	}
918
919	/// Get a block by hash
920	pub async fn block_by_hash(
921		&self,
922		hash: &SubstrateBlockHash,
923	) -> Result<Option<Arc<SubstrateBlock>>, ClientError> {
924		self.block_provider.block_by_hash(hash).await
925	}
926
927	/// Resolve Ethereum block hash to Substrate block hash, then get the block.
928	/// This method provides the abstraction layer needed by the RPC APIs.
929	pub async fn resolve_substrate_hash(&self, ethereum_hash: &H256) -> Option<H256> {
930		self.receipt_provider.get_substrate_hash(ethereum_hash).await
931	}
932
933	/// Resolve Substrate block hash to Ethereum block hash, then get the block.
934	/// This method provides the abstraction layer needed by the RPC APIs.
935	pub async fn resolve_ethereum_hash(&self, substrate_hash: &H256) -> Option<H256> {
936		self.receipt_provider.get_ethereum_hash(substrate_hash).await
937	}
938
939	/// Get a block by Ethereum hash with automatic resolution to Substrate hash.
940	/// Falls back to treating the hash as a Substrate hash if no mapping exists.
941	pub async fn block_by_ethereum_hash(
942		&self,
943		ethereum_hash: &H256,
944	) -> Result<Option<Arc<SubstrateBlock>>, ClientError> {
945		// First try to resolve the Ethereum hash to a Substrate hash
946		if let Some(substrate_hash) = self.resolve_substrate_hash(ethereum_hash).await {
947			return self.block_by_hash(&substrate_hash).await;
948		}
949
950		// Fallback: treat the provided hash as a Substrate hash (backward compatibility)
951		log::trace!(target: LOG_TARGET,
952			"block_by_ethereum_hash: no ETH-to-substrate mapping for {ethereum_hash:?}, \
953			 falling back to substrate hash lookup");
954		self.block_by_hash(ethereum_hash).await
955	}
956
957	/// Get a block by number
958	pub async fn block_by_number(
959		&self,
960		block_number: SubstrateBlockNumber,
961	) -> Result<Option<Arc<SubstrateBlock>>, ClientError> {
962		self.block_provider.block_by_number(block_number).await
963	}
964
965	async fn tracing_block(
966		&self,
967		block_hash: H256,
968	) -> Result<
969		sp_runtime::generic::Block<
970			sp_runtime::generic::Header<u32, sp_runtime::traits::BlakeTwo256>,
971			sp_runtime::OpaqueExtrinsic,
972		>,
973		ClientError,
974	> {
975		let signed_block: Option<
976			sp_runtime::generic::SignedBlock<
977				sp_runtime::generic::Block<
978					sp_runtime::generic::Header<u32, sp_runtime::traits::BlakeTwo256>,
979					sp_runtime::OpaqueExtrinsic,
980				>,
981			>,
982		> = self.rpc_client.request("chain_getBlock", rpc_params![block_hash]).await?;
983
984		Ok(signed_block.ok_or(ClientError::BlockNotFound)?.block)
985	}
986
987	/// Get the transaction traces for the given block.
988	pub async fn trace_block_by_number(
989		&self,
990		at: BlockNumberOrTag,
991		config: TracerTypeV1,
992	) -> Result<Vec<TransactionTrace>, ClientError> {
993		if self.receipt_provider.is_before_earliest_block(&at) {
994			return Ok(vec![]);
995		}
996
997		let block_hash = self.block_hash_for_tag(at.into()).await?;
998		let block = self.tracing_block(block_hash).await?;
999		let parent_hash = block.header().parent_hash;
1000		// Block 0 has no parent โ€” there is nothing to trace.
1001		if parent_hash == Default::default() {
1002			return Ok(vec![]);
1003		}
1004		let runtime_api = RuntimeApi::new(self.api.runtime_api().at(parent_hash));
1005		let traces = runtime_api.trace_block(block, config.clone()).await?;
1006
1007		let mut hashes = self
1008			.receipt_provider
1009			.block_transaction_hashes(&block_hash)
1010			.await
1011			.ok_or(ClientError::EthExtrinsicNotFound)?;
1012
1013		let traces = traces.into_iter().filter_map(|(index, trace)| {
1014			Some(TransactionTrace { tx_hash: hashes.remove(&(index as usize))?, trace })
1015		});
1016
1017		Ok(traces.collect())
1018	}
1019
1020	/// Get the transaction traces for the given transaction.
1021	pub async fn trace_transaction(
1022		&self,
1023		transaction_hash: H256,
1024		config: TracerTypeV1,
1025	) -> Result<TraceV1, ClientError> {
1026		let (block_hash, transaction_index) = self
1027			.receipt_provider
1028			.find_transaction(&transaction_hash)
1029			.await
1030			.ok_or(ClientError::EthExtrinsicNotFound)?;
1031
1032		let block = self.tracing_block(block_hash).await?;
1033		let parent_hash = block.header.parent_hash;
1034		let runtime_api = self.runtime_api(parent_hash);
1035
1036		runtime_api.trace_tx(block, transaction_index as u32, config).await
1037	}
1038
1039	/// Get the transaction traces for the given block.
1040	pub async fn trace_call(
1041		&self,
1042		transaction: GenericTransaction,
1043		block: BlockNumberOrTagOrHash,
1044		config: TracerTypeV1,
1045		state_overrides: Option<StateOverrideSet>,
1046	) -> Result<TraceV1, ClientError> {
1047		let block_hash = self.block_hash_for_tag(block).await?;
1048		let runtime_api = self.runtime_api(block_hash);
1049		runtime_api.trace_call(transaction, config, state_overrides).await
1050	}
1051
1052	/// Get the EVM block for the given Substrate block.
1053	pub async fn evm_block(
1054		&self,
1055		block: Arc<SubstrateBlock>,
1056		hydrated_transactions: bool,
1057	) -> Option<Block> {
1058		log::trace!(target: LOG_TARGET, "Get Ethereum block for hash {:?}", block.hash());
1059
1060		if self
1061			.receipt_provider
1062			.is_before_earliest_block(&BlockNumberOrTag::U256(U256::from(block.number())))
1063		{
1064			log::trace!(target: LOG_TARGET,
1065				"Block #{} is before receipt floor, skipping", block.number());
1066			return None;
1067		}
1068
1069		// This could potentially fail under below circumstances:
1070		//  - state has been pruned
1071		//  - the block author cannot be obtained from the digest logs (highly unlikely)
1072		//  - the node we are targeting has an outdated revive pallet (or ETH block functionality is
1073		//    disabled)
1074		match self.runtime_api(block.hash()).eth_block().await {
1075			Ok(mut eth_block) => {
1076				log::trace!(target: LOG_TARGET, "Ethereum block from runtime API hash {:?}", eth_block.hash);
1077
1078				if hydrated_transactions {
1079					// Hydrate the block.
1080					let tx_infos = self
1081						.receipt_provider
1082						.receipts_from_block(&block, eth_block.hash)
1083						.await
1084						.inspect_err(|err| {
1085							log::trace!(target: LOG_TARGET,
1086								"Failed to extract receipts for block #{}: {err:?}",
1087								block.number());
1088						})
1089						.unwrap_or_default()
1090						.into_iter()
1091						.map(|(signed_tx, receipt)| TransactionInfo::new(&receipt, signed_tx))
1092						.collect::<Vec<_>>();
1093
1094					eth_block.transactions = HashesOrTransactionInfos::TransactionInfos(tx_infos);
1095				}
1096
1097				Some(eth_block)
1098			},
1099			Err(err) => {
1100				log::error!(target: LOG_TARGET, "Failed to get Ethereum block for hash {:?}: {err:?}", block.hash());
1101				None
1102			},
1103		}
1104	}
1105
1106	/// Get the chain ID.
1107	pub fn chain_id(&self) -> u64 {
1108		self.chain_id
1109	}
1110
1111	/// Get the Max Block Weight.
1112	pub fn max_block_weight(&self) -> Weight {
1113		self.max_block_weight
1114	}
1115
1116	/// Get the block notifier, if automine is enabled or Self::create_block_notifier was called.
1117	pub fn block_notifier(&self) -> Option<tokio::sync::broadcast::Sender<H256>> {
1118		self.block_notifier.clone()
1119	}
1120
1121	/// Get the logs matching the given filter.
1122	pub async fn logs(&self, filter: Option<Filter>) -> Result<Vec<Log>, ClientError> {
1123		let earliest = U256::from(self.earliest_block_number());
1124		let latest = U256::from(self.latest_block().await.number());
1125		let resolve_block_number = |block: BlockNumberOrTag| match block {
1126			BlockNumberOrTag::U256(v) => Ok(v),
1127			BlockNumberOrTag::BlockTag(BlockTag::Earliest) => Ok(earliest),
1128			BlockNumberOrTag::BlockTag(BlockTag::Latest) => Ok(latest),
1129			BlockNumberOrTag::BlockTag(tag) => anyhow::bail!("Unsupported tag: {tag:?}"),
1130		};
1131
1132		let logs = self
1133			.receipt_provider
1134			.logs(filter, &resolve_block_number)
1135			.await
1136			.map_err(ClientError::LogFilterFailed)?;
1137
1138		Ok(logs)
1139	}
1140
1141	pub async fn fee_history(
1142		&self,
1143		block_count: u32,
1144		latest_block: BlockNumberOrTag,
1145		reward_percentiles: Option<Vec<f64>>,
1146	) -> Result<FeeHistoryResult, ClientError> {
1147		let Some(latest_block) = self.block_by_number_or_tag(&latest_block).await? else {
1148			return Err(ClientError::BlockNotFound);
1149		};
1150
1151		self.fee_history_provider
1152			.fee_history(block_count, latest_block.number(), reward_percentiles)
1153			.await
1154	}
1155
1156	/// Check if automine is enabled.
1157	pub fn is_automine(&self) -> bool {
1158		self.automine
1159	}
1160
1161	/// Get the automine status from the node.
1162	pub async fn get_automine(&self) -> bool {
1163		get_automine(&self.rpc_client).await
1164	}
1165
1166	/// Gets the block subscription rx side of the channel.
1167	pub fn get_block_subscription_rx(&self) -> tokio::sync::broadcast::Receiver<Block> {
1168		self.block_subscription_tx.subscribe()
1169	}
1170
1171	/// Gets the log subscription rx side of the channel.
1172	pub fn get_log_subscription_rx(&self) -> tokio::sync::broadcast::Receiver<Log> {
1173		self.log_subscription_tx.subscribe()
1174	}
1175}
1176
1177fn to_hex(bytes: impl AsRef<[u8]>) -> String {
1178	format!("0x{}", hex::encode(bytes.as_ref()))
1179}