1use codec::{Codec, Encode};
36use cumulus_client_collator::service::ServiceInterface as CollatorServiceInterface;
37use cumulus_client_consensus_common::{self as consensus_common, ParachainBlockImportMarker};
38use cumulus_primitives_aura::AuraUnincludedSegmentApi;
39use cumulus_primitives_core::{CollectCollationInfo, PersistedValidationData};
40use cumulus_relay_chain_interface::RelayChainInterface;
41use sp_consensus::Environment;
42
43use polkadot_node_primitives::SubmitCollationParams;
44use polkadot_node_subsystem::messages::CollationGenerationMessage;
45use polkadot_overseer::Handle as OverseerHandle;
46use polkadot_primitives::{CollatorPair, Id as ParaId, OccupiedCoreAssumption};
47
48use crate::{
49 collator as collator_util,
50 collators::{claim_queue_at, BackingGroupConnectionHelper},
51 export_pov_to_path,
52};
53use futures::prelude::*;
54use sc_client_api::{backend::AuxStore, BlockBackend, BlockOf};
55use sc_consensus::BlockImport;
56use sc_network_types::PeerId;
57use sp_api::ProvideRuntimeApi;
58use sp_application_crypto::AppPublic;
59use sp_blockchain::HeaderBackend;
60use sp_consensus_aura::{AuraApi, Slot};
61use sp_core::crypto::Pair;
62use sp_inherents::CreateInherentDataProviders;
63use sp_keystore::KeystorePtr;
64use sp_runtime::traits::{Block as BlockT, Header as HeaderT, Member};
65use sp_timestamp::Timestamp;
66use std::{path::PathBuf, sync::Arc, time::Duration};
67
68pub struct Params<BI, CIDP, Client, Backend, RClient, CHP, ProposerFactory, CS> {
70 pub create_inherent_data_providers: CIDP,
74 pub block_import: BI,
76 pub para_client: Arc<Client>,
78 pub para_backend: Arc<Backend>,
80 pub relay_client: RClient,
82 pub code_hash_provider: CHP,
84 pub keystore: KeystorePtr,
86 pub collator_key: CollatorPair,
88 pub collator_peer_id: PeerId,
90 pub para_id: ParaId,
92 pub overseer_handle: OverseerHandle,
94 pub relay_chain_slot_duration: Duration,
96 pub proposer: ProposerFactory,
98 pub collator_service: CS,
100 pub authoring_duration: Duration,
102 pub reinitialize: bool,
104 pub max_pov_percentage: Option<u32>,
107}
108
109fn get_parachain_slot<Block, Client, P>(
113 para_client: &Client,
114 block_hash: Block::Hash,
115 relay_parent_header: &polkadot_primitives::Header,
116 relay_chain_slot_duration: Duration,
117) -> Option<(Slot, Slot, Timestamp)>
118where
119 Block: BlockT,
120 Client: ProvideRuntimeApi<Block>,
121 Client::Api: AuraApi<Block, P>,
122 P: Codec,
123{
124 let slot_duration =
125 match sc_consensus_aura::standalone::slot_duration_at(para_client, block_hash) {
126 Ok(sd) => sd,
127 Err(err) => {
128 tracing::error!(target: crate::LOG_TARGET, ?err, "Failed to acquire parachain slot duration");
129 return None
130 },
131 };
132
133 tracing::debug!(target: crate::LOG_TARGET, ?slot_duration, ?block_hash, "Parachain slot duration acquired");
134
135 let (relay_slot, timestamp) =
136 consensus_common::relay_slot_and_timestamp(relay_parent_header, relay_chain_slot_duration)?;
137
138 let slot_now = Slot::from_timestamp(timestamp, slot_duration);
139
140 tracing::debug!(
141 target: crate::LOG_TARGET,
142 ?relay_slot,
143 para_slot = ?slot_now,
144 ?timestamp,
145 ?slot_duration,
146 ?relay_chain_slot_duration,
147 "Adjusted relay-chain slot to parachain slot"
148 );
149
150 Some((slot_now, relay_slot, timestamp))
151}
152
153pub fn run<Block, P, BI, CIDP, Client, Backend, RClient, CHP, Proposer, CS>(
155 params: Params<BI, CIDP, Client, Backend, RClient, CHP, Proposer, CS>,
156) -> impl Future<Output = ()> + Send + 'static
157where
158 Block: BlockT,
159 Client: ProvideRuntimeApi<Block>
160 + BlockOf
161 + AuxStore
162 + HeaderBackend<Block>
163 + BlockBackend<Block>
164 + Send
165 + Sync
166 + 'static,
167 Client::Api:
168 AuraApi<Block, P::Public> + CollectCollationInfo<Block> + AuraUnincludedSegmentApi<Block>,
169 Backend: sc_client_api::Backend<Block> + 'static,
170 RClient: RelayChainInterface + Clone + 'static,
171 CIDP: CreateInherentDataProviders<Block, ()> + 'static,
172 CIDP::InherentDataProviders: Send,
173 BI: BlockImport<Block> + ParachainBlockImportMarker + Send + Sync + 'static,
174 Proposer: Environment<Block> + Clone + Send + Sync + 'static,
175 CS: CollatorServiceInterface<Block> + Send + Sync + 'static,
176 CHP: consensus_common::ValidationCodeHashProvider<Block::Hash> + Send + 'static,
177 P: Pair + Send + Sync + 'static,
178 P::Public: AppPublic + Member + Codec,
179 P::Signature: TryFrom<Vec<u8>> + Member + Codec,
180{
181 run_with_export::<_, P, _, _, _, _, _, _, _, _>(ParamsWithExport { params, export_pov: None })
182}
183
184pub struct ParamsWithExport<BI, CIDP, Client, Backend, RClient, CHP, Proposer, CS> {
186 pub params: Params<BI, CIDP, Client, Backend, RClient, CHP, Proposer, CS>,
188
189 pub export_pov: Option<PathBuf>,
191}
192
193pub fn run_with_export<Block, P, BI, CIDP, Client, Backend, RClient, CHP, Proposer, CS>(
198 ParamsWithExport { mut params, export_pov }: ParamsWithExport<
199 BI,
200 CIDP,
201 Client,
202 Backend,
203 RClient,
204 CHP,
205 Proposer,
206 CS,
207 >,
208) -> impl Future<Output = ()> + Send + 'static
209where
210 Block: BlockT,
211 Client: ProvideRuntimeApi<Block>
212 + BlockOf
213 + AuxStore
214 + HeaderBackend<Block>
215 + BlockBackend<Block>
216 + Send
217 + Sync
218 + 'static,
219 Client::Api:
220 AuraApi<Block, P::Public> + CollectCollationInfo<Block> + AuraUnincludedSegmentApi<Block>,
221 Backend: sc_client_api::Backend<Block> + 'static,
222 RClient: RelayChainInterface + Clone + 'static,
223 CIDP: CreateInherentDataProviders<Block, ()> + 'static,
224 CIDP::InherentDataProviders: Send,
225 BI: BlockImport<Block> + ParachainBlockImportMarker + Send + Sync + 'static,
226 Proposer: Environment<Block> + Clone + Send + Sync + 'static,
227 CS: CollatorServiceInterface<Block> + Send + Sync + 'static,
228 CHP: consensus_common::ValidationCodeHashProvider<Block::Hash> + Send + 'static,
229 P: Pair + Send + Sync + 'static,
230 P::Public: AppPublic + Member + Codec,
231 P::Signature: TryFrom<Vec<u8>> + Member + Codec,
232{
233 async move {
234 cumulus_client_collator::initialize_collator_subsystems(
235 &mut params.overseer_handle,
236 params.collator_key,
237 params.para_id,
238 params.reinitialize,
239 )
240 .await;
241
242 let mut import_notifications = match params.relay_client.import_notification_stream().await
243 {
244 Ok(s) => s,
245 Err(err) => {
246 tracing::error!(
247 target: crate::LOG_TARGET,
248 ?err,
249 "Failed to initialize consensus: no relay chain import notification stream"
250 );
251
252 return
253 },
254 };
255
256 let mut collator = {
257 let params = collator_util::Params {
258 create_inherent_data_providers: params.create_inherent_data_providers,
259 block_import: params.block_import,
260 relay_client: params.relay_client.clone(),
261 keystore: params.keystore.clone(),
262 collator_peer_id: params.collator_peer_id,
263 para_id: params.para_id,
264 proposer: params.proposer,
265 collator_service: params.collator_service,
266 };
267
268 collator_util::Collator::<Block, P, _, _, _, _, _>::new(params)
269 };
270
271 let mut connection_helper = BackingGroupConnectionHelper::new(
272 params.keystore.clone(),
273 params.overseer_handle.clone(),
274 );
275
276 while let Some(relay_parent_header) = import_notifications.next().await {
277 let relay_parent = relay_parent_header.hash();
278
279 let Some(core_index) = claim_queue_at(relay_parent, &mut params.relay_client)
280 .await
281 .iter_claims_at_depth_for_para(0, params.para_id)
282 .next()
283 else {
284 tracing::trace!(
285 target: crate::LOG_TARGET,
286 ?relay_parent,
287 ?params.para_id,
288 "Para is not scheduled on any core, skipping import notification",
289 );
290
291 continue
292 };
293
294 let max_pov_size = match params
295 .relay_client
296 .persisted_validation_data(
297 relay_parent,
298 params.para_id,
299 OccupiedCoreAssumption::Included,
300 )
301 .await
302 {
303 Ok(None) => continue,
304 Ok(Some(pvd)) => pvd.max_pov_size,
305 Err(err) => {
306 tracing::error!(target: crate::LOG_TARGET, ?err, "Failed to gather information from relay-client");
307 continue
308 },
309 };
310
311 let (included_block, initial_parent) = match crate::collators::find_parent(
312 relay_parent,
313 params.para_id,
314 &*params.para_backend,
315 ¶ms.relay_client,
316 )
317 .await
318 {
319 Some(value) => value,
320 None => continue,
321 };
322
323 let para_client = &*params.para_client;
324 let keystore = ¶ms.keystore;
325 let can_build_upon = |block_hash| {
326 let (slot_now, relay_slot, timestamp) = get_parachain_slot::<_, _, P::Public>(
327 para_client,
328 block_hash,
329 &relay_parent_header,
330 params.relay_chain_slot_duration,
331 )?;
332
333 Some(super::can_build_upon::<_, _, P>(
334 slot_now,
335 relay_slot,
336 timestamp,
337 block_hash,
338 included_block.hash(),
339 para_client,
340 &keystore,
341 ))
342 };
343
344 let mut parent_hash = initial_parent.hash;
347 let mut parent_header = initial_parent.header;
348 let overseer_handle = &mut params.overseer_handle;
349
350 if !collator.collator_service().check_block_status(parent_hash, &parent_header) {
352 continue
353 }
354
355 if let (Some((slot_now, _relay_slot, _timestamp)), Ok(authorities)) = (
357 get_parachain_slot::<_, _, P::Public>(
358 para_client,
359 parent_hash,
360 &relay_parent_header,
361 params.relay_chain_slot_duration,
362 ),
363 para_client.runtime_api().authorities(parent_hash),
364 ) {
365 connection_helper.update::<P>(slot_now, &authorities).await;
366 }
367
368 for n_built in 0..2 {
371 let slot_claim = match can_build_upon(parent_hash) {
372 Some(fut) => match fut.await {
373 None => break,
374 Some(c) => c,
375 },
376 None => break,
377 };
378
379 tracing::debug!(
380 target: crate::LOG_TARGET,
381 ?relay_parent,
382 unincluded_segment_len = initial_parent.depth + n_built,
383 "Slot claimed. Building"
384 );
385
386 let validation_data = PersistedValidationData {
387 parent_head: parent_header.encode().into(),
388 relay_parent_number: *relay_parent_header.number(),
389 relay_parent_storage_root: *relay_parent_header.state_root(),
390 max_pov_size,
391 };
392
393 let (parachain_inherent_data, other_inherent_data) = match collator
396 .create_inherent_data(
397 relay_parent,
398 &validation_data,
399 parent_hash,
400 slot_claim.timestamp(),
401 params.collator_peer_id,
402 )
403 .await
404 {
405 Err(err) => {
406 tracing::error!(target: crate::LOG_TARGET, ?err);
407 break
408 },
409 Ok(x) => x,
410 };
411
412 let Some(validation_code_hash) =
413 params.code_hash_provider.code_hash_at(parent_hash)
414 else {
415 tracing::error!(target: crate::LOG_TARGET, ?parent_hash, "Could not fetch validation code hash");
416 break
417 };
418
419 super::check_validation_code_or_log(
420 &validation_code_hash,
421 params.para_id,
422 ¶ms.relay_client,
423 relay_parent,
424 )
425 .await;
426
427 let allowed_pov_size = if let Some(max_pov_percentage) = params.max_pov_percentage {
428 validation_data.max_pov_size * max_pov_percentage / 100
429 } else {
430 validation_data.max_pov_size * 85 / 100
435 } as usize;
436
437 match collator
438 .collate(
439 &parent_header,
440 &slot_claim,
441 None,
442 (parachain_inherent_data, other_inherent_data),
443 params.authoring_duration,
444 allowed_pov_size,
445 )
446 .await
447 {
448 Ok(Some((collation, block_data))) => {
449 let Some(new_block_header) =
450 block_data.blocks().first().map(|b| b.header().clone())
451 else {
452 tracing::error!(target: crate::LOG_TARGET, "Produced PoV doesn't contain any blocks");
453 break
454 };
455
456 let new_block_hash = new_block_header.hash();
457
458 collator.collator_service().announce_block(new_block_hash, None);
461
462 if let Some(ref export_pov) = export_pov {
463 export_pov_to_path::<Block>(
464 export_pov.clone(),
465 collation.proof_of_validity.clone().into_compressed(),
466 new_block_hash,
467 *new_block_header.number(),
468 parent_header.clone(),
469 *relay_parent_header.state_root(),
470 *relay_parent_header.number(),
471 validation_data.max_pov_size,
472 );
473 }
474
475 overseer_handle
481 .send_msg(
482 CollationGenerationMessage::SubmitCollation(
483 SubmitCollationParams {
484 relay_parent,
485 collation,
486 parent_head: parent_header.encode().into(),
487 validation_code_hash,
488 result_sender: None,
489 core_index,
490 },
491 ),
492 "SubmitCollation",
493 )
494 .await;
495
496 parent_hash = new_block_hash;
497 parent_header = new_block_header;
498 },
499 Ok(None) => {
500 tracing::debug!(target: crate::LOG_TARGET, "No block proposal");
501 break
502 },
503 Err(err) => {
504 tracing::error!(target: crate::LOG_TARGET, ?err);
505 break
506 },
507 }
508 }
509 }
510 }
511}