use crate::{
runtime::GAS_PRICE,
subxt_client::{
revive::{calls::types::EthTransact, events::ContractEmitted},
runtime_types::pallet_revive::storage::ContractInfo,
},
LOG_TARGET,
};
use futures::{stream, StreamExt};
use jsonrpsee::types::{error::CALL_EXECUTION_FAILED_CODE, ErrorObjectOwned};
use pallet_revive::{
create1,
evm::{
Block, BlockNumberOrTag, BlockNumberOrTagOrHash, Bytes256, GenericTransaction, Log,
ReceiptInfo, SyncingProgress, SyncingStatus, TransactionSigned, H160, H256, U256,
},
EthTransactError, EthTransactInfo,
};
use sp_core::keccak_256;
use sp_weights::Weight;
use std::{
collections::{HashMap, VecDeque},
sync::Arc,
time::Duration,
};
use subxt::{
backend::{
legacy::{rpc_methods::SystemHealth, LegacyRpcMethods},
rpc::{
reconnecting_rpc_client::{ExponentialBackoff, RpcClient as ReconnectingRpcClient},
RpcClient,
},
},
config::Header,
error::RpcError,
storage::Storage,
Config, OnlineClient,
};
use subxt_client::transaction_payment::events::TransactionFeePaid;
use thiserror::Error;
use tokio::sync::{watch::Sender, RwLock};
use crate::subxt_client::{self, system::events::ExtrinsicSuccess, SrcChainConfig};
pub type SubstrateBlock = subxt::blocks::Block<SrcChainConfig, OnlineClient<SrcChainConfig>>;
pub type SubstrateBlockNumber = <<SrcChainConfig as Config>::Header as Header>::Number;
pub type SubstrateBlockHash = <SrcChainConfig as Config>::Hash;
pub type Shared<T> = Arc<RwLock<T>>;
pub type Balance = u128;
#[derive(Default)]
struct BlockCache<const N: usize> {
buffer: VecDeque<Arc<SubstrateBlock>>,
blocks_by_number: HashMap<SubstrateBlockNumber, Arc<SubstrateBlock>>,
blocks_by_hash: HashMap<H256, Arc<SubstrateBlock>>,
receipts_by_hash: HashMap<H256, ReceiptInfo>,
signed_tx_by_hash: HashMap<H256, TransactionSigned>,
tx_hashes_by_block_and_index: HashMap<H256, HashMap<U256, H256>>,
}
fn unwrap_call_err(err: &subxt::error::RpcError) -> Option<ErrorObjectOwned> {
use subxt::backend::rpc::reconnecting_rpc_client;
match err {
subxt::error::RpcError::ClientError(err) => {
match err.downcast_ref::<reconnecting_rpc_client::Error>() {
Some(reconnecting_rpc_client::Error::RpcError(
jsonrpsee::core::client::Error::Call(err),
)) => Some(err.clone().into_owned()),
_ => None,
}
},
_ => None,
}
}
fn extract_revert_message(exec_data: &[u8]) -> Option<String> {
let error_selector = exec_data.get(0..4)?;
match error_selector {
[0x4E, 0x48, 0x7B, 0x71] => {
let panic_code: u32 = U256::from_big_endian(exec_data.get(4..36)?).try_into().ok()?;
let msg = match panic_code {
0x00 => "generic panic",
0x01 => "assert(false)",
0x11 => "arithmetic underflow or overflow",
0x12 => "division or modulo by zero",
0x21 => "enum overflow",
0x22 => "invalid encoded storage byte array accessed",
0x31 => "out-of-bounds array access; popping on an empty array",
0x32 => "out-of-bounds access of an array or bytesN",
0x41 => "out of memory",
0x51 => "uninitialized function",
code => return Some(format!("execution reverted: unknown panic code: {code:#x}")),
};
Some(format!("execution reverted: {msg}"))
},
[0x08, 0xC3, 0x79, 0xA0] => {
let decoded = ethabi::decode(&[ethabi::ParamType::String], &exec_data[4..]).ok()?;
if let Some(ethabi::Token::String(msg)) = decoded.first() {
return Some(format!("execution reverted: {msg}"))
}
Some("execution reverted".to_string())
},
_ => {
log::debug!(target: LOG_TARGET, "Unknown revert function selector: {error_selector:?}");
Some("execution reverted".to_string())
},
}
}
#[derive(Error, Debug)]
pub enum ClientError {
#[error(transparent)]
Jsonrpsee(#[from] jsonrpsee::core::ClientError),
#[error(transparent)]
SubxtError(#[from] subxt::Error),
#[error(transparent)]
RpcError(#[from] RpcError),
#[error(transparent)]
CodecError(#[from] codec::Error),
#[error("contract reverted")]
Reverted(EthTransactError),
#[error("conversion failed")]
ConversionFailed,
#[error("hash not found")]
BlockNotFound,
#[error("transactionFeePaid event not found")]
TxFeeNotFound,
#[error("cache is empty")]
CacheEmpty,
}
const REVERT_CODE: i32 = 3;
impl From<ClientError> for ErrorObjectOwned {
fn from(err: ClientError) -> Self {
match err {
ClientError::SubxtError(subxt::Error::Rpc(err)) | ClientError::RpcError(err) => {
if let Some(err) = unwrap_call_err(&err) {
return err;
}
ErrorObjectOwned::owned::<Vec<u8>>(
CALL_EXECUTION_FAILED_CODE,
err.to_string(),
None,
)
},
ClientError::Reverted(EthTransactError::Data(data)) => {
let msg = extract_revert_message(&data).unwrap_or_default();
let data = format!("0x{}", hex::encode(data));
ErrorObjectOwned::owned::<String>(REVERT_CODE, msg, Some(data))
},
ClientError::Reverted(EthTransactError::Message(msg)) =>
ErrorObjectOwned::owned::<String>(CALL_EXECUTION_FAILED_CODE, msg, None),
_ =>
ErrorObjectOwned::owned::<String>(CALL_EXECUTION_FAILED_CODE, err.to_string(), None),
}
}
}
pub const CACHE_SIZE: usize = 256;
impl<const N: usize> BlockCache<N> {
fn latest_block(&self) -> Option<&Arc<SubstrateBlock>> {
self.buffer.back()
}
fn insert(&mut self, block: SubstrateBlock) {
if self.buffer.len() >= N {
if let Some(block) = self.buffer.pop_front() {
log::trace!(target: LOG_TARGET, "Pruning block: {}", block.number());
let hash = block.hash();
self.blocks_by_hash.remove(&hash);
self.blocks_by_number.remove(&block.number());
if let Some(entries) = self.tx_hashes_by_block_and_index.remove(&hash) {
for hash in entries.values() {
self.receipts_by_hash.remove(hash);
}
}
}
}
let block = Arc::new(block);
self.buffer.push_back(block.clone());
self.blocks_by_number.insert(block.number(), block.clone());
self.blocks_by_hash.insert(block.hash(), block);
}
}
#[derive(Clone)]
pub struct Client {
inner: Arc<ClientInner>,
pub updates: tokio::sync::watch::Receiver<()>,
}
struct ClientInner {
api: OnlineClient<SrcChainConfig>,
rpc_client: ReconnectingRpcClient,
rpc: LegacyRpcMethods<SrcChainConfig>,
cache: Shared<BlockCache<CACHE_SIZE>>,
chain_id: u64,
max_block_weight: Weight,
}
impl ClientInner {
async fn from_url(url: &str) -> Result<Self, ClientError> {
let rpc_client = ReconnectingRpcClient::builder()
.retry_policy(ExponentialBackoff::from_millis(100).max_delay(Duration::from_secs(10)))
.build(url.to_string())
.await?;
let api = OnlineClient::<SrcChainConfig>::from_rpc_client(rpc_client.clone()).await?;
let cache = Arc::new(RwLock::new(BlockCache::<CACHE_SIZE>::default()));
let rpc = LegacyRpcMethods::<SrcChainConfig>::new(RpcClient::new(rpc_client.clone()));
let (chain_id, max_block_weight) =
tokio::try_join!(chain_id(&api), max_block_weight(&api))?;
Ok(Self { api, rpc_client, rpc, cache, chain_id, max_block_weight })
}
async fn receipt_infos(
&self,
block: &SubstrateBlock,
) -> Result<HashMap<H256, (TransactionSigned, ReceiptInfo)>, ClientError> {
let extrinsics = block.extrinsics().await?;
let extrinsics = extrinsics.iter().flat_map(|ext| {
let call = ext.as_extrinsic::<EthTransact>().ok()??;
let transaction_hash = H256(keccak_256(&call.payload));
let signed_tx = TransactionSigned::decode(&call.payload).ok()?;
let from = signed_tx.recover_eth_address().ok()?;
let tx_info = GenericTransaction::from_signed(signed_tx.clone(), Some(from));
let contract_address = if tx_info.to.is_none() {
Some(create1(&from, tx_info.nonce.unwrap_or_default().try_into().ok()?))
} else {
None
};
Some((from, signed_tx, tx_info, transaction_hash, contract_address, ext))
});
stream::iter(extrinsics)
.map(|(from, signed_tx, tx_info, transaction_hash, contract_address, ext)| async move {
let events = ext.events().await?;
let tx_fees =
events.find_first::<TransactionFeePaid>()?.ok_or(ClientError::TxFeeNotFound)?;
let gas_price = tx_info.gas_price.unwrap_or_default();
let gas_used = (tx_fees.tip.saturating_add(tx_fees.actual_fee))
.checked_div(gas_price.as_u128())
.unwrap_or_default();
let success = events.has::<ExtrinsicSuccess>()?;
let transaction_index = ext.index();
let block_hash = block.hash();
let block_number = block.number().into();
let logs = events.iter()
.filter_map(|event_details| {
let event_details = event_details.ok()?;
let event = event_details.as_event::<ContractEmitted>().ok()??;
Some(Log {
address: event.contract,
topics: event.topics,
data: Some(event.data.into()),
block_number: Some(block_number),
transaction_hash,
transaction_index: Some(transaction_index.into()),
block_hash: Some(block_hash),
log_index: Some(event_details.index().into()),
..Default::default()
})
}).collect();
log::debug!(target: LOG_TARGET, "Adding receipt for tx hash: {transaction_hash:?} - block: {block_number:?}");
let receipt = ReceiptInfo::new(
block_hash,
block_number,
contract_address,
from,
logs,
tx_info.to,
gas_price,
gas_used.into(),
success,
transaction_hash,
transaction_index.into(),
tx_info.r#type.unwrap_or_default()
);
Ok::<_, ClientError>((receipt.transaction_hash, (signed_tx, receipt)))
})
.buffer_unordered(10)
.collect::<Vec<Result<_, _>>>()
.await
.into_iter()
.collect::<Result<HashMap<_, _>, _>>()
}
}
async fn chain_id(api: &OnlineClient<SrcChainConfig>) -> Result<u64, ClientError> {
let query = subxt_client::constants().revive().chain_id();
api.constants().at(&query).map_err(|err| err.into())
}
async fn max_block_weight(api: &OnlineClient<SrcChainConfig>) -> Result<Weight, ClientError> {
let query = subxt_client::constants().system().block_weights();
let weights = api.constants().at(&query)?;
let max_block = weights.per_class.normal.max_extrinsic.unwrap_or(weights.max_block);
Ok(max_block.0)
}
async fn extract_block_timestamp(block: &SubstrateBlock) -> Option<u64> {
let extrinsics = block.extrinsics().await.ok()?;
let ext = extrinsics
.find_first::<crate::subxt_client::timestamp::calls::types::Set>()
.ok()??;
Some(ext.value.now / 1000)
}
impl Client {
pub async fn from_url(
url: &str,
spawn_handle: &sc_service::SpawnEssentialTaskHandle,
) -> Result<Self, ClientError> {
log::info!(target: LOG_TARGET, "Connecting to node at: {url} ...");
let inner: Arc<ClientInner> = Arc::new(ClientInner::from_url(url).await?);
log::info!(target: LOG_TARGET, "Connected to node at: {url}");
let (tx, mut updates) = tokio::sync::watch::channel(());
spawn_handle.spawn("subscribe-blocks", None, Self::subscribe_blocks(inner.clone(), tx));
updates.changed().await.expect("tx is not dropped");
Ok(Self { inner, updates })
}
async fn storage_api(
&self,
at: &BlockNumberOrTagOrHash,
) -> Result<Storage<SrcChainConfig, OnlineClient<SrcChainConfig>>, ClientError> {
match at {
BlockNumberOrTagOrHash::U256(block_number) => {
let n: SubstrateBlockNumber =
(*block_number).try_into().map_err(|_| ClientError::ConversionFailed)?;
let hash = self.get_block_hash(n).await?.ok_or(ClientError::BlockNotFound)?;
Ok(self.inner.api.storage().at(hash))
},
BlockNumberOrTagOrHash::H256(hash) => Ok(self.inner.api.storage().at(*hash)),
BlockNumberOrTagOrHash::BlockTag(_) => {
if let Some(block) = self.latest_block().await {
return Ok(self.inner.api.storage().at(block.hash()));
}
let storage = self.inner.api.storage().at_latest().await?;
Ok(storage)
},
}
}
async fn runtime_api(
&self,
at: &BlockNumberOrTagOrHash,
) -> Result<
subxt::runtime_api::RuntimeApi<SrcChainConfig, OnlineClient<SrcChainConfig>>,
ClientError,
> {
match at {
BlockNumberOrTagOrHash::U256(block_number) => {
let n: SubstrateBlockNumber =
(*block_number).try_into().map_err(|_| ClientError::ConversionFailed)?;
let hash = self.get_block_hash(n).await?.ok_or(ClientError::BlockNotFound)?;
Ok(self.inner.api.runtime_api().at(hash))
},
BlockNumberOrTagOrHash::H256(hash) => Ok(self.inner.api.runtime_api().at(*hash)),
BlockNumberOrTagOrHash::BlockTag(_) => {
if let Some(block) = self.latest_block().await {
return Ok(self.inner.api.runtime_api().at(block.hash()));
}
let api = self.inner.api.runtime_api().at_latest().await?;
Ok(api)
},
}
}
async fn subscribe_blocks(inner: Arc<ClientInner>, tx: Sender<()>) {
log::info!(target: LOG_TARGET, "Subscribing to new blocks");
let mut block_stream = match inner.as_ref().api.blocks().subscribe_best().await {
Ok(s) => s,
Err(err) => {
log::error!(target: LOG_TARGET, "Failed to subscribe to blocks: {err:?}");
return;
},
};
while let Some(block) = block_stream.next().await {
let block = match block {
Ok(block) => block,
Err(err) => {
if err.is_disconnected_will_reconnect() {
log::warn!(
target: LOG_TARGET,
"The RPC connection was lost and we may have missed a few blocks"
);
continue;
}
log::error!(target: LOG_TARGET, "Failed to fetch block: {err:?}");
return;
},
};
log::trace!(target: LOG_TARGET, "Pushing block: {}", block.number());
let mut cache = inner.cache.write().await;
let receipts = inner
.receipt_infos(&block)
.await
.inspect_err(|err| {
log::error!(target: LOG_TARGET, "Failed to get receipts: {err:?}");
})
.unwrap_or_default();
if !receipts.is_empty() {
let values = receipts
.iter()
.map(|(hash, (_, receipt))| (receipt.transaction_index, *hash))
.collect::<HashMap<_, _>>();
cache.tx_hashes_by_block_and_index.insert(block.hash(), values);
cache
.receipts_by_hash
.extend(receipts.iter().map(|(hash, (_, receipt))| (*hash, receipt.clone())));
cache.signed_tx_by_hash.extend(
receipts.iter().map(|(hash, (signed_tx, _))| (*hash, signed_tx.clone())),
)
}
cache.insert(block);
tx.send_replace(());
}
log::info!(target: LOG_TARGET, "Block subscription ended");
}
}
impl Client {
pub async fn latest_block(&self) -> Option<Arc<SubstrateBlock>> {
let cache = self.inner.cache.read().await;
let block = cache.latest_block()?;
Some(block.clone())
}
pub async fn submit(
&self,
call: subxt::tx::DefaultPayload<EthTransact>,
) -> Result<H256, ClientError> {
let ext = self.inner.api.tx().create_unsigned(&call).map_err(ClientError::from)?;
let hash = ext.submit().await?;
Ok(hash)
}
pub async fn receipt(&self, tx_hash: &H256) -> Option<ReceiptInfo> {
let cache = self.inner.cache.read().await;
cache.receipts_by_hash.get(tx_hash).cloned()
}
pub async fn syncing(&self) -> Result<SyncingStatus, ClientError> {
let health = self.inner.rpc.system_health().await?;
let status = if health.is_syncing {
let client = RpcClient::new(self.inner.rpc_client.clone());
let sync_state: sc_rpc::system::SyncState<SubstrateBlockNumber> =
client.request("system_syncState", Default::default()).await?;
SyncingProgress {
current_block: Some(sync_state.current_block.into()),
highest_block: Some(sync_state.highest_block.into()),
starting_block: Some(sync_state.starting_block.into()),
}
.into()
} else {
SyncingStatus::Bool(false)
};
Ok(status)
}
pub async fn receipt_by_hash_and_index(
&self,
block_hash: &H256,
transaction_index: &U256,
) -> Option<ReceiptInfo> {
let cache = self.inner.cache.read().await;
let receipt_hash =
cache.tx_hashes_by_block_and_index.get(block_hash)?.get(transaction_index)?;
let receipt = cache.receipts_by_hash.get(receipt_hash)?;
Some(receipt.clone())
}
pub async fn signed_tx_by_hash(&self, tx_hash: &H256) -> Option<TransactionSigned> {
let cache = self.inner.cache.read().await;
cache.signed_tx_by_hash.get(tx_hash).cloned()
}
pub async fn receipts_count_per_block(&self, block_hash: &SubstrateBlockHash) -> Option<usize> {
let cache = self.inner.cache.read().await;
cache.tx_hashes_by_block_and_index.get(block_hash).map(|v| v.len())
}
pub async fn system_health(&self) -> Result<SystemHealth, ClientError> {
let health = self.inner.rpc.system_health().await?;
Ok(health)
}
pub async fn balance(
&self,
address: H160,
at: &BlockNumberOrTagOrHash,
) -> Result<U256, ClientError> {
let address = address.0.into();
let runtime_api = self.runtime_api(at).await?;
let payload = subxt_client::apis().revive_api().balance(address);
let balance = runtime_api.call(payload).await?;
Ok(*balance)
}
pub async fn get_contract_storage(
&self,
contract_address: H160,
key: U256,
block: BlockNumberOrTagOrHash,
) -> Result<Vec<u8>, ClientError> {
let runtime_api = self.runtime_api(&block).await?;
let contract_address = contract_address.0.into();
let payload = subxt_client::apis()
.revive_api()
.get_storage(contract_address, key.to_big_endian());
let result = runtime_api.call(payload).await?.unwrap_or_default().unwrap_or_default();
Ok(result)
}
pub async fn get_contract_code(
&self,
contract_address: &H160,
block: BlockNumberOrTagOrHash,
) -> Result<Vec<u8>, ClientError> {
let storage_api = self.storage_api(&block).await?;
let contract_address: subxt::utils::H160 = contract_address.0.into();
let query = subxt_client::storage().revive().contract_info_of(contract_address);
let Some(ContractInfo { code_hash, .. }) = storage_api.fetch(&query).await? else {
return Ok(Vec::new());
};
let query = subxt_client::storage().revive().pristine_code(code_hash);
let result = storage_api.fetch(&query).await?.map(|v| v.0).unwrap_or_default();
Ok(result)
}
pub async fn dry_run(
&self,
tx: GenericTransaction,
block: BlockNumberOrTagOrHash,
) -> Result<EthTransactInfo<Balance>, ClientError> {
let runtime_api = self.runtime_api(&block).await?;
let payload = subxt_client::apis().revive_api().eth_transact(tx.into());
let result = runtime_api.call(payload).await?;
match result {
Err(err) => {
log::debug!(target: LOG_TARGET, "Dry run failed {err:?}");
Err(ClientError::Reverted(err.0))
},
Ok(result) => Ok(result.0),
}
}
pub async fn nonce(
&self,
address: H160,
at: BlockNumberOrTagOrHash,
) -> Result<U256, ClientError> {
let address = address.0.into();
let runtime_api = self.runtime_api(&at).await?;
let payload = subxt_client::apis().revive_api().nonce(address);
let nonce = runtime_api.call(payload).await?;
Ok(nonce.into())
}
pub async fn block_number(&self) -> Result<SubstrateBlockNumber, ClientError> {
let cache = self.inner.cache.read().await;
let latest_block = cache.buffer.back().ok_or(ClientError::CacheEmpty)?;
Ok(latest_block.number())
}
pub async fn get_block_hash(
&self,
block_number: SubstrateBlockNumber,
) -> Result<Option<SubstrateBlockHash>, ClientError> {
let cache = self.inner.cache.read().await;
if let Some(block) = cache.blocks_by_number.get(&block_number) {
return Ok(Some(block.hash()));
}
let hash = self.inner.rpc.chain_get_block_hash(Some(block_number.into())).await?;
Ok(hash)
}
pub async fn block_by_number_or_tag(
&self,
block: &BlockNumberOrTag,
) -> Result<Option<Arc<SubstrateBlock>>, ClientError> {
match block {
BlockNumberOrTag::U256(n) => {
let n = (*n).try_into().map_err(|_| ClientError::ConversionFailed)?;
self.block_by_number(n).await
},
BlockNumberOrTag::BlockTag(_) => {
let cache = self.inner.cache.read().await;
Ok(cache.buffer.back().cloned())
},
}
}
pub async fn block_by_hash(
&self,
hash: &SubstrateBlockHash,
) -> Result<Option<Arc<SubstrateBlock>>, ClientError> {
let cache = self.inner.cache.read().await;
if let Some(block) = cache.blocks_by_hash.get(hash) {
return Ok(Some(block.clone()));
}
match self.inner.api.blocks().at(*hash).await {
Ok(block) => Ok(Some(Arc::new(block))),
Err(subxt::Error::Block(subxt::error::BlockError::NotFound(_))) => Ok(None),
Err(err) => Err(err.into()),
}
}
pub async fn block_by_number(
&self,
block_number: SubstrateBlockNumber,
) -> Result<Option<Arc<SubstrateBlock>>, ClientError> {
let cache = self.inner.cache.read().await;
if let Some(block) = cache.blocks_by_number.get(&block_number) {
return Ok(Some(block.clone()));
}
let Some(hash) = self.get_block_hash(block_number).await? else {
return Ok(None);
};
self.block_by_hash(&hash).await
}
pub async fn evm_block(&self, block: Arc<SubstrateBlock>) -> Result<Block, ClientError> {
let runtime_api = self.inner.api.runtime_api().at(block.hash());
let max_fee = Self::weight_to_fee(&runtime_api, self.max_block_weight()).await?;
let gas_limit = U256::from(max_fee / GAS_PRICE as u128);
let header = block.header();
let timestamp = extract_block_timestamp(&block).await.unwrap_or_default();
let parent_hash = header.parent_hash.0.into();
let state_root = header.state_root.0.into();
let extrinsics_root = header.extrinsics_root.0.into();
Ok(Block {
hash: block.hash(),
parent_hash,
state_root,
transactions_root: extrinsics_root,
number: header.number.into(),
timestamp: timestamp.into(),
difficulty: Some(0u32.into()),
gas_limit,
logs_bloom: Bytes256([0u8; 256]),
receipts_root: extrinsics_root,
..Default::default()
})
}
async fn weight_to_fee(
runtime_api: &subxt::runtime_api::RuntimeApi<SrcChainConfig, OnlineClient<SrcChainConfig>>,
weight: Weight,
) -> Result<Balance, ClientError> {
let payload = subxt_client::apis()
.transaction_payment_api()
.query_weight_to_fee(weight.into());
let fee = runtime_api.call(payload).await?;
Ok(fee)
}
pub fn chain_id(&self) -> u64 {
self.inner.chain_id
}
pub fn max_block_weight(&self) -> Weight {
self.inner.max_block_weight
}
}