1mod runtime_api;
21mod storage_api;
22
23use runtime_api::RuntimeApi;
24use storage_api::StorageApi;
25
26use crate::{
27 subxt_client::{self, revive::calls::types::EthTransact, SrcChainConfig},
28 BlockInfoProvider, BlockTag, FeeHistoryProvider, ReceiptProvider, SubxtBlockInfoProvider,
29 TracerType, TransactionInfo, LOG_TARGET,
30};
31use jsonrpsee::{
32 core::traits::ToRpcParams,
33 rpc_params,
34 types::{error::CALL_EXECUTION_FAILED_CODE, ErrorObjectOwned},
35};
36use pallet_revive::{
37 evm::{
38 decode_revert_reason, Block, BlockNumberOrTag, BlockNumberOrTagOrHash, FeeHistoryResult,
39 Filter, GenericTransaction, Log, ReceiptInfo, SyncingProgress, SyncingStatus, Trace,
40 TransactionSigned, TransactionTrace, H256, U256,
41 },
42 EthTransactError,
43};
44use sp_runtime::traits::Block as BlockT;
45use sp_weights::Weight;
46use std::{ops::Range, sync::Arc, time::Duration};
47use subxt::{
48 backend::{
49 legacy::{rpc_methods::SystemHealth, LegacyRpcMethods},
50 rpc::{
51 reconnecting_rpc_client::{ExponentialBackoff, RpcClient as ReconnectingRpcClient},
52 RpcClient,
53 },
54 },
55 config::{HashFor, Header},
56 Config, OnlineClient,
57};
58use thiserror::Error;
59
60pub type SubstrateBlock = subxt::blocks::Block<SrcChainConfig, OnlineClient<SrcChainConfig>>;
62
63pub type SubstrateBlockHeader = <SrcChainConfig as Config>::Header;
65
66pub type SubstrateBlockNumber = <SubstrateBlockHeader as Header>::Number;
68
69pub type SubstrateBlockHash = HashFor<SrcChainConfig>;
71
72pub type Balance = u128;
74
75#[derive(Debug, Clone, Copy)]
77pub enum SubscriptionType {
78 BestBlocks,
80 FinalizedBlocks,
82}
83
84#[derive(Error, Debug)]
86pub enum ClientError {
87 #[error(transparent)]
89 Jsonrpsee(#[from] jsonrpsee::core::ClientError),
90 #[error(transparent)]
92 SubxtError(#[from] subxt::Error),
93 #[error(transparent)]
94 RpcError(#[from] subxt::ext::subxt_rpcs::Error),
95 #[error(transparent)]
97 SqlxError(#[from] sqlx::Error),
98 #[error(transparent)]
100 CodecError(#[from] codec::Error),
101 #[error("contract reverted")]
103 TransactError(EthTransactError),
104 #[error("conversion failed")]
106 ConversionFailed,
107 #[error("hash not found")]
109 BlockNotFound,
110 #[error("Contract not found")]
112 ContractNotFound,
113 #[error("No Ethereum extrinsic found")]
114 EthExtrinsicNotFound,
115 #[error("transactionFeePaid event not found")]
117 TxFeeNotFound,
118 #[error("Failed to decode a raw payload into a signed transaction")]
120 TxDecodingFailed,
121 #[error("failed to recover eth address")]
123 RecoverEthAddressFailed,
124 #[error("Failed to filter logs")]
126 LogFilterFailed(#[from] anyhow::Error),
127}
128
129const REVERT_CODE: i32 = 3;
130impl From<ClientError> for ErrorObjectOwned {
131 fn from(err: ClientError) -> Self {
132 match err {
133 ClientError::SubxtError(subxt::Error::Rpc(subxt::error::RpcError::ClientError(
134 subxt::ext::subxt_rpcs::Error::User(err),
135 ))) |
136 ClientError::RpcError(subxt::ext::subxt_rpcs::Error::User(err)) =>
137 ErrorObjectOwned::owned::<Vec<u8>>(err.code, err.message, None),
138 ClientError::TransactError(EthTransactError::Data(data)) => {
139 let msg = match decode_revert_reason(&data) {
140 Some(reason) => format!("execution reverted: {reason}"),
141 None => "execution reverted".to_string(),
142 };
143
144 let data = format!("0x{}", hex::encode(data));
145 ErrorObjectOwned::owned::<String>(REVERT_CODE, msg, Some(data))
146 },
147 ClientError::TransactError(EthTransactError::Message(msg)) =>
148 ErrorObjectOwned::owned::<String>(CALL_EXECUTION_FAILED_CODE, msg, None),
149 _ =>
150 ErrorObjectOwned::owned::<String>(CALL_EXECUTION_FAILED_CODE, err.to_string(), None),
151 }
152 }
153}
154
155#[derive(Clone)]
157pub struct Client {
158 api: OnlineClient<SrcChainConfig>,
159 rpc_client: ReconnectingRpcClient,
160 rpc: LegacyRpcMethods<SrcChainConfig>,
161 receipt_provider: ReceiptProvider,
162 block_provider: SubxtBlockInfoProvider,
163 fee_history_provider: FeeHistoryProvider,
164 chain_id: u64,
165 max_block_weight: Weight,
166}
167
168async fn chain_id(api: &OnlineClient<SrcChainConfig>) -> Result<u64, ClientError> {
170 let query = subxt_client::constants().revive().chain_id();
171 api.constants().at(&query).map_err(|err| err.into())
172}
173
174async fn max_block_weight(api: &OnlineClient<SrcChainConfig>) -> Result<Weight, ClientError> {
176 let query = subxt_client::constants().system().block_weights();
177 let weights = api.constants().at(&query)?;
178 let max_block = weights.per_class.normal.max_extrinsic.unwrap_or(weights.max_block);
179 Ok(max_block.0)
180}
181
182async fn extract_block_timestamp(block: &SubstrateBlock) -> Option<u64> {
184 let extrinsics = block.extrinsics().await.ok()?;
185 let ext = extrinsics
186 .find_first::<crate::subxt_client::timestamp::calls::types::Set>()
187 .ok()??;
188
189 Some(ext.value.now / 1000)
190}
191
192pub async fn connect(
195 node_rpc_url: &str,
196) -> Result<
197 (OnlineClient<SrcChainConfig>, ReconnectingRpcClient, LegacyRpcMethods<SrcChainConfig>),
198 ClientError,
199> {
200 log::info!(target: LOG_TARGET, "๐ Connecting to node at: {node_rpc_url} ...");
201 let rpc_client = ReconnectingRpcClient::builder()
202 .retry_policy(ExponentialBackoff::from_millis(100).max_delay(Duration::from_secs(10)))
203 .build(node_rpc_url.to_string())
204 .await?;
205 log::info!(target: LOG_TARGET, "๐ Connected to node at: {node_rpc_url}");
206
207 let api = OnlineClient::<SrcChainConfig>::from_rpc_client(rpc_client.clone()).await?;
208 let rpc = LegacyRpcMethods::<SrcChainConfig>::new(RpcClient::new(rpc_client.clone()));
209 Ok((api, rpc_client, rpc))
210}
211
212impl Client {
213 pub async fn new(
215 api: OnlineClient<SrcChainConfig>,
216 rpc_client: ReconnectingRpcClient,
217 rpc: LegacyRpcMethods<SrcChainConfig>,
218 block_provider: SubxtBlockInfoProvider,
219 receipt_provider: ReceiptProvider,
220 ) -> Result<Self, ClientError> {
221 let (chain_id, max_block_weight) =
222 tokio::try_join!(chain_id(&api), max_block_weight(&api))?;
223
224 Ok(Self {
225 api,
226 rpc_client,
227 rpc,
228 receipt_provider,
229 block_provider,
230 fee_history_provider: FeeHistoryProvider::default(),
231 chain_id,
232 max_block_weight,
233 })
234 }
235
236 async fn subscribe_past_blocks<F, Fut>(
238 &self,
239 range: Range<SubstrateBlockNumber>,
240 callback: F,
241 ) -> Result<(), ClientError>
242 where
243 F: Fn(Arc<SubstrateBlock>) -> Fut + Send + Sync,
244 Fut: std::future::Future<Output = Result<(), ClientError>> + Send,
245 {
246 let mut block = self
247 .block_provider
248 .block_by_number(range.end)
249 .await?
250 .ok_or(ClientError::BlockNotFound)?;
251
252 loop {
253 let block_number = block.number();
254 log::trace!(target: "eth-rpc::subscription", "Processing past block #{block_number}");
255
256 let parent_hash = block.header().parent_hash;
257 callback(block.clone()).await.inspect_err(|err| {
258 log::error!(target: "eth-rpc::subscription", "Failed to process past block #{block_number}: {err:?}");
259 })?;
260
261 if range.start < block_number {
262 block = self
263 .block_provider
264 .block_by_hash(&parent_hash)
265 .await?
266 .ok_or(ClientError::BlockNotFound)?;
267 } else {
268 return Ok(());
269 }
270 }
271 }
272
273 async fn subscribe_new_blocks<F, Fut>(
275 &self,
276 subscription_type: SubscriptionType,
277 callback: F,
278 ) -> Result<(), ClientError>
279 where
280 F: Fn(SubstrateBlock) -> Fut + Send + Sync,
281 Fut: std::future::Future<Output = Result<(), ClientError>> + Send,
282 {
283 let mut block_stream = match subscription_type {
284 SubscriptionType::BestBlocks => self.api.blocks().subscribe_best().await,
285 SubscriptionType::FinalizedBlocks => self.api.blocks().subscribe_finalized().await,
286 }
287 .inspect_err(|err| {
288 log::error!(target: LOG_TARGET, "Failed to subscribe to blocks: {err:?}");
289 })?;
290
291 while let Some(block) = block_stream.next().await {
292 let block = match block {
293 Ok(block) => block,
294 Err(err) => {
295 if err.is_disconnected_will_reconnect() {
296 log::warn!(
297 target: LOG_TARGET,
298 "The RPC connection was lost and we may have missed a few blocks ({subscription_type:?}): {err:?}"
299 );
300 continue;
301 }
302
303 log::error!(target: LOG_TARGET, "Failed to fetch block ({subscription_type:?}): {err:?}");
304 return Err(err.into());
305 },
306 };
307
308 let block_number = block.number();
309 log::trace!(target: "eth-rpc::subscription", "โณ Processing {subscription_type:?} block: {block_number}");
310 if let Err(err) = callback(block).await {
311 log::error!(target: LOG_TARGET, "Failed to process block {block_number}: {err:?}");
312 } else {
313 log::trace!(target: "eth-rpc::subscription", "โ
Processed {subscription_type:?} block: {block_number}");
314 }
315 }
316
317 log::info!(target: LOG_TARGET, "Block subscription ended");
318 Ok(())
319 }
320
321 pub async fn subscribe_and_cache_new_blocks(
323 &self,
324 subscription_type: SubscriptionType,
325 ) -> Result<(), ClientError> {
326 log::info!(target: LOG_TARGET, "๐ Subscribing to new blocks ({subscription_type:?})");
327 self.subscribe_new_blocks(subscription_type, |block| async {
328 let (signed_txs, receipts): (Vec<_>, Vec<_>) =
329 self.receipt_provider.insert_block_receipts(&block).await?.into_iter().unzip();
330
331 let evm_block =
332 self.evm_block_from_receipts(&block, &receipts, signed_txs, false).await;
333 self.block_provider.update_latest(block, subscription_type).await;
334
335 self.fee_history_provider.update_fee_history(&evm_block, &receipts).await;
336 Ok(())
337 })
338 .await
339 }
340
341 pub async fn subscribe_and_cache_blocks(
343 &self,
344 index_last_n_blocks: SubstrateBlockNumber,
345 ) -> Result<(), ClientError> {
346 let last = self.latest_block().await.number().saturating_sub(1);
347 let range = last.saturating_sub(index_last_n_blocks)..last;
348 log::info!(target: LOG_TARGET, "๐๏ธ Indexing past blocks in range {range:?}");
349 self.subscribe_past_blocks(range, |block| async move {
350 self.receipt_provider.insert_block_receipts(&block).await?;
351 Ok(())
352 })
353 .await?;
354
355 log::info!(target: LOG_TARGET, "๐๏ธ Finished indexing past blocks");
356 Ok(())
357 }
358
359 pub async fn block_hash_for_tag(
361 &self,
362 at: BlockNumberOrTagOrHash,
363 ) -> Result<SubstrateBlockHash, ClientError> {
364 match at {
365 BlockNumberOrTagOrHash::BlockHash(hash) => Ok(hash),
366 BlockNumberOrTagOrHash::BlockNumber(block_number) => {
367 let n: SubstrateBlockNumber =
368 (block_number).try_into().map_err(|_| ClientError::ConversionFailed)?;
369 let hash = self.get_block_hash(n).await?.ok_or(ClientError::BlockNotFound)?;
370 Ok(hash)
371 },
372 BlockNumberOrTagOrHash::BlockTag(BlockTag::Finalized | BlockTag::Safe) => {
373 let block = self.latest_finalized_block().await;
374 Ok(block.hash())
375 },
376 BlockNumberOrTagOrHash::BlockTag(_) => {
377 let block = self.latest_block().await;
378 Ok(block.hash())
379 },
380 }
381 }
382
383 pub fn storage_api(&self, block_hash: H256) -> StorageApi {
385 StorageApi::new(self.api.storage().at(block_hash))
386 }
387
388 pub fn runtime_api(&self, block_hash: H256) -> RuntimeApi {
390 RuntimeApi::new(self.api.runtime_api().at(block_hash))
391 }
392
393 pub async fn latest_finalized_block(&self) -> Arc<SubstrateBlock> {
395 self.block_provider.latest_finalized_block().await
396 }
397
398 pub async fn latest_block(&self) -> Arc<SubstrateBlock> {
400 self.block_provider.latest_block().await
401 }
402
403 pub async fn submit(
405 &self,
406 call: subxt::tx::DefaultPayload<EthTransact>,
407 ) -> Result<H256, ClientError> {
408 let ext = self.api.tx().create_unsigned(&call).map_err(ClientError::from)?;
409 let hash = ext.submit().await?;
410 Ok(hash)
411 }
412
413 pub async fn receipt(&self, tx_hash: &H256) -> Option<ReceiptInfo> {
415 self.receipt_provider.receipt_by_hash(tx_hash).await
416 }
417
418 pub async fn sync_state(
419 &self,
420 ) -> Result<sc_rpc::system::SyncState<SubstrateBlockNumber>, ClientError> {
421 let client = RpcClient::new(self.rpc_client.clone());
422 let sync_state: sc_rpc::system::SyncState<SubstrateBlockNumber> =
423 client.request("system_syncState", Default::default()).await?;
424 Ok(sync_state)
425 }
426
427 pub async fn syncing(&self) -> Result<SyncingStatus, ClientError> {
429 let health = self.rpc.system_health().await?;
430
431 let status = if health.is_syncing {
432 let sync_state = self.sync_state().await?;
433 SyncingProgress {
434 current_block: Some(sync_state.current_block.into()),
435 highest_block: Some(sync_state.highest_block.into()),
436 starting_block: Some(sync_state.starting_block.into()),
437 }
438 .into()
439 } else {
440 SyncingStatus::Bool(false)
441 };
442
443 Ok(status)
444 }
445
446 pub async fn receipt_by_hash_and_index(
448 &self,
449 block_hash: &H256,
450 transaction_index: usize,
451 ) -> Option<ReceiptInfo> {
452 self.receipt_provider
453 .receipt_by_block_hash_and_index(block_hash, transaction_index)
454 .await
455 }
456
457 pub async fn signed_tx_by_hash(&self, tx_hash: &H256) -> Option<TransactionSigned> {
458 self.receipt_provider.signed_tx_by_hash(tx_hash).await
459 }
460
461 pub async fn receipts_count_per_block(&self, block_hash: &SubstrateBlockHash) -> Option<usize> {
463 self.receipt_provider.receipts_count_per_block(block_hash).await
464 }
465
466 pub async fn system_health(&self) -> Result<SystemHealth, ClientError> {
468 let health = self.rpc.system_health().await?;
469 Ok(health)
470 }
471
472 pub async fn block_number(&self) -> Result<SubstrateBlockNumber, ClientError> {
474 let latest_block = self.block_provider.latest_block().await;
475 Ok(latest_block.number())
476 }
477
478 pub async fn get_block_hash(
480 &self,
481 block_number: SubstrateBlockNumber,
482 ) -> Result<Option<SubstrateBlockHash>, ClientError> {
483 let maybe_block = self.block_provider.block_by_number(block_number).await?;
484 Ok(maybe_block.map(|block| block.hash()))
485 }
486
487 pub async fn block_by_number_or_tag(
489 &self,
490 block: &BlockNumberOrTag,
491 ) -> Result<Option<Arc<SubstrateBlock>>, ClientError> {
492 match block {
493 BlockNumberOrTag::U256(n) => {
494 let n = (*n).try_into().map_err(|_| ClientError::ConversionFailed)?;
495 self.block_by_number(n).await
496 },
497 BlockNumberOrTag::BlockTag(BlockTag::Finalized | BlockTag::Safe) => {
498 let block = self.block_provider.latest_finalized_block().await;
499 Ok(Some(block))
500 },
501 BlockNumberOrTag::BlockTag(_) => {
502 let block = self.block_provider.latest_block().await;
503 Ok(Some(block))
504 },
505 }
506 }
507
508 pub async fn block_by_hash(
510 &self,
511 hash: &SubstrateBlockHash,
512 ) -> Result<Option<Arc<SubstrateBlock>>, ClientError> {
513 self.block_provider.block_by_hash(hash).await
514 }
515
516 pub async fn block_by_number(
518 &self,
519 block_number: SubstrateBlockNumber,
520 ) -> Result<Option<Arc<SubstrateBlock>>, ClientError> {
521 self.block_provider.block_by_number(block_number).await
522 }
523
524 async fn tracing_block(
525 &self,
526 block_hash: H256,
527 ) -> Result<
528 sp_runtime::generic::Block<
529 sp_runtime::generic::Header<u32, sp_runtime::traits::BlakeTwo256>,
530 sp_runtime::OpaqueExtrinsic,
531 >,
532 ClientError,
533 > {
534 let params = rpc_params![block_hash].to_rpc_params().unwrap_or_default();
535 let res = self.rpc_client.request("chain_getBlock".to_string(), params).await.unwrap();
536
537 let signed_block: sp_runtime::generic::SignedBlock<
538 sp_runtime::generic::Block<
539 sp_runtime::generic::Header<u32, sp_runtime::traits::BlakeTwo256>,
540 sp_runtime::OpaqueExtrinsic,
541 >,
542 > = serde_json::from_str(res.get()).unwrap();
543 Ok(signed_block.block)
544 }
545
546 pub async fn trace_block_by_number(
548 &self,
549 at: BlockNumberOrTag,
550 config: TracerType,
551 ) -> Result<Vec<TransactionTrace>, ClientError> {
552 if self.receipt_provider.is_before_earliest_block(&at) {
553 return Ok(vec![]);
554 }
555
556 let block_hash = self.block_hash_for_tag(at.into()).await?;
557 let block = self.tracing_block(block_hash).await?;
558 let parent_hash = block.header().parent_hash;
559 let runtime_api = RuntimeApi::new(self.api.runtime_api().at(parent_hash));
560 let traces = runtime_api.trace_block(block, config.clone()).await?;
561
562 let mut hashes = self
563 .receipt_provider
564 .block_transaction_hashes(&block_hash)
565 .await
566 .ok_or(ClientError::EthExtrinsicNotFound)?;
567
568 let traces = traces.into_iter().filter_map(|(index, trace)| {
569 Some(TransactionTrace { tx_hash: hashes.remove(&(index as usize))?, trace })
570 });
571
572 Ok(traces.collect())
573 }
574
575 pub async fn trace_transaction(
577 &self,
578 transaction_hash: H256,
579 config: TracerType,
580 ) -> Result<Trace, ClientError> {
581 let ReceiptInfo { block_hash, transaction_index, .. } = self
582 .receipt_provider
583 .receipt_by_hash(&transaction_hash)
584 .await
585 .ok_or(ClientError::EthExtrinsicNotFound)?;
586
587 let block = self.tracing_block(block_hash).await?;
588 let parent_hash = block.header.parent_hash;
589 let runtime_api = self.runtime_api(parent_hash);
590
591 runtime_api.trace_tx(block, transaction_index.as_u32(), config.clone()).await
592 }
593
594 pub async fn trace_call(
596 &self,
597 transaction: GenericTransaction,
598 block: BlockNumberOrTagOrHash,
599 config: TracerType,
600 ) -> Result<Trace, ClientError> {
601 let block_hash = self.block_hash_for_tag(block).await?;
602 let runtime_api = self.runtime_api(block_hash);
603 runtime_api.trace_call(transaction, config.clone()).await
604 }
605
606 pub async fn evm_block(
608 &self,
609 block: Arc<SubstrateBlock>,
610 hydrated_transactions: bool,
611 ) -> Block {
612 let (signed_txs, receipts): (Vec<_>, Vec<_>) = self
613 .receipt_provider
614 .receipts_from_block(&block)
615 .await
616 .unwrap_or_default()
617 .into_iter()
618 .unzip();
619 return self
620 .evm_block_from_receipts(&block, &receipts, signed_txs, hydrated_transactions)
621 .await
622 }
623
624 pub async fn evm_block_from_receipts(
626 &self,
627 block: &SubstrateBlock,
628 receipts: &[ReceiptInfo],
629 signed_txs: Vec<TransactionSigned>,
630 hydrated_transactions: bool,
631 ) -> Block {
632 let runtime_api = RuntimeApi::new(self.api.runtime_api().at(block.hash()));
633 let gas_limit = runtime_api.block_gas_limit().await.unwrap_or_default();
634
635 let header = block.header();
636 let timestamp = extract_block_timestamp(block).await.unwrap_or_default();
637 let block_author = runtime_api.block_author().await.ok().flatten().unwrap_or_default();
638
639 let parent_hash = header.parent_hash.0.into();
641 let state_root = header.state_root.0.into();
642 let extrinsics_root = header.extrinsics_root.0.into();
643
644 let gas_used = receipts.iter().fold(U256::zero(), |acc, receipt| acc + receipt.gas_used);
645 let transactions = if hydrated_transactions {
646 signed_txs
647 .into_iter()
648 .zip(receipts.iter())
649 .map(|(signed_tx, receipt)| TransactionInfo::new(receipt, signed_tx))
650 .collect::<Vec<TransactionInfo>>()
651 .into()
652 } else {
653 receipts
654 .iter()
655 .map(|receipt| receipt.transaction_hash)
656 .collect::<Vec<_>>()
657 .into()
658 };
659
660 Block {
661 hash: block.hash(),
662 parent_hash,
663 state_root,
664 miner: block_author,
665 transactions_root: extrinsics_root,
666 number: header.number.into(),
667 timestamp: timestamp.into(),
668 base_fee_per_gas: runtime_api.gas_price().await.ok().unwrap_or_default(),
669 gas_limit,
670 gas_used,
671 receipts_root: extrinsics_root,
672 transactions,
673 ..Default::default()
674 }
675 }
676
677 pub fn chain_id(&self) -> u64 {
679 self.chain_id
680 }
681
682 pub fn max_block_weight(&self) -> Weight {
684 self.max_block_weight
685 }
686
687 pub async fn logs(&self, filter: Option<Filter>) -> Result<Vec<Log>, ClientError> {
689 let logs =
690 self.receipt_provider.logs(filter).await.map_err(ClientError::LogFilterFailed)?;
691 Ok(logs)
692 }
693
694 pub async fn fee_history(
695 &self,
696 block_count: u32,
697 latest_block: BlockNumberOrTag,
698 reward_percentiles: Option<Vec<f64>>,
699 ) -> Result<FeeHistoryResult, ClientError> {
700 let Some(latest_block) = self.block_by_number_or_tag(&latest_block).await? else {
701 return Err(ClientError::BlockNotFound);
702 };
703
704 self.fee_history_provider
705 .fee_history(block_count, latest_block.number(), reward_percentiles)
706 .await
707 }
708}