use log::{debug, trace};
use std::{
fmt,
time::{Duration, Instant},
};
use sp_consensus::{error::Error as ConsensusError, BlockOrigin};
use sp_runtime::{
traits::{Block as BlockT, Header as _, NumberFor},
Justifications,
};
use crate::{
block_import::{
BlockCheckParams, BlockImport, BlockImportParams, ImportResult, ImportedAux, ImportedState,
JustificationImport, StateAction,
},
metrics::Metrics,
};
pub use basic_queue::BasicQueue;
const LOG_TARGET: &str = "sync::import-queue";
pub type DefaultImportQueue<Block> = BasicQueue<Block>;
mod basic_queue;
pub mod buffered_link;
pub mod mock;
pub type BoxBlockImport<B> = Box<dyn BlockImport<B, Error = ConsensusError> + Send + Sync>;
pub type BoxJustificationImport<B> =
Box<dyn JustificationImport<B, Error = ConsensusError> + Send + Sync>;
pub type RuntimeOrigin = sc_network_types::PeerId;
#[derive(Debug, PartialEq, Eq, Clone)]
pub struct IncomingBlock<B: BlockT> {
pub hash: <B as BlockT>::Hash,
pub header: Option<<B as BlockT>::Header>,
pub body: Option<Vec<<B as BlockT>::Extrinsic>>,
pub indexed_body: Option<Vec<Vec<u8>>>,
pub justifications: Option<Justifications>,
pub origin: Option<RuntimeOrigin>,
pub allow_missing_state: bool,
pub skip_execution: bool,
pub import_existing: bool,
pub state: Option<ImportedState<B>>,
}
#[async_trait::async_trait]
pub trait Verifier<B: BlockT>: Send + Sync {
async fn verify(&self, block: BlockImportParams<B>) -> Result<BlockImportParams<B>, String>;
}
pub trait ImportQueueService<B: BlockT>: Send {
fn import_blocks(&mut self, origin: BlockOrigin, blocks: Vec<IncomingBlock<B>>);
fn import_justifications(
&mut self,
who: RuntimeOrigin,
hash: B::Hash,
number: NumberFor<B>,
justifications: Justifications,
);
}
#[async_trait::async_trait]
pub trait ImportQueue<B: BlockT>: Send {
fn service(&self) -> Box<dyn ImportQueueService<B>>;
fn service_ref(&mut self) -> &mut dyn ImportQueueService<B>;
fn poll_actions(&mut self, cx: &mut futures::task::Context, link: &dyn Link<B>);
async fn run(self, link: &dyn Link<B>);
}
pub trait Link<B: BlockT>: Send + Sync {
fn blocks_processed(
&self,
_imported: usize,
_count: usize,
_results: Vec<(BlockImportResult<B>, B::Hash)>,
) {
}
fn justification_imported(
&self,
_who: RuntimeOrigin,
_hash: &B::Hash,
_number: NumberFor<B>,
_success: bool,
) {
}
fn request_justification(&self, _hash: &B::Hash, _number: NumberFor<B>) {}
}
#[derive(Debug, PartialEq)]
pub enum BlockImportStatus<BlockNumber: fmt::Debug + PartialEq> {
ImportedKnown(BlockNumber, Option<RuntimeOrigin>),
ImportedUnknown(BlockNumber, ImportedAux, Option<RuntimeOrigin>),
}
impl<BlockNumber: fmt::Debug + PartialEq> BlockImportStatus<BlockNumber> {
pub fn number(&self) -> &BlockNumber {
match self {
BlockImportStatus::ImportedKnown(n, _) |
BlockImportStatus::ImportedUnknown(n, _, _) => n,
}
}
}
#[derive(Debug, thiserror::Error)]
pub enum BlockImportError {
#[error("block is missing a header (origin = {0:?})")]
IncompleteHeader(Option<RuntimeOrigin>),
#[error("block verification failed (origin = {0:?}): {1}")]
VerificationFailed(Option<RuntimeOrigin>, String),
#[error("bad block (origin = {0:?})")]
BadBlock(Option<RuntimeOrigin>),
#[error("block is missing parent state")]
MissingState,
#[error("block has an unknown parent")]
UnknownParent,
#[error("import has been cancelled")]
Cancelled,
#[error("consensus error: {0}")]
Other(ConsensusError),
}
type BlockImportResult<B> = Result<BlockImportStatus<NumberFor<B>>, BlockImportError>;
pub async fn import_single_block<B: BlockT, V: Verifier<B>>(
import_handle: &mut impl BlockImport<B, Error = ConsensusError>,
block_origin: BlockOrigin,
block: IncomingBlock<B>,
verifier: &V,
) -> BlockImportResult<B> {
match verify_single_block_metered(import_handle, block_origin, block, verifier, None).await? {
SingleBlockVerificationOutcome::Imported(import_status) => Ok(import_status),
SingleBlockVerificationOutcome::Verified(import_parameters) =>
import_single_block_metered(import_handle, import_parameters, None).await,
}
}
fn import_handler<Block>(
number: NumberFor<Block>,
hash: Block::Hash,
parent_hash: Block::Hash,
block_origin: Option<RuntimeOrigin>,
import: Result<ImportResult, ConsensusError>,
) -> Result<BlockImportStatus<NumberFor<Block>>, BlockImportError>
where
Block: BlockT,
{
match import {
Ok(ImportResult::AlreadyInChain) => {
trace!(target: LOG_TARGET, "Block already in chain {}: {:?}", number, hash);
Ok(BlockImportStatus::ImportedKnown(number, block_origin))
},
Ok(ImportResult::Imported(aux)) =>
Ok(BlockImportStatus::ImportedUnknown(number, aux, block_origin)),
Ok(ImportResult::MissingState) => {
debug!(
target: LOG_TARGET,
"Parent state is missing for {}: {:?}, parent: {:?}", number, hash, parent_hash
);
Err(BlockImportError::MissingState)
},
Ok(ImportResult::UnknownParent) => {
debug!(
target: LOG_TARGET,
"Block with unknown parent {}: {:?}, parent: {:?}", number, hash, parent_hash
);
Err(BlockImportError::UnknownParent)
},
Ok(ImportResult::KnownBad) => {
debug!(target: LOG_TARGET, "Peer gave us a bad block {}: {:?}", number, hash);
Err(BlockImportError::BadBlock(block_origin))
},
Err(e) => {
debug!(target: LOG_TARGET, "Error importing block {}: {:?}: {}", number, hash, e);
Err(BlockImportError::Other(e))
},
}
}
pub(crate) enum SingleBlockVerificationOutcome<Block: BlockT> {
Imported(BlockImportStatus<NumberFor<Block>>),
Verified(SingleBlockImportParameters<Block>),
}
pub(crate) struct SingleBlockImportParameters<Block: BlockT> {
import_block: BlockImportParams<Block>,
hash: Block::Hash,
block_origin: Option<RuntimeOrigin>,
verification_time: Duration,
}
pub(crate) async fn verify_single_block_metered<B: BlockT, V: Verifier<B>>(
import_handle: &impl BlockImport<B, Error = ConsensusError>,
block_origin: BlockOrigin,
block: IncomingBlock<B>,
verifier: &V,
metrics: Option<&Metrics>,
) -> Result<SingleBlockVerificationOutcome<B>, BlockImportError> {
let peer = block.origin;
let justifications = block.justifications;
let Some(header) = block.header else {
if let Some(ref peer) = peer {
debug!(target: LOG_TARGET, "Header {} was not provided by {peer} ", block.hash);
} else {
debug!(target: LOG_TARGET, "Header {} was not provided ", block.hash);
}
return Err(BlockImportError::IncompleteHeader(peer))
};
trace!(target: LOG_TARGET, "Header {} has {:?} logs", block.hash, header.digest().logs().len());
let number = *header.number();
let hash = block.hash;
let parent_hash = *header.parent_hash();
match import_handler::<B>(
number,
hash,
parent_hash,
peer,
import_handle
.check_block(BlockCheckParams {
hash,
number,
parent_hash,
allow_missing_state: block.allow_missing_state,
import_existing: block.import_existing,
allow_missing_parent: block.state.is_some(),
})
.await,
)? {
BlockImportStatus::ImportedUnknown { .. } => (),
r => {
return Ok(SingleBlockVerificationOutcome::Imported(r))
},
}
let started = Instant::now();
let mut import_block = BlockImportParams::new(block_origin, header);
import_block.body = block.body;
import_block.justifications = justifications;
import_block.post_hash = Some(hash);
import_block.import_existing = block.import_existing;
import_block.indexed_body = block.indexed_body;
if let Some(state) = block.state {
let changes = crate::block_import::StorageChanges::Import(state);
import_block.state_action = StateAction::ApplyChanges(changes);
} else if block.skip_execution {
import_block.state_action = StateAction::Skip;
} else if block.allow_missing_state {
import_block.state_action = StateAction::ExecuteIfPossible;
}
let import_block = verifier.verify(import_block).await.map_err(|msg| {
if let Some(ref peer) = peer {
trace!(
target: LOG_TARGET,
"Verifying {}({}) from {} failed: {}",
number,
hash,
peer,
msg
);
} else {
trace!(target: LOG_TARGET, "Verifying {}({}) failed: {}", number, hash, msg);
}
if let Some(metrics) = metrics {
metrics.report_verification(false, started.elapsed());
}
BlockImportError::VerificationFailed(peer, msg)
})?;
let verification_time = started.elapsed();
if let Some(metrics) = metrics {
metrics.report_verification(true, verification_time);
}
Ok(SingleBlockVerificationOutcome::Verified(SingleBlockImportParameters {
import_block,
hash,
block_origin: peer,
verification_time,
}))
}
pub(crate) async fn import_single_block_metered<Block: BlockT>(
import_handle: &mut impl BlockImport<Block, Error = ConsensusError>,
import_parameters: SingleBlockImportParameters<Block>,
metrics: Option<&Metrics>,
) -> BlockImportResult<Block> {
let started = Instant::now();
let SingleBlockImportParameters { import_block, hash, block_origin, verification_time } =
import_parameters;
let number = *import_block.header.number();
let parent_hash = *import_block.header.parent_hash();
let imported = import_handle.import_block(import_block).await;
if let Some(metrics) = metrics {
metrics.report_verification_and_import(started.elapsed() + verification_time);
}
import_handler::<Block>(number, hash, parent_hash, block_origin, imported)
}