use cumulus_client_network::WaitToAnnounce;
use cumulus_primitives_core::{CollationInfo, CollectCollationInfo, ParachainBlockData};
use sc_client_api::BlockBackend;
use sp_api::{ApiExt, ProvideRuntimeApi};
use sp_consensus::BlockStatus;
use sp_core::traits::SpawnNamed;
use sp_runtime::traits::{Block as BlockT, HashingFor, Header as HeaderT, Zero};
use cumulus_client_consensus_common::ParachainCandidate;
use polkadot_node_primitives::{
BlockData, Collation, CollationSecondedSignal, MaybeCompressedPoV, PoV,
};
use codec::Encode;
use futures::channel::oneshot;
use parking_lot::Mutex;
use std::sync::Arc;
const LOG_TARGET: &str = "cumulus-collator";
pub trait ServiceInterface<Block: BlockT> {
fn check_block_status(&self, hash: Block::Hash, header: &Block::Header) -> bool;
fn build_collation(
&self,
parent_header: &Block::Header,
block_hash: Block::Hash,
candidate: ParachainCandidate<Block>,
) -> Option<(Collation, ParachainBlockData<Block>)>;
fn announce_with_barrier(
&self,
block_hash: Block::Hash,
) -> oneshot::Sender<CollationSecondedSignal>;
fn announce_block(&self, block_hash: Block::Hash, data: Option<Vec<u8>>);
}
pub struct CollatorService<Block: BlockT, BS, RA> {
block_status: Arc<BS>,
wait_to_announce: Arc<Mutex<WaitToAnnounce<Block>>>,
announce_block: Arc<dyn Fn(Block::Hash, Option<Vec<u8>>) + Send + Sync>,
runtime_api: Arc<RA>,
}
impl<Block: BlockT, BS, RA> Clone for CollatorService<Block, BS, RA> {
fn clone(&self) -> Self {
Self {
block_status: self.block_status.clone(),
wait_to_announce: self.wait_to_announce.clone(),
announce_block: self.announce_block.clone(),
runtime_api: self.runtime_api.clone(),
}
}
}
impl<Block, BS, RA> CollatorService<Block, BS, RA>
where
Block: BlockT,
BS: BlockBackend<Block>,
RA: ProvideRuntimeApi<Block>,
RA::Api: CollectCollationInfo<Block>,
{
pub fn new(
block_status: Arc<BS>,
spawner: Arc<dyn SpawnNamed + Send + Sync>,
announce_block: Arc<dyn Fn(Block::Hash, Option<Vec<u8>>) + Send + Sync>,
runtime_api: Arc<RA>,
) -> Self {
let wait_to_announce =
Arc::new(Mutex::new(WaitToAnnounce::new(spawner, announce_block.clone())));
Self { block_status, wait_to_announce, announce_block, runtime_api }
}
pub fn check_block_status(&self, hash: Block::Hash, header: &Block::Header) -> bool {
match self.block_status.block_status(hash) {
Ok(BlockStatus::Queued) => {
tracing::debug!(
target: LOG_TARGET,
block_hash = ?hash,
"Skipping candidate production, because block is still queued for import.",
);
false
},
Ok(BlockStatus::InChainWithState) => true,
Ok(BlockStatus::InChainPruned) => {
tracing::error!(
target: LOG_TARGET,
"Skipping candidate production, because block `{:?}` is already pruned!",
hash,
);
false
},
Ok(BlockStatus::KnownBad) => {
tracing::error!(
target: LOG_TARGET,
block_hash = ?hash,
"Block is tagged as known bad and is included in the relay chain! Skipping candidate production!",
);
false
},
Ok(BlockStatus::Unknown) => {
if header.number().is_zero() {
tracing::error!(
target: LOG_TARGET,
block_hash = ?hash,
"Could not find the header of the genesis block in the database!",
);
} else {
tracing::debug!(
target: LOG_TARGET,
block_hash = ?hash,
"Skipping candidate production, because block is unknown.",
);
}
false
},
Err(e) => {
tracing::error!(
target: LOG_TARGET,
block_hash = ?hash,
error = ?e,
"Failed to get block status.",
);
false
},
}
}
pub fn fetch_collation_info(
&self,
block_hash: Block::Hash,
header: &Block::Header,
) -> Result<Option<CollationInfo>, sp_api::ApiError> {
let runtime_api = self.runtime_api.runtime_api();
let api_version =
match runtime_api.api_version::<dyn CollectCollationInfo<Block>>(block_hash)? {
Some(version) => version,
None => {
tracing::error!(
target: LOG_TARGET,
"Could not fetch `CollectCollationInfo` runtime api version."
);
return Ok(None)
},
};
let collation_info = if api_version < 2 {
#[allow(deprecated)]
runtime_api
.collect_collation_info_before_version_2(block_hash)?
.into_latest(header.encode().into())
} else {
runtime_api.collect_collation_info(block_hash, header)?
};
Ok(Some(collation_info))
}
pub fn build_collation(
&self,
parent_header: &Block::Header,
block_hash: Block::Hash,
candidate: ParachainCandidate<Block>,
) -> Option<(Collation, ParachainBlockData<Block>)> {
let (header, extrinsics) = candidate.block.deconstruct();
let compact_proof = match candidate
.proof
.into_compact_proof::<HashingFor<Block>>(*parent_header.state_root())
{
Ok(proof) => proof,
Err(e) => {
tracing::error!(target: "cumulus-collator", "Failed to compact proof: {:?}", e);
return None
},
};
let collation_info = self
.fetch_collation_info(block_hash, &header)
.map_err(|e| {
tracing::error!(
target: LOG_TARGET,
error = ?e,
"Failed to collect collation info.",
)
})
.ok()
.flatten()?;
let block_data = ParachainBlockData::<Block>::new(header, extrinsics, compact_proof);
let pov = polkadot_node_primitives::maybe_compress_pov(PoV {
block_data: BlockData(block_data.encode()),
});
let upward_messages = collation_info
.upward_messages
.try_into()
.map_err(|e| {
tracing::error!(
target: LOG_TARGET,
error = ?e,
"Number of upward messages should not be greater than `MAX_UPWARD_MESSAGE_NUM`",
)
})
.ok()?;
let horizontal_messages = collation_info
.horizontal_messages
.try_into()
.map_err(|e| {
tracing::error!(
target: LOG_TARGET,
error = ?e,
"Number of horizontal messages should not be greater than `MAX_HORIZONTAL_MESSAGE_NUM`",
)
})
.ok()?;
let collation = Collation {
upward_messages,
new_validation_code: collation_info.new_validation_code,
processed_downward_messages: collation_info.processed_downward_messages,
horizontal_messages,
hrmp_watermark: collation_info.hrmp_watermark,
head_data: collation_info.head_data,
proof_of_validity: MaybeCompressedPoV::Compressed(pov),
};
Some((collation, block_data))
}
pub fn announce_with_barrier(
&self,
block_hash: Block::Hash,
) -> oneshot::Sender<CollationSecondedSignal> {
let (result_sender, signed_stmt_recv) = oneshot::channel();
self.wait_to_announce.lock().wait_to_announce(block_hash, signed_stmt_recv);
result_sender
}
}
impl<Block, BS, RA> ServiceInterface<Block> for CollatorService<Block, BS, RA>
where
Block: BlockT,
BS: BlockBackend<Block>,
RA: ProvideRuntimeApi<Block>,
RA::Api: CollectCollationInfo<Block>,
{
fn check_block_status(&self, hash: Block::Hash, header: &Block::Header) -> bool {
CollatorService::check_block_status(self, hash, header)
}
fn build_collation(
&self,
parent_header: &Block::Header,
block_hash: Block::Hash,
candidate: ParachainCandidate<Block>,
) -> Option<(Collation, ParachainBlockData<Block>)> {
CollatorService::build_collation(self, parent_header, block_hash, candidate)
}
fn announce_with_barrier(
&self,
block_hash: Block::Hash,
) -> oneshot::Sender<CollationSecondedSignal> {
CollatorService::announce_with_barrier(self, block_hash)
}
fn announce_block(&self, block_hash: Block::Hash, data: Option<Vec<u8>>) {
(self.announce_block)(block_hash, data)
}
}