1use codec::{Codec, Encode};
36use cumulus_client_collator::service::ServiceInterface as CollatorServiceInterface;
37use cumulus_client_consensus_common::{self as consensus_common, ParachainBlockImportMarker};
38use cumulus_client_consensus_proposer::ProposerInterface;
39use cumulus_primitives_aura::AuraUnincludedSegmentApi;
40use cumulus_primitives_core::{CollectCollationInfo, PersistedValidationData};
41use cumulus_relay_chain_interface::RelayChainInterface;
42
43use polkadot_node_primitives::SubmitCollationParams;
44use polkadot_node_subsystem::messages::CollationGenerationMessage;
45use polkadot_overseer::Handle as OverseerHandle;
46use polkadot_primitives::{CollatorPair, Id as ParaId, OccupiedCoreAssumption};
47
48use crate::{collator as collator_util, collators::claim_queue_at, export_pov_to_path};
49use futures::prelude::*;
50use sc_client_api::{backend::AuxStore, BlockBackend, BlockOf};
51use sc_consensus::BlockImport;
52use sp_api::ProvideRuntimeApi;
53use sp_application_crypto::AppPublic;
54use sp_blockchain::HeaderBackend;
55use sp_consensus_aura::{AuraApi, Slot};
56use sp_core::crypto::Pair;
57use sp_inherents::CreateInherentDataProviders;
58use sp_keystore::KeystorePtr;
59use sp_runtime::traits::{Block as BlockT, Header as HeaderT, Member};
60use std::{path::PathBuf, sync::Arc, time::Duration};
61
62pub struct Params<BI, CIDP, Client, Backend, RClient, CHP, Proposer, CS> {
64 pub create_inherent_data_providers: CIDP,
68 pub block_import: BI,
70 pub para_client: Arc<Client>,
72 pub para_backend: Arc<Backend>,
74 pub relay_client: RClient,
76 pub code_hash_provider: CHP,
78 pub keystore: KeystorePtr,
80 pub collator_key: CollatorPair,
82 pub para_id: ParaId,
84 pub overseer_handle: OverseerHandle,
86 pub relay_chain_slot_duration: Duration,
88 pub proposer: Proposer,
90 pub collator_service: CS,
92 pub authoring_duration: Duration,
94 pub reinitialize: bool,
96 pub max_pov_percentage: Option<u32>,
99}
100
101pub fn run<Block, P, BI, CIDP, Client, Backend, RClient, CHP, Proposer, CS>(
103 params: Params<BI, CIDP, Client, Backend, RClient, CHP, Proposer, CS>,
104) -> impl Future<Output = ()> + Send + 'static
105where
106 Block: BlockT,
107 Client: ProvideRuntimeApi<Block>
108 + BlockOf
109 + AuxStore
110 + HeaderBackend<Block>
111 + BlockBackend<Block>
112 + Send
113 + Sync
114 + 'static,
115 Client::Api:
116 AuraApi<Block, P::Public> + CollectCollationInfo<Block> + AuraUnincludedSegmentApi<Block>,
117 Backend: sc_client_api::Backend<Block> + 'static,
118 RClient: RelayChainInterface + Clone + 'static,
119 CIDP: CreateInherentDataProviders<Block, ()> + 'static,
120 CIDP::InherentDataProviders: Send,
121 BI: BlockImport<Block> + ParachainBlockImportMarker + Send + Sync + 'static,
122 Proposer: ProposerInterface<Block> + Send + Sync + 'static,
123 CS: CollatorServiceInterface<Block> + Send + Sync + 'static,
124 CHP: consensus_common::ValidationCodeHashProvider<Block::Hash> + Send + 'static,
125 P: Pair,
126 P::Public: AppPublic + Member + Codec,
127 P::Signature: TryFrom<Vec<u8>> + Member + Codec,
128{
129 run_with_export::<_, P, _, _, _, _, _, _, _, _>(ParamsWithExport { params, export_pov: None })
130}
131
132pub struct ParamsWithExport<BI, CIDP, Client, Backend, RClient, CHP, Proposer, CS> {
134 pub params: Params<BI, CIDP, Client, Backend, RClient, CHP, Proposer, CS>,
136
137 pub export_pov: Option<PathBuf>,
139}
140
141pub fn run_with_export<Block, P, BI, CIDP, Client, Backend, RClient, CHP, Proposer, CS>(
146 ParamsWithExport { mut params, export_pov }: ParamsWithExport<
147 BI,
148 CIDP,
149 Client,
150 Backend,
151 RClient,
152 CHP,
153 Proposer,
154 CS,
155 >,
156) -> impl Future<Output = ()> + Send + 'static
157where
158 Block: BlockT,
159 Client: ProvideRuntimeApi<Block>
160 + BlockOf
161 + AuxStore
162 + HeaderBackend<Block>
163 + BlockBackend<Block>
164 + Send
165 + Sync
166 + 'static,
167 Client::Api:
168 AuraApi<Block, P::Public> + CollectCollationInfo<Block> + AuraUnincludedSegmentApi<Block>,
169 Backend: sc_client_api::Backend<Block> + 'static,
170 RClient: RelayChainInterface + Clone + 'static,
171 CIDP: CreateInherentDataProviders<Block, ()> + 'static,
172 CIDP::InherentDataProviders: Send,
173 BI: BlockImport<Block> + ParachainBlockImportMarker + Send + Sync + 'static,
174 Proposer: ProposerInterface<Block> + Send + Sync + 'static,
175 CS: CollatorServiceInterface<Block> + Send + Sync + 'static,
176 CHP: consensus_common::ValidationCodeHashProvider<Block::Hash> + Send + 'static,
177 P: Pair,
178 P::Public: AppPublic + Member + Codec,
179 P::Signature: TryFrom<Vec<u8>> + Member + Codec,
180{
181 async move {
182 cumulus_client_collator::initialize_collator_subsystems(
183 &mut params.overseer_handle,
184 params.collator_key,
185 params.para_id,
186 params.reinitialize,
187 )
188 .await;
189
190 let mut import_notifications = match params.relay_client.import_notification_stream().await
191 {
192 Ok(s) => s,
193 Err(err) => {
194 tracing::error!(
195 target: crate::LOG_TARGET,
196 ?err,
197 "Failed to initialize consensus: no relay chain import notification stream"
198 );
199
200 return
201 },
202 };
203
204 let mut collator = {
205 let params = collator_util::Params {
206 create_inherent_data_providers: params.create_inherent_data_providers,
207 block_import: params.block_import,
208 relay_client: params.relay_client.clone(),
209 keystore: params.keystore.clone(),
210 para_id: params.para_id,
211 proposer: params.proposer,
212 collator_service: params.collator_service,
213 };
214
215 collator_util::Collator::<Block, P, _, _, _, _, _>::new(params)
216 };
217
218 while let Some(relay_parent_header) = import_notifications.next().await {
219 let relay_parent = relay_parent_header.hash();
220
221 let Some(core_index) = claim_queue_at(relay_parent, &mut params.relay_client)
222 .await
223 .iter_claims_at_depth_for_para(0, params.para_id)
224 .next()
225 else {
226 tracing::trace!(
227 target: crate::LOG_TARGET,
228 ?relay_parent,
229 ?params.para_id,
230 "Para is not scheduled on any core, skipping import notification",
231 );
232
233 continue
234 };
235
236 let max_pov_size = match params
237 .relay_client
238 .persisted_validation_data(
239 relay_parent,
240 params.para_id,
241 OccupiedCoreAssumption::Included,
242 )
243 .await
244 {
245 Ok(None) => continue,
246 Ok(Some(pvd)) => pvd.max_pov_size,
247 Err(err) => {
248 tracing::error!(target: crate::LOG_TARGET, ?err, "Failed to gather information from relay-client");
249 continue
250 },
251 };
252
253 let (included_block, initial_parent) = match crate::collators::find_parent(
254 relay_parent,
255 params.para_id,
256 &*params.para_backend,
257 ¶ms.relay_client,
258 )
259 .await
260 {
261 Some(value) => value,
262 None => continue,
263 };
264
265 let para_client = &*params.para_client;
266 let keystore = ¶ms.keystore;
267 let can_build_upon = |block_hash| {
268 let slot_duration = match sc_consensus_aura::standalone::slot_duration_at(
269 &*params.para_client,
270 block_hash,
271 ) {
272 Ok(sd) => sd,
273 Err(err) => {
274 tracing::error!(target: crate::LOG_TARGET, ?err, "Failed to acquire parachain slot duration");
275 return None
276 },
277 };
278 tracing::debug!(target: crate::LOG_TARGET, ?slot_duration, ?block_hash, "Parachain slot duration acquired");
279 let (relay_slot, timestamp) = consensus_common::relay_slot_and_timestamp(
280 &relay_parent_header,
281 params.relay_chain_slot_duration,
282 )?;
283 let slot_now = Slot::from_timestamp(timestamp, slot_duration);
284 tracing::debug!(
285 target: crate::LOG_TARGET,
286 ?relay_slot,
287 para_slot = ?slot_now,
288 ?timestamp,
289 ?slot_duration,
290 relay_chain_slot_duration = ?params.relay_chain_slot_duration,
291 "Adjusted relay-chain slot to parachain slot"
292 );
293 Some(super::can_build_upon::<_, _, P>(
294 slot_now,
295 relay_slot,
296 timestamp,
297 block_hash,
298 included_block.hash(),
299 para_client,
300 &keystore,
301 ))
302 };
303
304 let mut parent_hash = initial_parent.hash;
307 let mut parent_header = initial_parent.header;
308 let overseer_handle = &mut params.overseer_handle;
309
310 if !collator.collator_service().check_block_status(parent_hash, &parent_header) {
312 continue
313 }
314
315 for n_built in 0..2 {
318 let slot_claim = match can_build_upon(parent_hash) {
319 Some(fut) => match fut.await {
320 None => break,
321 Some(c) => c,
322 },
323 None => break,
324 };
325
326 tracing::debug!(
327 target: crate::LOG_TARGET,
328 ?relay_parent,
329 unincluded_segment_len = initial_parent.depth + n_built,
330 "Slot claimed. Building"
331 );
332
333 let validation_data = PersistedValidationData {
334 parent_head: parent_header.encode().into(),
335 relay_parent_number: *relay_parent_header.number(),
336 relay_parent_storage_root: *relay_parent_header.state_root(),
337 max_pov_size,
338 };
339
340 let (parachain_inherent_data, other_inherent_data) = match collator
343 .create_inherent_data(
344 relay_parent,
345 &validation_data,
346 parent_hash,
347 slot_claim.timestamp(),
348 )
349 .await
350 {
351 Err(err) => {
352 tracing::error!(target: crate::LOG_TARGET, ?err);
353 break
354 },
355 Ok(x) => x,
356 };
357
358 let Some(validation_code_hash) =
359 params.code_hash_provider.code_hash_at(parent_hash)
360 else {
361 tracing::error!(target: crate::LOG_TARGET, ?parent_hash, "Could not fetch validation code hash");
362 break
363 };
364
365 super::check_validation_code_or_log(
366 &validation_code_hash,
367 params.para_id,
368 ¶ms.relay_client,
369 relay_parent,
370 )
371 .await;
372
373 let allowed_pov_size = if let Some(max_pov_percentage) = params.max_pov_percentage {
374 validation_data.max_pov_size * max_pov_percentage / 100
375 } else {
376 validation_data.max_pov_size * 85 / 100
381 } as usize;
382
383 match collator
384 .collate(
385 &parent_header,
386 &slot_claim,
387 None,
388 (parachain_inherent_data, other_inherent_data),
389 params.authoring_duration,
390 allowed_pov_size,
391 )
392 .await
393 {
394 Ok(Some((collation, block_data))) => {
395 let Some(new_block_header) =
396 block_data.blocks().first().map(|b| b.header().clone())
397 else {
398 tracing::error!(target: crate::LOG_TARGET, "Produced PoV doesn't contain any blocks");
399 break
400 };
401
402 let new_block_hash = new_block_header.hash();
403
404 collator.collator_service().announce_block(new_block_hash, None);
407
408 if let Some(ref export_pov) = export_pov {
409 export_pov_to_path::<Block>(
410 export_pov.clone(),
411 collation.proof_of_validity.clone().into_compressed(),
412 new_block_hash,
413 *new_block_header.number(),
414 parent_header.clone(),
415 *relay_parent_header.state_root(),
416 *relay_parent_header.number(),
417 validation_data.max_pov_size,
418 );
419 }
420
421 overseer_handle
427 .send_msg(
428 CollationGenerationMessage::SubmitCollation(
429 SubmitCollationParams {
430 relay_parent,
431 collation,
432 parent_head: parent_header.encode().into(),
433 validation_code_hash,
434 result_sender: None,
435 core_index,
436 },
437 ),
438 "SubmitCollation",
439 )
440 .await;
441
442 parent_hash = new_block_hash;
443 parent_header = new_block_header;
444 },
445 Ok(None) => {
446 tracing::debug!(target: crate::LOG_TARGET, "No block proposal");
447 break
448 },
449 Err(err) => {
450 tracing::error!(target: crate::LOG_TARGET, ?err);
451 break
452 },
453 }
454 }
455 }
456 }
457}