1#![cfg_attr(docsrs, feature(doc_cfg))]
19
20use client::ClientError;
21use futures::{Stream, StreamExt, TryStreamExt};
22use jsonrpsee::{
23 PendingSubscriptionSink, SubscriptionMessage, SubscriptionSink,
24 core::{RpcResult, async_trait},
25 types::{ErrorCode, ErrorObjectOwned},
26};
27use pallet_revive::evm::*;
28use pallet_revive_types::runtime_api::TraceV1;
29use sp_core::{H160, H256, U256};
30use sp_crypto_hashing::keccak_256;
31use subxt::backend::legacy::rpc_methods::TransactionStatus;
32use subxt_signer::bip39::core::pin::Pin;
33use thiserror::Error;
34use tokio_stream::wrappers::{BroadcastStream, errors::BroadcastStreamRecvError};
35
36mod block_sync;
37pub(crate) use block_sync::{ChainMetadata, SyncLabel, SyncStateKey};
38pub mod cli;
39pub mod client;
40pub mod example;
41pub mod subxt_client;
42
43#[cfg(test)]
44mod tests;
45
46mod block_info_provider;
47pub use block_info_provider::*;
48
49mod receipt_provider;
50pub use receipt_provider::*;
51
52mod fee_history_provider;
53pub use fee_history_provider::*;
54
55mod receipt_extractor;
56pub use receipt_extractor::*;
57
58mod apis;
59pub use apis::*;
60
61pub const LOG_TARGET: &str = "eth-rpc";
62
63pub struct EthRpcServerImpl {
65 client: client::Client,
67
68 accounts: Vec<Account>,
70
71 allow_unprotected_txs: bool,
73
74 use_pending_for_estimate_gas: bool,
76}
77
78impl EthRpcServerImpl {
79 pub fn new(client: client::Client) -> Self {
81 Self {
82 client,
83 accounts: vec![],
84 allow_unprotected_txs: false,
85 use_pending_for_estimate_gas: false,
86 }
87 }
88
89 pub fn with_accounts(mut self, accounts: Vec<Account>) -> Self {
91 self.accounts = accounts;
92 self
93 }
94
95 pub fn with_allow_unprotected_txs(mut self, allow_unprotected_txs: bool) -> Self {
97 self.allow_unprotected_txs = allow_unprotected_txs;
98 self
99 }
100
101 pub fn with_use_pending_for_estimate_gas(mut self, use_pending_for_estimate_gas: bool) -> Self {
103 self.use_pending_for_estimate_gas = use_pending_for_estimate_gas;
104 self
105 }
106}
107
108#[derive(Error, Debug)]
110pub enum EthRpcError {
111 #[error("Client error: {0}")]
113 ClientError(#[from] ClientError),
114 #[error("Decoding error: {0}")]
116 RlpError(#[from] rlp::DecoderError),
117 #[error("Conversion error")]
119 ConversionError,
120 #[error("Invalid signature")]
122 InvalidSignature,
123 #[error("Account not found for address {0:?}")]
125 AccountNotFound(H160),
126 #[error("Invalid transaction")]
128 InvalidTransaction,
129 #[error("Invalid transaction {0:?}")]
131 TransactionTypeNotSupported(Byte),
132}
133
134impl From<EthRpcError> for ErrorObjectOwned {
136 fn from(value: EthRpcError) -> Self {
137 match value {
138 EthRpcError::ClientError(err) => Self::from(err),
139 _ => Self::owned::<String>(ErrorCode::InvalidRequest.code(), value.to_string(), None),
140 }
141 }
142}
143
144#[async_trait]
145impl EthRpcServer for EthRpcServerImpl {
146 async fn net_version(&self) -> RpcResult<String> {
147 Ok(self.client.chain_id().to_string())
148 }
149
150 async fn net_listening(&self) -> RpcResult<bool> {
151 let syncing = self.client.syncing().await?;
152 let listening = matches!(syncing, SyncingStatus::Bool(false));
153 Ok(listening)
154 }
155
156 async fn syncing(&self) -> RpcResult<SyncingStatus> {
157 Ok(self.client.syncing().await?)
158 }
159
160 async fn block_number(&self) -> RpcResult<U256> {
161 let number = self.client.block_number().await?;
162 Ok(number.into())
163 }
164
165 async fn get_transaction_receipt(
166 &self,
167 transaction_hash: H256,
168 ) -> RpcResult<Option<ReceiptInfo>> {
169 let receipt = self.client.receipt(&transaction_hash).await;
170 Ok(receipt)
171 }
172
173 async fn estimate_gas(
178 &self,
179 transaction: GenericTransaction,
180 block: Option<BlockNumberOrTag>,
181 ) -> RpcResult<U256> {
182 log::trace!(target: LOG_TARGET, "estimate_gas transaction={transaction:?} block={block:?}");
183
184 let block = block.unwrap_or_else(|| {
185 if self.use_pending_for_estimate_gas {
186 BlockTag::Pending.into()
187 } else {
188 Default::default()
189 }
190 });
191 let hash = self.client.block_hash_for_tag(block.into()).await?;
192 let gas_estimate =
193 self.client.runtime_api(hash).estimate_gas(transaction, block.into()).await?;
194
195 log::trace!(
196 target: LOG_TARGET,
197 "estimate_gas result={gas_estimate:?}",
198 );
199 Ok(gas_estimate)
200 }
201
202 async fn call(
203 &self,
204 transaction: GenericTransaction,
205 block: Option<BlockNumberOrTagOrHash>,
206 state_overrides: Option<StateOverrideSet>,
207 ) -> RpcResult<Bytes> {
208 let block = block.unwrap_or_default();
209 let hash = self.client.block_hash_for_tag(block.clone()).await?;
210 let runtime_api = self.client.runtime_api(hash);
211 let dry_run = runtime_api.dry_run(transaction, block, state_overrides).await?;
212 Ok(dry_run.data.into())
213 }
214
215 async fn send_raw_transaction(&self, transaction: Bytes) -> RpcResult<H256> {
216 let hash = H256(keccak_256(&transaction.0));
217 log::trace!(target: LOG_TARGET, "send_raw_transaction transaction: {transaction:?} ethereum_hash: {hash:?}");
218
219 if !self.allow_unprotected_txs {
220 let signed_transaction = TransactionSigned::decode(transaction.0.as_slice())
221 .map_err(|err| {
222 log::trace!(target: LOG_TARGET, "Transaction decoding failed. ethereum_hash: {hash:?}, error: {err:?}");
223 EthRpcError::InvalidTransaction
224 })?;
225
226 let is_chain_id_provided = match signed_transaction {
227 TransactionSigned::Transaction7702Signed(tx) => {
228 tx.transaction_7702_unsigned.chain_id != U256::zero()
229 },
230 TransactionSigned::Transaction4844Signed(tx) => {
231 tx.transaction_4844_unsigned.chain_id != U256::zero()
232 },
233 TransactionSigned::Transaction1559Signed(tx) => {
234 tx.transaction_1559_unsigned.chain_id != U256::zero()
235 },
236 TransactionSigned::Transaction2930Signed(tx) => {
237 tx.transaction_2930_unsigned.chain_id != U256::zero()
238 },
239 TransactionSigned::TransactionLegacySigned(tx) => {
240 tx.transaction_legacy_unsigned.chain_id.is_some()
241 },
242 };
243
244 if !is_chain_id_provided {
245 log::trace!(target: LOG_TARGET, "Invalid Transaction: transaction doesn't include a chain-id. ethereum_hash: {hash:?}");
246 Err(EthRpcError::InvalidTransaction)?;
247 }
248 }
249
250 let call = subxt_client::tx().revive().eth_transact(transaction.0);
251
252 let receiver = self.client.block_notifier().map(|sender| sender.subscribe());
254
255 let tx_status = self.client.submit(call).await.map_err(|err| {
257 log::trace!(target: LOG_TARGET, "send_raw_transaction ethereum_hash: {hash:?} failed: {err:?}");
258 err
259 })?;
260
261 if matches!(tx_status, TransactionStatus::Future) {
262 return Ok(hash);
263 }
264
265 if let Some(mut receiver) = receiver {
267 loop {
268 if let Ok(block_hash) = receiver.recv().await {
269 let Ok(Some(block)) = self.client.block_by_hash(&block_hash).await else {
270 log::debug!(target: LOG_TARGET, "Could not find the block with the received hash: {hash:?}.");
271 continue;
272 };
273 let Some(evm_block) = self.client.evm_block(block, false).await else {
274 log::debug!(target: LOG_TARGET, "Failed to get the EVM block for substrate block with hash: {hash:?}");
275 continue;
276 };
277 if evm_block.transactions.contains_tx(hash) {
278 log::debug!(target: LOG_TARGET, "{hash:} was included in a block");
279 break;
280 }
281 }
282 }
283 }
284
285 log::debug!(target: LOG_TARGET, "send_raw_transaction hash: {hash:?}");
286 Ok(hash)
287 }
288
289 async fn send_transaction(&self, mut transaction: GenericTransaction) -> RpcResult<H256> {
290 log::debug!(target: LOG_TARGET, "{transaction:#?}");
291
292 let Some(from) = transaction.from else {
293 log::debug!(target: LOG_TARGET, "Transaction must have a sender");
294 return Err(EthRpcError::InvalidTransaction.into());
295 };
296
297 let account = self
298 .accounts
299 .iter()
300 .find(|account| account.address() == from)
301 .ok_or(EthRpcError::AccountNotFound(from))?;
302
303 if transaction.gas.is_none() {
304 transaction.gas = Some(self.estimate_gas(transaction.clone(), None).await?);
305 }
306
307 if transaction.gas_price.is_none() {
308 transaction.gas_price = Some(self.gas_price().await?);
309 }
310
311 if transaction.nonce.is_none() {
312 transaction.nonce =
313 Some(self.get_transaction_count(from, BlockTag::Latest.into()).await?);
314 }
315
316 if transaction.chain_id.is_none() {
317 transaction.chain_id = Some(self.chain_id().await?);
318 }
319
320 let tx = transaction.try_into_unsigned().map_err(|_| EthRpcError::InvalidTransaction)?;
321 let payload = account.sign_transaction(tx).signed_payload();
322 self.send_raw_transaction(Bytes(payload)).await
323 }
324
325 async fn get_block_by_hash(
326 &self,
327 block_hash: H256,
328 hydrated_transactions: bool,
329 ) -> RpcResult<Option<Block>> {
330 let Some(block) = self.client.block_by_ethereum_hash(&block_hash).await? else {
331 return Ok(None);
332 };
333 let block = self.client.evm_block(block, hydrated_transactions).await;
334 Ok(block)
335 }
336
337 async fn get_balance(&self, address: H160, block: BlockNumberOrTagOrHash) -> RpcResult<U256> {
338 let hash = self.client.block_hash_for_tag(block).await?;
339 let runtime_api = self.client.runtime_api(hash);
340 let balance = runtime_api.balance(address).await?;
341 Ok(balance)
342 }
343
344 async fn chain_id(&self) -> RpcResult<U256> {
345 Ok(self.client.chain_id().into())
346 }
347
348 async fn gas_price(&self) -> RpcResult<U256> {
349 let hash = self.client.block_hash_for_tag(BlockTag::Latest.into()).await?;
350 let runtime_api = self.client.runtime_api(hash);
351 Ok(runtime_api.gas_price().await?)
352 }
353
354 async fn max_priority_fee_per_gas(&self) -> RpcResult<U256> {
355 Ok(Default::default())
358 }
359
360 async fn get_code(&self, address: H160, block: BlockNumberOrTagOrHash) -> RpcResult<Bytes> {
361 let hash = self.client.block_hash_for_tag(block).await?;
362 let code = self.client.runtime_api(hash).code(address).await?;
363 Ok(code.into())
364 }
365
366 async fn accounts(&self) -> RpcResult<Vec<H160>> {
367 Ok(self.accounts.iter().map(|account| account.address()).collect())
368 }
369
370 async fn get_block_by_number(
371 &self,
372 block_number: BlockNumberOrTag,
373 hydrated_transactions: bool,
374 ) -> RpcResult<Option<Block>> {
375 let Some(block) = self.client.block_by_number_or_tag(&block_number).await? else {
376 return Ok(None);
377 };
378 let block = self.client.evm_block(block, hydrated_transactions).await;
379 Ok(block)
380 }
381
382 async fn get_block_transaction_count_by_hash(
383 &self,
384 block_hash: Option<H256>,
385 ) -> RpcResult<Option<U256>> {
386 let block_hash = if let Some(block_hash) = block_hash {
387 block_hash
388 } else {
389 self.client.latest_block().await.hash()
390 };
391
392 let Some(substrate_hash) = self.client.resolve_substrate_hash(&block_hash).await else {
393 return Ok(None);
394 };
395
396 Ok(self.client.receipts_count_per_block(&substrate_hash).await.map(U256::from))
397 }
398
399 async fn get_block_transaction_count_by_number(
400 &self,
401 block: Option<BlockNumberOrTag>,
402 ) -> RpcResult<Option<U256>> {
403 let substrate_hash = if let Some(block) = self
404 .client
405 .block_by_number_or_tag(&block.unwrap_or_else(|| BlockTag::Latest.into()))
406 .await?
407 {
408 block.hash()
409 } else {
410 return Ok(None);
411 };
412
413 Ok(self.client.receipts_count_per_block(&substrate_hash).await.map(U256::from))
414 }
415
416 async fn get_logs(&self, filter: Option<Filter>) -> RpcResult<FilterResults> {
417 let logs = self.client.logs(filter).await?;
418 Ok(FilterResults::Logs(logs))
419 }
420
421 async fn get_storage_at(
422 &self,
423 address: H160,
424 storage_slot: U256,
425 block: BlockNumberOrTagOrHash,
426 ) -> RpcResult<Bytes> {
427 let hash = self.client.block_hash_for_tag(block).await?;
428 let runtime_api = self.client.runtime_api(hash);
429 let bytes = match runtime_api.get_storage(address, storage_slot.to_big_endian()).await {
430 Ok(value) => value.unwrap_or([0u8; 32].into()),
431 Err(ClientError::ContractNotFound) => {
433 log::trace!(target: LOG_TARGET, "get_storage_at: ContractNotFound for {address:?}, returning zero");
434 [0u8; 32].into()
435 },
436 Err(err) => return Err(err.into()),
437 };
438 Ok(bytes.into())
439 }
440
441 async fn get_transaction_by_block_hash_and_index(
442 &self,
443 block_hash: H256,
444 transaction_index: U256,
445 ) -> RpcResult<Option<TransactionInfo>> {
446 let Some(substrate_block_hash) = self.client.resolve_substrate_hash(&block_hash).await
447 else {
448 return Ok(None);
449 };
450 self.get_transaction_by_substrate_block_hash_and_index(
451 substrate_block_hash,
452 transaction_index,
453 )
454 .await
455 }
456
457 async fn get_transaction_by_block_number_and_index(
458 &self,
459 block: BlockNumberOrTag,
460 transaction_index: U256,
461 ) -> RpcResult<Option<TransactionInfo>> {
462 let Some(block) = self.client.block_by_number_or_tag(&block).await? else {
463 return Ok(None);
464 };
465 self.get_transaction_by_substrate_block_hash_and_index(block.hash(), transaction_index)
466 .await
467 }
468
469 async fn get_transaction_by_hash(
470 &self,
471 transaction_hash: H256,
472 ) -> RpcResult<Option<TransactionInfo>> {
473 let receipt = self.client.receipt(&transaction_hash).await;
474 let signed_tx = self.client.signed_tx_by_hash(&transaction_hash).await;
475 if let (Some(receipt), Some(signed_tx)) = (receipt, signed_tx) {
476 return Ok(Some(TransactionInfo::new(&receipt, signed_tx)));
477 }
478
479 Ok(None)
480 }
481
482 async fn get_transaction_count(
483 &self,
484 address: H160,
485 block: BlockNumberOrTagOrHash,
486 ) -> RpcResult<U256> {
487 let hash = self.client.block_hash_for_tag(block).await?;
488 let runtime_api = self.client.runtime_api(hash);
489 let nonce = runtime_api.nonce(address).await?;
490 Ok(nonce)
491 }
492
493 async fn web3_client_version(&self) -> RpcResult<String> {
494 let git_revision = env!("GIT_REVISION");
495 let rustc_version = env!("RUSTC_VERSION");
496 let target = env!("TARGET");
497 Ok(format!("eth-rpc/{git_revision}/{target}/{rustc_version}"))
498 }
499
500 async fn fee_history(
501 &self,
502 block_count: U256,
503 newest_block: BlockNumberOrTag,
504 reward_percentiles: Option<Vec<f64>>,
505 ) -> RpcResult<FeeHistoryResult> {
506 let block_count: u32 = block_count.try_into().map_err(|_| EthRpcError::ConversionError)?;
507 let result = self.client.fee_history(block_count, newest_block, reward_percentiles).await?;
508 Ok(result)
509 }
510
511 async fn eth_subscribe(
512 &self,
513 pending: PendingSubscriptionSink,
514 kind: SubscriptionKind,
515 options: Option<SubscriptionOptions>,
516 ) {
517 let Some(subscription_parameters) = SubscriptionParameters::new(kind, options) else {
518 return pending
519 .reject(ErrorObjectOwned::owned(
520 jsonrpsee::types::error::INVALID_PARAMS_CODE,
521 "Invalid subscription parameters",
522 None::<()>,
523 ))
524 .await;
525 };
526 let Ok(sink) = pending.accept().await else {
527 return;
528 };
529
530 let stream: Pin<
531 Box<dyn Stream<Item = Result<SubscriptionItem, BroadcastStreamRecvError>> + Send>,
532 > = match subscription_parameters {
533 SubscriptionParameters::NewBlockHeaders => Box::pin(
534 BroadcastStream::new(self.client.get_block_subscription_rx())
535 .map_ok(|block| SubscriptionItem::BlockHeader(BlockHeader::from(block))),
536 ) as _,
537 SubscriptionParameters::Logs(filter) => Box::pin(
538 BroadcastStream::new(self.client.get_log_subscription_rx())
539 .try_filter(move |log| futures::future::ready(filter.matches(log)))
540 .map_ok(SubscriptionItem::Log),
541 ) as _,
542 };
543 let _ = tokio::spawn(Self::handle_subscription_forwarding(sink, stream));
544 }
545}
546
547impl EthRpcServerImpl {
548 async fn get_transaction_by_substrate_block_hash_and_index(
549 &self,
550 substrate_block_hash: H256,
551 transaction_index: U256,
552 ) -> RpcResult<Option<TransactionInfo>> {
553 let Some(receipt) = self
554 .client
555 .receipt_by_hash_and_index(
556 &substrate_block_hash,
557 transaction_index.try_into().map_err(|_| EthRpcError::ConversionError)?,
558 )
559 .await
560 else {
561 return Ok(None);
562 };
563 let Some(signed_tx) = self.client.signed_tx_by_hash(&receipt.transaction_hash).await else {
564 return Ok(None);
565 };
566
567 Ok(Some(TransactionInfo::new(&receipt, signed_tx)))
568 }
569
570 async fn handle_subscription_forwarding(
571 sink: SubscriptionSink,
572 mut stream: Pin<
573 Box<dyn Stream<Item = Result<SubscriptionItem, BroadcastStreamRecvError>> + Send>,
574 >,
575 ) {
576 loop {
577 tokio::select! {
578 _ = sink.closed() => break,
579 item = stream.next() => {
580 match item {
581 None => break,
583 Some(Ok(sub_item)) => {
585 let msg = SubscriptionMessage::from_json(&sub_item)
586 .expect("SubscriptionItem is serializable; qed");
587 if sink.send(msg).await.is_err() {
588 break;
589 }
590 },
591 Some(Err(BroadcastStreamRecvError::Lagged(count))) => {
593 log::warn!(
594 target: LOG_TARGET,
595 "Subscription lagged, skipped {count} messages"
596 );
597 },
598 }
599 }
600 }
601 }
602 }
603}