use codec::{Codec, Encode};
use cumulus_client_collator::service::ServiceInterface as CollatorServiceInterface;
use cumulus_client_consensus_common::{
self as consensus_common, ParachainBlockImportMarker, ParachainCandidate,
};
use cumulus_client_consensus_proposer::ProposerInterface;
use cumulus_client_parachain_inherent::{ParachainInherentData, ParachainInherentDataProvider};
use cumulus_primitives_core::{
relay_chain::Hash as PHash, DigestItem, ParachainBlockData, PersistedValidationData,
};
use cumulus_relay_chain_interface::RelayChainInterface;
use polkadot_node_primitives::{Collation, MaybeCompressedPoV};
use polkadot_primitives::{Header as PHeader, Id as ParaId};
use futures::prelude::*;
use sc_consensus::{BlockImport, BlockImportParams, ForkChoiceStrategy, StateAction};
use sc_consensus_aura::standalone as aura_internal;
use sp_api::ProvideRuntimeApi;
use sp_application_crypto::AppPublic;
use sp_consensus::BlockOrigin;
use sp_consensus_aura::{AuraApi, Slot, SlotDuration};
use sp_core::crypto::Pair;
use sp_inherents::{CreateInherentDataProviders, InherentData, InherentDataProvider};
use sp_keystore::KeystorePtr;
use sp_runtime::{
generic::Digest,
traits::{Block as BlockT, HashingFor, Header as HeaderT, Member},
};
use sp_state_machine::StorageChanges;
use sp_timestamp::Timestamp;
use std::{error::Error, time::Duration};
pub struct Params<BI, CIDP, RClient, Proposer, CS> {
pub create_inherent_data_providers: CIDP,
pub block_import: BI,
pub relay_client: RClient,
pub keystore: KeystorePtr,
pub para_id: ParaId,
pub proposer: Proposer,
pub collator_service: CS,
}
pub struct Collator<Block, P, BI, CIDP, RClient, Proposer, CS> {
create_inherent_data_providers: CIDP,
block_import: BI,
relay_client: RClient,
keystore: KeystorePtr,
para_id: ParaId,
proposer: Proposer,
collator_service: CS,
_marker: std::marker::PhantomData<(Block, Box<dyn Fn(P) + Send + Sync + 'static>)>,
}
impl<Block, P, BI, CIDP, RClient, Proposer, CS> Collator<Block, P, BI, CIDP, RClient, Proposer, CS>
where
Block: BlockT,
RClient: RelayChainInterface,
CIDP: CreateInherentDataProviders<Block, ()> + 'static,
BI: BlockImport<Block> + ParachainBlockImportMarker + Send + Sync + 'static,
Proposer: ProposerInterface<Block>,
CS: CollatorServiceInterface<Block>,
P: Pair,
P::Public: AppPublic + Member,
P::Signature: TryFrom<Vec<u8>> + Member + Codec,
{
pub fn new(params: Params<BI, CIDP, RClient, Proposer, CS>) -> Self {
Collator {
create_inherent_data_providers: params.create_inherent_data_providers,
block_import: params.block_import,
relay_client: params.relay_client,
keystore: params.keystore,
para_id: params.para_id,
proposer: params.proposer,
collator_service: params.collator_service,
_marker: std::marker::PhantomData,
}
}
pub async fn create_inherent_data(
&self,
relay_parent: PHash,
validation_data: &PersistedValidationData,
parent_hash: Block::Hash,
timestamp: impl Into<Option<Timestamp>>,
) -> Result<(ParachainInherentData, InherentData), Box<dyn Error + Send + Sync + 'static>> {
let paras_inherent_data = ParachainInherentDataProvider::create_at(
relay_parent,
&self.relay_client,
validation_data,
self.para_id,
)
.await;
let paras_inherent_data = match paras_inherent_data {
Some(p) => p,
None =>
return Err(
format!("Could not create paras inherent data at {:?}", relay_parent).into()
),
};
let mut other_inherent_data = self
.create_inherent_data_providers
.create_inherent_data_providers(parent_hash, ())
.map_err(|e| e as Box<dyn Error + Send + Sync + 'static>)
.await?
.create_inherent_data()
.await
.map_err(Box::new)?;
if let Some(timestamp) = timestamp.into() {
other_inherent_data.replace_data(sp_timestamp::INHERENT_IDENTIFIER, ×tamp);
}
Ok((paras_inherent_data, other_inherent_data))
}
pub async fn build_block_and_import(
&mut self,
parent_header: &Block::Header,
slot_claim: &SlotClaim<P::Public>,
additional_pre_digest: impl Into<Option<Vec<DigestItem>>>,
inherent_data: (ParachainInherentData, InherentData),
proposal_duration: Duration,
max_pov_size: usize,
) -> Result<Option<ParachainCandidate<Block>>, Box<dyn Error + Send + 'static>> {
let mut digest = additional_pre_digest.into().unwrap_or_default();
digest.push(slot_claim.pre_digest.clone());
let maybe_proposal = self
.proposer
.propose(
&parent_header,
&inherent_data.0,
inherent_data.1,
Digest { logs: digest },
proposal_duration,
Some(max_pov_size),
)
.await
.map_err(|e| Box::new(e) as Box<dyn Error + Send>)?;
let proposal = match maybe_proposal {
None => return Ok(None),
Some(p) => p,
};
let sealed_importable = seal::<_, P>(
proposal.block,
proposal.storage_changes,
&slot_claim.author_pub,
&self.keystore,
)
.map_err(|e| e as Box<dyn Error + Send>)?;
let block = Block::new(
sealed_importable.post_header(),
sealed_importable
.body
.as_ref()
.expect("body always created with this `propose` fn; qed")
.clone(),
);
self.block_import
.import_block(sealed_importable)
.map_err(|e| Box::new(e) as Box<dyn Error + Send>)
.await?;
Ok(Some(ParachainCandidate { block, proof: proposal.proof }))
}
pub async fn collate(
&mut self,
parent_header: &Block::Header,
slot_claim: &SlotClaim<P::Public>,
additional_pre_digest: impl Into<Option<Vec<DigestItem>>>,
inherent_data: (ParachainInherentData, InherentData),
proposal_duration: Duration,
max_pov_size: usize,
) -> Result<
Option<(Collation, ParachainBlockData<Block>, Block::Hash)>,
Box<dyn Error + Send + 'static>,
> {
let maybe_candidate = self
.build_block_and_import(
parent_header,
slot_claim,
additional_pre_digest,
inherent_data,
proposal_duration,
max_pov_size,
)
.await?;
let Some(candidate) = maybe_candidate else { return Ok(None) };
let hash = candidate.block.header().hash();
if let Some((collation, block_data)) =
self.collator_service.build_collation(parent_header, hash, candidate)
{
tracing::info!(
target: crate::LOG_TARGET,
"PoV size {{ header: {}kb, extrinsics: {}kb, storage_proof: {}kb }}",
block_data.header().encode().len() as f64 / 1024f64,
block_data.extrinsics().encode().len() as f64 / 1024f64,
block_data.storage_proof().encode().len() as f64 / 1024f64,
);
if let MaybeCompressedPoV::Compressed(ref pov) = collation.proof_of_validity {
tracing::info!(
target: crate::LOG_TARGET,
"Compressed PoV size: {}kb",
pov.block_data.0.len() as f64 / 1024f64,
);
}
Ok(Some((collation, block_data, hash)))
} else {
Err(Box::<dyn Error + Send + Sync>::from("Unable to produce collation")
as Box<dyn Error + Send>)
}
}
pub fn collator_service(&self) -> &CS {
&self.collator_service
}
}
pub struct SlotClaim<Pub> {
author_pub: Pub,
pre_digest: DigestItem,
slot: Slot,
timestamp: Timestamp,
}
impl<Pub> SlotClaim<Pub> {
pub fn unchecked<P>(author_pub: Pub, slot: Slot, timestamp: Timestamp) -> Self
where
P: Pair<Public = Pub>,
P::Public: Codec,
P::Signature: Codec,
{
SlotClaim { author_pub, timestamp, pre_digest: aura_internal::pre_digest::<P>(slot), slot }
}
pub fn author_pub(&self) -> &Pub {
&self.author_pub
}
pub fn pre_digest(&self) -> &DigestItem {
&self.pre_digest
}
pub fn slot(&self) -> Slot {
self.slot
}
pub fn timestamp(&self) -> Timestamp {
self.timestamp
}
}
pub async fn claim_slot<B, C, P>(
client: &C,
parent_hash: B::Hash,
relay_parent_header: &PHeader,
slot_duration: SlotDuration,
relay_chain_slot_duration: Duration,
keystore: &KeystorePtr,
) -> Result<Option<SlotClaim<P::Public>>, Box<dyn Error>>
where
B: BlockT,
C: ProvideRuntimeApi<B> + Send + Sync + 'static,
C::Api: AuraApi<B, P::Public>,
P: Pair,
P::Public: Codec,
P::Signature: Codec,
{
let authorities = client.runtime_api().authorities(parent_hash).map_err(Box::new)?;
let (slot_now, timestamp) = match consensus_common::relay_slot_and_timestamp(
relay_parent_header,
relay_chain_slot_duration,
) {
Some((r_s, t)) => {
let our_slot = Slot::from_timestamp(t, slot_duration);
tracing::debug!(
target: crate::LOG_TARGET,
relay_slot = ?r_s,
para_slot = ?our_slot,
timestamp = ?t,
?slot_duration,
?relay_chain_slot_duration,
"Adjusted relay-chain slot to parachain slot"
);
(our_slot, t)
},
None => return Ok(None),
};
let author_pub = {
let res = aura_internal::claim_slot::<P>(slot_now, &authorities, keystore).await;
match res {
Some(p) => p,
None => return Ok(None),
}
};
Ok(Some(SlotClaim::unchecked::<P>(author_pub, slot_now, timestamp)))
}
pub fn seal<B: BlockT, P>(
pre_sealed: B,
storage_changes: StorageChanges<HashingFor<B>>,
author_pub: &P::Public,
keystore: &KeystorePtr,
) -> Result<BlockImportParams<B>, Box<dyn Error + Send + Sync + 'static>>
where
P: Pair,
P::Signature: Codec + TryFrom<Vec<u8>>,
P::Public: AppPublic,
{
let (pre_header, body) = pre_sealed.deconstruct();
let pre_hash = pre_header.hash();
let block_number = *pre_header.number();
let block_import_params = {
let seal_digest =
aura_internal::seal::<_, P>(&pre_hash, &author_pub, keystore).map_err(Box::new)?;
let mut block_import_params = BlockImportParams::new(BlockOrigin::Own, pre_header);
block_import_params.post_digests.push(seal_digest);
block_import_params.body = Some(body.clone());
block_import_params.state_action =
StateAction::ApplyChanges(sc_consensus::StorageChanges::Changes(storage_changes));
block_import_params.fork_choice = Some(ForkChoiceStrategy::LongestChain);
block_import_params
};
let post_hash = block_import_params.post_hash();
tracing::info!(
target: crate::LOG_TARGET,
"๐ Pre-sealed block for proposal at {}. Hash now {:?}, previously {:?}.",
block_number,
post_hash,
pre_hash,
);
Ok(block_import_params)
}