cumulus_client_consensus_aura/collators/slot_based/
collation_task.rs1use codec::Encode;
19use std::path::PathBuf;
20
21use cumulus_client_collator::service::ServiceInterface as CollatorServiceInterface;
22use cumulus_relay_chain_interface::RelayChainInterface;
23
24use polkadot_node_primitives::{MaybeCompressedPoV, SubmitCollationParams};
25use polkadot_node_subsystem::messages::CollationGenerationMessage;
26use polkadot_overseer::Handle as OverseerHandle;
27use polkadot_primitives::{CollatorPair, Id as ParaId};
28
29use cumulus_primitives_core::relay_chain::BlockId;
30use futures::prelude::*;
31
32use crate::export_pov_to_path;
33use sc_utils::mpsc::TracingUnboundedReceiver;
34use sp_runtime::traits::{Block as BlockT, Header};
35
36use super::CollatorMessage;
37
38const LOG_TARGET: &str = "aura::cumulus::collation_task";
39
40pub struct Params<Block: BlockT, RClient, CS> {
42 pub relay_client: RClient,
44 pub collator_key: CollatorPair,
46 pub para_id: ParaId,
48 pub reinitialize: bool,
50 pub collator_service: CS,
52 pub collator_receiver: TracingUnboundedReceiver<CollatorMessage<Block>>,
54 pub block_import_handle: super::SlotBasedBlockImportHandle<Block>,
56 pub export_pov: Option<PathBuf>,
58}
59
60pub async fn run_collation_task<Block, RClient, CS>(
67 Params {
68 relay_client,
69 collator_key,
70 para_id,
71 reinitialize,
72 collator_service,
73 mut collator_receiver,
74 mut block_import_handle,
75 export_pov,
76 }: Params<Block, RClient, CS>,
77) where
78 Block: BlockT,
79 CS: CollatorServiceInterface<Block> + Send + Sync + 'static,
80 RClient: RelayChainInterface + Clone + 'static,
81{
82 let Ok(mut overseer_handle) = relay_client.overseer_handle() else {
83 tracing::error!(target: LOG_TARGET, "Failed to get overseer handle.");
84 return
85 };
86
87 cumulus_client_collator::initialize_collator_subsystems(
88 &mut overseer_handle,
89 collator_key,
90 para_id,
91 reinitialize,
92 )
93 .await;
94
95 loop {
96 futures::select! {
97 collator_message = collator_receiver.next() => {
98 let Some(message) = collator_message else {
99 return;
100 };
101
102 handle_collation_message(message, &collator_service, &mut overseer_handle,relay_client.clone(),export_pov.clone()).await;
103 },
104 block_import_msg = block_import_handle.next().fuse() => {
105 let _ = block_import_msg;
108 }
109 }
110 }
111}
112
113async fn handle_collation_message<Block: BlockT, RClient: RelayChainInterface + Clone + 'static>(
117 message: CollatorMessage<Block>,
118 collator_service: &impl CollatorServiceInterface<Block>,
119 overseer_handle: &mut OverseerHandle,
120 relay_client: RClient,
121 export_pov: Option<PathBuf>,
122) {
123 let CollatorMessage {
124 parent_header,
125 parachain_candidate,
126 validation_code_hash,
127 relay_parent,
128 core_index,
129 max_pov_size,
130 } = message;
131
132 let hash = parachain_candidate.block.header().hash();
133 let number = *parachain_candidate.block.header().number();
134 let (collation, block_data) =
135 match collator_service.build_collation(&parent_header, hash, parachain_candidate) {
136 Some(collation) => collation,
137 None => {
138 tracing::warn!(target: LOG_TARGET, %hash, ?number, ?core_index, "Unable to build collation.");
139 return;
140 },
141 };
142
143 block_data.log_size_info();
144
145 if let MaybeCompressedPoV::Compressed(ref pov) = collation.proof_of_validity {
146 if let Some(pov_path) = export_pov {
147 if let Ok(Some(relay_parent_header)) =
148 relay_client.header(BlockId::Hash(relay_parent)).await
149 {
150 if let Some(header) = block_data.blocks().first().map(|b| b.header()) {
151 export_pov_to_path::<Block>(
152 pov_path.clone(),
153 pov.clone(),
154 header.hash(),
155 *header.number(),
156 parent_header.clone(),
157 relay_parent_header.state_root,
158 relay_parent_header.number,
159 max_pov_size,
160 );
161 }
162 } else {
163 tracing::error!(target: LOG_TARGET, "Failed to get relay parent header from hash: {relay_parent:?}");
164 }
165 }
166
167 tracing::info!(
168 target: LOG_TARGET,
169 "Compressed PoV size: {}kb",
170 pov.block_data.0.len() as f64 / 1024f64,
171 );
172 }
173
174 tracing::debug!(target: LOG_TARGET, ?core_index, ?hash, %number, "Submitting collation for core.");
175
176 overseer_handle
177 .send_msg(
178 CollationGenerationMessage::SubmitCollation(SubmitCollationParams {
179 relay_parent,
180 collation,
181 parent_head: parent_header.encode().into(),
182 validation_code_hash,
183 core_index,
184 result_sender: None,
185 }),
186 "SubmitCollation",
187 )
188 .await;
189}