referrerpolicy=no-referrer-when-downgrade

pallet_revive_eth_rpc/
client.rs

1// This file is part of Substrate.
2
3// Copyright (C) Parity Technologies (UK) Ltd.
4// SPDX-License-Identifier: Apache-2.0
5
6// Licensed under the Apache License, Version 2.0 (the "License");
7// you may not use this file except in compliance with the License.
8// You may obtain a copy of the License at
9//
10// 	http://www.apache.org/licenses/LICENSE-2.0
11//
12// Unless required by applicable law or agreed to in writing, software
13// distributed under the License is distributed on an "AS IS" BASIS,
14// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
15// See the License for the specific language governing permissions and
16// limitations under the License.
17//! The client connects to the source substrate chain
18//! and is used by the rpc server to query and send transactions to the substrate chain.
19
20pub(crate) mod runtime_api;
21pub(crate) mod storage_api;
22
23use crate::{
24	BlockInfoProvider, BlockTag, FeeHistoryProvider, ReceiptProvider, SubxtBlockInfoProvider,
25	SyncLabel, TracerType, TransactionInfo,
26	block_sync::SyncCheckpoint,
27	subxt_client::{self, SrcChainConfig, revive::calls::types::EthTransact},
28};
29use futures::TryStreamExt;
30use jsonrpsee::types::{ErrorObjectOwned, error::CALL_EXECUTION_FAILED_CODE};
31use pallet_revive::{
32	EthTransactError,
33	evm::{
34		Block, BlockNumberOrTag, BlockNumberOrTagOrHash, FeeHistoryResult, Filter,
35		GenericTransaction, H256, HashesOrTransactionInfos, Log, ReceiptInfo, StateOverrideSet,
36		SyncingProgress, SyncingStatus, Trace, TransactionSigned, TransactionTrace, U256,
37		decode_revert_reason,
38	},
39};
40use runtime_api::RuntimeApi;
41use sp_runtime::traits::Block as BlockT;
42use sp_weights::Weight;
43use std::{
44	sync::{
45		Arc,
46		atomic::{AtomicBool, AtomicUsize, Ordering},
47	},
48	time::Duration,
49};
50use storage_api::StorageApi;
51use subxt::{
52	Config, OnlineClient,
53	backend::{
54		StreamOf, StreamOfResults,
55		legacy::{
56			LegacyRpcMethods,
57			rpc_methods::{SystemHealth, TransactionStatus},
58		},
59		rpc::{
60			RpcClient,
61			reconnecting_rpc_client::{ExponentialBackoff, RpcClient as ReconnectingRpcClient},
62		},
63	},
64	config::{HashFor, Header},
65	ext::subxt_rpcs::rpc_params,
66};
67use thiserror::Error;
68use tokio::sync::{Mutex, mpsc};
69
70/// The substrate block type.
71pub type SubstrateBlock = subxt::blocks::Block<SrcChainConfig, OnlineClient<SrcChainConfig>>;
72
73/// The substrate block header.
74pub type SubstrateBlockHeader = <SrcChainConfig as Config>::Header;
75
76/// The substrate block number type.
77pub type SubstrateBlockNumber = <SubstrateBlockHeader as Header>::Number;
78
79/// The substrate block hash type.
80pub type SubstrateBlockHash = HashFor<SrcChainConfig>;
81
82/// The runtime balance type.
83pub type Balance = u128;
84
85/// The subscription type used to listen to new blocks.
86#[derive(Debug, Clone, Copy, PartialEq)]
87pub enum SubscriptionType {
88	/// Subscribe to best blocks.
89	BestBlocks,
90	/// Subscribe to finalized blocks.
91	FinalizedBlocks,
92}
93
94/// Submit Error reason.
95#[derive(Debug, Clone, Copy, PartialEq, Eq, Error)]
96pub enum SubmitError {
97	/// Transaction was usurped by another with the same nonce.
98	#[error("Transaction was usurped by another with the same nonce")]
99	Usurped,
100	/// Transaction was dropped from the pool.
101	#[error("Transaction was dropped")]
102	Dropped,
103	/// Transaction is invalid (e.g. bad nonce, signature, etc).
104	#[error("Transaction is invalid (e.g. bad nonce, signature, etc)")]
105	Invalid,
106	/// Transaction stream ended without a terminal status.
107	#[error("Transaction stream ended without status")]
108	StreamEnded,
109	/// Unknown transaction status.
110	#[error("Unknown transaction status")]
111	Unknown,
112}
113
114impl From<TransactionStatus<SubstrateBlockHash>> for SubmitError {
115	fn from(status: TransactionStatus<SubstrateBlockHash>) -> Self {
116		match status {
117			TransactionStatus::Usurped(_) => SubmitError::Usurped,
118			TransactionStatus::Dropped => SubmitError::Dropped,
119			TransactionStatus::Invalid => SubmitError::Invalid,
120			_ => SubmitError::Unknown,
121		}
122	}
123}
124
125/// The error type for the client.
126#[derive(Error, Debug)]
127pub enum ClientError {
128	/// A [`jsonrpsee::core::ClientError`] wrapper error.
129	#[error(transparent)]
130	Jsonrpsee(#[from] jsonrpsee::core::ClientError),
131	/// A [`subxt::Error`] wrapper error.
132	#[error(transparent)]
133	SubxtError(#[from] subxt::Error),
134	#[error(transparent)]
135	RpcError(#[from] subxt::ext::subxt_rpcs::Error),
136	/// A [`sqlx::Error`] wrapper error.
137	#[error(transparent)]
138	SqlxError(#[from] sqlx::Error),
139	/// A [`codec::Error`] wrapper error.
140	#[error(transparent)]
141	CodecError(#[from] codec::Error),
142	/// author_submitExtrinsic failed.
143	#[error("Invalid transaction: {0}")]
144	SubmitError(SubmitError),
145	/// Transact call failed.
146	#[error("contract reverted: {0:?}")]
147	TransactError(EthTransactError),
148	/// A decimal conversion failed.
149	#[error("conversion failed")]
150	ConversionFailed,
151	/// The block hash was not found.
152	#[error("hash not found")]
153	BlockNotFound,
154	/// The contract was not found.
155	#[error("Contract not found")]
156	ContractNotFound,
157	#[error("No Ethereum extrinsic found")]
158	EthExtrinsicNotFound,
159	/// The transaction fee could not be found
160	#[error("transactionFeePaid event not found")]
161	TxFeeNotFound,
162	/// Failed to decode a raw payload into a signed transaction.
163	#[error("Failed to decode a raw payload into a signed transaction")]
164	TxDecodingFailed,
165	/// Failed to recover eth address.
166	#[error("failed to recover eth address")]
167	RecoverEthAddressFailed,
168	/// Failed to filter logs.
169	#[error("Failed to filter logs")]
170	LogFilterFailed(#[from] anyhow::Error),
171	/// Receipt storage was not found.
172	#[error("Receipt storage not found")]
173	ReceiptDataNotFound,
174	/// Ethereum block was not found.
175	#[error("Ethereum block not found")]
176	EthereumBlockNotFound,
177	/// Receipt data length mismatch.
178	#[error("Receipt data length mismatch")]
179	ReceiptDataLengthMismatch,
180	/// Transaction submission timeout.
181	#[error("Transaction submission timeout")]
182	Timeout,
183	/// All of the estimation methods `eth_estimate`, `eth_transact_with_config`, and
184	/// `eth_transact` were not found and therefore none of the estimation methods succeeded.
185	#[error("None of the estimation methods were found")]
186	NoEstimationMethodSucceeded,
187	/// Chain identity mismatch between stored genesis and connected node.
188	#[error("Genesis hash mismatch")]
189	ChainMismatch,
190	/// Stored sync boundary does not match the connected node.
191	#[error("Sync boundary mismatch")]
192	SyncBoundaryMismatch,
193}
194
195impl ClientError {
196	/// Errors that indicate a mismatch between the stored sync state and the connected node.
197	pub(crate) fn is_chain_validation_error(&self) -> bool {
198		matches!(self, Self::ChainMismatch | Self::SyncBoundaryMismatch)
199	}
200}
201
202const LOG_TARGET: &str = "eth-rpc::client";
203const LOG_TARGET_SUBSCRIPTION: &str = "eth-rpc::subscription";
204
205const REVERT_CODE: i32 = 3;
206
207const NOTIFIER_CAPACITY: usize = 16;
208
209impl From<ClientError> for ErrorObjectOwned {
210	fn from(err: ClientError) -> Self {
211		match err {
212			ClientError::SubxtError(subxt::Error::Rpc(subxt::error::RpcError::ClientError(
213				subxt::ext::subxt_rpcs::Error::User(err),
214			))) |
215			ClientError::RpcError(subxt::ext::subxt_rpcs::Error::User(err)) => {
216				ErrorObjectOwned::owned::<Vec<u8>>(err.code, err.message, None)
217			},
218			ClientError::TransactError(EthTransactError::Data(data)) => {
219				let msg = match decode_revert_reason(&data) {
220					Some(reason) => format!("execution reverted: {reason}"),
221					None => "execution reverted".to_string(),
222				};
223
224				let data = format!("0x{}", hex::encode(data));
225				ErrorObjectOwned::owned::<String>(REVERT_CODE, msg, Some(data))
226			},
227			ClientError::TransactError(EthTransactError::Message(msg)) => {
228				ErrorObjectOwned::owned::<String>(CALL_EXECUTION_FAILED_CODE, msg, None)
229			},
230			_ => {
231				ErrorObjectOwned::owned::<String>(CALL_EXECUTION_FAILED_CODE, err.to_string(), None)
232			},
233		}
234	}
235}
236
237/// A client that connects to a substrate node and provides Ethereum-compatible RPC functionality.
238#[derive(Clone)]
239pub struct Client {
240	api: OnlineClient<SrcChainConfig>,
241	rpc_client: RpcClient,
242	rpc: LegacyRpcMethods<SrcChainConfig>,
243	receipt_provider: ReceiptProvider,
244	block_provider: SubxtBlockInfoProvider,
245	fee_history_provider: FeeHistoryProvider,
246	chain_id: u64,
247	max_block_weight: Weight,
248	/// Whether the node has automine enabled.
249	automine: bool,
250	/// A notifier, that informs subscribers of new best blocks.
251	block_notifier: Option<tokio::sync::broadcast::Sender<H256>>,
252	/// A lock to ensure only one subscription can perform write operations at a time.
253	subscription_lock: Arc<Mutex<()>>,
254
255	/// Block subscription sender side.
256	block_subscription_tx: tokio::sync::broadcast::Sender<Block>,
257	/// Log subscription sender side.
258	log_subscription_tx: tokio::sync::broadcast::Sender<Log>,
259	/// Whether archive mode is enabled
260	is_archive: bool,
261	/// Whether historic backfill has completed. `false` if not started or in progress.
262	backfill_complete: Arc<AtomicBool>,
263	/// Queue for backfilling blocks missed during subscription reconnects.
264	subscription_gap_queue: SubscriptionGapQueue,
265}
266
267/// A request to backfill a range of missed blocks (both bounds inclusive).
268pub(crate) struct GapFillRequest {
269	pub from_inclusive: SubstrateBlockNumber,
270	pub to_inclusive: SubstrateBlockNumber,
271}
272
273/// Queues gap-fill requests for blocks missed during subscription reconnects.
274#[derive(Clone)]
275pub(crate) struct SubscriptionGapQueue {
276	/// Sender half of the gap-fill queue.
277	tx: mpsc::Sender<GapFillRequest>,
278	/// Queued + in-flight gap fills. Channel length alone is insufficient
279	/// because it drops to zero as soon as the receiver dequeues the item.
280	pending: Arc<AtomicUsize>,
281}
282
283impl SubscriptionGapQueue {
284	pub(crate) fn new() -> (Self, mpsc::Receiver<GapFillRequest>) {
285		// Each reconnect produces one gap-fill request for the entire missed range,
286		// so 32 allows for 32 rapid disconnects before the consumer processes any.
287		let (tx, rx) = mpsc::channel(32);
288		(Self { tx, pending: Arc::new(AtomicUsize::new(0)) }, rx)
289	}
290
291	/// If `current` is not consecutive to `last`, queue a gap-fill for the missing range.
292	pub fn detect_and_queue(&self, current: SubstrateBlockNumber, last: SubstrateBlockNumber) {
293		if current.saturating_sub(last) <= 1 {
294			return;
295		}
296
297		let from_inclusive = current.saturating_sub(1);
298		let to_inclusive = last.saturating_add(1);
299		let gap_len = from_inclusive.saturating_sub(to_inclusive) + 1;
300		self.pending.fetch_add(1, Ordering::Release);
301		match self.tx.try_send(GapFillRequest { from_inclusive, to_inclusive }) {
302			Ok(_) => {
303				log::info!(target: LOG_TARGET,
304					"๐Ÿ”„ Subscription gap queue: queued #{from_inclusive} down to #{to_inclusive} ({gap_len} blocks)");
305			},
306			Err(err) => {
307				self.pending.fetch_sub(1, Ordering::Release);
308				log::warn!(target: LOG_TARGET,
309					"๐Ÿ”„ Subscription gap queue error, dropping #{from_inclusive}..#{to_inclusive} ({gap_len} blocks): {err}");
310			},
311		}
312	}
313
314	/// Mark one request as processed.
315	pub fn mark_done(&self) {
316		let res = self
317			.pending
318			.fetch_update(Ordering::AcqRel, Ordering::Acquire, |v| v.checked_sub(1));
319		if res.is_err() {
320			debug_assert!(false, "subscription gap queue pending counter underflowed");
321			log::error!(target: LOG_TARGET,
322				"๐Ÿ”„ Subscription gap queue pending counter underflow, delete the database and restart with --eth-pruning=archive to resync");
323		}
324	}
325
326	/// Returns `true` if there are pending gap-fill requests.
327	pub fn has_pending(&self) -> bool {
328		self.pending.load(Ordering::Acquire) > 0
329	}
330}
331
332/// Returns the first EVM block number for main and test nets, `None` otherwise.
333fn known_first_evm_block_for_chain(chain_id: u64) -> Option<u32> {
334	match chain_id {
335		420420417 => Some(4_367_914),  // Paseo Asset Hub
336		420420418 => Some(12_234_156), // Kusama Asset Hub
337		420420419 => Some(11_405_259), // Polkadot Asset Hub
338		420420421 => Some(13_169_391), // Westend Asset Hub
339		_ => None,
340	}
341}
342
343/// Fetch the chain ID from the substrate chain.
344async fn chain_id(api: &OnlineClient<SrcChainConfig>) -> Result<u64, ClientError> {
345	let query = subxt_client::constants().revive().chain_id().unvalidated();
346	api.constants().at(&query).map_err(|err| err.into())
347}
348
349/// Fetch the max block weight from the substrate chain.
350async fn max_block_weight(api: &OnlineClient<SrcChainConfig>) -> Result<Weight, ClientError> {
351	let query = subxt_client::constants().system().block_weights().unvalidated();
352	let weights = api.constants().at(&query)?;
353	let max_block = weights.per_class.normal.max_extrinsic.unwrap_or(weights.max_block);
354	Ok(max_block.0)
355}
356
357/// Get the automine status from the node.
358async fn get_automine(rpc_client: &RpcClient) -> bool {
359	match rpc_client.request::<bool>("getAutomine", rpc_params![]).await {
360		Ok(val) => val,
361		Err(err) => {
362			log::info!(target: LOG_TARGET, "Node does not have getAutomine RPC. Defaulting to automine=false. error: {err:?}");
363			false
364		},
365	}
366}
367
368/// Connect to a node at the given URL, and return the underlying API, RPC client, and legacy RPC
369/// clients.
370pub async fn connect(
371	node_rpc_url: &str,
372	max_request_size: u32,
373	max_response_size: u32,
374) -> Result<(OnlineClient<SrcChainConfig>, RpcClient, LegacyRpcMethods<SrcChainConfig>), ClientError>
375{
376	log::info!(target: LOG_TARGET, "๐ŸŒ Connecting to node at: {node_rpc_url} ...");
377	let rpc_client = ReconnectingRpcClient::builder()
378		.retry_policy(ExponentialBackoff::from_millis(100).max_delay(Duration::from_secs(10)))
379		.max_request_size(max_request_size)
380		.max_response_size(max_response_size)
381		.build(node_rpc_url.to_string())
382		.await?;
383	let rpc_client = RpcClient::new(rpc_client);
384	log::info!(target: LOG_TARGET, "๐ŸŒŸ Connected to node at: {node_rpc_url}");
385
386	let api = OnlineClient::<SrcChainConfig>::from_rpc_client(rpc_client.clone()).await?;
387	let rpc = LegacyRpcMethods::<SrcChainConfig>::new(rpc_client.clone());
388	Ok((api, rpc_client, rpc))
389}
390
391impl Client {
392	/// Create a new client instance.
393	pub(crate) async fn new(
394		api: OnlineClient<SrcChainConfig>,
395		rpc_client: RpcClient,
396		rpc: LegacyRpcMethods<SrcChainConfig>,
397		block_provider: SubxtBlockInfoProvider,
398		receipt_provider: ReceiptProvider,
399		is_archive: bool,
400		subscription_gap_queue: SubscriptionGapQueue,
401	) -> Result<Self, ClientError> {
402		let (chain_id, max_block_weight, automine) =
403			tokio::try_join!(chain_id(&api), max_block_weight(&api), async {
404				Ok(get_automine(&rpc_client).await)
405			},)?;
406
407		// Fall back to 0 when the hardcoded value exceeds the current best block (e.g. zombienet
408		// reusing a known chain ID) and backward sync is disabled.
409		if !is_archive {
410			if let Some(known) = known_first_evm_block_for_chain(chain_id) {
411				let best = block_provider.latest_block_number().await;
412				if known > best {
413					log::debug!(
414						target: LOG_TARGET,
415						"Hardcoded first EVM block {known} exceeds best block {best} \
416						 for chain {chain_id}, defaulting to 0"
417					);
418					receipt_provider.set_first_evm_block(0).await?;
419				}
420			}
421		}
422
423		let client = Self {
424			api,
425			rpc_client,
426			rpc,
427			receipt_provider,
428			block_provider,
429			fee_history_provider: FeeHistoryProvider::default(),
430			chain_id,
431			max_block_weight,
432			automine,
433			block_notifier: automine
434				.then(|| tokio::sync::broadcast::channel::<H256>(NOTIFIER_CAPACITY).0),
435			subscription_lock: Arc::new(Mutex::new(())),
436			block_subscription_tx: tokio::sync::broadcast::channel(256).0,
437			log_subscription_tx: tokio::sync::broadcast::channel(1000).0,
438			is_archive,
439			backfill_complete: Arc::new(AtomicBool::new(false)),
440			subscription_gap_queue,
441		};
442
443		Ok(client)
444	}
445
446	/// Mark historic backfill as complete.
447	pub(crate) fn mark_backfill_complete(&self) {
448		self.backfill_complete.store(true, Ordering::Release);
449	}
450
451	/// Advance the sync_state head label if safe to do so.
452	/// Requires: archive mode, historic backfill complete, and no pending gap fills.
453	async fn advance_sync_head(&self, block_number: SubstrateBlockNumber, hash: H256) {
454		if !self.is_archive ||
455			!self.backfill_complete.load(Ordering::Acquire) ||
456			self.subscription_gap_queue.has_pending()
457		{
458			return;
459		}
460
461		if let Err(err) = self
462			.receipt_provider
463			.advance_sync_label(SyncLabel::Head, SyncCheckpoint::new(block_number, hash))
464			.await
465		{
466			log::warn!(target: LOG_TARGET, "Failed to advance sync head: {err:?}");
467		}
468	}
469
470	/// Creates a block notifier instance.
471	pub fn create_block_notifier(&mut self) {
472		self.block_notifier = Some(tokio::sync::broadcast::channel::<H256>(NOTIFIER_CAPACITY).0);
473	}
474
475	/// Sets a block notifier
476	pub fn set_block_notifier(&mut self, notifier: Option<tokio::sync::broadcast::Sender<H256>>) {
477		self.block_notifier = notifier;
478	}
479
480	pub(crate) fn api(&self) -> &OnlineClient<SrcChainConfig> {
481		&self.api
482	}
483
484	pub(crate) fn receipt_provider(&self) -> &ReceiptProvider {
485		&self.receipt_provider
486	}
487
488	pub(crate) fn block_provider(&self) -> &SubxtBlockInfoProvider {
489		&self.block_provider
490	}
491
492	pub(crate) fn subscription_gap_queue(&self) -> &SubscriptionGapQueue {
493		&self.subscription_gap_queue
494	}
495
496	/// The earliest block number where the ReviveApi is available.
497	/// Resolution order: in-memory value > known-networks table > 0.
498	fn earliest_block_number(&self) -> u32 {
499		self.receipt_provider
500			.first_evm_block()
501			.or_else(|| known_first_evm_block_for_chain(self.chain_id))
502			.unwrap_or(0)
503	}
504
505	/// Subscribe to new blocks, and execute the async closure for each block.
506	async fn subscribe_new_blocks<F, Fut>(
507		&self,
508		subscription_type: SubscriptionType,
509		callback: F,
510	) -> Result<(), ClientError>
511	where
512		F: Fn(SubstrateBlock) -> Fut + Send + Sync,
513		Fut: std::future::Future<Output = Result<(), ClientError>> + Send,
514	{
515		let mut block_stream = match subscription_type {
516			SubscriptionType::BestBlocks => self.api.blocks().subscribe_best().await,
517			SubscriptionType::FinalizedBlocks => self.api.blocks().subscribe_finalized().await,
518		}
519		.inspect_err(|err| {
520			log::error!(target: LOG_TARGET, "Failed to subscribe to blocks: {err:?}");
521		})?;
522
523		let mut last_finalized_seen: Option<SubstrateBlockNumber> = None;
524
525		while let Some(block) = block_stream.next().await {
526			let block = match block {
527				Ok(block) => block,
528				Err(err) => {
529					if err.is_disconnected_will_reconnect() {
530						log::warn!(
531							target: LOG_TARGET,
532							"The RPC connection was lost and we may have missed a few blocks \
533							({subscription_type:?}, last finalized: {last_finalized_seen:?}): {err:?}"
534						);
535						continue;
536					}
537
538					log::error!(target: LOG_TARGET, "Failed to fetch block ({subscription_type:?}): {err:?}");
539					return Err(err.into());
540				},
541			};
542
543			// Acquire lock to ensure only one subscription can perform write operations at a time
544			let _guard = self.subscription_lock.lock().await;
545
546			let block_number = block.number();
547
548			// Only check finalized blocks for gaps.
549			if subscription_type == SubscriptionType::FinalizedBlocks {
550				if let Some(last) = last_finalized_seen {
551					self.subscription_gap_queue.detect_and_queue(block_number, last);
552				}
553				// Update unconditionally โ€” a callback failure doesn't mean the block was missed.
554				last_finalized_seen = Some(block_number);
555			}
556
557			log::trace!(target: LOG_TARGET_SUBSCRIPTION, "โณ Processing {subscription_type:?} block: {block_number}");
558			if let Err(err) = callback(block).await {
559				log::error!(target: LOG_TARGET, "Failed to process block {block_number}: {err:?}");
560			} else {
561				log::trace!(target: LOG_TARGET_SUBSCRIPTION, "โœ… Processed {subscription_type:?} block: {block_number}");
562			}
563		}
564
565		log::info!(target: LOG_TARGET, "Block subscription ended");
566		Ok(())
567	}
568
569	/// Extract receipts from a block, persist them and update fee history.
570	async fn process_block(
571		&self,
572		block: &SubstrateBlock,
573	) -> Result<(Block, Vec<ReceiptInfo>), ClientError> {
574		let block_number = block.number();
575		let hash = block.hash();
576
577		macro_rules! time {
578			($label:expr, $expr:expr) => {{
579				let t = std::time::Instant::now();
580				let r = $expr;
581				log::trace!(
582					target: LOG_TARGET,
583					"โฑ๏ธ #{block_number} {}: {:?}",
584					$label, t.elapsed(),
585				);
586				r
587			}};
588		}
589
590		let eth_block = time!("eth_block", self.runtime_api(hash).eth_block().await?);
591		let receipts = time!(
592			"receipts_from_block",
593			self.receipt_provider.receipts_from_block(block, eth_block.hash).await?
594		);
595		time!(
596			"insert_block_receipts",
597			self.receipt_provider
598				.insert_block_receipts(block, &receipts, &eth_block.hash)
599				.await?
600		);
601
602		let (_, receipt_infos): (Vec<_>, Vec<_>) = receipts.into_iter().unzip();
603		self.fee_history_provider.update_fee_history(&eth_block, &receipt_infos).await;
604
605		Ok((eth_block, receipt_infos))
606	}
607
608	/// Start the block subscription, and populate the block cache.
609	pub async fn subscribe_and_cache_new_blocks(
610		&self,
611		subscription_type: SubscriptionType,
612	) -> Result<(), ClientError> {
613		log::info!(target: LOG_TARGET, "๐Ÿ”Œ Subscribing to new blocks ({subscription_type:?})");
614		self.subscribe_new_blocks(subscription_type, |block| async {
615			let hash = block.hash();
616
617			match subscription_type {
618				SubscriptionType::BestBlocks => {
619					let (eth_block, _) = self.process_block(&block).await?;
620					self.block_provider.update_latest(Arc::new(block), subscription_type).await;
621
622					if let Some(sender) = &self.block_notifier {
623						if sender.receiver_count() > 0 {
624							let _ = sender.send(hash);
625						}
626					}
627					if self.block_subscription_tx.receiver_count() > 0 {
628						let _ = self.block_subscription_tx.send(eth_block);
629					}
630				},
631				SubscriptionType::FinalizedBlocks => {
632					let block_number = block.number();
633					let (receipt_infos, eth_hash) = match self
634						.receipt_provider
635						.get_processed_eth_block_hash(block_number, hash)
636						.await
637					{
638						Some(eth_hash) => {
639							log::trace!(target: LOG_TARGET_SUBSCRIPTION,
640									"โฉ Finalized block #{block_number} already processed, \
641									 skipping extraction");
642							(None, eth_hash)
643						},
644						None => {
645							let (eth_block, infos) = self.process_block(&block).await?;
646							(Some(infos), eth_block.hash)
647						},
648					};
649
650					self.block_provider.update_latest(Arc::new(block), subscription_type).await;
651					self.advance_sync_head(block_number, hash).await;
652
653					if self.log_subscription_tx.receiver_count() > 0 {
654						let logs = match receipt_infos {
655							Some(infos) => infos.into_iter().flat_map(|r| r.logs).collect(),
656							None => {
657								self.receipt_provider
658									.logs_by_block_number(block_number, eth_hash)
659									.await?
660							},
661						};
662						for log in logs {
663							let _ = self.log_subscription_tx.send(log);
664						}
665					}
666				},
667			}
668
669			Ok(())
670		})
671		.await
672	}
673
674	/// Get the block hash for the given block number or tag.
675	pub async fn block_hash_for_tag(
676		&self,
677		at: BlockNumberOrTagOrHash,
678	) -> Result<SubstrateBlockHash, ClientError> {
679		match at {
680			BlockNumberOrTagOrHash::BlockHash(hash) => self
681				.resolve_substrate_hash(&hash)
682				.await
683				.ok_or(ClientError::EthereumBlockNotFound),
684			BlockNumberOrTagOrHash::BlockNumber(block_number) => {
685				let n: SubstrateBlockNumber =
686					(block_number).try_into().map_err(|_| ClientError::ConversionFailed)?;
687				let hash = self.get_block_hash(n).await?.ok_or(ClientError::BlockNotFound)?;
688				Ok(hash)
689			},
690			BlockNumberOrTagOrHash::BlockTag(BlockTag::Finalized | BlockTag::Safe) => {
691				let block = self.latest_finalized_block().await;
692				Ok(block.hash())
693			},
694			BlockNumberOrTagOrHash::BlockTag(BlockTag::Earliest) => {
695				let hash = self
696					.get_block_hash(self.earliest_block_number())
697					.await?
698					.ok_or(ClientError::BlockNotFound)?;
699				Ok(hash)
700			},
701			BlockNumberOrTagOrHash::BlockTag(_) => {
702				let block = self.latest_block().await;
703				Ok(block.hash())
704			},
705		}
706	}
707
708	/// Get the storage API for the given block.
709	pub fn storage_api(&self, block_hash: H256) -> StorageApi {
710		StorageApi::new(self.api.storage().at(block_hash))
711	}
712
713	/// Get the runtime API for the given block.
714	pub fn runtime_api(&self, block_hash: H256) -> RuntimeApi {
715		RuntimeApi::new(self.api.runtime_api().at(block_hash))
716	}
717
718	/// Get the latest finalized block.
719	pub async fn latest_finalized_block(&self) -> Arc<SubstrateBlock> {
720		self.block_provider.latest_finalized_block().await
721	}
722
723	/// Get the latest best block.
724	pub async fn latest_block(&self) -> Arc<SubstrateBlock> {
725		self.block_provider.latest_block().await
726	}
727
728	/// Submit an ethereum transaction and return a stream of transaction status updates.
729	async fn submit_transaction(
730		&self,
731		call: subxt::tx::DefaultPayload<EthTransact>,
732	) -> Result<StreamOfResults<TransactionStatus<SubstrateBlockHash>>, ClientError> {
733		let ext = self.api.tx().create_unsigned(&call.unvalidated()).map_err(ClientError::from)?;
734
735		let sub = self
736			.rpc_client
737			.subscribe(
738				"author_submitAndWatchExtrinsic",
739				rpc_params![to_hex(ext.encoded())],
740				"author_unwatchExtrinsic",
741			)
742			.await?;
743
744		let sub = sub.map_err(|e| e.into());
745		Ok(StreamOf::new(Box::pin(sub)))
746	}
747
748	/// Expose the transaction API.
749	pub async fn submit(
750		&self,
751		call: subxt::tx::DefaultPayload<EthTransact>,
752	) -> Result<TransactionStatus<SubstrateBlockHash>, ClientError> {
753		let mut progress = self.submit_transaction(call).await.inspect_err(|err| {
754			log::debug!(target: LOG_TARGET, "Failed to submit transaction: {err:?}");
755		})?;
756
757		tokio::time::timeout(Duration::from_secs(5), async {
758			if let Some(status) = progress.next().await {
759				match status {
760					Ok(
761						tx @ (TransactionStatus::Future |
762						TransactionStatus::Ready |
763						// Add other events that follow Ready here for completeness,
764						// but they can be ignored.
765						TransactionStatus::Broadcast(_) |
766						TransactionStatus::InBlock(_) |
767						TransactionStatus::FinalityTimeout(_) |
768						TransactionStatus::Retracted(_) |
769						TransactionStatus::Finalized(_)),
770					) => {
771						return Ok(tx);
772					},
773					Ok(
774						tx @ (TransactionStatus::Usurped(_) |
775						TransactionStatus::Dropped |
776						TransactionStatus::Invalid),
777					) => {
778						return Err(ClientError::SubmitError(tx.into()));
779					},
780					Err(err) => {
781						log::debug!(target: LOG_TARGET, "Transaction submission failed: {err:?}");
782						return Err(ClientError::from(err));
783					},
784				}
785			}
786			return Err(ClientError::SubmitError(SubmitError::StreamEnded));
787		})
788		.await
789		.map_err(|_| ClientError::Timeout)?
790	}
791
792	/// Get an EVM transaction receipt by hash.
793	pub async fn receipt(&self, tx_hash: &H256) -> Option<ReceiptInfo> {
794		self.receipt_provider.receipt_by_hash(tx_hash).await
795	}
796
797	/// Get The post dispatch weight associated with this Ethereum transaction hash.
798	pub async fn post_dispatch_weight(&self, tx_hash: &H256) -> Option<Weight> {
799		use crate::subxt_client::system::events::ExtrinsicSuccess;
800		let ReceiptInfo { block_hash, transaction_index, .. } = self.receipt(tx_hash).await?;
801		let block_hash = self.resolve_substrate_hash(&block_hash).await?;
802		let block = self.block_provider.block_by_hash(&block_hash).await.ok()??;
803		let ext = block.extrinsics().await.ok()?.iter().nth(transaction_index.as_u32() as _)?;
804		let event = ext.events().await.ok()?.find_first::<ExtrinsicSuccess>().ok()??;
805		Some(event.dispatch_info.weight.0)
806	}
807
808	pub async fn sync_state(
809		&self,
810	) -> Result<sc_rpc::system::SyncState<SubstrateBlockNumber>, ClientError> {
811		let client = self.rpc_client.clone();
812		let sync_state: sc_rpc::system::SyncState<SubstrateBlockNumber> =
813			client.request("system_syncState", Default::default()).await?;
814		Ok(sync_state)
815	}
816
817	/// Get the syncing status of the chain.
818	pub async fn syncing(&self) -> Result<SyncingStatus, ClientError> {
819		let health = self.rpc.system_health().await?;
820
821		let status = if health.is_syncing {
822			let sync_state = self.sync_state().await?;
823			SyncingProgress {
824				current_block: Some(sync_state.current_block.into()),
825				highest_block: Some(sync_state.highest_block.into()),
826				starting_block: Some(sync_state.starting_block.into()),
827			}
828			.into()
829		} else {
830			SyncingStatus::Bool(false)
831		};
832
833		Ok(status)
834	}
835
836	/// Get an EVM transaction receipt by hash.
837	pub async fn receipt_by_hash_and_index(
838		&self,
839		block_hash: &H256,
840		transaction_index: usize,
841	) -> Option<ReceiptInfo> {
842		self.receipt_provider
843			.receipt_by_block_hash_and_index(block_hash, transaction_index)
844			.await
845	}
846
847	pub async fn signed_tx_by_hash(&self, tx_hash: &H256) -> Option<TransactionSigned> {
848		self.receipt_provider.signed_tx_by_hash(tx_hash).await
849	}
850
851	/// Get receipts count per block.
852	pub async fn receipts_count_per_block(&self, block_hash: &SubstrateBlockHash) -> Option<usize> {
853		self.receipt_provider.receipts_count_per_block(block_hash).await
854	}
855
856	/// Get an EVM transaction receipt by specified Ethereum block hash.
857	pub async fn receipt_by_ethereum_hash_and_index(
858		&self,
859		ethereum_hash: &H256,
860		transaction_index: usize,
861	) -> Option<ReceiptInfo> {
862		// Fallback: use hash as Substrate hash if Ethereum hash cannot be resolved
863		let substrate_hash =
864			self.resolve_substrate_hash(ethereum_hash).await.unwrap_or_else(|| {
865				log::trace!(target: LOG_TARGET,
866					"receipt_by_ethereum_hash_and_index: no ETH-to-substrate mapping for \
867					 {ethereum_hash:?}, falling back to substrate hash lookup");
868				*ethereum_hash
869			});
870		self.receipt_by_hash_and_index(&substrate_hash, transaction_index).await
871	}
872
873	/// Get the system health.
874	pub async fn system_health(&self) -> Result<SystemHealth, ClientError> {
875		let health = self.rpc.system_health().await?;
876		Ok(health)
877	}
878
879	/// Get the block number of the latest block.
880	pub async fn block_number(&self) -> Result<SubstrateBlockNumber, ClientError> {
881		let latest_block = self.block_provider.latest_block().await;
882		Ok(latest_block.number())
883	}
884
885	/// Get a block hash for the given block number.
886	pub async fn get_block_hash(
887		&self,
888		block_number: SubstrateBlockNumber,
889	) -> Result<Option<SubstrateBlockHash>, ClientError> {
890		let maybe_block = self.block_provider.block_by_number(block_number).await?;
891		Ok(maybe_block.map(|block| block.hash()))
892	}
893
894	/// Get a block for the specified hash or number.
895	pub async fn block_by_number_or_tag(
896		&self,
897		block: &BlockNumberOrTag,
898	) -> Result<Option<Arc<SubstrateBlock>>, ClientError> {
899		match block {
900			BlockNumberOrTag::U256(n) => {
901				let n = (*n).try_into().map_err(|_| ClientError::ConversionFailed)?;
902				self.block_by_number(n).await
903			},
904			BlockNumberOrTag::BlockTag(BlockTag::Finalized | BlockTag::Safe) => {
905				let block = self.block_provider.latest_finalized_block().await;
906				Ok(Some(block))
907			},
908			BlockNumberOrTag::BlockTag(BlockTag::Earliest) => {
909				self.block_by_number(self.earliest_block_number()).await
910			},
911			BlockNumberOrTag::BlockTag(_) => {
912				let block = self.block_provider.latest_block().await;
913				Ok(Some(block))
914			},
915		}
916	}
917
918	/// Get a block by hash
919	pub async fn block_by_hash(
920		&self,
921		hash: &SubstrateBlockHash,
922	) -> Result<Option<Arc<SubstrateBlock>>, ClientError> {
923		self.block_provider.block_by_hash(hash).await
924	}
925
926	/// Resolve Ethereum block hash to Substrate block hash, then get the block.
927	/// This method provides the abstraction layer needed by the RPC APIs.
928	pub async fn resolve_substrate_hash(&self, ethereum_hash: &H256) -> Option<H256> {
929		self.receipt_provider.get_substrate_hash(ethereum_hash).await
930	}
931
932	/// Resolve Substrate block hash to Ethereum block hash, then get the block.
933	/// This method provides the abstraction layer needed by the RPC APIs.
934	pub async fn resolve_ethereum_hash(&self, substrate_hash: &H256) -> Option<H256> {
935		self.receipt_provider.get_ethereum_hash(substrate_hash).await
936	}
937
938	/// Get a block by Ethereum hash with automatic resolution to Substrate hash.
939	/// Falls back to treating the hash as a Substrate hash if no mapping exists.
940	pub async fn block_by_ethereum_hash(
941		&self,
942		ethereum_hash: &H256,
943	) -> Result<Option<Arc<SubstrateBlock>>, ClientError> {
944		// First try to resolve the Ethereum hash to a Substrate hash
945		if let Some(substrate_hash) = self.resolve_substrate_hash(ethereum_hash).await {
946			return self.block_by_hash(&substrate_hash).await;
947		}
948
949		// Fallback: treat the provided hash as a Substrate hash (backward compatibility)
950		log::trace!(target: LOG_TARGET,
951			"block_by_ethereum_hash: no ETH-to-substrate mapping for {ethereum_hash:?}, \
952			 falling back to substrate hash lookup");
953		self.block_by_hash(ethereum_hash).await
954	}
955
956	/// Get a block by number
957	pub async fn block_by_number(
958		&self,
959		block_number: SubstrateBlockNumber,
960	) -> Result<Option<Arc<SubstrateBlock>>, ClientError> {
961		self.block_provider.block_by_number(block_number).await
962	}
963
964	async fn tracing_block(
965		&self,
966		block_hash: H256,
967	) -> Result<
968		sp_runtime::generic::Block<
969			sp_runtime::generic::Header<u32, sp_runtime::traits::BlakeTwo256>,
970			sp_runtime::OpaqueExtrinsic,
971		>,
972		ClientError,
973	> {
974		let signed_block: Option<
975			sp_runtime::generic::SignedBlock<
976				sp_runtime::generic::Block<
977					sp_runtime::generic::Header<u32, sp_runtime::traits::BlakeTwo256>,
978					sp_runtime::OpaqueExtrinsic,
979				>,
980			>,
981		> = self.rpc_client.request("chain_getBlock", rpc_params![block_hash]).await?;
982
983		Ok(signed_block.ok_or(ClientError::BlockNotFound)?.block)
984	}
985
986	/// Get the transaction traces for the given block.
987	pub async fn trace_block_by_number(
988		&self,
989		at: BlockNumberOrTag,
990		config: TracerType,
991	) -> Result<Vec<TransactionTrace>, ClientError> {
992		if self.receipt_provider.is_before_earliest_block(&at) {
993			return Ok(vec![]);
994		}
995
996		let block_hash = self.block_hash_for_tag(at.into()).await?;
997		let block = self.tracing_block(block_hash).await?;
998		let parent_hash = block.header().parent_hash;
999		// Block 0 has no parent โ€” there is nothing to trace.
1000		if parent_hash == Default::default() {
1001			return Ok(vec![]);
1002		}
1003		let runtime_api = RuntimeApi::new(self.api.runtime_api().at(parent_hash));
1004		let traces = runtime_api.trace_block(block, config.clone()).await?;
1005
1006		let mut hashes = self
1007			.receipt_provider
1008			.block_transaction_hashes(&block_hash)
1009			.await
1010			.ok_or(ClientError::EthExtrinsicNotFound)?;
1011
1012		let traces = traces.into_iter().filter_map(|(index, trace)| {
1013			Some(TransactionTrace { tx_hash: hashes.remove(&(index as usize))?, trace })
1014		});
1015
1016		Ok(traces.collect())
1017	}
1018
1019	/// Get the transaction traces for the given transaction.
1020	pub async fn trace_transaction(
1021		&self,
1022		transaction_hash: H256,
1023		config: TracerType,
1024	) -> Result<Trace, ClientError> {
1025		let (block_hash, transaction_index) = self
1026			.receipt_provider
1027			.find_transaction(&transaction_hash)
1028			.await
1029			.ok_or(ClientError::EthExtrinsicNotFound)?;
1030
1031		let block = self.tracing_block(block_hash).await?;
1032		let parent_hash = block.header.parent_hash;
1033		let runtime_api = self.runtime_api(parent_hash);
1034
1035		runtime_api.trace_tx(block, transaction_index as u32, config).await
1036	}
1037
1038	/// Get the transaction traces for the given block.
1039	pub async fn trace_call(
1040		&self,
1041		transaction: GenericTransaction,
1042		block: BlockNumberOrTagOrHash,
1043		config: TracerType,
1044		state_overrides: Option<StateOverrideSet>,
1045	) -> Result<Trace, ClientError> {
1046		let block_hash = self.block_hash_for_tag(block).await?;
1047		let runtime_api = self.runtime_api(block_hash);
1048		runtime_api.trace_call(transaction, config, state_overrides).await
1049	}
1050
1051	/// Get the EVM block for the given Substrate block.
1052	pub async fn evm_block(
1053		&self,
1054		block: Arc<SubstrateBlock>,
1055		hydrated_transactions: bool,
1056	) -> Option<Block> {
1057		log::trace!(target: LOG_TARGET, "Get Ethereum block for hash {:?}", block.hash());
1058
1059		if self
1060			.receipt_provider
1061			.is_before_earliest_block(&BlockNumberOrTag::U256(U256::from(block.number())))
1062		{
1063			log::trace!(target: LOG_TARGET,
1064				"Block #{} is before receipt floor, skipping", block.number());
1065			return None;
1066		}
1067
1068		// This could potentially fail under below circumstances:
1069		//  - state has been pruned
1070		//  - the block author cannot be obtained from the digest logs (highly unlikely)
1071		//  - the node we are targeting has an outdated revive pallet (or ETH block functionality is
1072		//    disabled)
1073		match self.runtime_api(block.hash()).eth_block().await {
1074			Ok(mut eth_block) => {
1075				log::trace!(target: LOG_TARGET, "Ethereum block from runtime API hash {:?}", eth_block.hash);
1076
1077				if hydrated_transactions {
1078					// Hydrate the block.
1079					let tx_infos = self
1080						.receipt_provider
1081						.receipts_from_block(&block, eth_block.hash)
1082						.await
1083						.inspect_err(|err| {
1084							log::trace!(target: LOG_TARGET,
1085								"Failed to extract receipts for block #{}: {err:?}",
1086								block.number());
1087						})
1088						.unwrap_or_default()
1089						.into_iter()
1090						.map(|(signed_tx, receipt)| TransactionInfo::new(&receipt, signed_tx))
1091						.collect::<Vec<_>>();
1092
1093					eth_block.transactions = HashesOrTransactionInfos::TransactionInfos(tx_infos);
1094				}
1095
1096				Some(eth_block)
1097			},
1098			Err(err) => {
1099				log::error!(target: LOG_TARGET, "Failed to get Ethereum block for hash {:?}: {err:?}", block.hash());
1100				None
1101			},
1102		}
1103	}
1104
1105	/// Get the chain ID.
1106	pub fn chain_id(&self) -> u64 {
1107		self.chain_id
1108	}
1109
1110	/// Get the Max Block Weight.
1111	pub fn max_block_weight(&self) -> Weight {
1112		self.max_block_weight
1113	}
1114
1115	/// Get the block notifier, if automine is enabled or Self::create_block_notifier was called.
1116	pub fn block_notifier(&self) -> Option<tokio::sync::broadcast::Sender<H256>> {
1117		self.block_notifier.clone()
1118	}
1119
1120	/// Get the logs matching the given filter.
1121	pub async fn logs(&self, filter: Option<Filter>) -> Result<Vec<Log>, ClientError> {
1122		let earliest = U256::from(self.earliest_block_number());
1123		let latest = U256::from(self.latest_block().await.number());
1124		let resolve_block_number = |block: BlockNumberOrTag| match block {
1125			BlockNumberOrTag::U256(v) => Ok(v),
1126			BlockNumberOrTag::BlockTag(BlockTag::Earliest) => Ok(earliest),
1127			BlockNumberOrTag::BlockTag(BlockTag::Latest) => Ok(latest),
1128			BlockNumberOrTag::BlockTag(tag) => anyhow::bail!("Unsupported tag: {tag:?}"),
1129		};
1130
1131		let logs = self
1132			.receipt_provider
1133			.logs(filter, &resolve_block_number)
1134			.await
1135			.map_err(ClientError::LogFilterFailed)?;
1136
1137		Ok(logs)
1138	}
1139
1140	pub async fn fee_history(
1141		&self,
1142		block_count: u32,
1143		latest_block: BlockNumberOrTag,
1144		reward_percentiles: Option<Vec<f64>>,
1145	) -> Result<FeeHistoryResult, ClientError> {
1146		let Some(latest_block) = self.block_by_number_or_tag(&latest_block).await? else {
1147			return Err(ClientError::BlockNotFound);
1148		};
1149
1150		self.fee_history_provider
1151			.fee_history(block_count, latest_block.number(), reward_percentiles)
1152			.await
1153	}
1154
1155	/// Check if automine is enabled.
1156	pub fn is_automine(&self) -> bool {
1157		self.automine
1158	}
1159
1160	/// Get the automine status from the node.
1161	pub async fn get_automine(&self) -> bool {
1162		get_automine(&self.rpc_client).await
1163	}
1164
1165	/// Gets the block subscription rx side of the channel.
1166	pub fn get_block_subscription_rx(&self) -> tokio::sync::broadcast::Receiver<Block> {
1167		self.block_subscription_tx.subscribe()
1168	}
1169
1170	/// Gets the log subscription rx side of the channel.
1171	pub fn get_log_subscription_rx(&self) -> tokio::sync::broadcast::Receiver<Log> {
1172		self.log_subscription_tx.subscribe()
1173	}
1174}
1175
1176fn to_hex(bytes: impl AsRef<[u8]>) -> String {
1177	format!("0x{}", hex::encode(bytes.as_ref()))
1178}