use codec::Decode;
use polkadot_primitives::{
Block as PBlock, Hash as PHash, Header as PHeader, PersistedValidationData, ValidationCodeHash,
};
use cumulus_primitives_core::{
relay_chain::{BlockId as RBlockId, OccupiedCoreAssumption},
ParaId,
};
use cumulus_relay_chain_interface::{RelayChainError, RelayChainInterface};
use sc_client_api::{Backend, HeaderBackend};
use sc_consensus::{shared_data::SharedData, BlockImport, ImportResult};
use sp_blockchain::Backend as BlockchainBackend;
use sp_consensus_slots::Slot;
use sp_runtime::traits::{Block as BlockT, Header as HeaderT};
use sp_timestamp::Timestamp;
use std::{sync::Arc, time::Duration};
mod level_monitor;
mod parachain_consensus;
#[cfg(test)]
mod tests;
pub use parachain_consensus::run_parachain_consensus;
use level_monitor::LevelMonitor;
pub use level_monitor::{LevelLimit, MAX_LEAVES_PER_LEVEL_SENSIBLE_DEFAULT};
pub mod import_queue;
pub trait ValidationCodeHashProvider<Hash> {
fn code_hash_at(&self, at: Hash) -> Option<ValidationCodeHash>;
}
impl<F, Hash> ValidationCodeHashProvider<Hash> for F
where
F: Fn(Hash) -> Option<ValidationCodeHash>,
{
fn code_hash_at(&self, at: Hash) -> Option<ValidationCodeHash> {
(self)(at)
}
}
pub struct ParachainCandidate<B> {
pub block: B,
pub proof: sp_trie::StorageProof,
}
#[async_trait::async_trait]
pub trait ParachainConsensus<B: BlockT>: Send + Sync + dyn_clone::DynClone {
async fn produce_candidate(
&mut self,
parent: &B::Header,
relay_parent: PHash,
validation_data: &PersistedValidationData,
) -> Option<ParachainCandidate<B>>;
}
dyn_clone::clone_trait_object!(<B> ParachainConsensus<B> where B: BlockT);
#[async_trait::async_trait]
impl<B: BlockT> ParachainConsensus<B> for Box<dyn ParachainConsensus<B> + Send + Sync> {
async fn produce_candidate(
&mut self,
parent: &B::Header,
relay_parent: PHash,
validation_data: &PersistedValidationData,
) -> Option<ParachainCandidate<B>> {
(*self).produce_candidate(parent, relay_parent, validation_data).await
}
}
pub struct ParachainBlockImport<Block: BlockT, BI, BE> {
inner: BI,
monitor: Option<SharedData<LevelMonitor<Block, BE>>>,
}
impl<Block: BlockT, BI, BE: Backend<Block>> ParachainBlockImport<Block, BI, BE> {
pub fn new(inner: BI, backend: Arc<BE>) -> Self {
Self::new_with_limit(inner, backend, LevelLimit::Default)
}
pub fn new_with_limit(inner: BI, backend: Arc<BE>, level_leaves_max: LevelLimit) -> Self {
let level_limit = match level_leaves_max {
LevelLimit::None => None,
LevelLimit::Some(limit) => Some(limit),
LevelLimit::Default => Some(MAX_LEAVES_PER_LEVEL_SENSIBLE_DEFAULT),
};
let monitor =
level_limit.map(|level_limit| SharedData::new(LevelMonitor::new(level_limit, backend)));
Self { inner, monitor }
}
}
impl<Block: BlockT, I: Clone, BE> Clone for ParachainBlockImport<Block, I, BE> {
fn clone(&self) -> Self {
ParachainBlockImport { inner: self.inner.clone(), monitor: self.monitor.clone() }
}
}
#[async_trait::async_trait]
impl<Block, BI, BE> BlockImport<Block> for ParachainBlockImport<Block, BI, BE>
where
Block: BlockT,
BI: BlockImport<Block> + Send,
BE: Backend<Block>,
{
type Error = BI::Error;
async fn check_block(
&mut self,
block: sc_consensus::BlockCheckParams<Block>,
) -> Result<sc_consensus::ImportResult, Self::Error> {
self.inner.check_block(block).await
}
async fn import_block(
&mut self,
mut params: sc_consensus::BlockImportParams<Block>,
) -> Result<sc_consensus::ImportResult, Self::Error> {
let hash = params.post_hash();
let number = *params.header.number();
if params.with_state() {
params.finalized = true;
}
params.fork_choice = Some(sc_consensus::ForkChoiceStrategy::Custom(
params.origin == sp_consensus::BlockOrigin::NetworkInitialSync,
));
let maybe_lock = self.monitor.as_ref().map(|monitor_lock| {
let mut monitor = monitor_lock.shared_data_locked();
monitor.enforce_limit(number);
monitor.release_mutex()
});
let res = self.inner.import_block(params).await?;
if let (Some(mut monitor_lock), ImportResult::Imported(_)) = (maybe_lock, &res) {
let mut monitor = monitor_lock.upgrade();
monitor.block_imported(number, hash);
}
Ok(res)
}
}
pub trait ParachainBlockImportMarker {}
impl<B: BlockT, BI, BE> ParachainBlockImportMarker for ParachainBlockImport<B, BI, BE> {}
pub struct ParentSearchParams {
pub relay_parent: PHash,
pub para_id: ParaId,
pub ancestry_lookback: usize,
pub max_depth: usize,
pub ignore_alternative_branches: bool,
}
pub struct PotentialParent<B: BlockT> {
pub hash: B::Hash,
pub header: B::Header,
pub depth: usize,
pub aligned_with_pending: bool,
}
pub async fn find_potential_parents<B: BlockT>(
params: ParentSearchParams,
client: &impl Backend<B>,
relay_client: &impl RelayChainInterface,
) -> Result<Vec<PotentialParent<B>>, RelayChainError> {
let rp_ancestry = {
let mut ancestry = Vec::with_capacity(params.ancestry_lookback + 1);
let mut current_rp = params.relay_parent;
let mut required_session = None;
while ancestry.len() <= params.ancestry_lookback {
let header = match relay_client.header(RBlockId::hash(current_rp)).await? {
None => break,
Some(h) => h,
};
let session = relay_client.session_index_for_child(current_rp).await?;
if let Some(required_session) = required_session {
if session != required_session {
break
}
} else {
required_session = Some(session);
}
ancestry.push((current_rp, *header.state_root()));
current_rp = *header.parent_hash();
if header.number == 1 {
break
}
}
ancestry
};
let is_hash_in_ancestry = |hash| rp_ancestry.iter().any(|x| x.0 == hash);
let is_root_in_ancestry = |root| rp_ancestry.iter().any(|x| x.1 == root);
let included_header = relay_client
.persisted_validation_data(
params.relay_parent,
params.para_id,
OccupiedCoreAssumption::TimedOut,
)
.await?;
let included_header = match included_header {
Some(pvd) => pvd.parent_head,
None => return Ok(Vec::new()), };
let pending_header = relay_client
.persisted_validation_data(
params.relay_parent,
params.para_id,
OccupiedCoreAssumption::Included,
)
.await?
.and_then(|x| if x.parent_head != included_header { Some(x.parent_head) } else { None });
let included_header = match B::Header::decode(&mut &included_header.0[..]).ok() {
None => return Ok(Vec::new()),
Some(x) => x,
};
let pending_header = pending_header.and_then(|p| B::Header::decode(&mut &p.0[..]).ok());
let included_hash = included_header.hash();
let pending_hash = pending_header.as_ref().map(|hdr| hdr.hash());
let mut frontier = vec![PotentialParent::<B> {
hash: included_hash,
header: included_header,
depth: 0,
aligned_with_pending: true,
}];
let mut potential_parents = Vec::new();
while let Some(entry) = frontier.pop() {
let is_pending =
entry.depth == 1 && pending_hash.as_ref().map_or(false, |h| &entry.hash == h);
let is_included = entry.depth == 0;
let is_potential = is_pending || is_included || {
let digest = entry.header.digest();
cumulus_primitives_core::extract_relay_parent(digest).map_or(false, is_hash_in_ancestry) ||
cumulus_primitives_core::rpsr_digest::extract_relay_parent_storage_root(digest)
.map(|(r, _n)| r)
.map_or(false, is_root_in_ancestry)
};
let parent_aligned_with_pending = entry.aligned_with_pending;
let child_depth = entry.depth + 1;
let hash = entry.hash;
if is_potential {
potential_parents.push(entry);
}
if !is_potential || child_depth > params.max_depth {
continue
}
for child in client.blockchain().children(hash).ok().into_iter().flatten() {
let aligned_with_pending = parent_aligned_with_pending &&
if child_depth == 1 {
pending_hash.as_ref().map_or(true, |h| &child == h)
} else {
true
};
if params.ignore_alternative_branches && !aligned_with_pending {
continue
}
let header = match client.blockchain().header(child) {
Ok(Some(h)) => h,
Ok(None) => continue,
Err(_) => continue,
};
frontier.push(PotentialParent {
hash: child,
header,
depth: child_depth,
aligned_with_pending,
});
}
}
Ok(potential_parents)
}
pub fn relay_slot_and_timestamp(
relay_parent_header: &PHeader,
relay_chain_slot_duration: Duration,
) -> Option<(Slot, Timestamp)> {
sc_consensus_babe::find_pre_digest::<PBlock>(relay_parent_header)
.map(|babe_pre_digest| {
let slot = babe_pre_digest.slot();
let t = Timestamp::new(relay_chain_slot_duration.as_millis() as u64 * *slot);
(slot, t)
})
.ok()
}