referrerpolicy=no-referrer-when-downgrade

pallet_revive_eth_rpc/
lib.rs

1// This file is part of Substrate.
2
3// Copyright (C) Parity Technologies (UK) Ltd.
4// SPDX-License-Identifier: Apache-2.0
5
6// Licensed under the Apache License, Version 2.0 (the "License");
7// you may not use this file except in compliance with the License.
8// You may obtain a copy of the License at
9//
10// 	http://www.apache.org/licenses/LICENSE-2.0
11//
12// Unless required by applicable law or agreed to in writing, software
13// distributed under the License is distributed on an "AS IS" BASIS,
14// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
15// See the License for the specific language governing permissions and
16// limitations under the License.
17//! The [`EthRpcServer`] RPC server implementation
18#![cfg_attr(docsrs, feature(doc_cfg))]
19
20use client::ClientError;
21use futures::{Stream, StreamExt, TryStreamExt};
22use jsonrpsee::{
23	PendingSubscriptionSink, SubscriptionMessage, SubscriptionSink,
24	core::{RpcResult, async_trait},
25	types::{ErrorCode, ErrorObjectOwned},
26};
27use pallet_revive::evm::*;
28use pallet_revive_types::runtime_api::TraceV1;
29use sp_core::{H160, H256, U256};
30use sp_crypto_hashing::keccak_256;
31use subxt::backend::legacy::rpc_methods::TransactionStatus;
32use subxt_signer::bip39::core::pin::Pin;
33use thiserror::Error;
34use tokio_stream::wrappers::{BroadcastStream, errors::BroadcastStreamRecvError};
35
36mod block_sync;
37pub(crate) use block_sync::{ChainMetadata, SyncLabel, SyncStateKey};
38pub mod cli;
39pub mod client;
40pub mod example;
41pub mod subxt_client;
42
43#[cfg(test)]
44mod tests;
45
46mod block_info_provider;
47pub use block_info_provider::*;
48
49mod receipt_provider;
50pub use receipt_provider::*;
51
52mod fee_history_provider;
53pub use fee_history_provider::*;
54
55mod receipt_extractor;
56pub use receipt_extractor::*;
57
58mod apis;
59pub use apis::*;
60
61pub const LOG_TARGET: &str = "eth-rpc";
62
63/// An EVM RPC server implementation.
64pub struct EthRpcServerImpl {
65	/// The client used to interact with the substrate node.
66	client: client::Client,
67
68	/// The accounts managed by the server.
69	accounts: Vec<Account>,
70
71	/// Controls if unprotected txs are allowed or not.
72	allow_unprotected_txs: bool,
73
74	/// When true, estimate_gas uses Pending block if no block is specified.
75	use_pending_for_estimate_gas: bool,
76}
77
78impl EthRpcServerImpl {
79	/// Creates a new [`EthRpcServerImpl`].
80	pub fn new(client: client::Client) -> Self {
81		Self {
82			client,
83			accounts: vec![],
84			allow_unprotected_txs: false,
85			use_pending_for_estimate_gas: false,
86		}
87	}
88
89	/// Sets the accounts managed by the server.
90	pub fn with_accounts(mut self, accounts: Vec<Account>) -> Self {
91		self.accounts = accounts;
92		self
93	}
94
95	/// Sets whether unprotected transactions are allowed or not.
96	pub fn with_allow_unprotected_txs(mut self, allow_unprotected_txs: bool) -> Self {
97		self.allow_unprotected_txs = allow_unprotected_txs;
98		self
99	}
100
101	/// Sets whether estimate_gas uses Pending block when no block is specified.
102	pub fn with_use_pending_for_estimate_gas(mut self, use_pending_for_estimate_gas: bool) -> Self {
103		self.use_pending_for_estimate_gas = use_pending_for_estimate_gas;
104		self
105	}
106}
107
108/// The error type for the EVM RPC server.
109#[derive(Error, Debug)]
110pub enum EthRpcError {
111	/// A [`ClientError`] wrapper error.
112	#[error("Client error: {0}")]
113	ClientError(#[from] ClientError),
114	/// A [`rlp::DecoderError`] wrapper error.
115	#[error("Decoding error: {0}")]
116	RlpError(#[from] rlp::DecoderError),
117	/// A Decimals conversion error.
118	#[error("Conversion error")]
119	ConversionError,
120	/// An invalid signature error.
121	#[error("Invalid signature")]
122	InvalidSignature,
123	/// The account was not found at the given address
124	#[error("Account not found for address {0:?}")]
125	AccountNotFound(H160),
126	/// Received an invalid transaction
127	#[error("Invalid transaction")]
128	InvalidTransaction,
129	/// Received an invalid transaction
130	#[error("Invalid transaction {0:?}")]
131	TransactionTypeNotSupported(Byte),
132}
133
134// TODO use https://eips.ethereum.org/EIPS/eip-1474#error-codes
135impl From<EthRpcError> for ErrorObjectOwned {
136	fn from(value: EthRpcError) -> Self {
137		match value {
138			EthRpcError::ClientError(err) => Self::from(err),
139			_ => Self::owned::<String>(ErrorCode::InvalidRequest.code(), value.to_string(), None),
140		}
141	}
142}
143
144#[async_trait]
145impl EthRpcServer for EthRpcServerImpl {
146	async fn net_version(&self) -> RpcResult<String> {
147		Ok(self.client.chain_id().to_string())
148	}
149
150	async fn net_listening(&self) -> RpcResult<bool> {
151		let syncing = self.client.syncing().await?;
152		let listening = matches!(syncing, SyncingStatus::Bool(false));
153		Ok(listening)
154	}
155
156	async fn syncing(&self) -> RpcResult<SyncingStatus> {
157		Ok(self.client.syncing().await?)
158	}
159
160	async fn block_number(&self) -> RpcResult<U256> {
161		let number = self.client.block_number().await?;
162		Ok(number.into())
163	}
164
165	async fn get_transaction_receipt(
166		&self,
167		transaction_hash: H256,
168	) -> RpcResult<Option<ReceiptInfo>> {
169		let receipt = self.client.receipt(&transaction_hash).await;
170		Ok(receipt)
171	}
172
173	/// Performs gas estimations to find the lowest gas limit required to run the transaction.
174	///
175	/// This method implements the same gas estimation logic found in Geth which performs binary
176	/// search with some simple heuristics to find the smallest gas limit for the transaction.
177	async fn estimate_gas(
178		&self,
179		transaction: GenericTransaction,
180		block: Option<BlockNumberOrTag>,
181	) -> RpcResult<U256> {
182		log::trace!(target: LOG_TARGET, "estimate_gas transaction={transaction:?} block={block:?}");
183
184		let block = block.unwrap_or_else(|| {
185			if self.use_pending_for_estimate_gas {
186				BlockTag::Pending.into()
187			} else {
188				Default::default()
189			}
190		});
191		let hash = self.client.block_hash_for_tag(block.into()).await?;
192		let gas_estimate =
193			self.client.runtime_api(hash).estimate_gas(transaction, block.into()).await?;
194
195		log::trace!(
196			target: LOG_TARGET,
197			"estimate_gas result={gas_estimate:?}",
198		);
199		Ok(gas_estimate)
200	}
201
202	async fn call(
203		&self,
204		transaction: GenericTransaction,
205		block: Option<BlockNumberOrTagOrHash>,
206		state_overrides: Option<StateOverrideSet>,
207	) -> RpcResult<Bytes> {
208		let block = block.unwrap_or_default();
209		let hash = self.client.block_hash_for_tag(block.clone()).await?;
210		let runtime_api = self.client.runtime_api(hash);
211		let dry_run = runtime_api.dry_run(transaction, block, state_overrides).await?;
212		Ok(dry_run.data.into())
213	}
214
215	async fn send_raw_transaction(&self, transaction: Bytes) -> RpcResult<H256> {
216		let hash = H256(keccak_256(&transaction.0));
217		log::trace!(target: LOG_TARGET, "send_raw_transaction transaction: {transaction:?} ethereum_hash: {hash:?}");
218
219		if !self.allow_unprotected_txs {
220			let signed_transaction = TransactionSigned::decode(transaction.0.as_slice())
221				.map_err(|err| {
222					log::trace!(target: LOG_TARGET, "Transaction decoding failed. ethereum_hash: {hash:?}, error: {err:?}");
223					EthRpcError::InvalidTransaction
224				})?;
225
226			let is_chain_id_provided = match signed_transaction {
227				TransactionSigned::Transaction7702Signed(tx) => {
228					tx.transaction_7702_unsigned.chain_id != U256::zero()
229				},
230				TransactionSigned::Transaction4844Signed(tx) => {
231					tx.transaction_4844_unsigned.chain_id != U256::zero()
232				},
233				TransactionSigned::Transaction1559Signed(tx) => {
234					tx.transaction_1559_unsigned.chain_id != U256::zero()
235				},
236				TransactionSigned::Transaction2930Signed(tx) => {
237					tx.transaction_2930_unsigned.chain_id != U256::zero()
238				},
239				TransactionSigned::TransactionLegacySigned(tx) => {
240					tx.transaction_legacy_unsigned.chain_id.is_some()
241				},
242			};
243
244			if !is_chain_id_provided {
245				log::trace!(target: LOG_TARGET, "Invalid Transaction: transaction doesn't include a chain-id. ethereum_hash: {hash:?}");
246				Err(EthRpcError::InvalidTransaction)?;
247			}
248		}
249
250		let call = subxt_client::tx().revive().eth_transact(transaction.0);
251
252		// Subscribe to new block only when automine is enabled.
253		let receiver = self.client.block_notifier().map(|sender| sender.subscribe());
254
255		// Submit the transaction
256		let tx_status = self.client.submit(call).await.map_err(|err| {
257			log::trace!(target: LOG_TARGET, "send_raw_transaction ethereum_hash: {hash:?} failed: {err:?}");
258			err
259		})?;
260
261		if matches!(tx_status, TransactionStatus::Future) {
262			return Ok(hash);
263		}
264
265		// Wait for the transaction to be included in a block if automine is enabled
266		if let Some(mut receiver) = receiver {
267			loop {
268				if let Ok(block_hash) = receiver.recv().await {
269					let Ok(Some(block)) = self.client.block_by_hash(&block_hash).await else {
270						log::debug!(target: LOG_TARGET, "Could not find the block with the received hash: {hash:?}.");
271						continue;
272					};
273					let Some(evm_block) = self.client.evm_block(block, false).await else {
274						log::debug!(target: LOG_TARGET, "Failed to get the EVM block for substrate block with hash: {hash:?}");
275						continue;
276					};
277					if evm_block.transactions.contains_tx(hash) {
278						log::debug!(target: LOG_TARGET, "{hash:} was included in a block");
279						break;
280					}
281				}
282			}
283		}
284
285		log::debug!(target: LOG_TARGET, "send_raw_transaction hash: {hash:?}");
286		Ok(hash)
287	}
288
289	async fn send_transaction(&self, mut transaction: GenericTransaction) -> RpcResult<H256> {
290		log::debug!(target: LOG_TARGET, "{transaction:#?}");
291
292		let Some(from) = transaction.from else {
293			log::debug!(target: LOG_TARGET, "Transaction must have a sender");
294			return Err(EthRpcError::InvalidTransaction.into());
295		};
296
297		let account = self
298			.accounts
299			.iter()
300			.find(|account| account.address() == from)
301			.ok_or(EthRpcError::AccountNotFound(from))?;
302
303		if transaction.gas.is_none() {
304			transaction.gas = Some(self.estimate_gas(transaction.clone(), None).await?);
305		}
306
307		if transaction.gas_price.is_none() {
308			transaction.gas_price = Some(self.gas_price().await?);
309		}
310
311		if transaction.nonce.is_none() {
312			transaction.nonce =
313				Some(self.get_transaction_count(from, BlockTag::Latest.into()).await?);
314		}
315
316		if transaction.chain_id.is_none() {
317			transaction.chain_id = Some(self.chain_id().await?);
318		}
319
320		let tx = transaction.try_into_unsigned().map_err(|_| EthRpcError::InvalidTransaction)?;
321		let payload = account.sign_transaction(tx).signed_payload();
322		self.send_raw_transaction(Bytes(payload)).await
323	}
324
325	async fn get_block_by_hash(
326		&self,
327		block_hash: H256,
328		hydrated_transactions: bool,
329	) -> RpcResult<Option<Block>> {
330		let Some(block) = self.client.block_by_ethereum_hash(&block_hash).await? else {
331			return Ok(None);
332		};
333		let block = self.client.evm_block(block, hydrated_transactions).await;
334		Ok(block)
335	}
336
337	async fn get_balance(&self, address: H160, block: BlockNumberOrTagOrHash) -> RpcResult<U256> {
338		let hash = self.client.block_hash_for_tag(block).await?;
339		let runtime_api = self.client.runtime_api(hash);
340		let balance = runtime_api.balance(address).await?;
341		Ok(balance)
342	}
343
344	async fn chain_id(&self) -> RpcResult<U256> {
345		Ok(self.client.chain_id().into())
346	}
347
348	async fn gas_price(&self) -> RpcResult<U256> {
349		let hash = self.client.block_hash_for_tag(BlockTag::Latest.into()).await?;
350		let runtime_api = self.client.runtime_api(hash);
351		Ok(runtime_api.gas_price().await?)
352	}
353
354	async fn max_priority_fee_per_gas(&self) -> RpcResult<U256> {
355		// We do not support tips. Hence the recommended priority fee is
356		// always zero. The effective gas price will always be the base price.
357		Ok(Default::default())
358	}
359
360	async fn get_code(&self, address: H160, block: BlockNumberOrTagOrHash) -> RpcResult<Bytes> {
361		let hash = self.client.block_hash_for_tag(block).await?;
362		let code = self.client.runtime_api(hash).code(address).await?;
363		Ok(code.into())
364	}
365
366	async fn accounts(&self) -> RpcResult<Vec<H160>> {
367		Ok(self.accounts.iter().map(|account| account.address()).collect())
368	}
369
370	async fn get_block_by_number(
371		&self,
372		block_number: BlockNumberOrTag,
373		hydrated_transactions: bool,
374	) -> RpcResult<Option<Block>> {
375		let Some(block) = self.client.block_by_number_or_tag(&block_number).await? else {
376			return Ok(None);
377		};
378		let block = self.client.evm_block(block, hydrated_transactions).await;
379		Ok(block)
380	}
381
382	async fn get_block_transaction_count_by_hash(
383		&self,
384		block_hash: Option<H256>,
385	) -> RpcResult<Option<U256>> {
386		let block_hash = if let Some(block_hash) = block_hash {
387			block_hash
388		} else {
389			self.client.latest_block().await.hash()
390		};
391
392		let Some(substrate_hash) = self.client.resolve_substrate_hash(&block_hash).await else {
393			return Ok(None);
394		};
395
396		Ok(self.client.receipts_count_per_block(&substrate_hash).await.map(U256::from))
397	}
398
399	async fn get_block_transaction_count_by_number(
400		&self,
401		block: Option<BlockNumberOrTag>,
402	) -> RpcResult<Option<U256>> {
403		let substrate_hash = if let Some(block) = self
404			.client
405			.block_by_number_or_tag(&block.unwrap_or_else(|| BlockTag::Latest.into()))
406			.await?
407		{
408			block.hash()
409		} else {
410			return Ok(None);
411		};
412
413		Ok(self.client.receipts_count_per_block(&substrate_hash).await.map(U256::from))
414	}
415
416	async fn get_logs(&self, filter: Option<Filter>) -> RpcResult<FilterResults> {
417		let logs = self.client.logs(filter).await?;
418		Ok(FilterResults::Logs(logs))
419	}
420
421	async fn get_storage_at(
422		&self,
423		address: H160,
424		storage_slot: U256,
425		block: BlockNumberOrTagOrHash,
426	) -> RpcResult<Bytes> {
427		let hash = self.client.block_hash_for_tag(block).await?;
428		let runtime_api = self.client.runtime_api(hash);
429		let bytes = match runtime_api.get_storage(address, storage_slot.to_big_endian()).await {
430			Ok(value) => value.unwrap_or([0u8; 32].into()),
431			// Per Ethereum spec, return zero for non-contract addresses.
432			Err(ClientError::ContractNotFound) => {
433				log::trace!(target: LOG_TARGET, "get_storage_at: ContractNotFound for {address:?}, returning zero");
434				[0u8; 32].into()
435			},
436			Err(err) => return Err(err.into()),
437		};
438		Ok(bytes.into())
439	}
440
441	async fn get_transaction_by_block_hash_and_index(
442		&self,
443		block_hash: H256,
444		transaction_index: U256,
445	) -> RpcResult<Option<TransactionInfo>> {
446		let Some(substrate_block_hash) = self.client.resolve_substrate_hash(&block_hash).await
447		else {
448			return Ok(None);
449		};
450		self.get_transaction_by_substrate_block_hash_and_index(
451			substrate_block_hash,
452			transaction_index,
453		)
454		.await
455	}
456
457	async fn get_transaction_by_block_number_and_index(
458		&self,
459		block: BlockNumberOrTag,
460		transaction_index: U256,
461	) -> RpcResult<Option<TransactionInfo>> {
462		let Some(block) = self.client.block_by_number_or_tag(&block).await? else {
463			return Ok(None);
464		};
465		self.get_transaction_by_substrate_block_hash_and_index(block.hash(), transaction_index)
466			.await
467	}
468
469	async fn get_transaction_by_hash(
470		&self,
471		transaction_hash: H256,
472	) -> RpcResult<Option<TransactionInfo>> {
473		let receipt = self.client.receipt(&transaction_hash).await;
474		let signed_tx = self.client.signed_tx_by_hash(&transaction_hash).await;
475		if let (Some(receipt), Some(signed_tx)) = (receipt, signed_tx) {
476			return Ok(Some(TransactionInfo::new(&receipt, signed_tx)));
477		}
478
479		Ok(None)
480	}
481
482	async fn get_transaction_count(
483		&self,
484		address: H160,
485		block: BlockNumberOrTagOrHash,
486	) -> RpcResult<U256> {
487		let hash = self.client.block_hash_for_tag(block).await?;
488		let runtime_api = self.client.runtime_api(hash);
489		let nonce = runtime_api.nonce(address).await?;
490		Ok(nonce)
491	}
492
493	async fn web3_client_version(&self) -> RpcResult<String> {
494		let git_revision = env!("GIT_REVISION");
495		let rustc_version = env!("RUSTC_VERSION");
496		let target = env!("TARGET");
497		Ok(format!("eth-rpc/{git_revision}/{target}/{rustc_version}"))
498	}
499
500	async fn fee_history(
501		&self,
502		block_count: U256,
503		newest_block: BlockNumberOrTag,
504		reward_percentiles: Option<Vec<f64>>,
505	) -> RpcResult<FeeHistoryResult> {
506		let block_count: u32 = block_count.try_into().map_err(|_| EthRpcError::ConversionError)?;
507		let result = self.client.fee_history(block_count, newest_block, reward_percentiles).await?;
508		Ok(result)
509	}
510
511	async fn eth_subscribe(
512		&self,
513		pending: PendingSubscriptionSink,
514		kind: SubscriptionKind,
515		options: Option<SubscriptionOptions>,
516	) {
517		let Some(subscription_parameters) = SubscriptionParameters::new(kind, options) else {
518			return pending
519				.reject(ErrorObjectOwned::owned(
520					jsonrpsee::types::error::INVALID_PARAMS_CODE,
521					"Invalid subscription parameters",
522					None::<()>,
523				))
524				.await;
525		};
526		let Ok(sink) = pending.accept().await else {
527			return;
528		};
529
530		let stream: Pin<
531			Box<dyn Stream<Item = Result<SubscriptionItem, BroadcastStreamRecvError>> + Send>,
532		> = match subscription_parameters {
533			SubscriptionParameters::NewBlockHeaders => Box::pin(
534				BroadcastStream::new(self.client.get_block_subscription_rx())
535					.map_ok(|block| SubscriptionItem::BlockHeader(BlockHeader::from(block))),
536			) as _,
537			SubscriptionParameters::Logs(filter) => Box::pin(
538				BroadcastStream::new(self.client.get_log_subscription_rx())
539					.try_filter(move |log| futures::future::ready(filter.matches(log)))
540					.map_ok(SubscriptionItem::Log),
541			) as _,
542		};
543		let _ = tokio::spawn(Self::handle_subscription_forwarding(sink, stream));
544	}
545}
546
547impl EthRpcServerImpl {
548	async fn get_transaction_by_substrate_block_hash_and_index(
549		&self,
550		substrate_block_hash: H256,
551		transaction_index: U256,
552	) -> RpcResult<Option<TransactionInfo>> {
553		let Some(receipt) = self
554			.client
555			.receipt_by_hash_and_index(
556				&substrate_block_hash,
557				transaction_index.try_into().map_err(|_| EthRpcError::ConversionError)?,
558			)
559			.await
560		else {
561			return Ok(None);
562		};
563		let Some(signed_tx) = self.client.signed_tx_by_hash(&receipt.transaction_hash).await else {
564			return Ok(None);
565		};
566
567		Ok(Some(TransactionInfo::new(&receipt, signed_tx)))
568	}
569
570	async fn handle_subscription_forwarding(
571		sink: SubscriptionSink,
572		mut stream: Pin<
573			Box<dyn Stream<Item = Result<SubscriptionItem, BroadcastStreamRecvError>> + Send>,
574		>,
575	) {
576		loop {
577			tokio::select! {
578				_ = sink.closed() => break,
579				item = stream.next() => {
580					match item {
581						// Stream ended.
582						None => break,
583						// Send the item to the subscriber.
584						Some(Ok(sub_item)) => {
585							let msg = SubscriptionMessage::from_json(&sub_item)
586								.expect("SubscriptionItem is serializable; qed");
587							if sink.send(msg).await.is_err() {
588								break;
589							}
590						},
591						// Broadcast receiver lagged behind — missed messages.
592						Some(Err(BroadcastStreamRecvError::Lagged(count))) => {
593							log::warn!(
594								target: LOG_TARGET,
595								"Subscription lagged, skipped {count} messages"
596							);
597						},
598					}
599				}
600			}
601		}
602	}
603}