1pub(crate) mod runtime_api;
21pub(crate) mod storage_api;
22
23use crate::{
24 BlockInfoProvider, BlockTag, FeeHistoryProvider, ReceiptProvider, SubxtBlockInfoProvider,
25 SyncLabel, TracerType, TransactionInfo,
26 block_sync::SyncCheckpoint,
27 subxt_client::{self, SrcChainConfig, revive::calls::types::EthTransact},
28};
29use futures::TryStreamExt;
30use jsonrpsee::types::{ErrorObjectOwned, error::CALL_EXECUTION_FAILED_CODE};
31use pallet_revive::{
32 EthTransactError,
33 evm::{
34 Block, BlockNumberOrTag, BlockNumberOrTagOrHash, FeeHistoryResult, Filter,
35 GenericTransaction, H256, HashesOrTransactionInfos, Log, ReceiptInfo, StateOverrideSet,
36 SyncingProgress, SyncingStatus, Trace, TransactionSigned, TransactionTrace, U256,
37 decode_revert_reason,
38 },
39};
40use runtime_api::RuntimeApi;
41use sp_runtime::traits::Block as BlockT;
42use sp_weights::Weight;
43use std::{
44 sync::{
45 Arc,
46 atomic::{AtomicBool, AtomicUsize, Ordering},
47 },
48 time::Duration,
49};
50use storage_api::StorageApi;
51use subxt::{
52 Config, OnlineClient,
53 backend::{
54 StreamOf, StreamOfResults,
55 legacy::{
56 LegacyRpcMethods,
57 rpc_methods::{SystemHealth, TransactionStatus},
58 },
59 rpc::{
60 RpcClient,
61 reconnecting_rpc_client::{ExponentialBackoff, RpcClient as ReconnectingRpcClient},
62 },
63 },
64 config::{HashFor, Header},
65 ext::subxt_rpcs::rpc_params,
66};
67use thiserror::Error;
68use tokio::sync::{Mutex, mpsc};
69
70pub type SubstrateBlock = subxt::blocks::Block<SrcChainConfig, OnlineClient<SrcChainConfig>>;
72
73pub type SubstrateBlockHeader = <SrcChainConfig as Config>::Header;
75
76pub type SubstrateBlockNumber = <SubstrateBlockHeader as Header>::Number;
78
79pub type SubstrateBlockHash = HashFor<SrcChainConfig>;
81
82pub type Balance = u128;
84
85#[derive(Debug, Clone, Copy, PartialEq)]
87pub enum SubscriptionType {
88 BestBlocks,
90 FinalizedBlocks,
92}
93
94#[derive(Debug, Clone, Copy, PartialEq, Eq, Error)]
96pub enum SubmitError {
97 #[error("Transaction was usurped by another with the same nonce")]
99 Usurped,
100 #[error("Transaction was dropped")]
102 Dropped,
103 #[error("Transaction is invalid (e.g. bad nonce, signature, etc)")]
105 Invalid,
106 #[error("Transaction stream ended without status")]
108 StreamEnded,
109 #[error("Unknown transaction status")]
111 Unknown,
112}
113
114impl From<TransactionStatus<SubstrateBlockHash>> for SubmitError {
115 fn from(status: TransactionStatus<SubstrateBlockHash>) -> Self {
116 match status {
117 TransactionStatus::Usurped(_) => SubmitError::Usurped,
118 TransactionStatus::Dropped => SubmitError::Dropped,
119 TransactionStatus::Invalid => SubmitError::Invalid,
120 _ => SubmitError::Unknown,
121 }
122 }
123}
124
125#[derive(Error, Debug)]
127pub enum ClientError {
128 #[error(transparent)]
130 Jsonrpsee(#[from] jsonrpsee::core::ClientError),
131 #[error(transparent)]
133 SubxtError(#[from] subxt::Error),
134 #[error(transparent)]
135 RpcError(#[from] subxt::ext::subxt_rpcs::Error),
136 #[error(transparent)]
138 SqlxError(#[from] sqlx::Error),
139 #[error(transparent)]
141 CodecError(#[from] codec::Error),
142 #[error("Invalid transaction: {0}")]
144 SubmitError(SubmitError),
145 #[error("contract reverted: {0:?}")]
147 TransactError(EthTransactError),
148 #[error("conversion failed")]
150 ConversionFailed,
151 #[error("hash not found")]
153 BlockNotFound,
154 #[error("Contract not found")]
156 ContractNotFound,
157 #[error("No Ethereum extrinsic found")]
158 EthExtrinsicNotFound,
159 #[error("transactionFeePaid event not found")]
161 TxFeeNotFound,
162 #[error("Failed to decode a raw payload into a signed transaction")]
164 TxDecodingFailed,
165 #[error("failed to recover eth address")]
167 RecoverEthAddressFailed,
168 #[error("Failed to filter logs")]
170 LogFilterFailed(#[from] anyhow::Error),
171 #[error("Receipt storage not found")]
173 ReceiptDataNotFound,
174 #[error("Ethereum block not found")]
176 EthereumBlockNotFound,
177 #[error("Receipt data length mismatch")]
179 ReceiptDataLengthMismatch,
180 #[error("Transaction submission timeout")]
182 Timeout,
183 #[error("None of the estimation methods were found")]
186 NoEstimationMethodSucceeded,
187 #[error("Genesis hash mismatch")]
189 ChainMismatch,
190 #[error("Sync boundary mismatch")]
192 SyncBoundaryMismatch,
193}
194
195impl ClientError {
196 pub(crate) fn is_chain_validation_error(&self) -> bool {
198 matches!(self, Self::ChainMismatch | Self::SyncBoundaryMismatch)
199 }
200}
201
202const LOG_TARGET: &str = "eth-rpc::client";
203const LOG_TARGET_SUBSCRIPTION: &str = "eth-rpc::subscription";
204
205const REVERT_CODE: i32 = 3;
206
207const NOTIFIER_CAPACITY: usize = 16;
208
209impl From<ClientError> for ErrorObjectOwned {
210 fn from(err: ClientError) -> Self {
211 match err {
212 ClientError::SubxtError(subxt::Error::Rpc(subxt::error::RpcError::ClientError(
213 subxt::ext::subxt_rpcs::Error::User(err),
214 ))) |
215 ClientError::RpcError(subxt::ext::subxt_rpcs::Error::User(err)) => {
216 ErrorObjectOwned::owned::<Vec<u8>>(err.code, err.message, None)
217 },
218 ClientError::TransactError(EthTransactError::Data(data)) => {
219 let msg = match decode_revert_reason(&data) {
220 Some(reason) => format!("execution reverted: {reason}"),
221 None => "execution reverted".to_string(),
222 };
223
224 let data = format!("0x{}", hex::encode(data));
225 ErrorObjectOwned::owned::<String>(REVERT_CODE, msg, Some(data))
226 },
227 ClientError::TransactError(EthTransactError::Message(msg)) => {
228 ErrorObjectOwned::owned::<String>(CALL_EXECUTION_FAILED_CODE, msg, None)
229 },
230 _ => {
231 ErrorObjectOwned::owned::<String>(CALL_EXECUTION_FAILED_CODE, err.to_string(), None)
232 },
233 }
234 }
235}
236
237#[derive(Clone)]
239pub struct Client {
240 api: OnlineClient<SrcChainConfig>,
241 rpc_client: RpcClient,
242 rpc: LegacyRpcMethods<SrcChainConfig>,
243 receipt_provider: ReceiptProvider,
244 block_provider: SubxtBlockInfoProvider,
245 fee_history_provider: FeeHistoryProvider,
246 chain_id: u64,
247 max_block_weight: Weight,
248 automine: bool,
250 block_notifier: Option<tokio::sync::broadcast::Sender<H256>>,
252 subscription_lock: Arc<Mutex<()>>,
254
255 block_subscription_tx: tokio::sync::broadcast::Sender<Block>,
257 log_subscription_tx: tokio::sync::broadcast::Sender<Log>,
259 is_archive: bool,
261 backfill_complete: Arc<AtomicBool>,
263 subscription_gap_queue: SubscriptionGapQueue,
265}
266
267pub(crate) struct GapFillRequest {
269 pub from_inclusive: SubstrateBlockNumber,
270 pub to_inclusive: SubstrateBlockNumber,
271}
272
273#[derive(Clone)]
275pub(crate) struct SubscriptionGapQueue {
276 tx: mpsc::Sender<GapFillRequest>,
278 pending: Arc<AtomicUsize>,
281}
282
283impl SubscriptionGapQueue {
284 pub(crate) fn new() -> (Self, mpsc::Receiver<GapFillRequest>) {
285 let (tx, rx) = mpsc::channel(32);
288 (Self { tx, pending: Arc::new(AtomicUsize::new(0)) }, rx)
289 }
290
291 pub fn detect_and_queue(&self, current: SubstrateBlockNumber, last: SubstrateBlockNumber) {
293 if current.saturating_sub(last) <= 1 {
294 return;
295 }
296
297 let from_inclusive = current.saturating_sub(1);
298 let to_inclusive = last.saturating_add(1);
299 let gap_len = from_inclusive.saturating_sub(to_inclusive) + 1;
300 self.pending.fetch_add(1, Ordering::Release);
301 match self.tx.try_send(GapFillRequest { from_inclusive, to_inclusive }) {
302 Ok(_) => {
303 log::info!(target: LOG_TARGET,
304 "๐ Subscription gap queue: queued #{from_inclusive} down to #{to_inclusive} ({gap_len} blocks)");
305 },
306 Err(err) => {
307 self.pending.fetch_sub(1, Ordering::Release);
308 log::warn!(target: LOG_TARGET,
309 "๐ Subscription gap queue error, dropping #{from_inclusive}..#{to_inclusive} ({gap_len} blocks): {err}");
310 },
311 }
312 }
313
314 pub fn mark_done(&self) {
316 let res = self
317 .pending
318 .fetch_update(Ordering::AcqRel, Ordering::Acquire, |v| v.checked_sub(1));
319 if res.is_err() {
320 debug_assert!(false, "subscription gap queue pending counter underflowed");
321 log::error!(target: LOG_TARGET,
322 "๐ Subscription gap queue pending counter underflow, delete the database and restart with --eth-pruning=archive to resync");
323 }
324 }
325
326 pub fn has_pending(&self) -> bool {
328 self.pending.load(Ordering::Acquire) > 0
329 }
330}
331
332fn known_first_evm_block_for_chain(chain_id: u64) -> Option<u32> {
334 match chain_id {
335 420420417 => Some(4_367_914), 420420418 => Some(12_234_156), 420420419 => Some(11_405_259), 420420421 => Some(13_169_391), _ => None,
340 }
341}
342
343async fn chain_id(api: &OnlineClient<SrcChainConfig>) -> Result<u64, ClientError> {
345 let query = subxt_client::constants().revive().chain_id().unvalidated();
346 api.constants().at(&query).map_err(|err| err.into())
347}
348
349async fn max_block_weight(api: &OnlineClient<SrcChainConfig>) -> Result<Weight, ClientError> {
351 let query = subxt_client::constants().system().block_weights().unvalidated();
352 let weights = api.constants().at(&query)?;
353 let max_block = weights.per_class.normal.max_extrinsic.unwrap_or(weights.max_block);
354 Ok(max_block.0)
355}
356
357async fn get_automine(rpc_client: &RpcClient) -> bool {
359 match rpc_client.request::<bool>("getAutomine", rpc_params![]).await {
360 Ok(val) => val,
361 Err(err) => {
362 log::info!(target: LOG_TARGET, "Node does not have getAutomine RPC. Defaulting to automine=false. error: {err:?}");
363 false
364 },
365 }
366}
367
368pub async fn connect(
371 node_rpc_url: &str,
372 max_request_size: u32,
373 max_response_size: u32,
374) -> Result<(OnlineClient<SrcChainConfig>, RpcClient, LegacyRpcMethods<SrcChainConfig>), ClientError>
375{
376 log::info!(target: LOG_TARGET, "๐ Connecting to node at: {node_rpc_url} ...");
377 let rpc_client = ReconnectingRpcClient::builder()
378 .retry_policy(ExponentialBackoff::from_millis(100).max_delay(Duration::from_secs(10)))
379 .max_request_size(max_request_size)
380 .max_response_size(max_response_size)
381 .build(node_rpc_url.to_string())
382 .await?;
383 let rpc_client = RpcClient::new(rpc_client);
384 log::info!(target: LOG_TARGET, "๐ Connected to node at: {node_rpc_url}");
385
386 let api = OnlineClient::<SrcChainConfig>::from_rpc_client(rpc_client.clone()).await?;
387 let rpc = LegacyRpcMethods::<SrcChainConfig>::new(rpc_client.clone());
388 Ok((api, rpc_client, rpc))
389}
390
391impl Client {
392 pub(crate) async fn new(
394 api: OnlineClient<SrcChainConfig>,
395 rpc_client: RpcClient,
396 rpc: LegacyRpcMethods<SrcChainConfig>,
397 block_provider: SubxtBlockInfoProvider,
398 receipt_provider: ReceiptProvider,
399 is_archive: bool,
400 subscription_gap_queue: SubscriptionGapQueue,
401 ) -> Result<Self, ClientError> {
402 let (chain_id, max_block_weight, automine) =
403 tokio::try_join!(chain_id(&api), max_block_weight(&api), async {
404 Ok(get_automine(&rpc_client).await)
405 },)?;
406
407 if !is_archive {
410 if let Some(known) = known_first_evm_block_for_chain(chain_id) {
411 let best = block_provider.latest_block_number().await;
412 if known > best {
413 log::debug!(
414 target: LOG_TARGET,
415 "Hardcoded first EVM block {known} exceeds best block {best} \
416 for chain {chain_id}, defaulting to 0"
417 );
418 receipt_provider.set_first_evm_block(0).await?;
419 }
420 }
421 }
422
423 let client = Self {
424 api,
425 rpc_client,
426 rpc,
427 receipt_provider,
428 block_provider,
429 fee_history_provider: FeeHistoryProvider::default(),
430 chain_id,
431 max_block_weight,
432 automine,
433 block_notifier: automine
434 .then(|| tokio::sync::broadcast::channel::<H256>(NOTIFIER_CAPACITY).0),
435 subscription_lock: Arc::new(Mutex::new(())),
436 block_subscription_tx: tokio::sync::broadcast::channel(256).0,
437 log_subscription_tx: tokio::sync::broadcast::channel(1000).0,
438 is_archive,
439 backfill_complete: Arc::new(AtomicBool::new(false)),
440 subscription_gap_queue,
441 };
442
443 Ok(client)
444 }
445
446 pub(crate) fn mark_backfill_complete(&self) {
448 self.backfill_complete.store(true, Ordering::Release);
449 }
450
451 async fn advance_sync_head(&self, block_number: SubstrateBlockNumber, hash: H256) {
454 if !self.is_archive ||
455 !self.backfill_complete.load(Ordering::Acquire) ||
456 self.subscription_gap_queue.has_pending()
457 {
458 return;
459 }
460
461 if let Err(err) = self
462 .receipt_provider
463 .advance_sync_label(SyncLabel::Head, SyncCheckpoint::new(block_number, hash))
464 .await
465 {
466 log::warn!(target: LOG_TARGET, "Failed to advance sync head: {err:?}");
467 }
468 }
469
470 pub fn create_block_notifier(&mut self) {
472 self.block_notifier = Some(tokio::sync::broadcast::channel::<H256>(NOTIFIER_CAPACITY).0);
473 }
474
475 pub fn set_block_notifier(&mut self, notifier: Option<tokio::sync::broadcast::Sender<H256>>) {
477 self.block_notifier = notifier;
478 }
479
480 pub(crate) fn api(&self) -> &OnlineClient<SrcChainConfig> {
481 &self.api
482 }
483
484 pub(crate) fn receipt_provider(&self) -> &ReceiptProvider {
485 &self.receipt_provider
486 }
487
488 pub(crate) fn block_provider(&self) -> &SubxtBlockInfoProvider {
489 &self.block_provider
490 }
491
492 pub(crate) fn subscription_gap_queue(&self) -> &SubscriptionGapQueue {
493 &self.subscription_gap_queue
494 }
495
496 fn earliest_block_number(&self) -> u32 {
499 self.receipt_provider
500 .first_evm_block()
501 .or_else(|| known_first_evm_block_for_chain(self.chain_id))
502 .unwrap_or(0)
503 }
504
505 async fn subscribe_new_blocks<F, Fut>(
507 &self,
508 subscription_type: SubscriptionType,
509 callback: F,
510 ) -> Result<(), ClientError>
511 where
512 F: Fn(SubstrateBlock) -> Fut + Send + Sync,
513 Fut: std::future::Future<Output = Result<(), ClientError>> + Send,
514 {
515 let mut block_stream = match subscription_type {
516 SubscriptionType::BestBlocks => self.api.blocks().subscribe_best().await,
517 SubscriptionType::FinalizedBlocks => self.api.blocks().subscribe_finalized().await,
518 }
519 .inspect_err(|err| {
520 log::error!(target: LOG_TARGET, "Failed to subscribe to blocks: {err:?}");
521 })?;
522
523 let mut last_finalized_seen: Option<SubstrateBlockNumber> = None;
524
525 while let Some(block) = block_stream.next().await {
526 let block = match block {
527 Ok(block) => block,
528 Err(err) => {
529 if err.is_disconnected_will_reconnect() {
530 log::warn!(
531 target: LOG_TARGET,
532 "The RPC connection was lost and we may have missed a few blocks \
533 ({subscription_type:?}, last finalized: {last_finalized_seen:?}): {err:?}"
534 );
535 continue;
536 }
537
538 log::error!(target: LOG_TARGET, "Failed to fetch block ({subscription_type:?}): {err:?}");
539 return Err(err.into());
540 },
541 };
542
543 let _guard = self.subscription_lock.lock().await;
545
546 let block_number = block.number();
547
548 if subscription_type == SubscriptionType::FinalizedBlocks {
550 if let Some(last) = last_finalized_seen {
551 self.subscription_gap_queue.detect_and_queue(block_number, last);
552 }
553 last_finalized_seen = Some(block_number);
555 }
556
557 log::trace!(target: LOG_TARGET_SUBSCRIPTION, "โณ Processing {subscription_type:?} block: {block_number}");
558 if let Err(err) = callback(block).await {
559 log::error!(target: LOG_TARGET, "Failed to process block {block_number}: {err:?}");
560 } else {
561 log::trace!(target: LOG_TARGET_SUBSCRIPTION, "โ
Processed {subscription_type:?} block: {block_number}");
562 }
563 }
564
565 log::info!(target: LOG_TARGET, "Block subscription ended");
566 Ok(())
567 }
568
569 async fn process_block(
571 &self,
572 block: &SubstrateBlock,
573 ) -> Result<(Block, Vec<ReceiptInfo>), ClientError> {
574 let block_number = block.number();
575 let hash = block.hash();
576
577 macro_rules! time {
578 ($label:expr, $expr:expr) => {{
579 let t = std::time::Instant::now();
580 let r = $expr;
581 log::trace!(
582 target: LOG_TARGET,
583 "โฑ๏ธ #{block_number} {}: {:?}",
584 $label, t.elapsed(),
585 );
586 r
587 }};
588 }
589
590 let eth_block = time!("eth_block", self.runtime_api(hash).eth_block().await?);
591 let receipts = time!(
592 "receipts_from_block",
593 self.receipt_provider.receipts_from_block(block, eth_block.hash).await?
594 );
595 time!(
596 "insert_block_receipts",
597 self.receipt_provider
598 .insert_block_receipts(block, &receipts, ð_block.hash)
599 .await?
600 );
601
602 let (_, receipt_infos): (Vec<_>, Vec<_>) = receipts.into_iter().unzip();
603 self.fee_history_provider.update_fee_history(ð_block, &receipt_infos).await;
604
605 Ok((eth_block, receipt_infos))
606 }
607
608 pub async fn subscribe_and_cache_new_blocks(
610 &self,
611 subscription_type: SubscriptionType,
612 ) -> Result<(), ClientError> {
613 log::info!(target: LOG_TARGET, "๐ Subscribing to new blocks ({subscription_type:?})");
614 self.subscribe_new_blocks(subscription_type, |block| async {
615 let hash = block.hash();
616
617 match subscription_type {
618 SubscriptionType::BestBlocks => {
619 let (eth_block, _) = self.process_block(&block).await?;
620 self.block_provider.update_latest(Arc::new(block), subscription_type).await;
621
622 if let Some(sender) = &self.block_notifier {
623 if sender.receiver_count() > 0 {
624 let _ = sender.send(hash);
625 }
626 }
627 if self.block_subscription_tx.receiver_count() > 0 {
628 let _ = self.block_subscription_tx.send(eth_block);
629 }
630 },
631 SubscriptionType::FinalizedBlocks => {
632 let block_number = block.number();
633 let (receipt_infos, eth_hash) = match self
634 .receipt_provider
635 .get_processed_eth_block_hash(block_number, hash)
636 .await
637 {
638 Some(eth_hash) => {
639 log::trace!(target: LOG_TARGET_SUBSCRIPTION,
640 "โฉ Finalized block #{block_number} already processed, \
641 skipping extraction");
642 (None, eth_hash)
643 },
644 None => {
645 let (eth_block, infos) = self.process_block(&block).await?;
646 (Some(infos), eth_block.hash)
647 },
648 };
649
650 self.block_provider.update_latest(Arc::new(block), subscription_type).await;
651 self.advance_sync_head(block_number, hash).await;
652
653 if self.log_subscription_tx.receiver_count() > 0 {
654 let logs = match receipt_infos {
655 Some(infos) => infos.into_iter().flat_map(|r| r.logs).collect(),
656 None => {
657 self.receipt_provider
658 .logs_by_block_number(block_number, eth_hash)
659 .await?
660 },
661 };
662 for log in logs {
663 let _ = self.log_subscription_tx.send(log);
664 }
665 }
666 },
667 }
668
669 Ok(())
670 })
671 .await
672 }
673
674 pub async fn block_hash_for_tag(
676 &self,
677 at: BlockNumberOrTagOrHash,
678 ) -> Result<SubstrateBlockHash, ClientError> {
679 match at {
680 BlockNumberOrTagOrHash::BlockHash(hash) => self
681 .resolve_substrate_hash(&hash)
682 .await
683 .ok_or(ClientError::EthereumBlockNotFound),
684 BlockNumberOrTagOrHash::BlockNumber(block_number) => {
685 let n: SubstrateBlockNumber =
686 (block_number).try_into().map_err(|_| ClientError::ConversionFailed)?;
687 let hash = self.get_block_hash(n).await?.ok_or(ClientError::BlockNotFound)?;
688 Ok(hash)
689 },
690 BlockNumberOrTagOrHash::BlockTag(BlockTag::Finalized | BlockTag::Safe) => {
691 let block = self.latest_finalized_block().await;
692 Ok(block.hash())
693 },
694 BlockNumberOrTagOrHash::BlockTag(BlockTag::Earliest) => {
695 let hash = self
696 .get_block_hash(self.earliest_block_number())
697 .await?
698 .ok_or(ClientError::BlockNotFound)?;
699 Ok(hash)
700 },
701 BlockNumberOrTagOrHash::BlockTag(_) => {
702 let block = self.latest_block().await;
703 Ok(block.hash())
704 },
705 }
706 }
707
708 pub fn storage_api(&self, block_hash: H256) -> StorageApi {
710 StorageApi::new(self.api.storage().at(block_hash))
711 }
712
713 pub fn runtime_api(&self, block_hash: H256) -> RuntimeApi {
715 RuntimeApi::new(self.api.runtime_api().at(block_hash))
716 }
717
718 pub async fn latest_finalized_block(&self) -> Arc<SubstrateBlock> {
720 self.block_provider.latest_finalized_block().await
721 }
722
723 pub async fn latest_block(&self) -> Arc<SubstrateBlock> {
725 self.block_provider.latest_block().await
726 }
727
728 async fn submit_transaction(
730 &self,
731 call: subxt::tx::DefaultPayload<EthTransact>,
732 ) -> Result<StreamOfResults<TransactionStatus<SubstrateBlockHash>>, ClientError> {
733 let ext = self.api.tx().create_unsigned(&call.unvalidated()).map_err(ClientError::from)?;
734
735 let sub = self
736 .rpc_client
737 .subscribe(
738 "author_submitAndWatchExtrinsic",
739 rpc_params![to_hex(ext.encoded())],
740 "author_unwatchExtrinsic",
741 )
742 .await?;
743
744 let sub = sub.map_err(|e| e.into());
745 Ok(StreamOf::new(Box::pin(sub)))
746 }
747
748 pub async fn submit(
750 &self,
751 call: subxt::tx::DefaultPayload<EthTransact>,
752 ) -> Result<TransactionStatus<SubstrateBlockHash>, ClientError> {
753 let mut progress = self.submit_transaction(call).await.inspect_err(|err| {
754 log::debug!(target: LOG_TARGET, "Failed to submit transaction: {err:?}");
755 })?;
756
757 tokio::time::timeout(Duration::from_secs(5), async {
758 if let Some(status) = progress.next().await {
759 match status {
760 Ok(
761 tx @ (TransactionStatus::Future |
762 TransactionStatus::Ready |
763 TransactionStatus::Broadcast(_) |
766 TransactionStatus::InBlock(_) |
767 TransactionStatus::FinalityTimeout(_) |
768 TransactionStatus::Retracted(_) |
769 TransactionStatus::Finalized(_)),
770 ) => {
771 return Ok(tx);
772 },
773 Ok(
774 tx @ (TransactionStatus::Usurped(_) |
775 TransactionStatus::Dropped |
776 TransactionStatus::Invalid),
777 ) => {
778 return Err(ClientError::SubmitError(tx.into()));
779 },
780 Err(err) => {
781 log::debug!(target: LOG_TARGET, "Transaction submission failed: {err:?}");
782 return Err(ClientError::from(err));
783 },
784 }
785 }
786 return Err(ClientError::SubmitError(SubmitError::StreamEnded));
787 })
788 .await
789 .map_err(|_| ClientError::Timeout)?
790 }
791
792 pub async fn receipt(&self, tx_hash: &H256) -> Option<ReceiptInfo> {
794 self.receipt_provider.receipt_by_hash(tx_hash).await
795 }
796
797 pub async fn post_dispatch_weight(&self, tx_hash: &H256) -> Option<Weight> {
799 use crate::subxt_client::system::events::ExtrinsicSuccess;
800 let ReceiptInfo { block_hash, transaction_index, .. } = self.receipt(tx_hash).await?;
801 let block_hash = self.resolve_substrate_hash(&block_hash).await?;
802 let block = self.block_provider.block_by_hash(&block_hash).await.ok()??;
803 let ext = block.extrinsics().await.ok()?.iter().nth(transaction_index.as_u32() as _)?;
804 let event = ext.events().await.ok()?.find_first::<ExtrinsicSuccess>().ok()??;
805 Some(event.dispatch_info.weight.0)
806 }
807
808 pub async fn sync_state(
809 &self,
810 ) -> Result<sc_rpc::system::SyncState<SubstrateBlockNumber>, ClientError> {
811 let client = self.rpc_client.clone();
812 let sync_state: sc_rpc::system::SyncState<SubstrateBlockNumber> =
813 client.request("system_syncState", Default::default()).await?;
814 Ok(sync_state)
815 }
816
817 pub async fn syncing(&self) -> Result<SyncingStatus, ClientError> {
819 let health = self.rpc.system_health().await?;
820
821 let status = if health.is_syncing {
822 let sync_state = self.sync_state().await?;
823 SyncingProgress {
824 current_block: Some(sync_state.current_block.into()),
825 highest_block: Some(sync_state.highest_block.into()),
826 starting_block: Some(sync_state.starting_block.into()),
827 }
828 .into()
829 } else {
830 SyncingStatus::Bool(false)
831 };
832
833 Ok(status)
834 }
835
836 pub async fn receipt_by_hash_and_index(
838 &self,
839 block_hash: &H256,
840 transaction_index: usize,
841 ) -> Option<ReceiptInfo> {
842 self.receipt_provider
843 .receipt_by_block_hash_and_index(block_hash, transaction_index)
844 .await
845 }
846
847 pub async fn signed_tx_by_hash(&self, tx_hash: &H256) -> Option<TransactionSigned> {
848 self.receipt_provider.signed_tx_by_hash(tx_hash).await
849 }
850
851 pub async fn receipts_count_per_block(&self, block_hash: &SubstrateBlockHash) -> Option<usize> {
853 self.receipt_provider.receipts_count_per_block(block_hash).await
854 }
855
856 pub async fn receipt_by_ethereum_hash_and_index(
858 &self,
859 ethereum_hash: &H256,
860 transaction_index: usize,
861 ) -> Option<ReceiptInfo> {
862 let substrate_hash =
864 self.resolve_substrate_hash(ethereum_hash).await.unwrap_or_else(|| {
865 log::trace!(target: LOG_TARGET,
866 "receipt_by_ethereum_hash_and_index: no ETH-to-substrate mapping for \
867 {ethereum_hash:?}, falling back to substrate hash lookup");
868 *ethereum_hash
869 });
870 self.receipt_by_hash_and_index(&substrate_hash, transaction_index).await
871 }
872
873 pub async fn system_health(&self) -> Result<SystemHealth, ClientError> {
875 let health = self.rpc.system_health().await?;
876 Ok(health)
877 }
878
879 pub async fn block_number(&self) -> Result<SubstrateBlockNumber, ClientError> {
881 let latest_block = self.block_provider.latest_block().await;
882 Ok(latest_block.number())
883 }
884
885 pub async fn get_block_hash(
887 &self,
888 block_number: SubstrateBlockNumber,
889 ) -> Result<Option<SubstrateBlockHash>, ClientError> {
890 let maybe_block = self.block_provider.block_by_number(block_number).await?;
891 Ok(maybe_block.map(|block| block.hash()))
892 }
893
894 pub async fn block_by_number_or_tag(
896 &self,
897 block: &BlockNumberOrTag,
898 ) -> Result<Option<Arc<SubstrateBlock>>, ClientError> {
899 match block {
900 BlockNumberOrTag::U256(n) => {
901 let n = (*n).try_into().map_err(|_| ClientError::ConversionFailed)?;
902 self.block_by_number(n).await
903 },
904 BlockNumberOrTag::BlockTag(BlockTag::Finalized | BlockTag::Safe) => {
905 let block = self.block_provider.latest_finalized_block().await;
906 Ok(Some(block))
907 },
908 BlockNumberOrTag::BlockTag(BlockTag::Earliest) => {
909 self.block_by_number(self.earliest_block_number()).await
910 },
911 BlockNumberOrTag::BlockTag(_) => {
912 let block = self.block_provider.latest_block().await;
913 Ok(Some(block))
914 },
915 }
916 }
917
918 pub async fn block_by_hash(
920 &self,
921 hash: &SubstrateBlockHash,
922 ) -> Result<Option<Arc<SubstrateBlock>>, ClientError> {
923 self.block_provider.block_by_hash(hash).await
924 }
925
926 pub async fn resolve_substrate_hash(&self, ethereum_hash: &H256) -> Option<H256> {
929 self.receipt_provider.get_substrate_hash(ethereum_hash).await
930 }
931
932 pub async fn resolve_ethereum_hash(&self, substrate_hash: &H256) -> Option<H256> {
935 self.receipt_provider.get_ethereum_hash(substrate_hash).await
936 }
937
938 pub async fn block_by_ethereum_hash(
941 &self,
942 ethereum_hash: &H256,
943 ) -> Result<Option<Arc<SubstrateBlock>>, ClientError> {
944 if let Some(substrate_hash) = self.resolve_substrate_hash(ethereum_hash).await {
946 return self.block_by_hash(&substrate_hash).await;
947 }
948
949 log::trace!(target: LOG_TARGET,
951 "block_by_ethereum_hash: no ETH-to-substrate mapping for {ethereum_hash:?}, \
952 falling back to substrate hash lookup");
953 self.block_by_hash(ethereum_hash).await
954 }
955
956 pub async fn block_by_number(
958 &self,
959 block_number: SubstrateBlockNumber,
960 ) -> Result<Option<Arc<SubstrateBlock>>, ClientError> {
961 self.block_provider.block_by_number(block_number).await
962 }
963
964 async fn tracing_block(
965 &self,
966 block_hash: H256,
967 ) -> Result<
968 sp_runtime::generic::Block<
969 sp_runtime::generic::Header<u32, sp_runtime::traits::BlakeTwo256>,
970 sp_runtime::OpaqueExtrinsic,
971 >,
972 ClientError,
973 > {
974 let signed_block: Option<
975 sp_runtime::generic::SignedBlock<
976 sp_runtime::generic::Block<
977 sp_runtime::generic::Header<u32, sp_runtime::traits::BlakeTwo256>,
978 sp_runtime::OpaqueExtrinsic,
979 >,
980 >,
981 > = self.rpc_client.request("chain_getBlock", rpc_params![block_hash]).await?;
982
983 Ok(signed_block.ok_or(ClientError::BlockNotFound)?.block)
984 }
985
986 pub async fn trace_block_by_number(
988 &self,
989 at: BlockNumberOrTag,
990 config: TracerType,
991 ) -> Result<Vec<TransactionTrace>, ClientError> {
992 if self.receipt_provider.is_before_earliest_block(&at) {
993 return Ok(vec![]);
994 }
995
996 let block_hash = self.block_hash_for_tag(at.into()).await?;
997 let block = self.tracing_block(block_hash).await?;
998 let parent_hash = block.header().parent_hash;
999 if parent_hash == Default::default() {
1001 return Ok(vec![]);
1002 }
1003 let runtime_api = RuntimeApi::new(self.api.runtime_api().at(parent_hash));
1004 let traces = runtime_api.trace_block(block, config.clone()).await?;
1005
1006 let mut hashes = self
1007 .receipt_provider
1008 .block_transaction_hashes(&block_hash)
1009 .await
1010 .ok_or(ClientError::EthExtrinsicNotFound)?;
1011
1012 let traces = traces.into_iter().filter_map(|(index, trace)| {
1013 Some(TransactionTrace { tx_hash: hashes.remove(&(index as usize))?, trace })
1014 });
1015
1016 Ok(traces.collect())
1017 }
1018
1019 pub async fn trace_transaction(
1021 &self,
1022 transaction_hash: H256,
1023 config: TracerType,
1024 ) -> Result<Trace, ClientError> {
1025 let (block_hash, transaction_index) = self
1026 .receipt_provider
1027 .find_transaction(&transaction_hash)
1028 .await
1029 .ok_or(ClientError::EthExtrinsicNotFound)?;
1030
1031 let block = self.tracing_block(block_hash).await?;
1032 let parent_hash = block.header.parent_hash;
1033 let runtime_api = self.runtime_api(parent_hash);
1034
1035 runtime_api.trace_tx(block, transaction_index as u32, config).await
1036 }
1037
1038 pub async fn trace_call(
1040 &self,
1041 transaction: GenericTransaction,
1042 block: BlockNumberOrTagOrHash,
1043 config: TracerType,
1044 state_overrides: Option<StateOverrideSet>,
1045 ) -> Result<Trace, ClientError> {
1046 let block_hash = self.block_hash_for_tag(block).await?;
1047 let runtime_api = self.runtime_api(block_hash);
1048 runtime_api.trace_call(transaction, config, state_overrides).await
1049 }
1050
1051 pub async fn evm_block(
1053 &self,
1054 block: Arc<SubstrateBlock>,
1055 hydrated_transactions: bool,
1056 ) -> Option<Block> {
1057 log::trace!(target: LOG_TARGET, "Get Ethereum block for hash {:?}", block.hash());
1058
1059 if self
1060 .receipt_provider
1061 .is_before_earliest_block(&BlockNumberOrTag::U256(U256::from(block.number())))
1062 {
1063 log::trace!(target: LOG_TARGET,
1064 "Block #{} is before receipt floor, skipping", block.number());
1065 return None;
1066 }
1067
1068 match self.runtime_api(block.hash()).eth_block().await {
1074 Ok(mut eth_block) => {
1075 log::trace!(target: LOG_TARGET, "Ethereum block from runtime API hash {:?}", eth_block.hash);
1076
1077 if hydrated_transactions {
1078 let tx_infos = self
1080 .receipt_provider
1081 .receipts_from_block(&block, eth_block.hash)
1082 .await
1083 .inspect_err(|err| {
1084 log::trace!(target: LOG_TARGET,
1085 "Failed to extract receipts for block #{}: {err:?}",
1086 block.number());
1087 })
1088 .unwrap_or_default()
1089 .into_iter()
1090 .map(|(signed_tx, receipt)| TransactionInfo::new(&receipt, signed_tx))
1091 .collect::<Vec<_>>();
1092
1093 eth_block.transactions = HashesOrTransactionInfos::TransactionInfos(tx_infos);
1094 }
1095
1096 Some(eth_block)
1097 },
1098 Err(err) => {
1099 log::error!(target: LOG_TARGET, "Failed to get Ethereum block for hash {:?}: {err:?}", block.hash());
1100 None
1101 },
1102 }
1103 }
1104
1105 pub fn chain_id(&self) -> u64 {
1107 self.chain_id
1108 }
1109
1110 pub fn max_block_weight(&self) -> Weight {
1112 self.max_block_weight
1113 }
1114
1115 pub fn block_notifier(&self) -> Option<tokio::sync::broadcast::Sender<H256>> {
1117 self.block_notifier.clone()
1118 }
1119
1120 pub async fn logs(&self, filter: Option<Filter>) -> Result<Vec<Log>, ClientError> {
1122 let earliest = U256::from(self.earliest_block_number());
1123 let latest = U256::from(self.latest_block().await.number());
1124 let resolve_block_number = |block: BlockNumberOrTag| match block {
1125 BlockNumberOrTag::U256(v) => Ok(v),
1126 BlockNumberOrTag::BlockTag(BlockTag::Earliest) => Ok(earliest),
1127 BlockNumberOrTag::BlockTag(BlockTag::Latest) => Ok(latest),
1128 BlockNumberOrTag::BlockTag(tag) => anyhow::bail!("Unsupported tag: {tag:?}"),
1129 };
1130
1131 let logs = self
1132 .receipt_provider
1133 .logs(filter, &resolve_block_number)
1134 .await
1135 .map_err(ClientError::LogFilterFailed)?;
1136
1137 Ok(logs)
1138 }
1139
1140 pub async fn fee_history(
1141 &self,
1142 block_count: u32,
1143 latest_block: BlockNumberOrTag,
1144 reward_percentiles: Option<Vec<f64>>,
1145 ) -> Result<FeeHistoryResult, ClientError> {
1146 let Some(latest_block) = self.block_by_number_or_tag(&latest_block).await? else {
1147 return Err(ClientError::BlockNotFound);
1148 };
1149
1150 self.fee_history_provider
1151 .fee_history(block_count, latest_block.number(), reward_percentiles)
1152 .await
1153 }
1154
1155 pub fn is_automine(&self) -> bool {
1157 self.automine
1158 }
1159
1160 pub async fn get_automine(&self) -> bool {
1162 get_automine(&self.rpc_client).await
1163 }
1164
1165 pub fn get_block_subscription_rx(&self) -> tokio::sync::broadcast::Receiver<Block> {
1167 self.block_subscription_tx.subscribe()
1168 }
1169
1170 pub fn get_log_subscription_rx(&self) -> tokio::sync::broadcast::Receiver<Log> {
1172 self.log_subscription_tx.subscribe()
1173 }
1174}
1175
1176fn to_hex(bytes: impl AsRef<[u8]>) -> String {
1177 format!("0x{}", hex::encode(bytes.as_ref()))
1178}