use crate::{
build_executor, full_extensions, parse, rpc_err_handler, state_machine_call_with_proof,
LiveState, SharedParams, State, LOG_TARGET,
};
use parity_scale_codec::{Decode, Encode};
use sc_executor::sp_wasm_interface::HostFunctions;
use serde::{de::DeserializeOwned, Serialize};
use sp_core::H256;
use sp_runtime::{
generic::SignedBlock,
traits::{Block as BlockT, Header as HeaderT, NumberFor},
};
use std::{fmt::Debug, str::FromStr};
use substrate_rpc_client::{ws_client, ChainApi, FinalizedHeaders, Subscription, WsClient};
const SUB: &str = "chain_subscribeFinalizedHeads";
const UN_SUB: &str = "chain_unsubscribeFinalizedHeads";
#[derive(Debug, Clone, clap::Parser)]
pub struct FollowChainCmd {
#[arg(short, long, value_parser = parse::url)]
pub uri: String,
#[arg(long)]
pub state_root_check: bool,
#[arg(long, default_value = "all")]
pub try_state: frame_try_runtime::TryStateSelect,
#[arg(long)]
pub keep_connection: bool,
}
async fn start_subscribing<Header: DeserializeOwned + Serialize + Send + Sync + 'static>(
url: &str,
) -> sc_cli::Result<(WsClient, Subscription<Header>)> {
let client = ws_client(url).await.map_err(|e| sc_cli::Error::Application(e.into()))?;
log::info!(target: LOG_TARGET, "subscribing to {:?} / {:?}", SUB, UN_SUB);
let sub = ChainApi::<(), (), Header, ()>::subscribe_finalized_heads(&client)
.await
.map_err(|e| sc_cli::Error::Application(e.into()))?;
Ok((client, sub))
}
pub(crate) async fn follow_chain<Block, HostFns>(
shared: SharedParams,
command: FollowChainCmd,
) -> sc_cli::Result<()>
where
Block: BlockT<Hash = H256> + DeserializeOwned,
Block::Header: DeserializeOwned,
<Block::Hash as FromStr>::Err: Debug,
NumberFor<Block>: FromStr,
<NumberFor<Block> as FromStr>::Err: Debug,
HostFns: HostFunctions,
{
let (rpc, subscription) = start_subscribing::<Block::Header>(&command.uri).await?;
let mut finalized_headers: FinalizedHeaders<Block, _, _> =
FinalizedHeaders::new(&rpc, subscription);
let mut maybe_state_ext = None;
let executor = build_executor::<HostFns>(&shared);
while let Some(header) = finalized_headers.next().await {
let hash = header.hash();
let number = header.number();
let block =
ChainApi::<(), Block::Hash, Block::Header, SignedBlock<Block>>::block(&rpc, Some(hash))
.await
.or_else(|e| {
if matches!(e, substrate_rpc_client::Error::ParseError(_)) {
log::error!(
target: LOG_TARGET,
"failed to parse the block format of remote against the local \
codebase. The block format has changed, and follow-chain cannot run in \
this case. Try running this command in a branch of your codebase that
has the same block format as the remote chain. For now, we replace the \
block with an empty one."
);
}
Err(rpc_err_handler(e))
})?
.expect("if header exists, block should also exist.")
.block;
log::debug!(
target: LOG_TARGET,
"new block event: {:?} => {:?}, extrinsics: {}",
hash,
number,
block.extrinsics().len()
);
if maybe_state_ext.is_none() {
let state = State::Live(LiveState {
uri: command.uri.clone(),
at: Some(hex::encode(header.parent_hash().encode())),
pallet: vec![],
child_tree: true,
});
let ext = state.into_ext::<Block, HostFns>(&shared, &executor, None, true).await?;
maybe_state_ext = Some(ext);
}
let state_ext =
maybe_state_ext.as_mut().expect("state_ext either existed or was just created");
let result = state_machine_call_with_proof::<Block, HostFns>(
state_ext,
&executor,
"TryRuntime_execute_block",
(block, command.state_root_check, true, command.try_state.clone())
.encode()
.as_ref(),
full_extensions(executor.clone()),
shared
.export_proof
.as_ref()
.map(|path| path.as_path().join(&format!("{}.json", number))),
);
if let Err(why) = result {
log::error!(
target: LOG_TARGET,
"failed to execute block {:?} due to {:?}",
number,
why
);
continue
}
let (mut changes, encoded_result) = result.expect("checked to be Ok; qed");
let consumed_weight = <sp_weights::Weight as Decode>::decode(&mut &*encoded_result)
.map_err(|e| format!("failed to decode weight: {:?}", e))?;
let storage_changes = changes
.drain_storage_changes(
&state_ext.backend,
state_ext.state_version,
)
.unwrap();
state_ext.backend.apply_transaction(
storage_changes.transaction_storage_root,
storage_changes.transaction,
);
log::info!(
target: LOG_TARGET,
"executed block {}, consumed weight {}, new storage root {:?}",
number,
consumed_weight,
state_ext.as_backend().root(),
);
}
log::error!(target: LOG_TARGET, "ws subscription must have terminated.");
Ok(())
}