use codec::Encode;
use cumulus_client_collator::service::ServiceInterface as CollatorServiceInterface;
use cumulus_relay_chain_interface::RelayChainInterface;
use polkadot_node_primitives::{MaybeCompressedPoV, SubmitCollationParams};
use polkadot_node_subsystem::messages::CollationGenerationMessage;
use polkadot_overseer::Handle as OverseerHandle;
use polkadot_primitives::{CollatorPair, Id as ParaId};
use futures::prelude::*;
use sc_utils::mpsc::TracingUnboundedReceiver;
use sp_runtime::traits::{Block as BlockT, Header};
use super::CollatorMessage;
const LOG_TARGET: &str = "aura::cumulus::collation_task";
pub struct Params<Block: BlockT, RClient, CS> {
pub relay_client: RClient,
pub collator_key: CollatorPair,
pub para_id: ParaId,
pub reinitialize: bool,
pub collator_service: CS,
pub collator_receiver: TracingUnboundedReceiver<CollatorMessage<Block>>,
pub block_import_handle: super::SlotBasedBlockImportHandle<Block>,
}
pub async fn run_collation_task<Block, RClient, CS>(
Params {
relay_client,
collator_key,
para_id,
reinitialize,
collator_service,
mut collator_receiver,
mut block_import_handle,
}: Params<Block, RClient, CS>,
) where
Block: BlockT,
CS: CollatorServiceInterface<Block> + Send + Sync + 'static,
RClient: RelayChainInterface + Clone + 'static,
{
let Ok(mut overseer_handle) = relay_client.overseer_handle() else {
tracing::error!(target: LOG_TARGET, "Failed to get overseer handle.");
return
};
cumulus_client_collator::initialize_collator_subsystems(
&mut overseer_handle,
collator_key,
para_id,
reinitialize,
)
.await;
loop {
futures::select! {
collator_message = collator_receiver.next() => {
let Some(message) = collator_message else {
return;
};
handle_collation_message(message, &collator_service, &mut overseer_handle).await;
},
block_import_msg = block_import_handle.next().fuse() => {
let _ = block_import_msg;
}
}
}
}
async fn handle_collation_message<Block: BlockT>(
message: CollatorMessage<Block>,
collator_service: &impl CollatorServiceInterface<Block>,
overseer_handle: &mut OverseerHandle,
) {
let CollatorMessage {
parent_header,
parachain_candidate,
validation_code_hash,
relay_parent,
core_index,
} = message;
let hash = parachain_candidate.block.header().hash();
let number = *parachain_candidate.block.header().number();
let (collation, block_data) =
match collator_service.build_collation(&parent_header, hash, parachain_candidate) {
Some(collation) => collation,
None => {
tracing::warn!(target: LOG_TARGET, %hash, ?number, ?core_index, "Unable to build collation.");
return;
},
};
tracing::info!(
target: LOG_TARGET,
"PoV size {{ header: {:.2}kB, extrinsics: {:.2}kB, storage_proof: {:.2}kB }}",
block_data.header().encoded_size() as f64 / 1024f64,
block_data.extrinsics().encoded_size() as f64 / 1024f64,
block_data.storage_proof().encoded_size() as f64 / 1024f64,
);
if let MaybeCompressedPoV::Compressed(ref pov) = collation.proof_of_validity {
tracing::info!(
target: LOG_TARGET,
"Compressed PoV size: {}kb",
pov.block_data.0.len() as f64 / 1024f64,
);
}
tracing::debug!(target: LOG_TARGET, ?core_index, %hash, %number, "Submitting collation for core.");
overseer_handle
.send_msg(
CollationGenerationMessage::SubmitCollation(SubmitCollationParams {
relay_parent,
collation,
parent_head: parent_header.encode().into(),
validation_code_hash,
core_index,
result_sender: None,
}),
"SubmitCollation",
)
.await;
}