referrerpolicy=no-referrer-when-downgrade

pallet_revive_eth_rpc/
lib.rs

1// This file is part of Substrate.
2
3// Copyright (C) Parity Technologies (UK) Ltd.
4// SPDX-License-Identifier: Apache-2.0
5
6// Licensed under the Apache License, Version 2.0 (the "License");
7// you may not use this file except in compliance with the License.
8// You may obtain a copy of the License at
9//
10// 	http://www.apache.org/licenses/LICENSE-2.0
11//
12// Unless required by applicable law or agreed to in writing, software
13// distributed under the License is distributed on an "AS IS" BASIS,
14// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
15// See the License for the specific language governing permissions and
16// limitations under the License.
17//! The [`EthRpcServer`] RPC server implementation
18#![cfg_attr(docsrs, feature(doc_cfg))]
19
20use client::ClientError;
21use futures::{Stream, StreamExt, TryStreamExt};
22use jsonrpsee::{
23	PendingSubscriptionSink, SubscriptionMessage, SubscriptionSink,
24	core::{RpcResult, async_trait},
25	types::{ErrorCode, ErrorObjectOwned},
26};
27use pallet_revive::evm::*;
28use sp_core::{H160, H256, U256, keccak_256};
29use subxt::backend::legacy::rpc_methods::TransactionStatus;
30use subxt_signer::bip39::core::pin::Pin;
31use thiserror::Error;
32use tokio_stream::wrappers::{BroadcastStream, errors::BroadcastStreamRecvError};
33
34mod block_sync;
35pub(crate) use block_sync::{ChainMetadata, SyncLabel, SyncStateKey};
36pub mod cli;
37pub mod client;
38pub mod example;
39pub mod subxt_client;
40
41#[cfg(test)]
42mod tests;
43
44mod block_info_provider;
45pub use block_info_provider::*;
46
47mod receipt_provider;
48pub use receipt_provider::*;
49
50mod fee_history_provider;
51pub use fee_history_provider::*;
52
53mod receipt_extractor;
54pub use receipt_extractor::*;
55
56mod apis;
57pub use apis::*;
58
59pub const LOG_TARGET: &str = "eth-rpc";
60
61/// An EVM RPC server implementation.
62pub struct EthRpcServerImpl {
63	/// The client used to interact with the substrate node.
64	client: client::Client,
65
66	/// The accounts managed by the server.
67	accounts: Vec<Account>,
68
69	/// Controls if unprotected txs are allowed or not.
70	allow_unprotected_txs: bool,
71
72	/// When true, estimate_gas uses Pending block if no block is specified.
73	use_pending_for_estimate_gas: bool,
74}
75
76impl EthRpcServerImpl {
77	/// Creates a new [`EthRpcServerImpl`].
78	pub fn new(client: client::Client) -> Self {
79		Self {
80			client,
81			accounts: vec![],
82			allow_unprotected_txs: false,
83			use_pending_for_estimate_gas: false,
84		}
85	}
86
87	/// Sets the accounts managed by the server.
88	pub fn with_accounts(mut self, accounts: Vec<Account>) -> Self {
89		self.accounts = accounts;
90		self
91	}
92
93	/// Sets whether unprotected transactions are allowed or not.
94	pub fn with_allow_unprotected_txs(mut self, allow_unprotected_txs: bool) -> Self {
95		self.allow_unprotected_txs = allow_unprotected_txs;
96		self
97	}
98
99	/// Sets whether estimate_gas uses Pending block when no block is specified.
100	pub fn with_use_pending_for_estimate_gas(mut self, use_pending_for_estimate_gas: bool) -> Self {
101		self.use_pending_for_estimate_gas = use_pending_for_estimate_gas;
102		self
103	}
104}
105
106/// The error type for the EVM RPC server.
107#[derive(Error, Debug)]
108pub enum EthRpcError {
109	/// A [`ClientError`] wrapper error.
110	#[error("Client error: {0}")]
111	ClientError(#[from] ClientError),
112	/// A [`rlp::DecoderError`] wrapper error.
113	#[error("Decoding error: {0}")]
114	RlpError(#[from] rlp::DecoderError),
115	/// A Decimals conversion error.
116	#[error("Conversion error")]
117	ConversionError,
118	/// An invalid signature error.
119	#[error("Invalid signature")]
120	InvalidSignature,
121	/// The account was not found at the given address
122	#[error("Account not found for address {0:?}")]
123	AccountNotFound(H160),
124	/// Received an invalid transaction
125	#[error("Invalid transaction")]
126	InvalidTransaction,
127	/// Received an invalid transaction
128	#[error("Invalid transaction {0:?}")]
129	TransactionTypeNotSupported(Byte),
130}
131
132// TODO use https://eips.ethereum.org/EIPS/eip-1474#error-codes
133impl From<EthRpcError> for ErrorObjectOwned {
134	fn from(value: EthRpcError) -> Self {
135		match value {
136			EthRpcError::ClientError(err) => Self::from(err),
137			_ => Self::owned::<String>(ErrorCode::InvalidRequest.code(), value.to_string(), None),
138		}
139	}
140}
141
142#[async_trait]
143impl EthRpcServer for EthRpcServerImpl {
144	async fn net_version(&self) -> RpcResult<String> {
145		Ok(self.client.chain_id().to_string())
146	}
147
148	async fn net_listening(&self) -> RpcResult<bool> {
149		let syncing = self.client.syncing().await?;
150		let listening = matches!(syncing, SyncingStatus::Bool(false));
151		Ok(listening)
152	}
153
154	async fn syncing(&self) -> RpcResult<SyncingStatus> {
155		Ok(self.client.syncing().await?)
156	}
157
158	async fn block_number(&self) -> RpcResult<U256> {
159		let number = self.client.block_number().await?;
160		Ok(number.into())
161	}
162
163	async fn get_transaction_receipt(
164		&self,
165		transaction_hash: H256,
166	) -> RpcResult<Option<ReceiptInfo>> {
167		let receipt = self.client.receipt(&transaction_hash).await;
168		Ok(receipt)
169	}
170
171	/// Performs gas estimations to find the lowest gas limit required to run the transaction.
172	///
173	/// This method implements the same gas estimation logic found in Geth which performs binary
174	/// search with some simple heuristics to find the smallest gas limit for the transaction.
175	async fn estimate_gas(
176		&self,
177		transaction: GenericTransaction,
178		block: Option<BlockNumberOrTag>,
179	) -> RpcResult<U256> {
180		log::trace!(target: LOG_TARGET, "estimate_gas transaction={transaction:?} block={block:?}");
181
182		let block = block.unwrap_or_else(|| {
183			if self.use_pending_for_estimate_gas {
184				BlockTag::Pending.into()
185			} else {
186				Default::default()
187			}
188		});
189		let hash = self.client.block_hash_for_tag(block.into()).await?;
190		let gas_estimate =
191			self.client.runtime_api(hash).estimate_gas(transaction, block.into()).await?;
192
193		log::trace!(
194			target: LOG_TARGET,
195			"estimate_gas result={gas_estimate:?}",
196		);
197		Ok(gas_estimate)
198	}
199
200	async fn call(
201		&self,
202		transaction: GenericTransaction,
203		block: Option<BlockNumberOrTagOrHash>,
204		state_overrides: Option<StateOverrideSet>,
205	) -> RpcResult<Bytes> {
206		let block = block.unwrap_or_default();
207		let hash = self.client.block_hash_for_tag(block.clone()).await?;
208		let runtime_api = self.client.runtime_api(hash);
209		let dry_run = runtime_api.dry_run(transaction, block, state_overrides).await?;
210		Ok(dry_run.data.into())
211	}
212
213	async fn send_raw_transaction(&self, transaction: Bytes) -> RpcResult<H256> {
214		let hash = H256(keccak_256(&transaction.0));
215		log::trace!(target: LOG_TARGET, "send_raw_transaction transaction: {transaction:?} ethereum_hash: {hash:?}");
216
217		if !self.allow_unprotected_txs {
218			let signed_transaction = TransactionSigned::decode(transaction.0.as_slice())
219				.map_err(|err| {
220					log::trace!(target: LOG_TARGET, "Transaction decoding failed. ethereum_hash: {hash:?}, error: {err:?}");
221					EthRpcError::InvalidTransaction
222				})?;
223
224			let is_chain_id_provided = match signed_transaction {
225				TransactionSigned::Transaction7702Signed(tx) => {
226					tx.transaction_7702_unsigned.chain_id != U256::zero()
227				},
228				TransactionSigned::Transaction4844Signed(tx) => {
229					tx.transaction_4844_unsigned.chain_id != U256::zero()
230				},
231				TransactionSigned::Transaction1559Signed(tx) => {
232					tx.transaction_1559_unsigned.chain_id != U256::zero()
233				},
234				TransactionSigned::Transaction2930Signed(tx) => {
235					tx.transaction_2930_unsigned.chain_id != U256::zero()
236				},
237				TransactionSigned::TransactionLegacySigned(tx) => {
238					tx.transaction_legacy_unsigned.chain_id.is_some()
239				},
240			};
241
242			if !is_chain_id_provided {
243				log::trace!(target: LOG_TARGET, "Invalid Transaction: transaction doesn't include a chain-id. ethereum_hash: {hash:?}");
244				Err(EthRpcError::InvalidTransaction)?;
245			}
246		}
247
248		let call = subxt_client::tx().revive().eth_transact(transaction.0);
249
250		// Subscribe to new block only when automine is enabled.
251		let receiver = self.client.block_notifier().map(|sender| sender.subscribe());
252
253		// Submit the transaction
254		let tx_status = self.client.submit(call).await.map_err(|err| {
255			log::trace!(target: LOG_TARGET, "send_raw_transaction ethereum_hash: {hash:?} failed: {err:?}");
256			err
257		})?;
258
259		if matches!(tx_status, TransactionStatus::Future) {
260			return Ok(hash);
261		}
262
263		// Wait for the transaction to be included in a block if automine is enabled
264		if let Some(mut receiver) = receiver {
265			loop {
266				if let Ok(block_hash) = receiver.recv().await {
267					let Ok(Some(block)) = self.client.block_by_hash(&block_hash).await else {
268						log::debug!(target: LOG_TARGET, "Could not find the block with the received hash: {hash:?}.");
269						continue;
270					};
271					let Some(evm_block) = self.client.evm_block(block, false).await else {
272						log::debug!(target: LOG_TARGET, "Failed to get the EVM block for substrate block with hash: {hash:?}");
273						continue;
274					};
275					if evm_block.transactions.contains_tx(hash) {
276						log::debug!(target: LOG_TARGET, "{hash:} was included in a block");
277						break;
278					}
279				}
280			}
281		}
282
283		log::debug!(target: LOG_TARGET, "send_raw_transaction hash: {hash:?}");
284		Ok(hash)
285	}
286
287	async fn send_transaction(&self, mut transaction: GenericTransaction) -> RpcResult<H256> {
288		log::debug!(target: LOG_TARGET, "{transaction:#?}");
289
290		let Some(from) = transaction.from else {
291			log::debug!(target: LOG_TARGET, "Transaction must have a sender");
292			return Err(EthRpcError::InvalidTransaction.into());
293		};
294
295		let account = self
296			.accounts
297			.iter()
298			.find(|account| account.address() == from)
299			.ok_or(EthRpcError::AccountNotFound(from))?;
300
301		if transaction.gas.is_none() {
302			transaction.gas = Some(self.estimate_gas(transaction.clone(), None).await?);
303		}
304
305		if transaction.gas_price.is_none() {
306			transaction.gas_price = Some(self.gas_price().await?);
307		}
308
309		if transaction.nonce.is_none() {
310			transaction.nonce =
311				Some(self.get_transaction_count(from, BlockTag::Latest.into()).await?);
312		}
313
314		if transaction.chain_id.is_none() {
315			transaction.chain_id = Some(self.chain_id().await?);
316		}
317
318		let tx = transaction.try_into_unsigned().map_err(|_| EthRpcError::InvalidTransaction)?;
319		let payload = account.sign_transaction(tx).signed_payload();
320		self.send_raw_transaction(Bytes(payload)).await
321	}
322
323	async fn get_block_by_hash(
324		&self,
325		block_hash: H256,
326		hydrated_transactions: bool,
327	) -> RpcResult<Option<Block>> {
328		let Some(block) = self.client.block_by_ethereum_hash(&block_hash).await? else {
329			return Ok(None);
330		};
331		let block = self.client.evm_block(block, hydrated_transactions).await;
332		Ok(block)
333	}
334
335	async fn get_balance(&self, address: H160, block: BlockNumberOrTagOrHash) -> RpcResult<U256> {
336		let hash = self.client.block_hash_for_tag(block).await?;
337		let runtime_api = self.client.runtime_api(hash);
338		let balance = runtime_api.balance(address).await?;
339		Ok(balance)
340	}
341
342	async fn chain_id(&self) -> RpcResult<U256> {
343		Ok(self.client.chain_id().into())
344	}
345
346	async fn gas_price(&self) -> RpcResult<U256> {
347		let hash = self.client.block_hash_for_tag(BlockTag::Latest.into()).await?;
348		let runtime_api = self.client.runtime_api(hash);
349		Ok(runtime_api.gas_price().await?)
350	}
351
352	async fn max_priority_fee_per_gas(&self) -> RpcResult<U256> {
353		// We do not support tips. Hence the recommended priority fee is
354		// always zero. The effective gas price will always be the base price.
355		Ok(Default::default())
356	}
357
358	async fn get_code(&self, address: H160, block: BlockNumberOrTagOrHash) -> RpcResult<Bytes> {
359		let hash = self.client.block_hash_for_tag(block).await?;
360		let code = self.client.runtime_api(hash).code(address).await?;
361		Ok(code.into())
362	}
363
364	async fn accounts(&self) -> RpcResult<Vec<H160>> {
365		Ok(self.accounts.iter().map(|account| account.address()).collect())
366	}
367
368	async fn get_block_by_number(
369		&self,
370		block_number: BlockNumberOrTag,
371		hydrated_transactions: bool,
372	) -> RpcResult<Option<Block>> {
373		let Some(block) = self.client.block_by_number_or_tag(&block_number).await? else {
374			return Ok(None);
375		};
376		let block = self.client.evm_block(block, hydrated_transactions).await;
377		Ok(block)
378	}
379
380	async fn get_block_transaction_count_by_hash(
381		&self,
382		block_hash: Option<H256>,
383	) -> RpcResult<Option<U256>> {
384		let block_hash = if let Some(block_hash) = block_hash {
385			block_hash
386		} else {
387			self.client.latest_block().await.hash()
388		};
389
390		let Some(substrate_hash) = self.client.resolve_substrate_hash(&block_hash).await else {
391			return Ok(None);
392		};
393
394		Ok(self.client.receipts_count_per_block(&substrate_hash).await.map(U256::from))
395	}
396
397	async fn get_block_transaction_count_by_number(
398		&self,
399		block: Option<BlockNumberOrTag>,
400	) -> RpcResult<Option<U256>> {
401		let substrate_hash = if let Some(block) = self
402			.client
403			.block_by_number_or_tag(&block.unwrap_or_else(|| BlockTag::Latest.into()))
404			.await?
405		{
406			block.hash()
407		} else {
408			return Ok(None);
409		};
410
411		Ok(self.client.receipts_count_per_block(&substrate_hash).await.map(U256::from))
412	}
413
414	async fn get_logs(&self, filter: Option<Filter>) -> RpcResult<FilterResults> {
415		let logs = self.client.logs(filter).await?;
416		Ok(FilterResults::Logs(logs))
417	}
418
419	async fn get_storage_at(
420		&self,
421		address: H160,
422		storage_slot: U256,
423		block: BlockNumberOrTagOrHash,
424	) -> RpcResult<Bytes> {
425		let hash = self.client.block_hash_for_tag(block).await?;
426		let runtime_api = self.client.runtime_api(hash);
427		let bytes = match runtime_api.get_storage(address, storage_slot.to_big_endian()).await {
428			Ok(value) => value.unwrap_or([0u8; 32].into()),
429			// Per Ethereum spec, return zero for non-contract addresses.
430			Err(ClientError::ContractNotFound) => {
431				log::trace!(target: LOG_TARGET, "get_storage_at: ContractNotFound for {address:?}, returning zero");
432				[0u8; 32].into()
433			},
434			Err(err) => return Err(err.into()),
435		};
436		Ok(bytes.into())
437	}
438
439	async fn get_transaction_by_block_hash_and_index(
440		&self,
441		block_hash: H256,
442		transaction_index: U256,
443	) -> RpcResult<Option<TransactionInfo>> {
444		let Some(substrate_block_hash) = self.client.resolve_substrate_hash(&block_hash).await
445		else {
446			return Ok(None);
447		};
448		self.get_transaction_by_substrate_block_hash_and_index(
449			substrate_block_hash,
450			transaction_index,
451		)
452		.await
453	}
454
455	async fn get_transaction_by_block_number_and_index(
456		&self,
457		block: BlockNumberOrTag,
458		transaction_index: U256,
459	) -> RpcResult<Option<TransactionInfo>> {
460		let Some(block) = self.client.block_by_number_or_tag(&block).await? else {
461			return Ok(None);
462		};
463		self.get_transaction_by_substrate_block_hash_and_index(block.hash(), transaction_index)
464			.await
465	}
466
467	async fn get_transaction_by_hash(
468		&self,
469		transaction_hash: H256,
470	) -> RpcResult<Option<TransactionInfo>> {
471		let receipt = self.client.receipt(&transaction_hash).await;
472		let signed_tx = self.client.signed_tx_by_hash(&transaction_hash).await;
473		if let (Some(receipt), Some(signed_tx)) = (receipt, signed_tx) {
474			return Ok(Some(TransactionInfo::new(&receipt, signed_tx)));
475		}
476
477		Ok(None)
478	}
479
480	async fn get_transaction_count(
481		&self,
482		address: H160,
483		block: BlockNumberOrTagOrHash,
484	) -> RpcResult<U256> {
485		let hash = self.client.block_hash_for_tag(block).await?;
486		let runtime_api = self.client.runtime_api(hash);
487		let nonce = runtime_api.nonce(address).await?;
488		Ok(nonce)
489	}
490
491	async fn web3_client_version(&self) -> RpcResult<String> {
492		let git_revision = env!("GIT_REVISION");
493		let rustc_version = env!("RUSTC_VERSION");
494		let target = env!("TARGET");
495		Ok(format!("eth-rpc/{git_revision}/{target}/{rustc_version}"))
496	}
497
498	async fn fee_history(
499		&self,
500		block_count: U256,
501		newest_block: BlockNumberOrTag,
502		reward_percentiles: Option<Vec<f64>>,
503	) -> RpcResult<FeeHistoryResult> {
504		let block_count: u32 = block_count.try_into().map_err(|_| EthRpcError::ConversionError)?;
505		let result = self.client.fee_history(block_count, newest_block, reward_percentiles).await?;
506		Ok(result)
507	}
508
509	async fn eth_subscribe(
510		&self,
511		pending: PendingSubscriptionSink,
512		kind: SubscriptionKind,
513		options: Option<SubscriptionOptions>,
514	) {
515		let Some(subscription_parameters) = SubscriptionParameters::new(kind, options) else {
516			return pending
517				.reject(ErrorObjectOwned::owned(
518					jsonrpsee::types::error::INVALID_PARAMS_CODE,
519					"Invalid subscription parameters",
520					None::<()>,
521				))
522				.await;
523		};
524		let Ok(sink) = pending.accept().await else {
525			return;
526		};
527
528		let stream: Pin<
529			Box<dyn Stream<Item = Result<SubscriptionItem, BroadcastStreamRecvError>> + Send>,
530		> = match subscription_parameters {
531			SubscriptionParameters::NewBlockHeaders => Box::pin(
532				BroadcastStream::new(self.client.get_block_subscription_rx())
533					.map_ok(|block| SubscriptionItem::BlockHeader(BlockHeader::from(block))),
534			) as _,
535			SubscriptionParameters::Logs(filter) => Box::pin(
536				BroadcastStream::new(self.client.get_log_subscription_rx())
537					.try_filter(move |log| futures::future::ready(filter.matches(log)))
538					.map_ok(SubscriptionItem::Log),
539			) as _,
540		};
541		let _ = tokio::spawn(Self::handle_subscription_forwarding(sink, stream));
542	}
543}
544
545impl EthRpcServerImpl {
546	async fn get_transaction_by_substrate_block_hash_and_index(
547		&self,
548		substrate_block_hash: H256,
549		transaction_index: U256,
550	) -> RpcResult<Option<TransactionInfo>> {
551		let Some(receipt) = self
552			.client
553			.receipt_by_hash_and_index(
554				&substrate_block_hash,
555				transaction_index.try_into().map_err(|_| EthRpcError::ConversionError)?,
556			)
557			.await
558		else {
559			return Ok(None);
560		};
561		let Some(signed_tx) = self.client.signed_tx_by_hash(&receipt.transaction_hash).await else {
562			return Ok(None);
563		};
564
565		Ok(Some(TransactionInfo::new(&receipt, signed_tx)))
566	}
567
568	async fn handle_subscription_forwarding(
569		sink: SubscriptionSink,
570		mut stream: Pin<
571			Box<dyn Stream<Item = Result<SubscriptionItem, BroadcastStreamRecvError>> + Send>,
572		>,
573	) {
574		loop {
575			tokio::select! {
576				_ = sink.closed() => break,
577				item = stream.next() => {
578					match item {
579						// Stream ended.
580						None => break,
581						// Send the item to the subscriber.
582						Some(Ok(sub_item)) => {
583							let msg = SubscriptionMessage::from_json(&sub_item)
584								.expect("SubscriptionItem is serializable; qed");
585							if sink.send(msg).await.is_err() {
586								break;
587							}
588						},
589						// Broadcast receiver lagged behind — missed messages.
590						Some(Err(BroadcastStreamRecvError::Lagged(count))) => {
591							log::warn!(
592								target: LOG_TARGET,
593								"Subscription lagged, skipped {count} messages"
594							);
595						},
596					}
597				}
598			}
599		}
600	}
601}