use codec::{Codec, Encode};
use cumulus_client_collator::service::ServiceInterface as CollatorServiceInterface;
use cumulus_client_consensus_common::{self as consensus_common, ParachainBlockImportMarker};
use cumulus_client_consensus_proposer::ProposerInterface;
use cumulus_primitives_aura::AuraUnincludedSegmentApi;
use cumulus_primitives_core::{ClaimQueueOffset, CollectCollationInfo, PersistedValidationData};
use cumulus_relay_chain_interface::RelayChainInterface;
use polkadot_node_primitives::{PoV, SubmitCollationParams};
use polkadot_node_subsystem::messages::CollationGenerationMessage;
use polkadot_overseer::Handle as OverseerHandle;
use polkadot_primitives::{
vstaging::DEFAULT_CLAIM_QUEUE_OFFSET, BlockNumber as RBlockNumber, CollatorPair, Hash as RHash,
HeadData, Id as ParaId, OccupiedCoreAssumption,
};
use futures::prelude::*;
use sc_client_api::{backend::AuxStore, BlockBackend, BlockOf};
use sc_consensus::BlockImport;
use sp_api::ProvideRuntimeApi;
use sp_application_crypto::AppPublic;
use sp_blockchain::HeaderBackend;
use sp_consensus_aura::{AuraApi, Slot};
use sp_core::crypto::Pair;
use sp_inherents::CreateInherentDataProviders;
use sp_keystore::KeystorePtr;
use sp_runtime::traits::{Block as BlockT, Header as HeaderT, Member, NumberFor};
use std::{
fs::{self, File},
path::PathBuf,
sync::Arc,
time::Duration,
};
use crate::{collator as collator_util, LOG_TARGET};
fn export_pov_to_path<Block: BlockT>(
path: PathBuf,
pov: PoV,
block_hash: Block::Hash,
block_number: NumberFor<Block>,
parent_header: Block::Header,
relay_parent_storage_root: RHash,
relay_parent_number: RBlockNumber,
) {
if let Err(error) = fs::create_dir_all(&path) {
tracing::error!(target: LOG_TARGET, %error, path = %path.display(), "Failed to create PoV export directory");
return
}
let mut file = match File::create(path.join(format!("{block_hash:?}_{block_number}.pov"))) {
Ok(f) => f,
Err(error) => {
tracing::error!(target: LOG_TARGET, %error, "Failed to export PoV.");
return
},
};
pov.encode_to(&mut file);
HeadData(parent_header.encode()).encode_to(&mut file);
relay_parent_storage_root.encode_to(&mut file);
relay_parent_number.encode_to(&mut file);
}
pub struct Params<BI, CIDP, Client, Backend, RClient, CHP, Proposer, CS> {
pub create_inherent_data_providers: CIDP,
pub block_import: BI,
pub para_client: Arc<Client>,
pub para_backend: Arc<Backend>,
pub relay_client: RClient,
pub code_hash_provider: CHP,
pub keystore: KeystorePtr,
pub collator_key: CollatorPair,
pub para_id: ParaId,
pub overseer_handle: OverseerHandle,
pub relay_chain_slot_duration: Duration,
pub proposer: Proposer,
pub collator_service: CS,
pub authoring_duration: Duration,
pub reinitialize: bool,
}
pub fn run<Block, P, BI, CIDP, Client, Backend, RClient, CHP, Proposer, CS>(
params: Params<BI, CIDP, Client, Backend, RClient, CHP, Proposer, CS>,
) -> impl Future<Output = ()> + Send + 'static
where
Block: BlockT,
Client: ProvideRuntimeApi<Block>
+ BlockOf
+ AuxStore
+ HeaderBackend<Block>
+ BlockBackend<Block>
+ Send
+ Sync
+ 'static,
Client::Api:
AuraApi<Block, P::Public> + CollectCollationInfo<Block> + AuraUnincludedSegmentApi<Block>,
Backend: sc_client_api::Backend<Block> + 'static,
RClient: RelayChainInterface + Clone + 'static,
CIDP: CreateInherentDataProviders<Block, ()> + 'static,
CIDP::InherentDataProviders: Send,
BI: BlockImport<Block> + ParachainBlockImportMarker + Send + Sync + 'static,
Proposer: ProposerInterface<Block> + Send + Sync + 'static,
CS: CollatorServiceInterface<Block> + Send + Sync + 'static,
CHP: consensus_common::ValidationCodeHashProvider<Block::Hash> + Send + 'static,
P: Pair,
P::Public: AppPublic + Member + Codec,
P::Signature: TryFrom<Vec<u8>> + Member + Codec,
{
run_with_export::<_, P, _, _, _, _, _, _, _, _>(ParamsWithExport { params, export_pov: None })
}
pub struct ParamsWithExport<BI, CIDP, Client, Backend, RClient, CHP, Proposer, CS> {
pub params: Params<BI, CIDP, Client, Backend, RClient, CHP, Proposer, CS>,
pub export_pov: Option<PathBuf>,
}
pub fn run_with_export<Block, P, BI, CIDP, Client, Backend, RClient, CHP, Proposer, CS>(
ParamsWithExport { mut params, export_pov }: ParamsWithExport<
BI,
CIDP,
Client,
Backend,
RClient,
CHP,
Proposer,
CS,
>,
) -> impl Future<Output = ()> + Send + 'static
where
Block: BlockT,
Client: ProvideRuntimeApi<Block>
+ BlockOf
+ AuxStore
+ HeaderBackend<Block>
+ BlockBackend<Block>
+ Send
+ Sync
+ 'static,
Client::Api:
AuraApi<Block, P::Public> + CollectCollationInfo<Block> + AuraUnincludedSegmentApi<Block>,
Backend: sc_client_api::Backend<Block> + 'static,
RClient: RelayChainInterface + Clone + 'static,
CIDP: CreateInherentDataProviders<Block, ()> + 'static,
CIDP::InherentDataProviders: Send,
BI: BlockImport<Block> + ParachainBlockImportMarker + Send + Sync + 'static,
Proposer: ProposerInterface<Block> + Send + Sync + 'static,
CS: CollatorServiceInterface<Block> + Send + Sync + 'static,
CHP: consensus_common::ValidationCodeHashProvider<Block::Hash> + Send + 'static,
P: Pair,
P::Public: AppPublic + Member + Codec,
P::Signature: TryFrom<Vec<u8>> + Member + Codec,
{
async move {
cumulus_client_collator::initialize_collator_subsystems(
&mut params.overseer_handle,
params.collator_key,
params.para_id,
params.reinitialize,
)
.await;
let mut import_notifications = match params.relay_client.import_notification_stream().await
{
Ok(s) => s,
Err(err) => {
tracing::error!(
target: crate::LOG_TARGET,
?err,
"Failed to initialize consensus: no relay chain import notification stream"
);
return
},
};
let mut collator = {
let params = collator_util::Params {
create_inherent_data_providers: params.create_inherent_data_providers,
block_import: params.block_import,
relay_client: params.relay_client.clone(),
keystore: params.keystore.clone(),
para_id: params.para_id,
proposer: params.proposer,
collator_service: params.collator_service,
};
collator_util::Collator::<Block, P, _, _, _, _, _>::new(params)
};
while let Some(relay_parent_header) = import_notifications.next().await {
let relay_parent = relay_parent_header.hash();
let core_index = if let Some(core_index) = super::cores_scheduled_for_para(
relay_parent,
params.para_id,
&mut params.relay_client,
ClaimQueueOffset(DEFAULT_CLAIM_QUEUE_OFFSET),
)
.await
.get(0)
{
*core_index
} else {
tracing::trace!(
target: crate::LOG_TARGET,
?relay_parent,
?params.para_id,
"Para is not scheduled on any core, skipping import notification",
);
continue
};
let max_pov_size = match params
.relay_client
.persisted_validation_data(
relay_parent,
params.para_id,
OccupiedCoreAssumption::Included,
)
.await
{
Ok(None) => continue,
Ok(Some(pvd)) => pvd.max_pov_size,
Err(err) => {
tracing::error!(target: crate::LOG_TARGET, ?err, "Failed to gather information from relay-client");
continue
},
};
let (included_block, initial_parent) = match crate::collators::find_parent(
relay_parent,
params.para_id,
&*params.para_backend,
¶ms.relay_client,
)
.await
{
Some(value) => value,
None => continue,
};
let para_client = &*params.para_client;
let keystore = ¶ms.keystore;
let can_build_upon = |block_hash| {
let slot_duration = match sc_consensus_aura::standalone::slot_duration_at(
&*params.para_client,
block_hash,
) {
Ok(sd) => sd,
Err(err) => {
tracing::error!(target: crate::LOG_TARGET, ?err, "Failed to acquire parachain slot duration");
return None
},
};
tracing::debug!(target: crate::LOG_TARGET, ?slot_duration, ?block_hash, "Parachain slot duration acquired");
let (relay_slot, timestamp) = consensus_common::relay_slot_and_timestamp(
&relay_parent_header,
params.relay_chain_slot_duration,
)?;
let slot_now = Slot::from_timestamp(timestamp, slot_duration);
tracing::debug!(
target: crate::LOG_TARGET,
?relay_slot,
para_slot = ?slot_now,
?timestamp,
?slot_duration,
relay_chain_slot_duration = ?params.relay_chain_slot_duration,
"Adjusted relay-chain slot to parachain slot"
);
Some(super::can_build_upon::<_, _, P>(
slot_now,
timestamp,
block_hash,
included_block,
para_client,
&keystore,
))
};
let mut parent_hash = initial_parent.hash;
let mut parent_header = initial_parent.header;
let overseer_handle = &mut params.overseer_handle;
if !collator.collator_service().check_block_status(parent_hash, &parent_header) {
continue
}
for n_built in 0..2 {
let slot_claim = match can_build_upon(parent_hash) {
Some(fut) => match fut.await {
None => break,
Some(c) => c,
},
None => break,
};
tracing::debug!(
target: crate::LOG_TARGET,
?relay_parent,
unincluded_segment_len = initial_parent.depth + n_built,
"Slot claimed. Building"
);
let validation_data = PersistedValidationData {
parent_head: parent_header.encode().into(),
relay_parent_number: *relay_parent_header.number(),
relay_parent_storage_root: *relay_parent_header.state_root(),
max_pov_size,
};
let (parachain_inherent_data, other_inherent_data) = match collator
.create_inherent_data(
relay_parent,
&validation_data,
parent_hash,
slot_claim.timestamp(),
)
.await
{
Err(err) => {
tracing::error!(target: crate::LOG_TARGET, ?err);
break
},
Ok(x) => x,
};
let Some(validation_code_hash) =
params.code_hash_provider.code_hash_at(parent_hash)
else {
tracing::error!(target: crate::LOG_TARGET, ?parent_hash, "Could not fetch validation code hash");
break
};
super::check_validation_code_or_log(
&validation_code_hash,
params.para_id,
¶ms.relay_client,
relay_parent,
)
.await;
let allowed_pov_size = if cfg!(feature = "full-pov-size") {
validation_data.max_pov_size
} else {
validation_data.max_pov_size / 2
} as usize;
match collator
.collate(
&parent_header,
&slot_claim,
None,
(parachain_inherent_data, other_inherent_data),
params.authoring_duration,
allowed_pov_size,
)
.await
{
Ok(Some((collation, block_data, new_block_hash))) => {
collator.collator_service().announce_block(new_block_hash, None);
if let Some(ref export_pov) = export_pov {
export_pov_to_path::<Block>(
export_pov.clone(),
collation.proof_of_validity.clone().into_compressed(),
new_block_hash,
*block_data.header().number(),
parent_header.clone(),
*relay_parent_header.state_root(),
*relay_parent_header.number(),
);
}
overseer_handle
.send_msg(
CollationGenerationMessage::SubmitCollation(
SubmitCollationParams {
relay_parent,
collation,
parent_head: parent_header.encode().into(),
validation_code_hash,
result_sender: None,
core_index,
},
),
"SubmitCollation",
)
.await;
parent_hash = new_block_hash;
parent_header = block_data.into_header();
},
Ok(None) => {
tracing::debug!(target: crate::LOG_TARGET, "No block proposal");
break
},
Err(err) => {
tracing::error!(target: crate::LOG_TARGET, ?err);
break
},
}
}
}
}
}