1pub(crate) mod runtime_api;
21pub(crate) mod storage_api;
22
23use crate::{
24 BlockInfoProvider, BlockTag, FeeHistoryProvider, ReceiptProvider, SubxtBlockInfoProvider,
25 SyncLabel, TransactionInfo,
26 block_sync::SyncCheckpoint,
27 subxt_client::{self, SrcChainConfig, revive::calls::types::EthTransact},
28};
29use futures::TryStreamExt;
30use jsonrpsee::types::{ErrorObjectOwned, error::CALL_EXECUTION_FAILED_CODE};
31use pallet_revive::{
32 EthTransactError,
33 evm::{
34 Block, BlockNumberOrTag, BlockNumberOrTagOrHash, FeeHistoryResult, Filter,
35 GenericTransaction, H256, HashesOrTransactionInfos, Log, ReceiptInfo, StateOverrideSet,
36 SyncingProgress, SyncingStatus, TransactionSigned, TransactionTrace, U256,
37 decode_revert_reason,
38 },
39};
40use pallet_revive_types::runtime_api::*;
41use runtime_api::RuntimeApi;
42use sp_runtime::traits::Block as BlockT;
43use sp_weights::Weight;
44use std::{
45 sync::{
46 Arc,
47 atomic::{AtomicBool, AtomicUsize, Ordering},
48 },
49 time::Duration,
50};
51use storage_api::StorageApi;
52use subxt::{
53 Config, OnlineClient,
54 backend::{
55 StreamOf, StreamOfResults,
56 legacy::{
57 LegacyRpcMethods,
58 rpc_methods::{SystemHealth, TransactionStatus},
59 },
60 rpc::{
61 RpcClient,
62 reconnecting_rpc_client::{ExponentialBackoff, RpcClient as ReconnectingRpcClient},
63 },
64 },
65 config::{HashFor, Header},
66 ext::subxt_rpcs::rpc_params,
67};
68use thiserror::Error;
69use tokio::sync::{Mutex, mpsc};
70
71pub type SubstrateBlock = subxt::blocks::Block<SrcChainConfig, OnlineClient<SrcChainConfig>>;
73
74pub type SubstrateBlockHeader = <SrcChainConfig as Config>::Header;
76
77pub type SubstrateBlockNumber = <SubstrateBlockHeader as Header>::Number;
79
80pub type SubstrateBlockHash = HashFor<SrcChainConfig>;
82
83pub type Balance = u128;
85
86#[derive(Debug, Clone, Copy, PartialEq)]
88pub enum SubscriptionType {
89 BestBlocks,
91 FinalizedBlocks,
93}
94
95#[derive(Debug, Clone, Copy, PartialEq, Eq, Error)]
97pub enum SubmitError {
98 #[error("Transaction was usurped by another with the same nonce")]
100 Usurped,
101 #[error("Transaction was dropped")]
103 Dropped,
104 #[error("Transaction is invalid (e.g. bad nonce, signature, etc)")]
106 Invalid,
107 #[error("Transaction stream ended without status")]
109 StreamEnded,
110 #[error("Unknown transaction status")]
112 Unknown,
113}
114
115impl From<TransactionStatus<SubstrateBlockHash>> for SubmitError {
116 fn from(status: TransactionStatus<SubstrateBlockHash>) -> Self {
117 match status {
118 TransactionStatus::Usurped(_) => SubmitError::Usurped,
119 TransactionStatus::Dropped => SubmitError::Dropped,
120 TransactionStatus::Invalid => SubmitError::Invalid,
121 _ => SubmitError::Unknown,
122 }
123 }
124}
125
126#[derive(Error, Debug)]
128pub enum ClientError {
129 #[error(transparent)]
131 Jsonrpsee(#[from] jsonrpsee::core::ClientError),
132 #[error(transparent)]
134 SubxtError(#[from] subxt::Error),
135 #[error(transparent)]
136 RpcError(#[from] subxt::ext::subxt_rpcs::Error),
137 #[error(transparent)]
139 SqlxError(#[from] sqlx::Error),
140 #[error(transparent)]
142 CodecError(#[from] codec::Error),
143 #[error("Invalid transaction: {0}")]
145 SubmitError(SubmitError),
146 #[error("contract reverted: {0:?}")]
148 TransactError(EthTransactError),
149 #[error("conversion failed")]
151 ConversionFailed,
152 #[error("hash not found")]
154 BlockNotFound,
155 #[error("Contract not found")]
157 ContractNotFound,
158 #[error("No Ethereum extrinsic found")]
159 EthExtrinsicNotFound,
160 #[error("transactionFeePaid event not found")]
162 TxFeeNotFound,
163 #[error("Failed to decode a raw payload into a signed transaction")]
165 TxDecodingFailed,
166 #[error("failed to recover eth address")]
168 RecoverEthAddressFailed,
169 #[error("Failed to filter logs")]
171 LogFilterFailed(#[from] anyhow::Error),
172 #[error("Receipt storage not found")]
174 ReceiptDataNotFound,
175 #[error("Ethereum block not found")]
177 EthereumBlockNotFound,
178 #[error("Receipt data length mismatch")]
180 ReceiptDataLengthMismatch,
181 #[error("Transaction submission timeout")]
183 Timeout,
184 #[error("None of the estimation methods were found")]
187 NoEstimationMethodSucceeded,
188 #[error("Genesis hash mismatch")]
190 ChainMismatch,
191 #[error("Sync boundary mismatch")]
193 SyncBoundaryMismatch,
194}
195
196impl ClientError {
197 pub(crate) fn is_chain_validation_error(&self) -> bool {
199 matches!(self, Self::ChainMismatch | Self::SyncBoundaryMismatch)
200 }
201}
202
203const LOG_TARGET: &str = "eth-rpc::client";
204const LOG_TARGET_SUBSCRIPTION: &str = "eth-rpc::subscription";
205
206const REVERT_CODE: i32 = 3;
207
208const NOTIFIER_CAPACITY: usize = 16;
209
210impl From<ClientError> for ErrorObjectOwned {
211 fn from(err: ClientError) -> Self {
212 match err {
213 ClientError::SubxtError(subxt::Error::Rpc(subxt::error::RpcError::ClientError(
214 subxt::ext::subxt_rpcs::Error::User(err),
215 ))) |
216 ClientError::RpcError(subxt::ext::subxt_rpcs::Error::User(err)) => {
217 ErrorObjectOwned::owned::<Vec<u8>>(err.code, err.message, None)
218 },
219 ClientError::TransactError(EthTransactError::Data(data)) => {
220 let msg = match decode_revert_reason(&data) {
221 Some(reason) => format!("execution reverted: {reason}"),
222 None => "execution reverted".to_string(),
223 };
224
225 let data = format!("0x{}", hex::encode(data));
226 ErrorObjectOwned::owned::<String>(REVERT_CODE, msg, Some(data))
227 },
228 ClientError::TransactError(EthTransactError::Message(msg)) => {
229 ErrorObjectOwned::owned::<String>(CALL_EXECUTION_FAILED_CODE, msg, None)
230 },
231 _ => {
232 ErrorObjectOwned::owned::<String>(CALL_EXECUTION_FAILED_CODE, err.to_string(), None)
233 },
234 }
235 }
236}
237
238#[derive(Clone)]
240pub struct Client {
241 api: OnlineClient<SrcChainConfig>,
242 rpc_client: RpcClient,
243 rpc: LegacyRpcMethods<SrcChainConfig>,
244 receipt_provider: ReceiptProvider,
245 block_provider: SubxtBlockInfoProvider,
246 fee_history_provider: FeeHistoryProvider,
247 chain_id: u64,
248 max_block_weight: Weight,
249 automine: bool,
251 block_notifier: Option<tokio::sync::broadcast::Sender<H256>>,
253 subscription_lock: Arc<Mutex<()>>,
255
256 block_subscription_tx: tokio::sync::broadcast::Sender<Block>,
258 log_subscription_tx: tokio::sync::broadcast::Sender<Log>,
260 is_archive: bool,
262 backfill_complete: Arc<AtomicBool>,
264 subscription_gap_queue: SubscriptionGapQueue,
266}
267
268pub(crate) struct GapFillRequest {
270 pub from_inclusive: SubstrateBlockNumber,
271 pub to_inclusive: SubstrateBlockNumber,
272}
273
274#[derive(Clone)]
276pub(crate) struct SubscriptionGapQueue {
277 tx: mpsc::Sender<GapFillRequest>,
279 pending: Arc<AtomicUsize>,
282}
283
284impl SubscriptionGapQueue {
285 pub(crate) fn new() -> (Self, mpsc::Receiver<GapFillRequest>) {
286 let (tx, rx) = mpsc::channel(32);
289 (Self { tx, pending: Arc::new(AtomicUsize::new(0)) }, rx)
290 }
291
292 pub fn detect_and_queue(&self, current: SubstrateBlockNumber, last: SubstrateBlockNumber) {
294 if current.saturating_sub(last) <= 1 {
295 return;
296 }
297
298 let from_inclusive = current.saturating_sub(1);
299 let to_inclusive = last.saturating_add(1);
300 let gap_len = from_inclusive.saturating_sub(to_inclusive) + 1;
301 self.pending.fetch_add(1, Ordering::Release);
302 match self.tx.try_send(GapFillRequest { from_inclusive, to_inclusive }) {
303 Ok(_) => {
304 log::info!(target: LOG_TARGET,
305 "๐ Subscription gap queue: queued #{from_inclusive} down to #{to_inclusive} ({gap_len} blocks)");
306 },
307 Err(err) => {
308 self.pending.fetch_sub(1, Ordering::Release);
309 log::warn!(target: LOG_TARGET,
310 "๐ Subscription gap queue error, dropping #{from_inclusive}..#{to_inclusive} ({gap_len} blocks): {err}");
311 },
312 }
313 }
314
315 pub fn mark_done(&self) {
317 let res = self
318 .pending
319 .fetch_update(Ordering::AcqRel, Ordering::Acquire, |v| v.checked_sub(1));
320 if res.is_err() {
321 debug_assert!(false, "subscription gap queue pending counter underflowed");
322 log::error!(target: LOG_TARGET,
323 "๐ Subscription gap queue pending counter underflow, delete the database and restart with --eth-pruning=archive to resync");
324 }
325 }
326
327 pub fn has_pending(&self) -> bool {
329 self.pending.load(Ordering::Acquire) > 0
330 }
331}
332
333fn known_first_evm_block_for_chain(chain_id: u64) -> Option<u32> {
335 match chain_id {
336 420420417 => Some(4_367_914), 420420418 => Some(12_234_156), 420420419 => Some(11_405_259), 420420421 => Some(13_169_391), _ => None,
341 }
342}
343
344async fn chain_id(api: &OnlineClient<SrcChainConfig>) -> Result<u64, ClientError> {
346 let query = subxt_client::constants().revive().chain_id().unvalidated();
347 api.constants().at(&query).map_err(|err| err.into())
348}
349
350async fn max_block_weight(api: &OnlineClient<SrcChainConfig>) -> Result<Weight, ClientError> {
352 let query = subxt_client::constants().system().block_weights().unvalidated();
353 let weights = api.constants().at(&query)?;
354 let max_block = weights.per_class.normal.max_extrinsic.unwrap_or(weights.max_block);
355 Ok(max_block.0)
356}
357
358async fn get_automine(rpc_client: &RpcClient) -> bool {
360 match rpc_client.request::<bool>("getAutomine", rpc_params![]).await {
361 Ok(val) => val,
362 Err(err) => {
363 log::info!(target: LOG_TARGET, "Node does not have getAutomine RPC. Defaulting to automine=false. error: {err:?}");
364 false
365 },
366 }
367}
368
369pub async fn connect(
372 node_rpc_url: &str,
373 max_request_size: u32,
374 max_response_size: u32,
375) -> Result<(OnlineClient<SrcChainConfig>, RpcClient, LegacyRpcMethods<SrcChainConfig>), ClientError>
376{
377 log::info!(target: LOG_TARGET, "๐ Connecting to node at: {node_rpc_url} ...");
378 let rpc_client = ReconnectingRpcClient::builder()
379 .retry_policy(ExponentialBackoff::from_millis(100).max_delay(Duration::from_secs(10)))
380 .max_request_size(max_request_size)
381 .max_response_size(max_response_size)
382 .build(node_rpc_url.to_string())
383 .await?;
384 let rpc_client = RpcClient::new(rpc_client);
385 log::info!(target: LOG_TARGET, "๐ Connected to node at: {node_rpc_url}");
386
387 let api = OnlineClient::<SrcChainConfig>::from_rpc_client(rpc_client.clone()).await?;
388 let rpc = LegacyRpcMethods::<SrcChainConfig>::new(rpc_client.clone());
389 Ok((api, rpc_client, rpc))
390}
391
392impl Client {
393 pub(crate) async fn new(
395 api: OnlineClient<SrcChainConfig>,
396 rpc_client: RpcClient,
397 rpc: LegacyRpcMethods<SrcChainConfig>,
398 block_provider: SubxtBlockInfoProvider,
399 receipt_provider: ReceiptProvider,
400 is_archive: bool,
401 subscription_gap_queue: SubscriptionGapQueue,
402 ) -> Result<Self, ClientError> {
403 let (chain_id, max_block_weight, automine) =
404 tokio::try_join!(chain_id(&api), max_block_weight(&api), async {
405 Ok(get_automine(&rpc_client).await)
406 },)?;
407
408 if !is_archive {
411 if let Some(known) = known_first_evm_block_for_chain(chain_id) {
412 let best = block_provider.latest_block_number().await;
413 if known > best {
414 log::debug!(
415 target: LOG_TARGET,
416 "Hardcoded first EVM block {known} exceeds best block {best} \
417 for chain {chain_id}, defaulting to 0"
418 );
419 receipt_provider.set_first_evm_block(0).await?;
420 }
421 }
422 }
423
424 let client = Self {
425 api,
426 rpc_client,
427 rpc,
428 receipt_provider,
429 block_provider,
430 fee_history_provider: FeeHistoryProvider::default(),
431 chain_id,
432 max_block_weight,
433 automine,
434 block_notifier: automine
435 .then(|| tokio::sync::broadcast::channel::<H256>(NOTIFIER_CAPACITY).0),
436 subscription_lock: Arc::new(Mutex::new(())),
437 block_subscription_tx: tokio::sync::broadcast::channel(256).0,
438 log_subscription_tx: tokio::sync::broadcast::channel(1000).0,
439 is_archive,
440 backfill_complete: Arc::new(AtomicBool::new(false)),
441 subscription_gap_queue,
442 };
443
444 Ok(client)
445 }
446
447 pub(crate) fn mark_backfill_complete(&self) {
449 self.backfill_complete.store(true, Ordering::Release);
450 }
451
452 async fn advance_sync_head(&self, block_number: SubstrateBlockNumber, hash: H256) {
455 if !self.is_archive ||
456 !self.backfill_complete.load(Ordering::Acquire) ||
457 self.subscription_gap_queue.has_pending()
458 {
459 return;
460 }
461
462 if let Err(err) = self
463 .receipt_provider
464 .advance_sync_label(SyncLabel::Head, SyncCheckpoint::new(block_number, hash))
465 .await
466 {
467 log::warn!(target: LOG_TARGET, "Failed to advance sync head: {err:?}");
468 }
469 }
470
471 pub fn create_block_notifier(&mut self) {
473 self.block_notifier = Some(tokio::sync::broadcast::channel::<H256>(NOTIFIER_CAPACITY).0);
474 }
475
476 pub fn set_block_notifier(&mut self, notifier: Option<tokio::sync::broadcast::Sender<H256>>) {
478 self.block_notifier = notifier;
479 }
480
481 pub(crate) fn api(&self) -> &OnlineClient<SrcChainConfig> {
482 &self.api
483 }
484
485 pub(crate) fn receipt_provider(&self) -> &ReceiptProvider {
486 &self.receipt_provider
487 }
488
489 pub(crate) fn block_provider(&self) -> &SubxtBlockInfoProvider {
490 &self.block_provider
491 }
492
493 pub(crate) fn subscription_gap_queue(&self) -> &SubscriptionGapQueue {
494 &self.subscription_gap_queue
495 }
496
497 fn earliest_block_number(&self) -> u32 {
500 self.receipt_provider
501 .first_evm_block()
502 .or_else(|| known_first_evm_block_for_chain(self.chain_id))
503 .unwrap_or(0)
504 }
505
506 async fn subscribe_new_blocks<F, Fut>(
508 &self,
509 subscription_type: SubscriptionType,
510 callback: F,
511 ) -> Result<(), ClientError>
512 where
513 F: Fn(SubstrateBlock) -> Fut + Send + Sync,
514 Fut: std::future::Future<Output = Result<(), ClientError>> + Send,
515 {
516 let mut block_stream = match subscription_type {
517 SubscriptionType::BestBlocks => self.api.blocks().subscribe_best().await,
518 SubscriptionType::FinalizedBlocks => self.api.blocks().subscribe_finalized().await,
519 }
520 .inspect_err(|err| {
521 log::error!(target: LOG_TARGET, "Failed to subscribe to blocks: {err:?}");
522 })?;
523
524 let mut last_finalized_seen: Option<SubstrateBlockNumber> = None;
525
526 while let Some(block) = block_stream.next().await {
527 let block = match block {
528 Ok(block) => block,
529 Err(err) => {
530 if err.is_disconnected_will_reconnect() {
531 log::warn!(
532 target: LOG_TARGET,
533 "The RPC connection was lost and we may have missed a few blocks \
534 ({subscription_type:?}, last finalized: {last_finalized_seen:?}): {err:?}"
535 );
536 continue;
537 }
538
539 log::error!(target: LOG_TARGET, "Failed to fetch block ({subscription_type:?}): {err:?}");
540 return Err(err.into());
541 },
542 };
543
544 let _guard = self.subscription_lock.lock().await;
546
547 let block_number = block.number();
548
549 if subscription_type == SubscriptionType::FinalizedBlocks {
551 if let Some(last) = last_finalized_seen {
552 self.subscription_gap_queue.detect_and_queue(block_number, last);
553 }
554 last_finalized_seen = Some(block_number);
556 }
557
558 log::trace!(target: LOG_TARGET_SUBSCRIPTION, "โณ Processing {subscription_type:?} block: {block_number}");
559 if let Err(err) = callback(block).await {
560 log::error!(target: LOG_TARGET, "Failed to process block {block_number}: {err:?}");
561 } else {
562 log::trace!(target: LOG_TARGET_SUBSCRIPTION, "โ
Processed {subscription_type:?} block: {block_number}");
563 }
564 }
565
566 log::info!(target: LOG_TARGET, "Block subscription ended");
567 Ok(())
568 }
569
570 async fn process_block(
572 &self,
573 block: &SubstrateBlock,
574 ) -> Result<(Block, Vec<ReceiptInfo>), ClientError> {
575 let block_number = block.number();
576 let hash = block.hash();
577
578 macro_rules! time {
579 ($label:expr, $expr:expr) => {{
580 let t = std::time::Instant::now();
581 let r = $expr;
582 log::trace!(
583 target: LOG_TARGET,
584 "โฑ๏ธ #{block_number} {}: {:?}",
585 $label, t.elapsed(),
586 );
587 r
588 }};
589 }
590
591 let eth_block = time!("eth_block", self.runtime_api(hash).eth_block().await?);
592 let receipts = time!(
593 "receipts_from_block",
594 self.receipt_provider.receipts_from_block(block, eth_block.hash).await?
595 );
596 time!(
597 "insert_block_receipts",
598 self.receipt_provider
599 .insert_block_receipts(block, &receipts, ð_block.hash)
600 .await?
601 );
602
603 let (_, receipt_infos): (Vec<_>, Vec<_>) = receipts.into_iter().unzip();
604 self.fee_history_provider.update_fee_history(ð_block, &receipt_infos).await;
605
606 Ok((eth_block, receipt_infos))
607 }
608
609 pub async fn subscribe_and_cache_new_blocks(
611 &self,
612 subscription_type: SubscriptionType,
613 ) -> Result<(), ClientError> {
614 log::info!(target: LOG_TARGET, "๐ Subscribing to new blocks ({subscription_type:?})");
615 self.subscribe_new_blocks(subscription_type, |block| async {
616 let hash = block.hash();
617
618 match subscription_type {
619 SubscriptionType::BestBlocks => {
620 let (eth_block, _) = self.process_block(&block).await?;
621 self.block_provider.update_latest(Arc::new(block), subscription_type).await;
622
623 if let Some(sender) = &self.block_notifier {
624 if sender.receiver_count() > 0 {
625 let _ = sender.send(hash);
626 }
627 }
628 if self.block_subscription_tx.receiver_count() > 0 {
629 let _ = self.block_subscription_tx.send(eth_block);
630 }
631 },
632 SubscriptionType::FinalizedBlocks => {
633 let block_number = block.number();
634 let (receipt_infos, eth_hash) = match self
635 .receipt_provider
636 .get_processed_eth_block_hash(block_number, hash)
637 .await
638 {
639 Some(eth_hash) => {
640 log::trace!(target: LOG_TARGET_SUBSCRIPTION,
641 "โฉ Finalized block #{block_number} already processed, \
642 skipping extraction");
643 (None, eth_hash)
644 },
645 None => {
646 let (eth_block, infos) = self.process_block(&block).await?;
647 (Some(infos), eth_block.hash)
648 },
649 };
650
651 self.block_provider.update_latest(Arc::new(block), subscription_type).await;
652 self.advance_sync_head(block_number, hash).await;
653
654 if self.log_subscription_tx.receiver_count() > 0 {
655 let logs = match receipt_infos {
656 Some(infos) => infos.into_iter().flat_map(|r| r.logs).collect(),
657 None => {
658 self.receipt_provider
659 .logs_by_block_number(block_number, eth_hash)
660 .await?
661 },
662 };
663 for log in logs {
664 let _ = self.log_subscription_tx.send(log);
665 }
666 }
667 },
668 }
669
670 Ok(())
671 })
672 .await
673 }
674
675 pub async fn block_hash_for_tag(
677 &self,
678 at: BlockNumberOrTagOrHash,
679 ) -> Result<SubstrateBlockHash, ClientError> {
680 match at {
681 BlockNumberOrTagOrHash::BlockHash(hash) => self
682 .resolve_substrate_hash(&hash)
683 .await
684 .ok_or(ClientError::EthereumBlockNotFound),
685 BlockNumberOrTagOrHash::BlockNumber(block_number) => {
686 let n: SubstrateBlockNumber =
687 (block_number).try_into().map_err(|_| ClientError::ConversionFailed)?;
688 let hash = self.get_block_hash(n).await?.ok_or(ClientError::BlockNotFound)?;
689 Ok(hash)
690 },
691 BlockNumberOrTagOrHash::BlockTag(BlockTag::Finalized | BlockTag::Safe) => {
692 let block = self.latest_finalized_block().await;
693 Ok(block.hash())
694 },
695 BlockNumberOrTagOrHash::BlockTag(BlockTag::Earliest) => {
696 let hash = self
697 .get_block_hash(self.earliest_block_number())
698 .await?
699 .ok_or(ClientError::BlockNotFound)?;
700 Ok(hash)
701 },
702 BlockNumberOrTagOrHash::BlockTag(_) => {
703 let block = self.latest_block().await;
704 Ok(block.hash())
705 },
706 }
707 }
708
709 pub fn storage_api(&self, block_hash: H256) -> StorageApi {
711 StorageApi::new(self.api.storage().at(block_hash))
712 }
713
714 pub fn runtime_api(&self, block_hash: H256) -> RuntimeApi {
716 RuntimeApi::new(self.api.runtime_api().at(block_hash))
717 }
718
719 pub async fn latest_finalized_block(&self) -> Arc<SubstrateBlock> {
721 self.block_provider.latest_finalized_block().await
722 }
723
724 pub async fn latest_block(&self) -> Arc<SubstrateBlock> {
726 self.block_provider.latest_block().await
727 }
728
729 async fn submit_transaction(
731 &self,
732 call: subxt::tx::DefaultPayload<EthTransact>,
733 ) -> Result<StreamOfResults<TransactionStatus<SubstrateBlockHash>>, ClientError> {
734 let ext = self.api.tx().create_unsigned(&call.unvalidated()).map_err(ClientError::from)?;
735
736 let sub = self
737 .rpc_client
738 .subscribe(
739 "author_submitAndWatchExtrinsic",
740 rpc_params![to_hex(ext.encoded())],
741 "author_unwatchExtrinsic",
742 )
743 .await?;
744
745 let sub = sub.map_err(|e| e.into());
746 Ok(StreamOf::new(Box::pin(sub)))
747 }
748
749 pub async fn submit(
751 &self,
752 call: subxt::tx::DefaultPayload<EthTransact>,
753 ) -> Result<TransactionStatus<SubstrateBlockHash>, ClientError> {
754 let mut progress = self.submit_transaction(call).await.inspect_err(|err| {
755 log::debug!(target: LOG_TARGET, "Failed to submit transaction: {err:?}");
756 })?;
757
758 tokio::time::timeout(Duration::from_secs(5), async {
759 if let Some(status) = progress.next().await {
760 match status {
761 Ok(
762 tx @ (TransactionStatus::Future |
763 TransactionStatus::Ready |
764 TransactionStatus::Broadcast(_) |
767 TransactionStatus::InBlock(_) |
768 TransactionStatus::FinalityTimeout(_) |
769 TransactionStatus::Retracted(_) |
770 TransactionStatus::Finalized(_)),
771 ) => {
772 return Ok(tx);
773 },
774 Ok(
775 tx @ (TransactionStatus::Usurped(_) |
776 TransactionStatus::Dropped |
777 TransactionStatus::Invalid),
778 ) => {
779 return Err(ClientError::SubmitError(tx.into()));
780 },
781 Err(err) => {
782 log::debug!(target: LOG_TARGET, "Transaction submission failed: {err:?}");
783 return Err(ClientError::from(err));
784 },
785 }
786 }
787 return Err(ClientError::SubmitError(SubmitError::StreamEnded));
788 })
789 .await
790 .map_err(|_| ClientError::Timeout)?
791 }
792
793 pub async fn receipt(&self, tx_hash: &H256) -> Option<ReceiptInfo> {
795 self.receipt_provider.receipt_by_hash(tx_hash).await
796 }
797
798 pub async fn post_dispatch_weight(&self, tx_hash: &H256) -> Option<Weight> {
800 use crate::subxt_client::system::events::ExtrinsicSuccess;
801 let ReceiptInfo { block_hash, transaction_index, .. } = self.receipt(tx_hash).await?;
802 let block_hash = self.resolve_substrate_hash(&block_hash).await?;
803 let block = self.block_provider.block_by_hash(&block_hash).await.ok()??;
804 let ext = block.extrinsics().await.ok()?.iter().nth(transaction_index.as_u32() as _)?;
805 let event = ext.events().await.ok()?.find_first::<ExtrinsicSuccess>().ok()??;
806 Some(event.dispatch_info.weight.0)
807 }
808
809 pub async fn sync_state(
810 &self,
811 ) -> Result<sc_rpc::system::SyncState<SubstrateBlockNumber>, ClientError> {
812 let client = self.rpc_client.clone();
813 let sync_state: sc_rpc::system::SyncState<SubstrateBlockNumber> =
814 client.request("system_syncState", Default::default()).await?;
815 Ok(sync_state)
816 }
817
818 pub async fn syncing(&self) -> Result<SyncingStatus, ClientError> {
820 let health = self.rpc.system_health().await?;
821
822 let status = if health.is_syncing {
823 let sync_state = self.sync_state().await?;
824 SyncingProgress {
825 current_block: Some(sync_state.current_block.into()),
826 highest_block: Some(sync_state.highest_block.into()),
827 starting_block: Some(sync_state.starting_block.into()),
828 }
829 .into()
830 } else {
831 SyncingStatus::Bool(false)
832 };
833
834 Ok(status)
835 }
836
837 pub async fn receipt_by_hash_and_index(
839 &self,
840 block_hash: &H256,
841 transaction_index: usize,
842 ) -> Option<ReceiptInfo> {
843 self.receipt_provider
844 .receipt_by_block_hash_and_index(block_hash, transaction_index)
845 .await
846 }
847
848 pub async fn signed_tx_by_hash(&self, tx_hash: &H256) -> Option<TransactionSigned> {
849 self.receipt_provider.signed_tx_by_hash(tx_hash).await
850 }
851
852 pub async fn receipts_count_per_block(&self, block_hash: &SubstrateBlockHash) -> Option<usize> {
854 self.receipt_provider.receipts_count_per_block(block_hash).await
855 }
856
857 pub async fn receipt_by_ethereum_hash_and_index(
859 &self,
860 ethereum_hash: &H256,
861 transaction_index: usize,
862 ) -> Option<ReceiptInfo> {
863 let substrate_hash =
865 self.resolve_substrate_hash(ethereum_hash).await.unwrap_or_else(|| {
866 log::trace!(target: LOG_TARGET,
867 "receipt_by_ethereum_hash_and_index: no ETH-to-substrate mapping for \
868 {ethereum_hash:?}, falling back to substrate hash lookup");
869 *ethereum_hash
870 });
871 self.receipt_by_hash_and_index(&substrate_hash, transaction_index).await
872 }
873
874 pub async fn system_health(&self) -> Result<SystemHealth, ClientError> {
876 let health = self.rpc.system_health().await?;
877 Ok(health)
878 }
879
880 pub async fn block_number(&self) -> Result<SubstrateBlockNumber, ClientError> {
882 let latest_block = self.block_provider.latest_block().await;
883 Ok(latest_block.number())
884 }
885
886 pub async fn get_block_hash(
888 &self,
889 block_number: SubstrateBlockNumber,
890 ) -> Result<Option<SubstrateBlockHash>, ClientError> {
891 let maybe_block = self.block_provider.block_by_number(block_number).await?;
892 Ok(maybe_block.map(|block| block.hash()))
893 }
894
895 pub async fn block_by_number_or_tag(
897 &self,
898 block: &BlockNumberOrTag,
899 ) -> Result<Option<Arc<SubstrateBlock>>, ClientError> {
900 match block {
901 BlockNumberOrTag::U256(n) => {
902 let n = (*n).try_into().map_err(|_| ClientError::ConversionFailed)?;
903 self.block_by_number(n).await
904 },
905 BlockNumberOrTag::BlockTag(BlockTag::Finalized | BlockTag::Safe) => {
906 let block = self.block_provider.latest_finalized_block().await;
907 Ok(Some(block))
908 },
909 BlockNumberOrTag::BlockTag(BlockTag::Earliest) => {
910 self.block_by_number(self.earliest_block_number()).await
911 },
912 BlockNumberOrTag::BlockTag(_) => {
913 let block = self.block_provider.latest_block().await;
914 Ok(Some(block))
915 },
916 }
917 }
918
919 pub async fn block_by_hash(
921 &self,
922 hash: &SubstrateBlockHash,
923 ) -> Result<Option<Arc<SubstrateBlock>>, ClientError> {
924 self.block_provider.block_by_hash(hash).await
925 }
926
927 pub async fn resolve_substrate_hash(&self, ethereum_hash: &H256) -> Option<H256> {
930 self.receipt_provider.get_substrate_hash(ethereum_hash).await
931 }
932
933 pub async fn resolve_ethereum_hash(&self, substrate_hash: &H256) -> Option<H256> {
936 self.receipt_provider.get_ethereum_hash(substrate_hash).await
937 }
938
939 pub async fn block_by_ethereum_hash(
942 &self,
943 ethereum_hash: &H256,
944 ) -> Result<Option<Arc<SubstrateBlock>>, ClientError> {
945 if let Some(substrate_hash) = self.resolve_substrate_hash(ethereum_hash).await {
947 return self.block_by_hash(&substrate_hash).await;
948 }
949
950 log::trace!(target: LOG_TARGET,
952 "block_by_ethereum_hash: no ETH-to-substrate mapping for {ethereum_hash:?}, \
953 falling back to substrate hash lookup");
954 self.block_by_hash(ethereum_hash).await
955 }
956
957 pub async fn block_by_number(
959 &self,
960 block_number: SubstrateBlockNumber,
961 ) -> Result<Option<Arc<SubstrateBlock>>, ClientError> {
962 self.block_provider.block_by_number(block_number).await
963 }
964
965 async fn tracing_block(
966 &self,
967 block_hash: H256,
968 ) -> Result<
969 sp_runtime::generic::Block<
970 sp_runtime::generic::Header<u32, sp_runtime::traits::BlakeTwo256>,
971 sp_runtime::OpaqueExtrinsic,
972 >,
973 ClientError,
974 > {
975 let signed_block: Option<
976 sp_runtime::generic::SignedBlock<
977 sp_runtime::generic::Block<
978 sp_runtime::generic::Header<u32, sp_runtime::traits::BlakeTwo256>,
979 sp_runtime::OpaqueExtrinsic,
980 >,
981 >,
982 > = self.rpc_client.request("chain_getBlock", rpc_params![block_hash]).await?;
983
984 Ok(signed_block.ok_or(ClientError::BlockNotFound)?.block)
985 }
986
987 pub async fn trace_block_by_number(
989 &self,
990 at: BlockNumberOrTag,
991 config: TracerTypeV1,
992 ) -> Result<Vec<TransactionTrace>, ClientError> {
993 if self.receipt_provider.is_before_earliest_block(&at) {
994 return Ok(vec![]);
995 }
996
997 let block_hash = self.block_hash_for_tag(at.into()).await?;
998 let block = self.tracing_block(block_hash).await?;
999 let parent_hash = block.header().parent_hash;
1000 if parent_hash == Default::default() {
1002 return Ok(vec![]);
1003 }
1004 let runtime_api = RuntimeApi::new(self.api.runtime_api().at(parent_hash));
1005 let traces = runtime_api.trace_block(block, config.clone()).await?;
1006
1007 let mut hashes = self
1008 .receipt_provider
1009 .block_transaction_hashes(&block_hash)
1010 .await
1011 .ok_or(ClientError::EthExtrinsicNotFound)?;
1012
1013 let traces = traces.into_iter().filter_map(|(index, trace)| {
1014 Some(TransactionTrace { tx_hash: hashes.remove(&(index as usize))?, trace })
1015 });
1016
1017 Ok(traces.collect())
1018 }
1019
1020 pub async fn trace_transaction(
1022 &self,
1023 transaction_hash: H256,
1024 config: TracerTypeV1,
1025 ) -> Result<TraceV1, ClientError> {
1026 let (block_hash, transaction_index) = self
1027 .receipt_provider
1028 .find_transaction(&transaction_hash)
1029 .await
1030 .ok_or(ClientError::EthExtrinsicNotFound)?;
1031
1032 let block = self.tracing_block(block_hash).await?;
1033 let parent_hash = block.header.parent_hash;
1034 let runtime_api = self.runtime_api(parent_hash);
1035
1036 runtime_api.trace_tx(block, transaction_index as u32, config).await
1037 }
1038
1039 pub async fn trace_call(
1041 &self,
1042 transaction: GenericTransaction,
1043 block: BlockNumberOrTagOrHash,
1044 config: TracerTypeV1,
1045 state_overrides: Option<StateOverrideSet>,
1046 ) -> Result<TraceV1, ClientError> {
1047 let block_hash = self.block_hash_for_tag(block).await?;
1048 let runtime_api = self.runtime_api(block_hash);
1049 runtime_api.trace_call(transaction, config, state_overrides).await
1050 }
1051
1052 pub async fn evm_block(
1054 &self,
1055 block: Arc<SubstrateBlock>,
1056 hydrated_transactions: bool,
1057 ) -> Option<Block> {
1058 log::trace!(target: LOG_TARGET, "Get Ethereum block for hash {:?}", block.hash());
1059
1060 if self
1061 .receipt_provider
1062 .is_before_earliest_block(&BlockNumberOrTag::U256(U256::from(block.number())))
1063 {
1064 log::trace!(target: LOG_TARGET,
1065 "Block #{} is before receipt floor, skipping", block.number());
1066 return None;
1067 }
1068
1069 match self.runtime_api(block.hash()).eth_block().await {
1075 Ok(mut eth_block) => {
1076 log::trace!(target: LOG_TARGET, "Ethereum block from runtime API hash {:?}", eth_block.hash);
1077
1078 if hydrated_transactions {
1079 let tx_infos = self
1081 .receipt_provider
1082 .receipts_from_block(&block, eth_block.hash)
1083 .await
1084 .inspect_err(|err| {
1085 log::trace!(target: LOG_TARGET,
1086 "Failed to extract receipts for block #{}: {err:?}",
1087 block.number());
1088 })
1089 .unwrap_or_default()
1090 .into_iter()
1091 .map(|(signed_tx, receipt)| TransactionInfo::new(&receipt, signed_tx))
1092 .collect::<Vec<_>>();
1093
1094 eth_block.transactions = HashesOrTransactionInfos::TransactionInfos(tx_infos);
1095 }
1096
1097 Some(eth_block)
1098 },
1099 Err(err) => {
1100 log::error!(target: LOG_TARGET, "Failed to get Ethereum block for hash {:?}: {err:?}", block.hash());
1101 None
1102 },
1103 }
1104 }
1105
1106 pub fn chain_id(&self) -> u64 {
1108 self.chain_id
1109 }
1110
1111 pub fn max_block_weight(&self) -> Weight {
1113 self.max_block_weight
1114 }
1115
1116 pub fn block_notifier(&self) -> Option<tokio::sync::broadcast::Sender<H256>> {
1118 self.block_notifier.clone()
1119 }
1120
1121 pub async fn logs(&self, filter: Option<Filter>) -> Result<Vec<Log>, ClientError> {
1123 let earliest = U256::from(self.earliest_block_number());
1124 let latest = U256::from(self.latest_block().await.number());
1125 let resolve_block_number = |block: BlockNumberOrTag| match block {
1126 BlockNumberOrTag::U256(v) => Ok(v),
1127 BlockNumberOrTag::BlockTag(BlockTag::Earliest) => Ok(earliest),
1128 BlockNumberOrTag::BlockTag(BlockTag::Latest) => Ok(latest),
1129 BlockNumberOrTag::BlockTag(tag) => anyhow::bail!("Unsupported tag: {tag:?}"),
1130 };
1131
1132 let logs = self
1133 .receipt_provider
1134 .logs(filter, &resolve_block_number)
1135 .await
1136 .map_err(ClientError::LogFilterFailed)?;
1137
1138 Ok(logs)
1139 }
1140
1141 pub async fn fee_history(
1142 &self,
1143 block_count: u32,
1144 latest_block: BlockNumberOrTag,
1145 reward_percentiles: Option<Vec<f64>>,
1146 ) -> Result<FeeHistoryResult, ClientError> {
1147 let Some(latest_block) = self.block_by_number_or_tag(&latest_block).await? else {
1148 return Err(ClientError::BlockNotFound);
1149 };
1150
1151 self.fee_history_provider
1152 .fee_history(block_count, latest_block.number(), reward_percentiles)
1153 .await
1154 }
1155
1156 pub fn is_automine(&self) -> bool {
1158 self.automine
1159 }
1160
1161 pub async fn get_automine(&self) -> bool {
1163 get_automine(&self.rpc_client).await
1164 }
1165
1166 pub fn get_block_subscription_rx(&self) -> tokio::sync::broadcast::Receiver<Block> {
1168 self.block_subscription_tx.subscribe()
1169 }
1170
1171 pub fn get_log_subscription_rx(&self) -> tokio::sync::broadcast::Receiver<Log> {
1173 self.log_subscription_tx.subscribe()
1174 }
1175}
1176
1177fn to_hex(bytes: impl AsRef<[u8]>) -> String {
1178 format!("0x{}", hex::encode(bytes.as_ref()))
1179}