use super::{
block_rules::{BlockRules, LookupResult as BlockLookupResult},
CodeProvider,
};
use crate::client::notification_pinning::NotificationPinningWorker;
use log::{debug, info, trace, warn};
use parking_lot::{Mutex, RwLock};
use prometheus_endpoint::Registry;
use rand::Rng;
use sc_chain_spec::{resolve_state_version_from_wasm, BuildGenesisBlock};
use sc_client_api::{
backend::{
self, apply_aux, BlockImportOperation, ClientImportOperation, FinalizeSummary, Finalizer,
ImportNotificationAction, ImportSummary, LockImportRun, NewBlockState, StorageProvider,
},
client::{
BadBlocks, BlockBackend, BlockImportNotification, BlockOf, BlockchainEvents, ClientInfo,
FinalityNotification, FinalityNotifications, ForkBlocks, ImportNotifications,
PreCommitActions, ProvideUncles,
},
execution_extensions::ExecutionExtensions,
notifications::{StorageEventStream, StorageNotifications},
CallExecutor, ExecutorProvider, KeysIter, OnFinalityAction, OnImportAction, PairsIter,
ProofProvider, UnpinWorkerMessage, UsageProvider,
};
use sc_consensus::{
BlockCheckParams, BlockImportParams, ForkChoiceStrategy, ImportResult, StateAction,
};
use sc_executor::RuntimeVersion;
use sc_telemetry::{telemetry, TelemetryHandle, SUBSTRATE_INFO};
use sp_api::{
ApiExt, ApiRef, CallApiAt, CallApiAtParams, ConstructRuntimeApi, Core as CoreApi,
ProvideRuntimeApi,
};
use sp_blockchain::{
self as blockchain, Backend as ChainBackend, CachedHeaderMetadata, Error,
HeaderBackend as ChainHeaderBackend, HeaderMetadata, Info as BlockchainInfo,
};
use sp_consensus::{BlockOrigin, BlockStatus, Error as ConsensusError};
use sc_utils::mpsc::{tracing_unbounded, TracingUnboundedSender};
use sp_core::{
storage::{ChildInfo, ChildType, PrefixedStorageKey, StorageChild, StorageData, StorageKey},
traits::{CallContext, SpawnNamed},
};
use sp_runtime::{
generic::{BlockId, SignedBlock},
traits::{
Block as BlockT, BlockIdTo, HashingFor, Header as HeaderT, NumberFor, One,
SaturatedConversion, Zero,
},
Justification, Justifications, StateVersion,
};
use sp_state_machine::{
prove_child_read, prove_range_read_with_child_with_size, prove_read,
read_range_proof_check_with_child_on_proving_backend, Backend as StateBackend,
ChildStorageCollection, KeyValueStates, KeyValueStorageLevel, StorageCollection,
MAX_NESTED_TRIE_DEPTH,
};
use sp_trie::{proof_size_extension::ProofSizeExt, CompactProof, MerkleValue, StorageProof};
use std::{
collections::{HashMap, HashSet},
marker::PhantomData,
path::PathBuf,
sync::Arc,
};
use super::call_executor::LocalCallExecutor;
use sp_core::traits::CodeExecutor;
type NotificationSinks<T> = Mutex<Vec<TracingUnboundedSender<T>>>;
pub struct Client<B, E, Block, RA>
where
Block: BlockT,
{
backend: Arc<B>,
executor: E,
storage_notifications: StorageNotifications<Block>,
import_notification_sinks: NotificationSinks<BlockImportNotification<Block>>,
every_import_notification_sinks: NotificationSinks<BlockImportNotification<Block>>,
finality_notification_sinks: NotificationSinks<FinalityNotification<Block>>,
import_actions: Mutex<Vec<OnImportAction<Block>>>,
finality_actions: Mutex<Vec<OnFinalityAction<Block>>>,
importing_block: RwLock<Option<Block::Hash>>,
block_rules: BlockRules<Block>,
config: ClientConfig<Block>,
telemetry: Option<TelemetryHandle>,
unpin_worker_sender: TracingUnboundedSender<UnpinWorkerMessage<Block>>,
code_provider: CodeProvider<Block, B, E>,
_phantom: PhantomData<RA>,
}
enum PrePostHeader<H> {
Same(H),
Different(H, H),
}
impl<H> PrePostHeader<H> {
fn post(&self) -> &H {
match *self {
PrePostHeader::Same(ref h) => h,
PrePostHeader::Different(_, ref h) => h,
}
}
fn into_post(self) -> H {
match self {
PrePostHeader::Same(h) => h,
PrePostHeader::Different(_, h) => h,
}
}
}
enum PrepareStorageChangesResult<Block: BlockT> {
Discard(ImportResult),
Import(Option<sc_consensus::StorageChanges<Block>>),
}
#[derive(Debug, Clone)]
pub struct ClientConfig<Block: BlockT> {
pub offchain_worker_enabled: bool,
pub offchain_indexing_api: bool,
pub wasm_runtime_overrides: Option<PathBuf>,
pub no_genesis: bool,
pub wasm_runtime_substitutes: HashMap<NumberFor<Block>, Vec<u8>>,
pub enable_import_proof_recording: bool,
}
impl<Block: BlockT> Default for ClientConfig<Block> {
fn default() -> Self {
Self {
offchain_worker_enabled: false,
offchain_indexing_api: false,
wasm_runtime_overrides: None,
no_genesis: false,
wasm_runtime_substitutes: HashMap::new(),
enable_import_proof_recording: false,
}
}
}
pub fn new_with_backend<B, E, Block, G, RA>(
backend: Arc<B>,
executor: E,
genesis_block_builder: G,
spawn_handle: Box<dyn SpawnNamed>,
prometheus_registry: Option<Registry>,
telemetry: Option<TelemetryHandle>,
config: ClientConfig<Block>,
) -> sp_blockchain::Result<Client<B, LocalCallExecutor<Block, B, E>, Block, RA>>
where
E: CodeExecutor + sc_executor::RuntimeVersionOf,
G: BuildGenesisBlock<
Block,
BlockImportOperation = <B as backend::Backend<Block>>::BlockImportOperation,
>,
Block: BlockT,
B: backend::LocalBackend<Block> + 'static,
{
let extensions = ExecutionExtensions::new(None, Arc::new(executor.clone()));
let call_executor =
LocalCallExecutor::new(backend.clone(), executor, config.clone(), extensions)?;
Client::new(
backend,
call_executor,
spawn_handle,
genesis_block_builder,
Default::default(),
Default::default(),
prometheus_registry,
telemetry,
config,
)
}
impl<B, E, Block, RA> BlockOf for Client<B, E, Block, RA>
where
B: backend::Backend<Block>,
E: CallExecutor<Block>,
Block: BlockT,
{
type Type = Block;
}
impl<B, E, Block, RA> LockImportRun<Block, B> for Client<B, E, Block, RA>
where
B: backend::Backend<Block>,
E: CallExecutor<Block>,
Block: BlockT,
{
fn lock_import_and_run<R, Err, F>(&self, f: F) -> Result<R, Err>
where
F: FnOnce(&mut ClientImportOperation<Block, B>) -> Result<R, Err>,
Err: From<sp_blockchain::Error>,
{
let inner = || {
let _import_lock = self.backend.get_import_lock().write();
let mut op = ClientImportOperation {
op: self.backend.begin_operation()?,
notify_imported: None,
notify_finalized: None,
};
let r = f(&mut op)?;
let ClientImportOperation { mut op, notify_imported, notify_finalized } = op;
let finality_notification = notify_finalized.map(|summary| {
FinalityNotification::from_summary(summary, self.unpin_worker_sender.clone())
});
let (import_notification, storage_changes, import_notification_action) =
match notify_imported {
Some(mut summary) => {
let import_notification_action = summary.import_notification_action;
let storage_changes = summary.storage_changes.take();
(
Some(BlockImportNotification::from_summary(
summary,
self.unpin_worker_sender.clone(),
)),
storage_changes,
import_notification_action,
)
},
None => (None, None, ImportNotificationAction::None),
};
if let Some(ref notification) = finality_notification {
for action in self.finality_actions.lock().iter_mut() {
op.insert_aux(action(notification))?;
}
}
if let Some(ref notification) = import_notification {
for action in self.import_actions.lock().iter_mut() {
op.insert_aux(action(notification))?;
}
}
self.backend.commit_operation(op)?;
if let Some(ref notification) = finality_notification {
if let Err(err) = self.backend.pin_block(notification.hash) {
debug!(
"Unable to pin block for finality notification. hash: {}, Error: {}",
notification.hash, err
);
} else {
let _ = self
.unpin_worker_sender
.unbounded_send(UnpinWorkerMessage::AnnouncePin(notification.hash))
.map_err(|e| {
log::error!(
"Unable to send AnnouncePin worker message for finality: {e}"
)
});
}
}
if let Some(ref notification) = import_notification {
if let Err(err) = self.backend.pin_block(notification.hash) {
debug!(
"Unable to pin block for import notification. hash: {}, Error: {}",
notification.hash, err
);
} else {
let _ = self
.unpin_worker_sender
.unbounded_send(UnpinWorkerMessage::AnnouncePin(notification.hash))
.map_err(|e| {
log::error!("Unable to send AnnouncePin worker message for import: {e}")
});
};
}
self.notify_finalized(finality_notification)?;
self.notify_imported(import_notification, import_notification_action, storage_changes)?;
Ok(r)
};
let result = inner();
*self.importing_block.write() = None;
result
}
}
impl<B, E, Block, RA> LockImportRun<Block, B> for &Client<B, E, Block, RA>
where
Block: BlockT,
B: backend::Backend<Block>,
E: CallExecutor<Block>,
{
fn lock_import_and_run<R, Err, F>(&self, f: F) -> Result<R, Err>
where
F: FnOnce(&mut ClientImportOperation<Block, B>) -> Result<R, Err>,
Err: From<sp_blockchain::Error>,
{
(**self).lock_import_and_run(f)
}
}
impl<B, E, Block, RA> Client<B, E, Block, RA>
where
B: backend::Backend<Block>,
E: CallExecutor<Block>,
Block: BlockT,
Block::Header: Clone,
{
pub fn new<G>(
backend: Arc<B>,
executor: E,
spawn_handle: Box<dyn SpawnNamed>,
genesis_block_builder: G,
fork_blocks: ForkBlocks<Block>,
bad_blocks: BadBlocks<Block>,
prometheus_registry: Option<Registry>,
telemetry: Option<TelemetryHandle>,
config: ClientConfig<Block>,
) -> sp_blockchain::Result<Self>
where
G: BuildGenesisBlock<
Block,
BlockImportOperation = <B as backend::Backend<Block>>::BlockImportOperation,
>,
E: Clone,
B: 'static,
{
let info = backend.blockchain().info();
if info.finalized_state.is_none() {
let (genesis_block, mut op) = genesis_block_builder.build_genesis_block()?;
info!(
"๐จ Initializing Genesis block/state (state: {}, header-hash: {})",
genesis_block.header().state_root(),
genesis_block.header().hash()
);
let block_state = if info.best_hash == Default::default() {
NewBlockState::Final
} else {
NewBlockState::Normal
};
let (header, body) = genesis_block.deconstruct();
op.set_block_data(header, Some(body), None, None, block_state)?;
backend.commit_operation(op)?;
}
let (unpin_worker_sender, rx) = tracing_unbounded::<UnpinWorkerMessage<Block>>(
"notification-pinning-worker-channel",
10_000,
);
let unpin_worker = NotificationPinningWorker::new(rx, backend.clone());
spawn_handle.spawn("notification-pinning-worker", None, Box::pin(unpin_worker.run()));
let code_provider = CodeProvider::new(&config, executor.clone(), backend.clone())?;
Ok(Client {
backend,
executor,
storage_notifications: StorageNotifications::new(prometheus_registry),
import_notification_sinks: Default::default(),
every_import_notification_sinks: Default::default(),
finality_notification_sinks: Default::default(),
import_actions: Default::default(),
finality_actions: Default::default(),
importing_block: Default::default(),
block_rules: BlockRules::new(fork_blocks, bad_blocks),
config,
telemetry,
unpin_worker_sender,
code_provider,
_phantom: Default::default(),
})
}
pub fn import_notification_sinks(&self) -> &NotificationSinks<BlockImportNotification<Block>> {
&self.import_notification_sinks
}
pub fn finality_notification_sinks(&self) -> &NotificationSinks<FinalityNotification<Block>> {
&self.finality_notification_sinks
}
pub fn state_at(&self, hash: Block::Hash) -> sp_blockchain::Result<B::State> {
self.backend.state_at(hash)
}
pub fn code_at(&self, hash: Block::Hash) -> sp_blockchain::Result<Vec<u8>> {
self.code_provider.code_at_ignoring_overrides(hash)
}
pub fn runtime_version_at(&self, hash: Block::Hash) -> sp_blockchain::Result<RuntimeVersion> {
CallExecutor::runtime_version(&self.executor, hash)
}
fn apply_block(
&self,
operation: &mut ClientImportOperation<Block, B>,
import_block: BlockImportParams<Block>,
storage_changes: Option<sc_consensus::StorageChanges<Block>>,
) -> sp_blockchain::Result<ImportResult>
where
Self: ProvideRuntimeApi<Block>,
<Self as ProvideRuntimeApi<Block>>::Api: CoreApi<Block> + ApiExt<Block>,
{
let BlockImportParams {
origin,
header,
justifications,
post_digests,
body,
indexed_body,
finalized,
auxiliary,
fork_choice,
intermediates,
import_existing,
create_gap,
..
} = import_block;
if !intermediates.is_empty() {
return Err(Error::IncompletePipeline)
}
let fork_choice = fork_choice.ok_or(Error::IncompletePipeline)?;
let import_headers = if post_digests.is_empty() {
PrePostHeader::Same(header)
} else {
let mut post_header = header.clone();
for item in post_digests {
post_header.digest_mut().push(item);
}
PrePostHeader::Different(header, post_header)
};
let hash = import_headers.post().hash();
let height = (*import_headers.post().number()).saturated_into::<u64>();
*self.importing_block.write() = Some(hash);
operation.op.set_create_gap(create_gap);
let result = self.execute_and_import_block(
operation,
origin,
hash,
import_headers,
justifications,
body,
indexed_body,
storage_changes,
finalized,
auxiliary,
fork_choice,
import_existing,
);
if let Ok(ImportResult::Imported(ref aux)) = result {
if aux.is_new_best {
if origin != BlockOrigin::NetworkInitialSync || rand::thread_rng().gen_bool(0.1) {
telemetry!(
self.telemetry;
SUBSTRATE_INFO;
"block.import";
"height" => height,
"best" => ?hash,
"origin" => ?origin
);
}
}
}
result
}
fn execute_and_import_block(
&self,
operation: &mut ClientImportOperation<Block, B>,
origin: BlockOrigin,
hash: Block::Hash,
import_headers: PrePostHeader<Block::Header>,
justifications: Option<Justifications>,
body: Option<Vec<Block::Extrinsic>>,
indexed_body: Option<Vec<Vec<u8>>>,
storage_changes: Option<sc_consensus::StorageChanges<Block>>,
finalized: bool,
aux: Vec<(Vec<u8>, Option<Vec<u8>>)>,
fork_choice: ForkChoiceStrategy,
import_existing: bool,
) -> sp_blockchain::Result<ImportResult>
where
Self: ProvideRuntimeApi<Block>,
<Self as ProvideRuntimeApi<Block>>::Api: CoreApi<Block> + ApiExt<Block>,
{
let parent_hash = *import_headers.post().parent_hash();
let status = self.backend.blockchain().status(hash)?;
let parent_exists =
self.backend.blockchain().status(parent_hash)? == blockchain::BlockStatus::InChain;
match (import_existing, status) {
(false, blockchain::BlockStatus::InChain) => return Ok(ImportResult::AlreadyInChain),
(false, blockchain::BlockStatus::Unknown) => {},
(true, blockchain::BlockStatus::InChain) => {},
(true, blockchain::BlockStatus::Unknown) => {},
}
let info = self.backend.blockchain().info();
let gap_block =
info.block_gap.map_or(false, |gap| *import_headers.post().number() == gap.start);
if status == blockchain::BlockStatus::Unknown &&
*import_headers.post().number() <= info.finalized_number &&
!gap_block
{
return Err(sp_blockchain::Error::NotInFinalizedChain)
}
let make_notifications = match origin {
BlockOrigin::NetworkBroadcast | BlockOrigin::Own | BlockOrigin::ConsensusBroadcast =>
true,
BlockOrigin::Genesis | BlockOrigin::NetworkInitialSync | BlockOrigin::File => false,
};
let storage_changes = match storage_changes {
Some(storage_changes) => {
let storage_changes = match storage_changes {
sc_consensus::StorageChanges::Changes(storage_changes) => {
self.backend.begin_state_operation(&mut operation.op, parent_hash)?;
let (main_sc, child_sc, offchain_sc, tx, _, tx_index) =
storage_changes.into_inner();
if self.config.offchain_indexing_api {
operation.op.update_offchain_storage(offchain_sc)?;
}
operation.op.update_db_storage(tx)?;
operation.op.update_storage(main_sc.clone(), child_sc.clone())?;
operation.op.update_transaction_index(tx_index)?;
Some((main_sc, child_sc))
},
sc_consensus::StorageChanges::Import(changes) => {
let mut storage = sp_storage::Storage::default();
for state in changes.state.0.into_iter() {
if state.parent_storage_keys.is_empty() && state.state_root.is_empty() {
for (key, value) in state.key_values.into_iter() {
storage.top.insert(key, value);
}
} else {
for parent_storage in state.parent_storage_keys {
let storage_key = PrefixedStorageKey::new_ref(&parent_storage);
let storage_key =
match ChildType::from_prefixed_key(storage_key) {
Some((ChildType::ParentKeyId, storage_key)) =>
storage_key,
None =>
return Err(Error::Backend(
"Invalid child storage key.".to_string(),
)),
};
let entry = storage
.children_default
.entry(storage_key.to_vec())
.or_insert_with(|| StorageChild {
data: Default::default(),
child_info: ChildInfo::new_default(storage_key),
});
for (key, value) in state.key_values.iter() {
entry.data.insert(key.clone(), value.clone());
}
}
}
}
let state_version = resolve_state_version_from_wasm::<_, HashingFor<Block>>(
&storage,
&self.executor,
)?;
let state_root = operation.op.reset_storage(storage, state_version)?;
if state_root != *import_headers.post().state_root() {
warn!("Error importing state: State root mismatch.");
return Err(Error::InvalidStateRoot)
}
None
},
};
storage_changes
},
None => None,
};
if finalized && parent_exists && info.finalized_hash != parent_hash {
self.apply_finality_with_block_hash(
operation,
parent_hash,
None,
&info,
make_notifications,
)?;
}
let is_new_best = !gap_block &&
(finalized ||
match fork_choice {
ForkChoiceStrategy::LongestChain =>
import_headers.post().number() > &info.best_number,
ForkChoiceStrategy::Custom(v) => v,
});
let leaf_state = if finalized {
NewBlockState::Final
} else if is_new_best {
NewBlockState::Best
} else {
NewBlockState::Normal
};
let tree_route = if is_new_best && info.best_hash != parent_hash && parent_exists {
let route_from_best =
sp_blockchain::tree_route(self.backend.blockchain(), info.best_hash, parent_hash)?;
Some(route_from_best)
} else {
None
};
trace!(
"Imported {}, (#{}), best={}, origin={:?}",
hash,
import_headers.post().number(),
is_new_best,
origin,
);
operation.op.set_block_data(
import_headers.post().clone(),
body,
indexed_body,
justifications,
leaf_state,
)?;
operation.op.insert_aux(aux)?;
let should_notify_every_block = !self.every_import_notification_sinks.lock().is_empty();
let should_notify_recent_block = make_notifications || tree_route.is_some();
if should_notify_every_block || should_notify_recent_block {
let header = import_headers.into_post();
if finalized && should_notify_recent_block {
let mut summary = match operation.notify_finalized.take() {
Some(mut summary) => {
summary.header = header.clone();
summary.finalized.push(hash);
summary
},
None => FinalizeSummary {
header: header.clone(),
finalized: vec![hash],
stale_heads: Vec::new(),
},
};
if parent_exists {
for head in self
.backend
.blockchain()
.leaves()?
.into_iter()
.filter(|h| *h != parent_hash)
{
let route_from_parent = sp_blockchain::tree_route(
self.backend.blockchain(),
parent_hash,
head,
)?;
if route_from_parent.retracted().is_empty() {
summary.stale_heads.push(head);
}
}
}
operation.notify_finalized = Some(summary);
}
let import_notification_action = if should_notify_every_block {
if should_notify_recent_block {
ImportNotificationAction::Both
} else {
ImportNotificationAction::EveryBlock
}
} else {
ImportNotificationAction::RecentBlock
};
operation.notify_imported = Some(ImportSummary {
hash,
origin,
header,
is_new_best,
storage_changes,
tree_route,
import_notification_action,
})
}
Ok(ImportResult::imported(is_new_best))
}
fn prepare_block_storage_changes(
&self,
import_block: &mut BlockImportParams<Block>,
) -> sp_blockchain::Result<PrepareStorageChangesResult<Block>>
where
Self: ProvideRuntimeApi<Block>,
<Self as ProvideRuntimeApi<Block>>::Api: CoreApi<Block> + ApiExt<Block>,
{
let parent_hash = import_block.header.parent_hash();
let state_action = std::mem::replace(&mut import_block.state_action, StateAction::Skip);
let (enact_state, storage_changes) = match (self.block_status(*parent_hash)?, state_action)
{
(BlockStatus::KnownBad, _) =>
return Ok(PrepareStorageChangesResult::Discard(ImportResult::KnownBad)),
(
BlockStatus::InChainPruned,
StateAction::ApplyChanges(sc_consensus::StorageChanges::Changes(_)),
) => return Ok(PrepareStorageChangesResult::Discard(ImportResult::MissingState)),
(_, StateAction::ApplyChanges(changes)) => (true, Some(changes)),
(BlockStatus::Unknown, _) =>
return Ok(PrepareStorageChangesResult::Discard(ImportResult::UnknownParent)),
(_, StateAction::Skip) => (false, None),
(BlockStatus::InChainPruned, StateAction::Execute) =>
return Ok(PrepareStorageChangesResult::Discard(ImportResult::MissingState)),
(BlockStatus::InChainPruned, StateAction::ExecuteIfPossible) => (false, None),
(_, StateAction::Execute) => (true, None),
(_, StateAction::ExecuteIfPossible) => (true, None),
};
let storage_changes = match (enact_state, storage_changes, &import_block.body) {
(true, changes @ Some(_), _) => changes,
(true, None, Some(ref body)) => {
let mut runtime_api = self.runtime_api();
runtime_api.set_call_context(CallContext::Onchain);
if self.config.enable_import_proof_recording {
runtime_api.record_proof();
let recorder = runtime_api
.proof_recorder()
.expect("Proof recording is enabled in the line above; qed.");
runtime_api.register_extension(ProofSizeExt::new(recorder));
}
runtime_api.execute_block(
*parent_hash,
Block::new(import_block.header.clone(), body.clone()),
)?;
let state = self.backend.state_at(*parent_hash)?;
let gen_storage_changes = runtime_api
.into_storage_changes(&state, *parent_hash)
.map_err(sp_blockchain::Error::Storage)?;
if import_block.header.state_root() != &gen_storage_changes.transaction_storage_root
{
return Err(Error::InvalidStateRoot)
}
Some(sc_consensus::StorageChanges::Changes(gen_storage_changes))
},
(true, None, None) => None,
(false, _, _) => None,
};
Ok(PrepareStorageChangesResult::Import(storage_changes))
}
fn apply_finality_with_block_hash(
&self,
operation: &mut ClientImportOperation<Block, B>,
hash: Block::Hash,
justification: Option<Justification>,
info: &BlockchainInfo<Block>,
notify: bool,
) -> sp_blockchain::Result<()> {
if hash == info.finalized_hash {
warn!(
"Possible safety violation: attempted to re-finalize last finalized block {:?} ",
hash,
);
return Ok(())
}
let route_from_finalized =
sp_blockchain::tree_route(self.backend.blockchain(), info.finalized_hash, hash)?;
if let Some(retracted) = route_from_finalized.retracted().get(0) {
warn!(
"Safety violation: attempted to revert finalized block {:?} which is not in the \
same chain as last finalized {:?}",
retracted, info.finalized_hash
);
return Err(sp_blockchain::Error::NotInFinalizedChain)
}
let block_number = self
.backend
.blockchain()
.number(hash)?
.ok_or(Error::MissingHeader(format!("{hash:?}")))?;
if self.backend.blockchain().leaves()?.len() > 1 || info.best_number < block_number {
let route_from_best =
sp_blockchain::tree_route(self.backend.blockchain(), info.best_hash, hash)?;
if route_from_best.common_block().hash != hash {
operation.op.mark_head(hash)?;
}
}
let enacted = route_from_finalized.enacted();
assert!(enacted.len() > 0);
for finalize_new in &enacted[..enacted.len() - 1] {
operation.op.mark_finalized(finalize_new.hash, None)?;
}
assert_eq!(enacted.last().map(|e| e.hash), Some(hash));
operation.op.mark_finalized(hash, justification)?;
if notify {
let finalized =
route_from_finalized.enacted().iter().map(|elem| elem.hash).collect::<Vec<_>>();
let block_number = route_from_finalized
.last()
.expect(
"The block to finalize is always the latest \
block in the route to the finalized block; qed",
)
.number;
let stale_heads = self
.backend
.blockchain()
.displaced_leaves_after_finalizing(hash, block_number)?
.hashes()
.collect();
let header = self
.backend
.blockchain()
.header(hash)?
.expect("Block to finalize expected to be onchain; qed");
operation.notify_finalized = Some(FinalizeSummary { header, finalized, stale_heads });
}
Ok(())
}
fn notify_finalized(
&self,
notification: Option<FinalityNotification<Block>>,
) -> sp_blockchain::Result<()> {
let mut sinks = self.finality_notification_sinks.lock();
let notification = match notification {
Some(notify_finalized) => notify_finalized,
None => {
sinks.retain(|sink| !sink.is_closed());
return Ok(())
},
};
telemetry!(
self.telemetry;
SUBSTRATE_INFO;
"notify.finalized";
"height" => format!("{}", notification.header.number()),
"best" => ?notification.hash,
);
sinks.retain(|sink| sink.unbounded_send(notification.clone()).is_ok());
Ok(())
}
fn notify_imported(
&self,
notification: Option<BlockImportNotification<Block>>,
import_notification_action: ImportNotificationAction,
storage_changes: Option<(StorageCollection, ChildStorageCollection)>,
) -> sp_blockchain::Result<()> {
let notification = match notification {
Some(notify_import) => notify_import,
None => {
self.import_notification_sinks.lock().retain(|sink| !sink.is_closed());
self.every_import_notification_sinks.lock().retain(|sink| !sink.is_closed());
return Ok(())
},
};
let trigger_storage_changes_notification = || {
if let Some(storage_changes) = storage_changes {
self.storage_notifications.trigger(
¬ification.hash,
storage_changes.0.into_iter(),
storage_changes.1.into_iter().map(|(sk, v)| (sk, v.into_iter())),
);
}
};
match import_notification_action {
ImportNotificationAction::Both => {
trigger_storage_changes_notification();
self.import_notification_sinks
.lock()
.retain(|sink| sink.unbounded_send(notification.clone()).is_ok());
self.every_import_notification_sinks
.lock()
.retain(|sink| sink.unbounded_send(notification.clone()).is_ok());
},
ImportNotificationAction::RecentBlock => {
trigger_storage_changes_notification();
self.import_notification_sinks
.lock()
.retain(|sink| sink.unbounded_send(notification.clone()).is_ok());
self.every_import_notification_sinks.lock().retain(|sink| !sink.is_closed());
},
ImportNotificationAction::EveryBlock => {
self.every_import_notification_sinks
.lock()
.retain(|sink| sink.unbounded_send(notification.clone()).is_ok());
self.import_notification_sinks.lock().retain(|sink| !sink.is_closed());
},
ImportNotificationAction::None => {
self.import_notification_sinks.lock().retain(|sink| !sink.is_closed());
self.every_import_notification_sinks.lock().retain(|sink| !sink.is_closed());
},
}
Ok(())
}
pub fn revert(&self, n: NumberFor<Block>) -> sp_blockchain::Result<NumberFor<Block>> {
let (number, _) = self.backend.revert(n, false)?;
Ok(number)
}
pub fn unsafe_revert(
&mut self,
n: NumberFor<Block>,
blacklist: bool,
) -> sp_blockchain::Result<NumberFor<Block>> {
let (number, reverted) = self.backend.revert(n, true)?;
if blacklist {
for b in reverted {
self.block_rules.mark_bad(b);
}
}
Ok(number)
}
pub fn chain_info(&self) -> BlockchainInfo<Block> {
self.backend.blockchain().info()
}
pub fn block_status(&self, hash: Block::Hash) -> sp_blockchain::Result<BlockStatus> {
if self
.importing_block
.read()
.as_ref()
.map_or(false, |importing| &hash == importing)
{
return Ok(BlockStatus::Queued)
}
let hash_and_number = self.backend.blockchain().number(hash)?.map(|n| (hash, n));
match hash_and_number {
Some((hash, number)) =>
if self.backend.have_state_at(hash, number) {
Ok(BlockStatus::InChainWithState)
} else {
Ok(BlockStatus::InChainPruned)
},
None => Ok(BlockStatus::Unknown),
}
}
pub fn header(
&self,
hash: Block::Hash,
) -> sp_blockchain::Result<Option<<Block as BlockT>::Header>> {
self.backend.blockchain().header(hash)
}
pub fn body(
&self,
hash: Block::Hash,
) -> sp_blockchain::Result<Option<Vec<<Block as BlockT>::Extrinsic>>> {
self.backend.blockchain().body(hash)
}
pub fn uncles(
&self,
target_hash: Block::Hash,
max_generation: NumberFor<Block>,
) -> sp_blockchain::Result<Vec<Block::Hash>> {
let load_header = |hash: Block::Hash| -> sp_blockchain::Result<Block::Header> {
self.backend
.blockchain()
.header(hash)?
.ok_or_else(|| Error::UnknownBlock(format!("{:?}", hash)))
};
let genesis_hash = self.backend.blockchain().info().genesis_hash;
if genesis_hash == target_hash {
return Ok(Vec::new())
}
let mut current_hash = target_hash;
let mut current = load_header(current_hash)?;
let mut ancestor_hash = *current.parent_hash();
let mut ancestor = load_header(ancestor_hash)?;
let mut uncles = Vec::new();
let mut generation: NumberFor<Block> = Zero::zero();
while generation < max_generation {
let children = self.backend.blockchain().children(ancestor_hash)?;
uncles.extend(children.into_iter().filter(|h| h != ¤t_hash));
current_hash = ancestor_hash;
if genesis_hash == current_hash {
break
}
current = ancestor;
ancestor_hash = *current.parent_hash();
ancestor = load_header(ancestor_hash)?;
generation += One::one();
}
trace!("Collected {} uncles", uncles.len());
Ok(uncles)
}
}
impl<B, E, Block, RA> UsageProvider<Block> for Client<B, E, Block, RA>
where
B: backend::Backend<Block>,
E: CallExecutor<Block>,
Block: BlockT,
{
fn usage_info(&self) -> ClientInfo<Block> {
ClientInfo { chain: self.chain_info(), usage: self.backend.usage_info() }
}
}
impl<B, E, Block, RA> ProofProvider<Block> for Client<B, E, Block, RA>
where
B: backend::Backend<Block>,
E: CallExecutor<Block>,
Block: BlockT,
{
fn read_proof(
&self,
hash: Block::Hash,
keys: &mut dyn Iterator<Item = &[u8]>,
) -> sp_blockchain::Result<StorageProof> {
self.state_at(hash)
.and_then(|state| prove_read(state, keys).map_err(Into::into))
}
fn read_child_proof(
&self,
hash: Block::Hash,
child_info: &ChildInfo,
keys: &mut dyn Iterator<Item = &[u8]>,
) -> sp_blockchain::Result<StorageProof> {
self.state_at(hash)
.and_then(|state| prove_child_read(state, child_info, keys).map_err(Into::into))
}
fn execution_proof(
&self,
hash: Block::Hash,
method: &str,
call_data: &[u8],
) -> sp_blockchain::Result<(Vec<u8>, StorageProof)> {
self.executor.prove_execution(hash, method, call_data)
}
fn read_proof_collection(
&self,
hash: Block::Hash,
start_key: &[Vec<u8>],
size_limit: usize,
) -> sp_blockchain::Result<(CompactProof, u32)> {
let state = self.state_at(hash)?;
let root = state.storage_root(std::iter::empty(), StateVersion::V0).0;
let (proof, count) = prove_range_read_with_child_with_size::<_, HashingFor<Block>>(
state, size_limit, start_key,
)?;
let proof = proof
.into_compact_proof::<HashingFor<Block>>(root)
.map_err(|e| sp_blockchain::Error::from_state(Box::new(e)))?;
Ok((proof, count))
}
fn storage_collection(
&self,
hash: Block::Hash,
start_key: &[Vec<u8>],
size_limit: usize,
) -> sp_blockchain::Result<Vec<(KeyValueStorageLevel, bool)>> {
if start_key.len() > MAX_NESTED_TRIE_DEPTH {
return Err(Error::Backend("Invalid start key.".to_string()))
}
let state = self.state_at(hash)?;
let child_info = |storage_key: &Vec<u8>| -> sp_blockchain::Result<ChildInfo> {
let storage_key = PrefixedStorageKey::new_ref(storage_key);
match ChildType::from_prefixed_key(storage_key) {
Some((ChildType::ParentKeyId, storage_key)) =>
Ok(ChildInfo::new_default(storage_key)),
None => Err(Error::Backend("Invalid child storage key.".to_string())),
}
};
let mut current_child = if start_key.len() == 2 {
let start_key = start_key.get(0).expect("checked len");
if let Some(child_root) = state
.storage(start_key)
.map_err(|e| sp_blockchain::Error::from_state(Box::new(e)))?
{
Some((child_info(start_key)?, child_root))
} else {
return Err(Error::Backend("Invalid root start key.".to_string()))
}
} else {
None
};
let mut current_key = start_key.last().map(Clone::clone).unwrap_or_default();
let mut total_size = 0;
let mut result = vec![(
KeyValueStorageLevel {
state_root: Vec::new(),
key_values: Vec::new(),
parent_storage_keys: Vec::new(),
},
false,
)];
let mut child_roots = HashSet::new();
loop {
let mut entries = Vec::new();
let mut complete = true;
let mut switch_child_key = None;
while let Some(next_key) = if let Some(child) = current_child.as_ref() {
state
.next_child_storage_key(&child.0, ¤t_key)
.map_err(|e| sp_blockchain::Error::from_state(Box::new(e)))?
} else {
state
.next_storage_key(¤t_key)
.map_err(|e| sp_blockchain::Error::from_state(Box::new(e)))?
} {
let value = if let Some(child) = current_child.as_ref() {
state
.child_storage(&child.0, next_key.as_ref())
.map_err(|e| sp_blockchain::Error::from_state(Box::new(e)))?
.unwrap_or_default()
} else {
state
.storage(next_key.as_ref())
.map_err(|e| sp_blockchain::Error::from_state(Box::new(e)))?
.unwrap_or_default()
};
let size = value.len() + next_key.len();
if total_size + size > size_limit && !entries.is_empty() {
complete = false;
break
}
total_size += size;
if current_child.is_none() &&
sp_core::storage::well_known_keys::is_child_storage_key(next_key.as_slice()) &&
!child_roots.contains(value.as_slice())
{
child_roots.insert(value.clone());
switch_child_key = Some((next_key.clone(), value.clone()));
entries.push((next_key.clone(), value));
break
}
entries.push((next_key.clone(), value));
current_key = next_key;
}
if let Some((child, child_root)) = switch_child_key.take() {
result[0].0.key_values.extend(entries.into_iter());
current_child = Some((child_info(&child)?, child_root));
current_key = Vec::new();
} else if let Some((child, child_root)) = current_child.take() {
current_key = child.into_prefixed_storage_key().into_inner();
result.push((
KeyValueStorageLevel {
state_root: child_root,
key_values: entries,
parent_storage_keys: Vec::new(),
},
complete,
));
if !complete {
break
}
} else {
result[0].0.key_values.extend(entries.into_iter());
result[0].1 = complete;
break
}
}
Ok(result)
}
fn verify_range_proof(
&self,
root: Block::Hash,
proof: CompactProof,
start_key: &[Vec<u8>],
) -> sp_blockchain::Result<(KeyValueStates, usize)> {
let mut db = sp_state_machine::MemoryDB::<HashingFor<Block>>::new(&[]);
let _ = sp_trie::decode_compact::<sp_state_machine::LayoutV0<HashingFor<Block>>, _, _>(
&mut db,
proof.iter_compact_encoded_nodes(),
Some(&root),
)
.map_err(|e| sp_blockchain::Error::from_state(Box::new(e)))?;
let proving_backend = sp_state_machine::TrieBackendBuilder::new(db, root).build();
let state = read_range_proof_check_with_child_on_proving_backend::<HashingFor<Block>>(
&proving_backend,
start_key,
)?;
Ok(state)
}
}
impl<B, E, Block, RA> ExecutorProvider<Block> for Client<B, E, Block, RA>
where
B: backend::Backend<Block>,
E: CallExecutor<Block>,
Block: BlockT,
{
type Executor = E;
fn executor(&self) -> &Self::Executor {
&self.executor
}
fn execution_extensions(&self) -> &ExecutionExtensions<Block> {
self.executor.execution_extensions()
}
}
impl<B, E, Block, RA> StorageProvider<Block, B> for Client<B, E, Block, RA>
where
B: backend::Backend<Block>,
E: CallExecutor<Block>,
Block: BlockT,
{
fn storage_keys(
&self,
hash: <Block as BlockT>::Hash,
prefix: Option<&StorageKey>,
start_key: Option<&StorageKey>,
) -> sp_blockchain::Result<KeysIter<B::State, Block>> {
let state = self.state_at(hash)?;
KeysIter::new(state, prefix, start_key)
.map_err(|e| sp_blockchain::Error::from_state(Box::new(e)))
}
fn child_storage_keys(
&self,
hash: <Block as BlockT>::Hash,
child_info: ChildInfo,
prefix: Option<&StorageKey>,
start_key: Option<&StorageKey>,
) -> sp_blockchain::Result<KeysIter<B::State, Block>> {
let state = self.state_at(hash)?;
KeysIter::new_child(state, child_info, prefix, start_key)
.map_err(|e| sp_blockchain::Error::from_state(Box::new(e)))
}
fn storage_pairs(
&self,
hash: <Block as BlockT>::Hash,
prefix: Option<&StorageKey>,
start_key: Option<&StorageKey>,
) -> sp_blockchain::Result<PairsIter<B::State, Block>> {
let state = self.state_at(hash)?;
PairsIter::new(state, prefix, start_key)
.map_err(|e| sp_blockchain::Error::from_state(Box::new(e)))
}
fn storage(
&self,
hash: Block::Hash,
key: &StorageKey,
) -> sp_blockchain::Result<Option<StorageData>> {
Ok(self
.state_at(hash)?
.storage(&key.0)
.map_err(|e| sp_blockchain::Error::from_state(Box::new(e)))?
.map(StorageData))
}
fn storage_hash(
&self,
hash: <Block as BlockT>::Hash,
key: &StorageKey,
) -> sp_blockchain::Result<Option<Block::Hash>> {
self.state_at(hash)?
.storage_hash(&key.0)
.map_err(|e| sp_blockchain::Error::from_state(Box::new(e)))
}
fn child_storage(
&self,
hash: <Block as BlockT>::Hash,
child_info: &ChildInfo,
key: &StorageKey,
) -> sp_blockchain::Result<Option<StorageData>> {
Ok(self
.state_at(hash)?
.child_storage(child_info, &key.0)
.map_err(|e| sp_blockchain::Error::from_state(Box::new(e)))?
.map(StorageData))
}
fn child_storage_hash(
&self,
hash: <Block as BlockT>::Hash,
child_info: &ChildInfo,
key: &StorageKey,
) -> sp_blockchain::Result<Option<Block::Hash>> {
self.state_at(hash)?
.child_storage_hash(child_info, &key.0)
.map_err(|e| sp_blockchain::Error::from_state(Box::new(e)))
}
fn closest_merkle_value(
&self,
hash: <Block as BlockT>::Hash,
key: &StorageKey,
) -> blockchain::Result<Option<MerkleValue<<Block as BlockT>::Hash>>> {
self.state_at(hash)?
.closest_merkle_value(&key.0)
.map_err(|e| sp_blockchain::Error::from_state(Box::new(e)))
}
fn child_closest_merkle_value(
&self,
hash: <Block as BlockT>::Hash,
child_info: &ChildInfo,
key: &StorageKey,
) -> blockchain::Result<Option<MerkleValue<<Block as BlockT>::Hash>>> {
self.state_at(hash)?
.child_closest_merkle_value(child_info, &key.0)
.map_err(|e| sp_blockchain::Error::from_state(Box::new(e)))
}
}
impl<B, E, Block, RA> HeaderMetadata<Block> for Client<B, E, Block, RA>
where
B: backend::Backend<Block>,
E: CallExecutor<Block>,
Block: BlockT,
{
type Error = sp_blockchain::Error;
fn header_metadata(
&self,
hash: Block::Hash,
) -> Result<CachedHeaderMetadata<Block>, Self::Error> {
self.backend.blockchain().header_metadata(hash)
}
fn insert_header_metadata(&self, hash: Block::Hash, metadata: CachedHeaderMetadata<Block>) {
self.backend.blockchain().insert_header_metadata(hash, metadata)
}
fn remove_header_metadata(&self, hash: Block::Hash) {
self.backend.blockchain().remove_header_metadata(hash)
}
}
impl<B, E, Block, RA> ProvideUncles<Block> for Client<B, E, Block, RA>
where
B: backend::Backend<Block>,
E: CallExecutor<Block>,
Block: BlockT,
{
fn uncles(
&self,
target_hash: Block::Hash,
max_generation: NumberFor<Block>,
) -> sp_blockchain::Result<Vec<Block::Header>> {
Ok(Client::uncles(self, target_hash, max_generation)?
.into_iter()
.filter_map(|hash| Client::header(self, hash).unwrap_or(None))
.collect())
}
}
impl<B, E, Block, RA> ChainHeaderBackend<Block> for Client<B, E, Block, RA>
where
B: backend::Backend<Block>,
E: CallExecutor<Block> + Send + Sync,
Block: BlockT,
RA: Send + Sync,
{
fn header(&self, hash: Block::Hash) -> sp_blockchain::Result<Option<Block::Header>> {
self.backend.blockchain().header(hash)
}
fn info(&self) -> blockchain::Info<Block> {
self.backend.blockchain().info()
}
fn status(&self, hash: Block::Hash) -> sp_blockchain::Result<blockchain::BlockStatus> {
self.backend.blockchain().status(hash)
}
fn number(
&self,
hash: Block::Hash,
) -> sp_blockchain::Result<Option<<<Block as BlockT>::Header as HeaderT>::Number>> {
self.backend.blockchain().number(hash)
}
fn hash(&self, number: NumberFor<Block>) -> sp_blockchain::Result<Option<Block::Hash>> {
self.backend.blockchain().hash(number)
}
}
impl<B, E, Block, RA> BlockIdTo<Block> for Client<B, E, Block, RA>
where
B: backend::Backend<Block>,
E: CallExecutor<Block> + Send + Sync,
Block: BlockT,
RA: Send + Sync,
{
type Error = Error;
fn to_hash(&self, block_id: &BlockId<Block>) -> sp_blockchain::Result<Option<Block::Hash>> {
self.block_hash_from_id(block_id)
}
fn to_number(
&self,
block_id: &BlockId<Block>,
) -> sp_blockchain::Result<Option<NumberFor<Block>>> {
self.block_number_from_id(block_id)
}
}
impl<B, E, Block, RA> ChainHeaderBackend<Block> for &Client<B, E, Block, RA>
where
B: backend::Backend<Block>,
E: CallExecutor<Block> + Send + Sync,
Block: BlockT,
RA: Send + Sync,
{
fn header(&self, hash: Block::Hash) -> sp_blockchain::Result<Option<Block::Header>> {
self.backend.blockchain().header(hash)
}
fn info(&self) -> blockchain::Info<Block> {
self.backend.blockchain().info()
}
fn status(&self, hash: Block::Hash) -> sp_blockchain::Result<blockchain::BlockStatus> {
(**self).status(hash)
}
fn number(
&self,
hash: Block::Hash,
) -> sp_blockchain::Result<Option<<<Block as BlockT>::Header as HeaderT>::Number>> {
(**self).number(hash)
}
fn hash(&self, number: NumberFor<Block>) -> sp_blockchain::Result<Option<Block::Hash>> {
(**self).hash(number)
}
}
impl<B, E, Block, RA> ProvideRuntimeApi<Block> for Client<B, E, Block, RA>
where
B: backend::Backend<Block>,
E: CallExecutor<Block, Backend = B> + Send + Sync,
Block: BlockT,
RA: ConstructRuntimeApi<Block, Self> + Send + Sync,
{
type Api = <RA as ConstructRuntimeApi<Block, Self>>::RuntimeApi;
fn runtime_api(&self) -> ApiRef<Self::Api> {
RA::construct_runtime_api(self)
}
}
impl<B, E, Block, RA> CallApiAt<Block> for Client<B, E, Block, RA>
where
B: backend::Backend<Block>,
E: CallExecutor<Block, Backend = B> + Send + Sync,
Block: BlockT,
RA: Send + Sync,
{
type StateBackend = B::State;
fn call_api_at(&self, params: CallApiAtParams<Block>) -> Result<Vec<u8>, sp_api::ApiError> {
self.executor
.contextual_call(
params.at,
params.function,
¶ms.arguments,
params.overlayed_changes,
params.recorder,
params.call_context,
params.extensions,
)
.map_err(Into::into)
}
fn runtime_version_at(&self, hash: Block::Hash) -> Result<RuntimeVersion, sp_api::ApiError> {
CallExecutor::runtime_version(&self.executor, hash).map_err(Into::into)
}
fn state_at(&self, at: Block::Hash) -> Result<Self::StateBackend, sp_api::ApiError> {
self.state_at(at).map_err(Into::into)
}
fn initialize_extensions(
&self,
at: Block::Hash,
extensions: &mut sp_externalities::Extensions,
) -> Result<(), sp_api::ApiError> {
let block_number = self.expect_block_number_from_id(&BlockId::Hash(at))?;
extensions.merge(self.executor.execution_extensions().extensions(at, block_number));
Ok(())
}
}
#[async_trait::async_trait]
impl<B, E, Block, RA> sc_consensus::BlockImport<Block> for &Client<B, E, Block, RA>
where
B: backend::Backend<Block>,
E: CallExecutor<Block> + Send + Sync,
Block: BlockT,
Client<B, E, Block, RA>: ProvideRuntimeApi<Block>,
<Client<B, E, Block, RA> as ProvideRuntimeApi<Block>>::Api: CoreApi<Block> + ApiExt<Block>,
RA: Sync + Send,
{
type Error = ConsensusError;
async fn import_block(
&self,
mut import_block: BlockImportParams<Block>,
) -> Result<ImportResult, Self::Error> {
let span = tracing::span!(tracing::Level::DEBUG, "import_block");
let _enter = span.enter();
let storage_changes =
match self.prepare_block_storage_changes(&mut import_block).map_err(|e| {
warn!("Block prepare storage changes error: {}", e);
ConsensusError::ClientImport(e.to_string())
})? {
PrepareStorageChangesResult::Discard(res) => return Ok(res),
PrepareStorageChangesResult::Import(storage_changes) => storage_changes,
};
self.lock_import_and_run(|operation| {
self.apply_block(operation, import_block, storage_changes)
})
.map_err(|e| {
warn!("Block import error: {}", e);
ConsensusError::ClientImport(e.to_string())
})
}
async fn check_block(
&self,
block: BlockCheckParams<Block>,
) -> Result<ImportResult, Self::Error> {
let BlockCheckParams {
hash,
number,
parent_hash,
allow_missing_state,
import_existing,
allow_missing_parent,
} = block;
match self.block_rules.lookup(number, &hash) {
BlockLookupResult::KnownBad => {
trace!("Rejecting known bad block: #{} {:?}", number, hash);
return Ok(ImportResult::KnownBad)
},
BlockLookupResult::Expected(expected_hash) => {
trace!(
"Rejecting block from known invalid fork. Got {:?}, expected: {:?} at height {}",
hash,
expected_hash,
number
);
return Ok(ImportResult::KnownBad)
},
BlockLookupResult::NotSpecial => {},
}
match self
.block_status(hash)
.map_err(|e| ConsensusError::ClientImport(e.to_string()))?
{
BlockStatus::InChainWithState | BlockStatus::Queued =>
return Ok(ImportResult::AlreadyInChain),
BlockStatus::InChainPruned if !import_existing =>
return Ok(ImportResult::AlreadyInChain),
BlockStatus::InChainPruned => {},
BlockStatus::Unknown => {},
BlockStatus::KnownBad => return Ok(ImportResult::KnownBad),
}
match self
.block_status(parent_hash)
.map_err(|e| ConsensusError::ClientImport(e.to_string()))?
{
BlockStatus::InChainWithState | BlockStatus::Queued => {},
BlockStatus::Unknown if allow_missing_parent => {},
BlockStatus::Unknown => return Ok(ImportResult::UnknownParent),
BlockStatus::InChainPruned if allow_missing_state => {},
BlockStatus::InChainPruned => return Ok(ImportResult::MissingState),
BlockStatus::KnownBad => return Ok(ImportResult::KnownBad),
}
Ok(ImportResult::imported(false))
}
}
#[async_trait::async_trait]
impl<B, E, Block, RA> sc_consensus::BlockImport<Block> for Client<B, E, Block, RA>
where
B: backend::Backend<Block>,
E: CallExecutor<Block> + Send + Sync,
Block: BlockT,
Self: ProvideRuntimeApi<Block>,
<Self as ProvideRuntimeApi<Block>>::Api: CoreApi<Block> + ApiExt<Block>,
RA: Sync + Send,
{
type Error = ConsensusError;
async fn check_block(
&self,
block: BlockCheckParams<Block>,
) -> Result<ImportResult, Self::Error> {
(&self).check_block(block).await
}
async fn import_block(
&self,
import_block: BlockImportParams<Block>,
) -> Result<ImportResult, Self::Error> {
(&self).import_block(import_block).await
}
}
impl<B, E, Block, RA> Finalizer<Block, B> for Client<B, E, Block, RA>
where
B: backend::Backend<Block>,
E: CallExecutor<Block>,
Block: BlockT,
{
fn apply_finality(
&self,
operation: &mut ClientImportOperation<Block, B>,
hash: Block::Hash,
justification: Option<Justification>,
notify: bool,
) -> sp_blockchain::Result<()> {
let info = self.backend.blockchain().info();
self.apply_finality_with_block_hash(operation, hash, justification, &info, notify)
}
fn finalize_block(
&self,
hash: Block::Hash,
justification: Option<Justification>,
notify: bool,
) -> sp_blockchain::Result<()> {
self.lock_import_and_run(|operation| {
self.apply_finality(operation, hash, justification, notify)
})
}
}
impl<B, E, Block, RA> Finalizer<Block, B> for &Client<B, E, Block, RA>
where
B: backend::Backend<Block>,
E: CallExecutor<Block>,
Block: BlockT,
{
fn apply_finality(
&self,
operation: &mut ClientImportOperation<Block, B>,
hash: Block::Hash,
justification: Option<Justification>,
notify: bool,
) -> sp_blockchain::Result<()> {
(**self).apply_finality(operation, hash, justification, notify)
}
fn finalize_block(
&self,
hash: Block::Hash,
justification: Option<Justification>,
notify: bool,
) -> sp_blockchain::Result<()> {
(**self).finalize_block(hash, justification, notify)
}
}
impl<B, E, Block, RA> PreCommitActions<Block> for Client<B, E, Block, RA>
where
Block: BlockT,
{
fn register_import_action(&self, action: OnImportAction<Block>) {
self.import_actions.lock().push(action);
}
fn register_finality_action(&self, action: OnFinalityAction<Block>) {
self.finality_actions.lock().push(action);
}
}
impl<B, E, Block, RA> BlockchainEvents<Block> for Client<B, E, Block, RA>
where
E: CallExecutor<Block>,
Block: BlockT,
{
fn import_notification_stream(&self) -> ImportNotifications<Block> {
let (sink, stream) = tracing_unbounded("mpsc_import_notification_stream", 100_000);
self.import_notification_sinks.lock().push(sink);
stream
}
fn every_import_notification_stream(&self) -> ImportNotifications<Block> {
let (sink, stream) = tracing_unbounded("mpsc_every_import_notification_stream", 100_000);
self.every_import_notification_sinks.lock().push(sink);
stream
}
fn finality_notification_stream(&self) -> FinalityNotifications<Block> {
let (sink, stream) = tracing_unbounded("mpsc_finality_notification_stream", 100_000);
self.finality_notification_sinks.lock().push(sink);
stream
}
fn storage_changes_notification_stream(
&self,
filter_keys: Option<&[StorageKey]>,
child_filter_keys: Option<&[(StorageKey, Option<Vec<StorageKey>>)]>,
) -> sp_blockchain::Result<StorageEventStream<Block::Hash>> {
Ok(self.storage_notifications.listen(filter_keys, child_filter_keys))
}
}
impl<B, E, Block, RA> BlockBackend<Block> for Client<B, E, Block, RA>
where
B: backend::Backend<Block>,
E: CallExecutor<Block>,
Block: BlockT,
{
fn block_body(
&self,
hash: Block::Hash,
) -> sp_blockchain::Result<Option<Vec<<Block as BlockT>::Extrinsic>>> {
self.body(hash)
}
fn block(&self, hash: Block::Hash) -> sp_blockchain::Result<Option<SignedBlock<Block>>> {
Ok(match (self.header(hash)?, self.body(hash)?, self.justifications(hash)?) {
(Some(header), Some(extrinsics), justifications) =>
Some(SignedBlock { block: Block::new(header, extrinsics), justifications }),
_ => None,
})
}
fn block_status(&self, hash: Block::Hash) -> sp_blockchain::Result<BlockStatus> {
Client::block_status(self, hash)
}
fn justifications(&self, hash: Block::Hash) -> sp_blockchain::Result<Option<Justifications>> {
self.backend.blockchain().justifications(hash)
}
fn block_hash(&self, number: NumberFor<Block>) -> sp_blockchain::Result<Option<Block::Hash>> {
self.backend.blockchain().hash(number)
}
fn indexed_transaction(&self, hash: Block::Hash) -> sp_blockchain::Result<Option<Vec<u8>>> {
self.backend.blockchain().indexed_transaction(hash)
}
fn has_indexed_transaction(&self, hash: Block::Hash) -> sp_blockchain::Result<bool> {
self.backend.blockchain().has_indexed_transaction(hash)
}
fn block_indexed_body(&self, hash: Block::Hash) -> sp_blockchain::Result<Option<Vec<Vec<u8>>>> {
self.backend.blockchain().block_indexed_body(hash)
}
fn requires_full_sync(&self) -> bool {
self.backend.requires_full_sync()
}
}
impl<B, E, Block, RA> backend::AuxStore for Client<B, E, Block, RA>
where
B: backend::Backend<Block>,
E: CallExecutor<Block>,
Block: BlockT,
Self: ProvideRuntimeApi<Block>,
<Self as ProvideRuntimeApi<Block>>::Api: CoreApi<Block>,
{
fn insert_aux<
'a,
'b: 'a,
'c: 'a,
I: IntoIterator<Item = &'a (&'c [u8], &'c [u8])>,
D: IntoIterator<Item = &'a &'b [u8]>,
>(
&self,
insert: I,
delete: D,
) -> sp_blockchain::Result<()> {
self.lock_import_and_run(|operation| apply_aux(operation, insert, delete))
}
fn get_aux(&self, key: &[u8]) -> sp_blockchain::Result<Option<Vec<u8>>> {
backend::AuxStore::get_aux(&*self.backend, key)
}
}
impl<B, E, Block, RA> backend::AuxStore for &Client<B, E, Block, RA>
where
B: backend::Backend<Block>,
E: CallExecutor<Block>,
Block: BlockT,
Client<B, E, Block, RA>: ProvideRuntimeApi<Block>,
<Client<B, E, Block, RA> as ProvideRuntimeApi<Block>>::Api: CoreApi<Block>,
{
fn insert_aux<
'a,
'b: 'a,
'c: 'a,
I: IntoIterator<Item = &'a (&'c [u8], &'c [u8])>,
D: IntoIterator<Item = &'a &'b [u8]>,
>(
&self,
insert: I,
delete: D,
) -> sp_blockchain::Result<()> {
(**self).insert_aux(insert, delete)
}
fn get_aux(&self, key: &[u8]) -> sp_blockchain::Result<Option<Vec<u8>>> {
(**self).get_aux(key)
}
}
impl<BE, E, B, RA> sp_consensus::block_validation::Chain<B> for Client<BE, E, B, RA>
where
BE: backend::Backend<B>,
E: CallExecutor<B>,
B: BlockT,
{
fn block_status(
&self,
hash: B::Hash,
) -> Result<BlockStatus, Box<dyn std::error::Error + Send>> {
Client::block_status(self, hash).map_err(|e| Box::new(e) as Box<_>)
}
}
impl<BE, E, B, RA> sp_transaction_storage_proof::IndexedBody<B> for Client<BE, E, B, RA>
where
BE: backend::Backend<B>,
E: CallExecutor<B>,
B: BlockT,
{
fn block_indexed_body(
&self,
number: NumberFor<B>,
) -> Result<Option<Vec<Vec<u8>>>, sp_transaction_storage_proof::Error> {
let hash = match self
.backend
.blockchain()
.block_hash_from_id(&BlockId::Number(number))
.map_err(|e| sp_transaction_storage_proof::Error::Application(Box::new(e)))?
{
Some(hash) => hash,
None => return Ok(None),
};
self.backend
.blockchain()
.block_indexed_body(hash)
.map_err(|e| sp_transaction_storage_proof::Error::Application(Box::new(e)))
}
fn number(
&self,
hash: B::Hash,
) -> Result<Option<NumberFor<B>>, sp_transaction_storage_proof::Error> {
self.backend
.blockchain()
.number(hash)
.map_err(|e| sp_transaction_storage_proof::Error::Application(Box::new(e)))
}
}