1mod runtime_api;
21mod storage_api;
22
23use runtime_api::RuntimeApi;
24use storage_api::StorageApi;
25
26use crate::{
27 subxt_client::{self, revive::calls::types::EthTransact, SrcChainConfig},
28 BlockInfoProvider, BlockTag, FeeHistoryProvider, ReceiptProvider, SubxtBlockInfoProvider,
29 TracerType, TransactionInfo, LOG_TARGET,
30};
31use jsonrpsee::types::{error::CALL_EXECUTION_FAILED_CODE, ErrorObjectOwned};
32use pallet_revive::{
33 evm::{
34 decode_revert_reason, Block, BlockNumberOrTag, BlockNumberOrTagOrHash, FeeHistoryResult,
35 Filter, GenericTransaction, Log, ReceiptInfo, SyncingProgress, SyncingStatus, Trace,
36 TransactionSigned, TransactionTrace, H256, U256,
37 },
38 EthTransactError,
39};
40use sp_runtime::traits::Block as BlockT;
41use sp_weights::Weight;
42use std::{ops::Range, sync::Arc, time::Duration};
43use subxt::{
44 backend::{
45 legacy::{rpc_methods::SystemHealth, LegacyRpcMethods},
46 rpc::{
47 reconnecting_rpc_client::{ExponentialBackoff, RpcClient as ReconnectingRpcClient},
48 RpcClient,
49 },
50 },
51 config::{HashFor, Header},
52 ext::subxt_rpcs::rpc_params,
53 Config, OnlineClient,
54};
55use thiserror::Error;
56
57pub type SubstrateBlock = subxt::blocks::Block<SrcChainConfig, OnlineClient<SrcChainConfig>>;
59
60pub type SubstrateBlockHeader = <SrcChainConfig as Config>::Header;
62
63pub type SubstrateBlockNumber = <SubstrateBlockHeader as Header>::Number;
65
66pub type SubstrateBlockHash = HashFor<SrcChainConfig>;
68
69pub type Balance = u128;
71
72#[derive(Debug, Clone, Copy)]
74pub enum SubscriptionType {
75 BestBlocks,
77 FinalizedBlocks,
79}
80
81#[derive(Error, Debug)]
83pub enum ClientError {
84 #[error(transparent)]
86 Jsonrpsee(#[from] jsonrpsee::core::ClientError),
87 #[error(transparent)]
89 SubxtError(#[from] subxt::Error),
90 #[error(transparent)]
91 RpcError(#[from] subxt::ext::subxt_rpcs::Error),
92 #[error(transparent)]
94 SqlxError(#[from] sqlx::Error),
95 #[error(transparent)]
97 CodecError(#[from] codec::Error),
98 #[error("contract reverted")]
100 TransactError(EthTransactError),
101 #[error("conversion failed")]
103 ConversionFailed,
104 #[error("hash not found")]
106 BlockNotFound,
107 #[error("Contract not found")]
109 ContractNotFound,
110 #[error("No Ethereum extrinsic found")]
111 EthExtrinsicNotFound,
112 #[error("transactionFeePaid event not found")]
114 TxFeeNotFound,
115 #[error("Failed to decode a raw payload into a signed transaction")]
117 TxDecodingFailed,
118 #[error("failed to recover eth address")]
120 RecoverEthAddressFailed,
121 #[error("Failed to filter logs")]
123 LogFilterFailed(#[from] anyhow::Error),
124}
125
126const REVERT_CODE: i32 = 3;
127impl From<ClientError> for ErrorObjectOwned {
128 fn from(err: ClientError) -> Self {
129 match err {
130 ClientError::SubxtError(subxt::Error::Rpc(subxt::error::RpcError::ClientError(
131 subxt::ext::subxt_rpcs::Error::User(err),
132 ))) |
133 ClientError::RpcError(subxt::ext::subxt_rpcs::Error::User(err)) =>
134 ErrorObjectOwned::owned::<Vec<u8>>(err.code, err.message, None),
135 ClientError::TransactError(EthTransactError::Data(data)) => {
136 let msg = match decode_revert_reason(&data) {
137 Some(reason) => format!("execution reverted: {reason}"),
138 None => "execution reverted".to_string(),
139 };
140
141 let data = format!("0x{}", hex::encode(data));
142 ErrorObjectOwned::owned::<String>(REVERT_CODE, msg, Some(data))
143 },
144 ClientError::TransactError(EthTransactError::Message(msg)) =>
145 ErrorObjectOwned::owned::<String>(CALL_EXECUTION_FAILED_CODE, msg, None),
146 _ =>
147 ErrorObjectOwned::owned::<String>(CALL_EXECUTION_FAILED_CODE, err.to_string(), None),
148 }
149 }
150}
151
152#[derive(Clone)]
154pub struct Client {
155 api: OnlineClient<SrcChainConfig>,
156 rpc_client: RpcClient,
157 rpc: LegacyRpcMethods<SrcChainConfig>,
158 receipt_provider: ReceiptProvider,
159 block_provider: SubxtBlockInfoProvider,
160 fee_history_provider: FeeHistoryProvider,
161 chain_id: u64,
162 max_block_weight: Weight,
163}
164
165async fn chain_id(api: &OnlineClient<SrcChainConfig>) -> Result<u64, ClientError> {
167 let query = subxt_client::constants().revive().chain_id();
168 api.constants().at(&query).map_err(|err| err.into())
169}
170
171async fn max_block_weight(api: &OnlineClient<SrcChainConfig>) -> Result<Weight, ClientError> {
173 let query = subxt_client::constants().system().block_weights();
174 let weights = api.constants().at(&query)?;
175 let max_block = weights.per_class.normal.max_extrinsic.unwrap_or(weights.max_block);
176 Ok(max_block.0)
177}
178
179async fn extract_block_timestamp(block: &SubstrateBlock) -> Option<u64> {
181 let extrinsics = block.extrinsics().await.ok()?;
182 let ext = extrinsics
183 .find_first::<crate::subxt_client::timestamp::calls::types::Set>()
184 .ok()??;
185
186 Some(ext.value.now / 1000)
187}
188
189pub async fn connect(
192 node_rpc_url: &str,
193) -> Result<(OnlineClient<SrcChainConfig>, RpcClient, LegacyRpcMethods<SrcChainConfig>), ClientError>
194{
195 log::info!(target: LOG_TARGET, "๐ Connecting to node at: {node_rpc_url} ...");
196 let rpc_client = ReconnectingRpcClient::builder()
197 .retry_policy(ExponentialBackoff::from_millis(100).max_delay(Duration::from_secs(10)))
198 .build(node_rpc_url.to_string())
199 .await?;
200 let rpc_client = RpcClient::new(rpc_client);
201 log::info!(target: LOG_TARGET, "๐ Connected to node at: {node_rpc_url}");
202
203 let api = OnlineClient::<SrcChainConfig>::from_rpc_client(rpc_client.clone()).await?;
204 let rpc = LegacyRpcMethods::<SrcChainConfig>::new(rpc_client.clone());
205 Ok((api, rpc_client, rpc))
206}
207
208impl Client {
209 pub async fn new(
211 api: OnlineClient<SrcChainConfig>,
212 rpc_client: RpcClient,
213 rpc: LegacyRpcMethods<SrcChainConfig>,
214 block_provider: SubxtBlockInfoProvider,
215 receipt_provider: ReceiptProvider,
216 ) -> Result<Self, ClientError> {
217 let (chain_id, max_block_weight) =
218 tokio::try_join!(chain_id(&api), max_block_weight(&api))?;
219
220 Ok(Self {
221 api,
222 rpc_client,
223 rpc,
224 receipt_provider,
225 block_provider,
226 fee_history_provider: FeeHistoryProvider::default(),
227 chain_id,
228 max_block_weight,
229 })
230 }
231
232 async fn subscribe_past_blocks<F, Fut>(
234 &self,
235 range: Range<SubstrateBlockNumber>,
236 callback: F,
237 ) -> Result<(), ClientError>
238 where
239 F: Fn(Arc<SubstrateBlock>) -> Fut + Send + Sync,
240 Fut: std::future::Future<Output = Result<(), ClientError>> + Send,
241 {
242 let mut block = self
243 .block_provider
244 .block_by_number(range.end)
245 .await?
246 .ok_or(ClientError::BlockNotFound)?;
247
248 loop {
249 let block_number = block.number();
250 log::trace!(target: "eth-rpc::subscription", "Processing past block #{block_number}");
251
252 let parent_hash = block.header().parent_hash;
253 callback(block.clone()).await.inspect_err(|err| {
254 log::error!(target: "eth-rpc::subscription", "Failed to process past block #{block_number}: {err:?}");
255 })?;
256
257 if range.start < block_number {
258 block = self
259 .block_provider
260 .block_by_hash(&parent_hash)
261 .await?
262 .ok_or(ClientError::BlockNotFound)?;
263 } else {
264 return Ok(());
265 }
266 }
267 }
268
269 async fn subscribe_new_blocks<F, Fut>(
271 &self,
272 subscription_type: SubscriptionType,
273 callback: F,
274 ) -> Result<(), ClientError>
275 where
276 F: Fn(SubstrateBlock) -> Fut + Send + Sync,
277 Fut: std::future::Future<Output = Result<(), ClientError>> + Send,
278 {
279 let mut block_stream = match subscription_type {
280 SubscriptionType::BestBlocks => self.api.blocks().subscribe_best().await,
281 SubscriptionType::FinalizedBlocks => self.api.blocks().subscribe_finalized().await,
282 }
283 .inspect_err(|err| {
284 log::error!(target: LOG_TARGET, "Failed to subscribe to blocks: {err:?}");
285 })?;
286
287 while let Some(block) = block_stream.next().await {
288 let block = match block {
289 Ok(block) => block,
290 Err(err) => {
291 if err.is_disconnected_will_reconnect() {
292 log::warn!(
293 target: LOG_TARGET,
294 "The RPC connection was lost and we may have missed a few blocks ({subscription_type:?}): {err:?}"
295 );
296 continue;
297 }
298
299 log::error!(target: LOG_TARGET, "Failed to fetch block ({subscription_type:?}): {err:?}");
300 return Err(err.into());
301 },
302 };
303
304 let block_number = block.number();
305 log::trace!(target: "eth-rpc::subscription", "โณ Processing {subscription_type:?} block: {block_number}");
306 if let Err(err) = callback(block).await {
307 log::error!(target: LOG_TARGET, "Failed to process block {block_number}: {err:?}");
308 } else {
309 log::trace!(target: "eth-rpc::subscription", "โ
Processed {subscription_type:?} block: {block_number}");
310 }
311 }
312
313 log::info!(target: LOG_TARGET, "Block subscription ended");
314 Ok(())
315 }
316
317 pub async fn subscribe_and_cache_new_blocks(
319 &self,
320 subscription_type: SubscriptionType,
321 ) -> Result<(), ClientError> {
322 log::info!(target: LOG_TARGET, "๐ Subscribing to new blocks ({subscription_type:?})");
323 self.subscribe_new_blocks(subscription_type, |block| async {
324 let (signed_txs, receipts): (Vec<_>, Vec<_>) =
325 self.receipt_provider.insert_block_receipts(&block).await?.into_iter().unzip();
326
327 let evm_block =
328 self.evm_block_from_receipts(&block, &receipts, signed_txs, false).await;
329 self.block_provider.update_latest(block, subscription_type).await;
330
331 self.fee_history_provider.update_fee_history(&evm_block, &receipts).await;
332 Ok(())
333 })
334 .await
335 }
336
337 pub async fn subscribe_and_cache_blocks(
339 &self,
340 index_last_n_blocks: SubstrateBlockNumber,
341 ) -> Result<(), ClientError> {
342 let last = self.latest_block().await.number().saturating_sub(1);
343 let range = last.saturating_sub(index_last_n_blocks)..last;
344 log::info!(target: LOG_TARGET, "๐๏ธ Indexing past blocks in range {range:?}");
345 self.subscribe_past_blocks(range, |block| async move {
346 self.receipt_provider.insert_block_receipts(&block).await?;
347 Ok(())
348 })
349 .await?;
350
351 log::info!(target: LOG_TARGET, "๐๏ธ Finished indexing past blocks");
352 Ok(())
353 }
354
355 pub async fn block_hash_for_tag(
357 &self,
358 at: BlockNumberOrTagOrHash,
359 ) -> Result<SubstrateBlockHash, ClientError> {
360 match at {
361 BlockNumberOrTagOrHash::BlockHash(hash) => Ok(hash),
362 BlockNumberOrTagOrHash::BlockNumber(block_number) => {
363 let n: SubstrateBlockNumber =
364 (block_number).try_into().map_err(|_| ClientError::ConversionFailed)?;
365 let hash = self.get_block_hash(n).await?.ok_or(ClientError::BlockNotFound)?;
366 Ok(hash)
367 },
368 BlockNumberOrTagOrHash::BlockTag(BlockTag::Finalized | BlockTag::Safe) => {
369 let block = self.latest_finalized_block().await;
370 Ok(block.hash())
371 },
372 BlockNumberOrTagOrHash::BlockTag(_) => {
373 let block = self.latest_block().await;
374 Ok(block.hash())
375 },
376 }
377 }
378
379 pub fn storage_api(&self, block_hash: H256) -> StorageApi {
381 StorageApi::new(self.api.storage().at(block_hash))
382 }
383
384 pub fn runtime_api(&self, block_hash: H256) -> RuntimeApi {
386 RuntimeApi::new(self.api.runtime_api().at(block_hash))
387 }
388
389 pub async fn latest_finalized_block(&self) -> Arc<SubstrateBlock> {
391 self.block_provider.latest_finalized_block().await
392 }
393
394 pub async fn latest_block(&self) -> Arc<SubstrateBlock> {
396 self.block_provider.latest_block().await
397 }
398
399 pub async fn submit(
401 &self,
402 call: subxt::tx::DefaultPayload<EthTransact>,
403 ) -> Result<H256, ClientError> {
404 let ext = self.api.tx().create_unsigned(&call).map_err(ClientError::from)?;
405 let hash = ext.submit().await?;
406 Ok(hash)
407 }
408
409 pub async fn receipt(&self, tx_hash: &H256) -> Option<ReceiptInfo> {
411 self.receipt_provider.receipt_by_hash(tx_hash).await
412 }
413
414 pub async fn sync_state(
415 &self,
416 ) -> Result<sc_rpc::system::SyncState<SubstrateBlockNumber>, ClientError> {
417 let client = self.rpc_client.clone();
418 let sync_state: sc_rpc::system::SyncState<SubstrateBlockNumber> =
419 client.request("system_syncState", Default::default()).await?;
420 Ok(sync_state)
421 }
422
423 pub async fn syncing(&self) -> Result<SyncingStatus, ClientError> {
425 let health = self.rpc.system_health().await?;
426
427 let status = if health.is_syncing {
428 let sync_state = self.sync_state().await?;
429 SyncingProgress {
430 current_block: Some(sync_state.current_block.into()),
431 highest_block: Some(sync_state.highest_block.into()),
432 starting_block: Some(sync_state.starting_block.into()),
433 }
434 .into()
435 } else {
436 SyncingStatus::Bool(false)
437 };
438
439 Ok(status)
440 }
441
442 pub async fn receipt_by_hash_and_index(
444 &self,
445 block_hash: &H256,
446 transaction_index: usize,
447 ) -> Option<ReceiptInfo> {
448 self.receipt_provider
449 .receipt_by_block_hash_and_index(block_hash, transaction_index)
450 .await
451 }
452
453 pub async fn signed_tx_by_hash(&self, tx_hash: &H256) -> Option<TransactionSigned> {
454 self.receipt_provider.signed_tx_by_hash(tx_hash).await
455 }
456
457 pub async fn receipts_count_per_block(&self, block_hash: &SubstrateBlockHash) -> Option<usize> {
459 self.receipt_provider.receipts_count_per_block(block_hash).await
460 }
461
462 pub async fn system_health(&self) -> Result<SystemHealth, ClientError> {
464 let health = self.rpc.system_health().await?;
465 Ok(health)
466 }
467
468 pub async fn block_number(&self) -> Result<SubstrateBlockNumber, ClientError> {
470 let latest_block = self.block_provider.latest_block().await;
471 Ok(latest_block.number())
472 }
473
474 pub async fn get_block_hash(
476 &self,
477 block_number: SubstrateBlockNumber,
478 ) -> Result<Option<SubstrateBlockHash>, ClientError> {
479 let maybe_block = self.block_provider.block_by_number(block_number).await?;
480 Ok(maybe_block.map(|block| block.hash()))
481 }
482
483 pub async fn block_by_number_or_tag(
485 &self,
486 block: &BlockNumberOrTag,
487 ) -> Result<Option<Arc<SubstrateBlock>>, ClientError> {
488 match block {
489 BlockNumberOrTag::U256(n) => {
490 let n = (*n).try_into().map_err(|_| ClientError::ConversionFailed)?;
491 self.block_by_number(n).await
492 },
493 BlockNumberOrTag::BlockTag(BlockTag::Finalized | BlockTag::Safe) => {
494 let block = self.block_provider.latest_finalized_block().await;
495 Ok(Some(block))
496 },
497 BlockNumberOrTag::BlockTag(_) => {
498 let block = self.block_provider.latest_block().await;
499 Ok(Some(block))
500 },
501 }
502 }
503
504 pub async fn block_by_hash(
506 &self,
507 hash: &SubstrateBlockHash,
508 ) -> Result<Option<Arc<SubstrateBlock>>, ClientError> {
509 self.block_provider.block_by_hash(hash).await
510 }
511
512 pub async fn block_by_number(
514 &self,
515 block_number: SubstrateBlockNumber,
516 ) -> Result<Option<Arc<SubstrateBlock>>, ClientError> {
517 self.block_provider.block_by_number(block_number).await
518 }
519
520 async fn tracing_block(
521 &self,
522 block_hash: H256,
523 ) -> Result<
524 sp_runtime::generic::Block<
525 sp_runtime::generic::Header<u32, sp_runtime::traits::BlakeTwo256>,
526 sp_runtime::OpaqueExtrinsic,
527 >,
528 ClientError,
529 > {
530 let signed_block: sp_runtime::generic::SignedBlock<
531 sp_runtime::generic::Block<
532 sp_runtime::generic::Header<u32, sp_runtime::traits::BlakeTwo256>,
533 sp_runtime::OpaqueExtrinsic,
534 >,
535 > = self
536 .rpc_client
537 .request("chain_getBlock", rpc_params![block_hash])
538 .await
539 .unwrap();
540
541 Ok(signed_block.block)
542 }
543
544 pub async fn trace_block_by_number(
546 &self,
547 at: BlockNumberOrTag,
548 config: TracerType,
549 ) -> Result<Vec<TransactionTrace>, ClientError> {
550 if self.receipt_provider.is_before_earliest_block(&at) {
551 return Ok(vec![]);
552 }
553
554 let block_hash = self.block_hash_for_tag(at.into()).await?;
555 let block = self.tracing_block(block_hash).await?;
556 let parent_hash = block.header().parent_hash;
557 let runtime_api = RuntimeApi::new(self.api.runtime_api().at(parent_hash));
558 let traces = runtime_api.trace_block(block, config.clone()).await?;
559
560 let mut hashes = self
561 .receipt_provider
562 .block_transaction_hashes(&block_hash)
563 .await
564 .ok_or(ClientError::EthExtrinsicNotFound)?;
565
566 let traces = traces.into_iter().filter_map(|(index, trace)| {
567 Some(TransactionTrace { tx_hash: hashes.remove(&(index as usize))?, trace })
568 });
569
570 Ok(traces.collect())
571 }
572
573 pub async fn trace_transaction(
575 &self,
576 transaction_hash: H256,
577 config: TracerType,
578 ) -> Result<Trace, ClientError> {
579 let ReceiptInfo { block_hash, transaction_index, .. } = self
580 .receipt_provider
581 .receipt_by_hash(&transaction_hash)
582 .await
583 .ok_or(ClientError::EthExtrinsicNotFound)?;
584
585 let block = self.tracing_block(block_hash).await?;
586 let parent_hash = block.header.parent_hash;
587 let runtime_api = self.runtime_api(parent_hash);
588
589 runtime_api.trace_tx(block, transaction_index.as_u32(), config.clone()).await
590 }
591
592 pub async fn trace_call(
594 &self,
595 transaction: GenericTransaction,
596 block: BlockNumberOrTagOrHash,
597 config: TracerType,
598 ) -> Result<Trace, ClientError> {
599 let block_hash = self.block_hash_for_tag(block).await?;
600 let runtime_api = self.runtime_api(block_hash);
601 runtime_api.trace_call(transaction, config.clone()).await
602 }
603
604 pub async fn evm_block(
606 &self,
607 block: Arc<SubstrateBlock>,
608 hydrated_transactions: bool,
609 ) -> Block {
610 let (signed_txs, receipts): (Vec<_>, Vec<_>) = self
611 .receipt_provider
612 .receipts_from_block(&block)
613 .await
614 .unwrap_or_default()
615 .into_iter()
616 .unzip();
617 return self
618 .evm_block_from_receipts(&block, &receipts, signed_txs, hydrated_transactions)
619 .await
620 }
621
622 pub async fn evm_block_from_receipts(
624 &self,
625 block: &SubstrateBlock,
626 receipts: &[ReceiptInfo],
627 signed_txs: Vec<TransactionSigned>,
628 hydrated_transactions: bool,
629 ) -> Block {
630 let runtime_api = RuntimeApi::new(self.api.runtime_api().at(block.hash()));
631 let gas_limit = runtime_api.block_gas_limit().await.unwrap_or_default();
632
633 let header = block.header();
634 let timestamp = extract_block_timestamp(block).await.unwrap_or_default();
635 let block_author = runtime_api.block_author().await.ok().flatten().unwrap_or_default();
636
637 let parent_hash = header.parent_hash.0.into();
639 let state_root = header.state_root.0.into();
640 let extrinsics_root = header.extrinsics_root.0.into();
641
642 let gas_used = receipts.iter().fold(U256::zero(), |acc, receipt| acc + receipt.gas_used);
643 let transactions = if hydrated_transactions {
644 signed_txs
645 .into_iter()
646 .zip(receipts.iter())
647 .map(|(signed_tx, receipt)| TransactionInfo::new(receipt, signed_tx))
648 .collect::<Vec<TransactionInfo>>()
649 .into()
650 } else {
651 receipts
652 .iter()
653 .map(|receipt| receipt.transaction_hash)
654 .collect::<Vec<_>>()
655 .into()
656 };
657
658 Block {
659 hash: block.hash(),
660 parent_hash,
661 state_root,
662 miner: block_author,
663 transactions_root: extrinsics_root,
664 number: header.number.into(),
665 timestamp: timestamp.into(),
666 base_fee_per_gas: runtime_api.gas_price().await.ok().unwrap_or_default(),
667 gas_limit,
668 gas_used,
669 receipts_root: extrinsics_root,
670 transactions,
671 ..Default::default()
672 }
673 }
674
675 pub fn chain_id(&self) -> u64 {
677 self.chain_id
678 }
679
680 pub fn max_block_weight(&self) -> Weight {
682 self.max_block_weight
683 }
684
685 pub async fn logs(&self, filter: Option<Filter>) -> Result<Vec<Log>, ClientError> {
687 let logs =
688 self.receipt_provider.logs(filter).await.map_err(ClientError::LogFilterFailed)?;
689 Ok(logs)
690 }
691
692 pub async fn fee_history(
693 &self,
694 block_count: u32,
695 latest_block: BlockNumberOrTag,
696 reward_percentiles: Option<Vec<f64>>,
697 ) -> Result<FeeHistoryResult, ClientError> {
698 let Some(latest_block) = self.block_by_number_or_tag(&latest_block).await? else {
699 return Err(ClientError::BlockNotFound);
700 };
701
702 self.fee_history_provider
703 .fee_history(block_count, latest_block.number(), reward_percentiles)
704 .await
705 }
706}