1#![cfg_attr(docsrs, feature(doc_cfg))]
19
20use client::ClientError;
21use futures::{Stream, StreamExt, TryStreamExt};
22use jsonrpsee::{
23 PendingSubscriptionSink, SubscriptionMessage, SubscriptionSink,
24 core::{RpcResult, async_trait},
25 types::{ErrorCode, ErrorObjectOwned},
26};
27use pallet_revive::evm::*;
28use sp_core::{H160, H256, U256, keccak_256};
29use subxt::backend::legacy::rpc_methods::TransactionStatus;
30use subxt_signer::bip39::core::pin::Pin;
31use thiserror::Error;
32use tokio_stream::wrappers::{BroadcastStream, errors::BroadcastStreamRecvError};
33
34mod block_sync;
35pub(crate) use block_sync::{ChainMetadata, SyncLabel, SyncStateKey};
36pub mod cli;
37pub mod client;
38pub mod example;
39pub mod subxt_client;
40
41#[cfg(test)]
42mod tests;
43
44mod block_info_provider;
45pub use block_info_provider::*;
46
47mod receipt_provider;
48pub use receipt_provider::*;
49
50mod fee_history_provider;
51pub use fee_history_provider::*;
52
53mod receipt_extractor;
54pub use receipt_extractor::*;
55
56mod apis;
57pub use apis::*;
58
59pub const LOG_TARGET: &str = "eth-rpc";
60
61pub struct EthRpcServerImpl {
63 client: client::Client,
65
66 accounts: Vec<Account>,
68
69 allow_unprotected_txs: bool,
71
72 use_pending_for_estimate_gas: bool,
74}
75
76impl EthRpcServerImpl {
77 pub fn new(client: client::Client) -> Self {
79 Self {
80 client,
81 accounts: vec![],
82 allow_unprotected_txs: false,
83 use_pending_for_estimate_gas: false,
84 }
85 }
86
87 pub fn with_accounts(mut self, accounts: Vec<Account>) -> Self {
89 self.accounts = accounts;
90 self
91 }
92
93 pub fn with_allow_unprotected_txs(mut self, allow_unprotected_txs: bool) -> Self {
95 self.allow_unprotected_txs = allow_unprotected_txs;
96 self
97 }
98
99 pub fn with_use_pending_for_estimate_gas(mut self, use_pending_for_estimate_gas: bool) -> Self {
101 self.use_pending_for_estimate_gas = use_pending_for_estimate_gas;
102 self
103 }
104}
105
106#[derive(Error, Debug)]
108pub enum EthRpcError {
109 #[error("Client error: {0}")]
111 ClientError(#[from] ClientError),
112 #[error("Decoding error: {0}")]
114 RlpError(#[from] rlp::DecoderError),
115 #[error("Conversion error")]
117 ConversionError,
118 #[error("Invalid signature")]
120 InvalidSignature,
121 #[error("Account not found for address {0:?}")]
123 AccountNotFound(H160),
124 #[error("Invalid transaction")]
126 InvalidTransaction,
127 #[error("Invalid transaction {0:?}")]
129 TransactionTypeNotSupported(Byte),
130}
131
132impl From<EthRpcError> for ErrorObjectOwned {
134 fn from(value: EthRpcError) -> Self {
135 match value {
136 EthRpcError::ClientError(err) => Self::from(err),
137 _ => Self::owned::<String>(ErrorCode::InvalidRequest.code(), value.to_string(), None),
138 }
139 }
140}
141
142#[async_trait]
143impl EthRpcServer for EthRpcServerImpl {
144 async fn net_version(&self) -> RpcResult<String> {
145 Ok(self.client.chain_id().to_string())
146 }
147
148 async fn net_listening(&self) -> RpcResult<bool> {
149 let syncing = self.client.syncing().await?;
150 let listening = matches!(syncing, SyncingStatus::Bool(false));
151 Ok(listening)
152 }
153
154 async fn syncing(&self) -> RpcResult<SyncingStatus> {
155 Ok(self.client.syncing().await?)
156 }
157
158 async fn block_number(&self) -> RpcResult<U256> {
159 let number = self.client.block_number().await?;
160 Ok(number.into())
161 }
162
163 async fn get_transaction_receipt(
164 &self,
165 transaction_hash: H256,
166 ) -> RpcResult<Option<ReceiptInfo>> {
167 let receipt = self.client.receipt(&transaction_hash).await;
168 Ok(receipt)
169 }
170
171 async fn estimate_gas(
176 &self,
177 transaction: GenericTransaction,
178 block: Option<BlockNumberOrTag>,
179 ) -> RpcResult<U256> {
180 log::trace!(target: LOG_TARGET, "estimate_gas transaction={transaction:?} block={block:?}");
181
182 let block = block.unwrap_or_else(|| {
183 if self.use_pending_for_estimate_gas {
184 BlockTag::Pending.into()
185 } else {
186 Default::default()
187 }
188 });
189 let hash = self.client.block_hash_for_tag(block.into()).await?;
190 let gas_estimate =
191 self.client.runtime_api(hash).estimate_gas(transaction, block.into()).await?;
192
193 log::trace!(
194 target: LOG_TARGET,
195 "estimate_gas result={gas_estimate:?}",
196 );
197 Ok(gas_estimate)
198 }
199
200 async fn call(
201 &self,
202 transaction: GenericTransaction,
203 block: Option<BlockNumberOrTagOrHash>,
204 state_overrides: Option<StateOverrideSet>,
205 ) -> RpcResult<Bytes> {
206 let block = block.unwrap_or_default();
207 let hash = self.client.block_hash_for_tag(block.clone()).await?;
208 let runtime_api = self.client.runtime_api(hash);
209 let dry_run = runtime_api.dry_run(transaction, block, state_overrides).await?;
210 Ok(dry_run.data.into())
211 }
212
213 async fn send_raw_transaction(&self, transaction: Bytes) -> RpcResult<H256> {
214 let hash = H256(keccak_256(&transaction.0));
215 log::trace!(target: LOG_TARGET, "send_raw_transaction transaction: {transaction:?} ethereum_hash: {hash:?}");
216
217 if !self.allow_unprotected_txs {
218 let signed_transaction = TransactionSigned::decode(transaction.0.as_slice())
219 .map_err(|err| {
220 log::trace!(target: LOG_TARGET, "Transaction decoding failed. ethereum_hash: {hash:?}, error: {err:?}");
221 EthRpcError::InvalidTransaction
222 })?;
223
224 let is_chain_id_provided = match signed_transaction {
225 TransactionSigned::Transaction7702Signed(tx) => {
226 tx.transaction_7702_unsigned.chain_id != U256::zero()
227 },
228 TransactionSigned::Transaction4844Signed(tx) => {
229 tx.transaction_4844_unsigned.chain_id != U256::zero()
230 },
231 TransactionSigned::Transaction1559Signed(tx) => {
232 tx.transaction_1559_unsigned.chain_id != U256::zero()
233 },
234 TransactionSigned::Transaction2930Signed(tx) => {
235 tx.transaction_2930_unsigned.chain_id != U256::zero()
236 },
237 TransactionSigned::TransactionLegacySigned(tx) => {
238 tx.transaction_legacy_unsigned.chain_id.is_some()
239 },
240 };
241
242 if !is_chain_id_provided {
243 log::trace!(target: LOG_TARGET, "Invalid Transaction: transaction doesn't include a chain-id. ethereum_hash: {hash:?}");
244 Err(EthRpcError::InvalidTransaction)?;
245 }
246 }
247
248 let call = subxt_client::tx().revive().eth_transact(transaction.0);
249
250 let receiver = self.client.block_notifier().map(|sender| sender.subscribe());
252
253 let tx_status = self.client.submit(call).await.map_err(|err| {
255 log::trace!(target: LOG_TARGET, "send_raw_transaction ethereum_hash: {hash:?} failed: {err:?}");
256 err
257 })?;
258
259 if matches!(tx_status, TransactionStatus::Future) {
260 return Ok(hash);
261 }
262
263 if let Some(mut receiver) = receiver {
265 loop {
266 if let Ok(block_hash) = receiver.recv().await {
267 let Ok(Some(block)) = self.client.block_by_hash(&block_hash).await else {
268 log::debug!(target: LOG_TARGET, "Could not find the block with the received hash: {hash:?}.");
269 continue;
270 };
271 let Some(evm_block) = self.client.evm_block(block, false).await else {
272 log::debug!(target: LOG_TARGET, "Failed to get the EVM block for substrate block with hash: {hash:?}");
273 continue;
274 };
275 if evm_block.transactions.contains_tx(hash) {
276 log::debug!(target: LOG_TARGET, "{hash:} was included in a block");
277 break;
278 }
279 }
280 }
281 }
282
283 log::debug!(target: LOG_TARGET, "send_raw_transaction hash: {hash:?}");
284 Ok(hash)
285 }
286
287 async fn send_transaction(&self, mut transaction: GenericTransaction) -> RpcResult<H256> {
288 log::debug!(target: LOG_TARGET, "{transaction:#?}");
289
290 let Some(from) = transaction.from else {
291 log::debug!(target: LOG_TARGET, "Transaction must have a sender");
292 return Err(EthRpcError::InvalidTransaction.into());
293 };
294
295 let account = self
296 .accounts
297 .iter()
298 .find(|account| account.address() == from)
299 .ok_or(EthRpcError::AccountNotFound(from))?;
300
301 if transaction.gas.is_none() {
302 transaction.gas = Some(self.estimate_gas(transaction.clone(), None).await?);
303 }
304
305 if transaction.gas_price.is_none() {
306 transaction.gas_price = Some(self.gas_price().await?);
307 }
308
309 if transaction.nonce.is_none() {
310 transaction.nonce =
311 Some(self.get_transaction_count(from, BlockTag::Latest.into()).await?);
312 }
313
314 if transaction.chain_id.is_none() {
315 transaction.chain_id = Some(self.chain_id().await?);
316 }
317
318 let tx = transaction.try_into_unsigned().map_err(|_| EthRpcError::InvalidTransaction)?;
319 let payload = account.sign_transaction(tx).signed_payload();
320 self.send_raw_transaction(Bytes(payload)).await
321 }
322
323 async fn get_block_by_hash(
324 &self,
325 block_hash: H256,
326 hydrated_transactions: bool,
327 ) -> RpcResult<Option<Block>> {
328 let Some(block) = self.client.block_by_ethereum_hash(&block_hash).await? else {
329 return Ok(None);
330 };
331 let block = self.client.evm_block(block, hydrated_transactions).await;
332 Ok(block)
333 }
334
335 async fn get_balance(&self, address: H160, block: BlockNumberOrTagOrHash) -> RpcResult<U256> {
336 let hash = self.client.block_hash_for_tag(block).await?;
337 let runtime_api = self.client.runtime_api(hash);
338 let balance = runtime_api.balance(address).await?;
339 Ok(balance)
340 }
341
342 async fn chain_id(&self) -> RpcResult<U256> {
343 Ok(self.client.chain_id().into())
344 }
345
346 async fn gas_price(&self) -> RpcResult<U256> {
347 let hash = self.client.block_hash_for_tag(BlockTag::Latest.into()).await?;
348 let runtime_api = self.client.runtime_api(hash);
349 Ok(runtime_api.gas_price().await?)
350 }
351
352 async fn max_priority_fee_per_gas(&self) -> RpcResult<U256> {
353 Ok(Default::default())
356 }
357
358 async fn get_code(&self, address: H160, block: BlockNumberOrTagOrHash) -> RpcResult<Bytes> {
359 let hash = self.client.block_hash_for_tag(block).await?;
360 let code = self.client.runtime_api(hash).code(address).await?;
361 Ok(code.into())
362 }
363
364 async fn accounts(&self) -> RpcResult<Vec<H160>> {
365 Ok(self.accounts.iter().map(|account| account.address()).collect())
366 }
367
368 async fn get_block_by_number(
369 &self,
370 block_number: BlockNumberOrTag,
371 hydrated_transactions: bool,
372 ) -> RpcResult<Option<Block>> {
373 let Some(block) = self.client.block_by_number_or_tag(&block_number).await? else {
374 return Ok(None);
375 };
376 let block = self.client.evm_block(block, hydrated_transactions).await;
377 Ok(block)
378 }
379
380 async fn get_block_transaction_count_by_hash(
381 &self,
382 block_hash: Option<H256>,
383 ) -> RpcResult<Option<U256>> {
384 let block_hash = if let Some(block_hash) = block_hash {
385 block_hash
386 } else {
387 self.client.latest_block().await.hash()
388 };
389
390 let Some(substrate_hash) = self.client.resolve_substrate_hash(&block_hash).await else {
391 return Ok(None);
392 };
393
394 Ok(self.client.receipts_count_per_block(&substrate_hash).await.map(U256::from))
395 }
396
397 async fn get_block_transaction_count_by_number(
398 &self,
399 block: Option<BlockNumberOrTag>,
400 ) -> RpcResult<Option<U256>> {
401 let substrate_hash = if let Some(block) = self
402 .client
403 .block_by_number_or_tag(&block.unwrap_or_else(|| BlockTag::Latest.into()))
404 .await?
405 {
406 block.hash()
407 } else {
408 return Ok(None);
409 };
410
411 Ok(self.client.receipts_count_per_block(&substrate_hash).await.map(U256::from))
412 }
413
414 async fn get_logs(&self, filter: Option<Filter>) -> RpcResult<FilterResults> {
415 let logs = self.client.logs(filter).await?;
416 Ok(FilterResults::Logs(logs))
417 }
418
419 async fn get_storage_at(
420 &self,
421 address: H160,
422 storage_slot: U256,
423 block: BlockNumberOrTagOrHash,
424 ) -> RpcResult<Bytes> {
425 let hash = self.client.block_hash_for_tag(block).await?;
426 let runtime_api = self.client.runtime_api(hash);
427 let bytes = match runtime_api.get_storage(address, storage_slot.to_big_endian()).await {
428 Ok(value) => value.unwrap_or([0u8; 32].into()),
429 Err(ClientError::ContractNotFound) => {
431 log::trace!(target: LOG_TARGET, "get_storage_at: ContractNotFound for {address:?}, returning zero");
432 [0u8; 32].into()
433 },
434 Err(err) => return Err(err.into()),
435 };
436 Ok(bytes.into())
437 }
438
439 async fn get_transaction_by_block_hash_and_index(
440 &self,
441 block_hash: H256,
442 transaction_index: U256,
443 ) -> RpcResult<Option<TransactionInfo>> {
444 let Some(substrate_block_hash) = self.client.resolve_substrate_hash(&block_hash).await
445 else {
446 return Ok(None);
447 };
448 self.get_transaction_by_substrate_block_hash_and_index(
449 substrate_block_hash,
450 transaction_index,
451 )
452 .await
453 }
454
455 async fn get_transaction_by_block_number_and_index(
456 &self,
457 block: BlockNumberOrTag,
458 transaction_index: U256,
459 ) -> RpcResult<Option<TransactionInfo>> {
460 let Some(block) = self.client.block_by_number_or_tag(&block).await? else {
461 return Ok(None);
462 };
463 self.get_transaction_by_substrate_block_hash_and_index(block.hash(), transaction_index)
464 .await
465 }
466
467 async fn get_transaction_by_hash(
468 &self,
469 transaction_hash: H256,
470 ) -> RpcResult<Option<TransactionInfo>> {
471 let receipt = self.client.receipt(&transaction_hash).await;
472 let signed_tx = self.client.signed_tx_by_hash(&transaction_hash).await;
473 if let (Some(receipt), Some(signed_tx)) = (receipt, signed_tx) {
474 return Ok(Some(TransactionInfo::new(&receipt, signed_tx)));
475 }
476
477 Ok(None)
478 }
479
480 async fn get_transaction_count(
481 &self,
482 address: H160,
483 block: BlockNumberOrTagOrHash,
484 ) -> RpcResult<U256> {
485 let hash = self.client.block_hash_for_tag(block).await?;
486 let runtime_api = self.client.runtime_api(hash);
487 let nonce = runtime_api.nonce(address).await?;
488 Ok(nonce)
489 }
490
491 async fn web3_client_version(&self) -> RpcResult<String> {
492 let git_revision = env!("GIT_REVISION");
493 let rustc_version = env!("RUSTC_VERSION");
494 let target = env!("TARGET");
495 Ok(format!("eth-rpc/{git_revision}/{target}/{rustc_version}"))
496 }
497
498 async fn fee_history(
499 &self,
500 block_count: U256,
501 newest_block: BlockNumberOrTag,
502 reward_percentiles: Option<Vec<f64>>,
503 ) -> RpcResult<FeeHistoryResult> {
504 let block_count: u32 = block_count.try_into().map_err(|_| EthRpcError::ConversionError)?;
505 let result = self.client.fee_history(block_count, newest_block, reward_percentiles).await?;
506 Ok(result)
507 }
508
509 async fn eth_subscribe(
510 &self,
511 pending: PendingSubscriptionSink,
512 kind: SubscriptionKind,
513 options: Option<SubscriptionOptions>,
514 ) {
515 let Some(subscription_parameters) = SubscriptionParameters::new(kind, options) else {
516 return pending
517 .reject(ErrorObjectOwned::owned(
518 jsonrpsee::types::error::INVALID_PARAMS_CODE,
519 "Invalid subscription parameters",
520 None::<()>,
521 ))
522 .await;
523 };
524 let Ok(sink) = pending.accept().await else {
525 return;
526 };
527
528 let stream: Pin<
529 Box<dyn Stream<Item = Result<SubscriptionItem, BroadcastStreamRecvError>> + Send>,
530 > = match subscription_parameters {
531 SubscriptionParameters::NewBlockHeaders => Box::pin(
532 BroadcastStream::new(self.client.get_block_subscription_rx())
533 .map_ok(|block| SubscriptionItem::BlockHeader(BlockHeader::from(block))),
534 ) as _,
535 SubscriptionParameters::Logs(filter) => Box::pin(
536 BroadcastStream::new(self.client.get_log_subscription_rx())
537 .try_filter(move |log| futures::future::ready(filter.matches(log)))
538 .map_ok(SubscriptionItem::Log),
539 ) as _,
540 };
541 let _ = tokio::spawn(Self::handle_subscription_forwarding(sink, stream));
542 }
543}
544
545impl EthRpcServerImpl {
546 async fn get_transaction_by_substrate_block_hash_and_index(
547 &self,
548 substrate_block_hash: H256,
549 transaction_index: U256,
550 ) -> RpcResult<Option<TransactionInfo>> {
551 let Some(receipt) = self
552 .client
553 .receipt_by_hash_and_index(
554 &substrate_block_hash,
555 transaction_index.try_into().map_err(|_| EthRpcError::ConversionError)?,
556 )
557 .await
558 else {
559 return Ok(None);
560 };
561 let Some(signed_tx) = self.client.signed_tx_by_hash(&receipt.transaction_hash).await else {
562 return Ok(None);
563 };
564
565 Ok(Some(TransactionInfo::new(&receipt, signed_tx)))
566 }
567
568 async fn handle_subscription_forwarding(
569 sink: SubscriptionSink,
570 mut stream: Pin<
571 Box<dyn Stream<Item = Result<SubscriptionItem, BroadcastStreamRecvError>> + Send>,
572 >,
573 ) {
574 loop {
575 tokio::select! {
576 _ = sink.closed() => break,
577 item = stream.next() => {
578 match item {
579 None => break,
581 Some(Ok(sub_item)) => {
583 let msg = SubscriptionMessage::from_json(&sub_item)
584 .expect("SubscriptionItem is serializable; qed");
585 if sink.send(msg).await.is_err() {
586 break;
587 }
588 },
589 Some(Err(BroadcastStreamRecvError::Lagged(count))) => {
591 log::warn!(
592 target: LOG_TARGET,
593 "Subscription lagged, skipped {count} messages"
594 );
595 },
596 }
597 }
598 }
599 }
600 }
601}