1#![deny(missing_docs)]
33
34use codec::Encode;
35use error::{Error, Result};
36use futures::{channel::oneshot, future::FutureExt, select};
37use polkadot_node_primitives::{
38 AvailableData, Collation, CollationGenerationConfig, CollationSecondedSignal, PoV,
39 SubmitCollationParams,
40};
41use polkadot_node_subsystem::{
42 messages::{CollationGenerationMessage, CollatorProtocolMessage, RuntimeApiMessage},
43 overseer, ActiveLeavesUpdate, FromOrchestra, OverseerSignal, SpawnedSubsystem,
44 SubsystemContext, SubsystemError, SubsystemResult, SubsystemSender,
45};
46use polkadot_node_subsystem_util::{
47 request_claim_queue, request_persisted_validation_data, request_session_index_for_child,
48 request_validation_code_hash, request_validators, runtime::ClaimQueueSnapshot,
49};
50use polkadot_primitives::{
51 transpose_claim_queue, CandidateCommitments, CandidateDescriptorV2,
52 CommittedCandidateReceiptV2, CoreIndex, Hash, Id as ParaId, OccupiedCoreAssumption,
53 PersistedValidationData, SessionIndex, TransposedClaimQueue, ValidationCodeHash,
54};
55use schnellru::{ByLength, LruMap};
56use std::{collections::HashSet, sync::Arc};
57
58mod error;
59
60#[cfg(test)]
61mod tests;
62
63mod metrics;
64use self::metrics::Metrics;
65
66const LOG_TARGET: &'static str = "parachain::collation-generation";
67
68pub struct CollationGenerationSubsystem {
70 config: Option<Arc<CollationGenerationConfig>>,
71 session_info_cache: SessionInfoCache,
72 metrics: Metrics,
73}
74
75#[overseer::contextbounds(CollationGeneration, prefix = self::overseer)]
76impl CollationGenerationSubsystem {
77 pub fn new(metrics: Metrics) -> Self {
79 Self { config: None, metrics, session_info_cache: SessionInfoCache::new() }
80 }
81
82 async fn run<Context>(mut self, mut ctx: Context) {
94 loop {
95 select! {
96 incoming = ctx.recv().fuse() => {
97 if self.handle_incoming::<Context>(incoming, &mut ctx).await {
98 break;
99 }
100 },
101 }
102 }
103 }
104
105 async fn handle_incoming<Context>(
111 &mut self,
112 incoming: SubsystemResult<FromOrchestra<<Context as SubsystemContext>::Message>>,
113 ctx: &mut Context,
114 ) -> bool {
115 match incoming {
116 Ok(FromOrchestra::Signal(OverseerSignal::ActiveLeaves(ActiveLeavesUpdate {
117 activated,
118 ..
119 }))) => {
120 if let Err(err) = self.handle_new_activation(activated.map(|v| v.hash), ctx).await {
121 gum::warn!(target: LOG_TARGET, err = ?err, "failed to handle new activation");
122 }
123
124 false
125 },
126 Ok(FromOrchestra::Signal(OverseerSignal::Conclude)) => true,
127 Ok(FromOrchestra::Communication {
128 msg: CollationGenerationMessage::Initialize(config),
129 }) => {
130 if self.config.is_some() {
131 gum::error!(target: LOG_TARGET, "double initialization");
132 } else {
133 self.config = Some(Arc::new(config));
134 }
135 false
136 },
137 Ok(FromOrchestra::Communication {
138 msg: CollationGenerationMessage::Reinitialize(config),
139 }) => {
140 self.config = Some(Arc::new(config));
141 false
142 },
143 Ok(FromOrchestra::Communication {
144 msg: CollationGenerationMessage::SubmitCollation(params),
145 }) => {
146 if let Err(err) = self.handle_submit_collation(params, ctx).await {
147 gum::error!(target: LOG_TARGET, ?err, "Failed to submit collation");
148 }
149
150 false
151 },
152 Ok(FromOrchestra::Signal(OverseerSignal::BlockFinalized(..))) => false,
153 Err(err) => {
154 gum::error!(
155 target: LOG_TARGET,
156 err = ?err,
157 "error receiving message from subsystem context: {:?}",
158 err
159 );
160 true
161 },
162 }
163 }
164
165 async fn handle_submit_collation<Context>(
166 &mut self,
167 params: SubmitCollationParams,
168 ctx: &mut Context,
169 ) -> Result<()> {
170 let Some(config) = &self.config else {
171 return Err(Error::SubmittedBeforeInit);
172 };
173 let _timer = self.metrics.time_submit_collation();
174
175 let SubmitCollationParams {
176 relay_parent,
177 collation,
178 parent_head,
179 validation_code_hash,
180 result_sender,
181 core_index,
182 } = params;
183
184 let mut validation_data = match request_persisted_validation_data(
185 relay_parent,
186 config.para_id,
187 OccupiedCoreAssumption::TimedOut,
188 ctx.sender(),
189 )
190 .await
191 .await??
192 {
193 Some(v) => v,
194 None => {
195 gum::debug!(
196 target: LOG_TARGET,
197 relay_parent = ?relay_parent,
198 our_para = %config.para_id,
199 "No validation data for para - does it exist at this relay-parent?",
200 );
201 return Ok(())
202 },
203 };
204
205 validation_data.parent_head = parent_head;
207
208 let claim_queue = request_claim_queue(relay_parent, ctx.sender()).await.await??;
209
210 let session_index =
211 request_session_index_for_child(relay_parent, ctx.sender()).await.await??;
212
213 let session_info =
214 self.session_info_cache.get(relay_parent, session_index, ctx.sender()).await?;
215 let collation = PreparedCollation {
216 collation,
217 relay_parent,
218 para_id: config.para_id,
219 validation_data,
220 validation_code_hash,
221 n_validators: session_info.n_validators,
222 core_index,
223 session_index,
224 };
225
226 construct_and_distribute_receipt(
227 collation,
228 ctx.sender(),
229 result_sender,
230 &mut self.metrics,
231 &transpose_claim_queue(claim_queue),
232 )
233 .await?;
234
235 Ok(())
236 }
237
238 async fn handle_new_activation<Context>(
239 &mut self,
240 maybe_activated: Option<Hash>,
241 ctx: &mut Context,
242 ) -> Result<()> {
243 let Some(config) = &self.config else {
244 return Ok(());
245 };
246
247 let Some(relay_parent) = maybe_activated else { return Ok(()) };
248
249 if config.collator.is_none() {
252 return Ok(())
253 }
254
255 let para_id = config.para_id;
256
257 let _timer = self.metrics.time_new_activation();
258
259 let session_index =
260 request_session_index_for_child(relay_parent, ctx.sender()).await.await??;
261
262 let session_info =
263 self.session_info_cache.get(relay_parent, session_index, ctx.sender()).await?;
264 let n_validators = session_info.n_validators;
265
266 let claim_queue =
267 ClaimQueueSnapshot::from(request_claim_queue(relay_parent, ctx.sender()).await.await??);
268
269 let assigned_cores = claim_queue
270 .iter_all_claims()
271 .filter_map(|(core_idx, para_ids)| {
272 para_ids.iter().any(|¶_id| para_id == config.para_id).then_some(*core_idx)
273 })
274 .collect::<Vec<_>>();
275
276 if assigned_cores.is_empty() {
278 return Ok(())
279 }
280
281 let mut validation_data = match request_persisted_validation_data(
285 relay_parent,
286 para_id,
287 OccupiedCoreAssumption::Included,
290 ctx.sender(),
291 )
292 .await
293 .await??
294 {
295 Some(v) => v,
296 None => {
297 gum::debug!(
298 target: LOG_TARGET,
299 relay_parent = ?relay_parent,
300 our_para = %para_id,
301 "validation data is not available",
302 );
303 return Ok(())
304 },
305 };
306
307 let validation_code_hash = match request_validation_code_hash(
308 relay_parent,
309 para_id,
310 OccupiedCoreAssumption::Included,
313 ctx.sender(),
314 )
315 .await
316 .await??
317 {
318 Some(v) => v,
319 None => {
320 gum::debug!(
321 target: LOG_TARGET,
322 relay_parent = ?relay_parent,
323 our_para = %para_id,
324 "validation code hash is not found.",
325 );
326 return Ok(())
327 },
328 };
329
330 let task_config = config.clone();
331 let metrics = self.metrics.clone();
332 let mut task_sender = ctx.sender().clone();
333
334 ctx.spawn(
335 "chained-collation-builder",
336 Box::pin(async move {
337 let transposed_claim_queue = transpose_claim_queue(claim_queue.0.clone());
338
339 let mut used_cores = HashSet::new();
341
342 for i in 0..assigned_cores.len() {
343 let collator_fn = match task_config.collator.as_ref() {
345 Some(x) => x,
346 None => return,
347 };
348
349 let (collation, result_sender) =
350 match collator_fn(relay_parent, &validation_data).await {
351 Some(collation) => collation.into_inner(),
352 None => {
353 gum::debug!(
354 target: LOG_TARGET,
355 ?para_id,
356 "collator returned no collation on collate",
357 );
358 return
359 },
360 };
361
362 let mut commitments = CandidateCommitments::default();
365 commitments.upward_messages = collation.upward_messages.clone();
366
367 let ump_signals = match commitments.ump_signals() {
368 Ok(signals) => signals,
369 Err(err) => {
370 gum::debug!(
371 target: LOG_TARGET,
372 ?para_id,
373 "error processing UMP signals: {}",
374 err
375 );
376 return
377 },
378 };
379
380 let (cs_index, cq_offset) = ump_signals
381 .core_selector()
382 .map(|(cs_index, cq_offset)| (cs_index.0 as usize, cq_offset.0 as usize))
383 .unwrap_or((i, 0));
384
385 let cores_to_build_on = claim_queue
387 .iter_claims_at_depth(cq_offset)
388 .filter_map(|(core_idx, para_id)| {
389 (para_id == task_config.para_id).then_some(core_idx)
390 })
391 .collect::<Vec<_>>();
392
393 if cores_to_build_on.is_empty() {
394 gum::debug!(
395 target: LOG_TARGET,
396 ?para_id,
397 "no core is assigned to para at depth {}",
398 cq_offset,
399 );
400 return
401 }
402
403 let descriptor_core_index =
404 cores_to_build_on[cs_index % cores_to_build_on.len()];
405
406 if used_cores.contains(&descriptor_core_index.0) {
408 gum::warn!(
409 target: LOG_TARGET,
410 ?para_id,
411 "parachain repeatedly selected the same core index: {}",
412 descriptor_core_index.0,
413 );
414 return
415 }
416
417 used_cores.insert(descriptor_core_index.0);
418 gum::trace!(
419 target: LOG_TARGET,
420 ?para_id,
421 "selected core index: {}",
422 descriptor_core_index.0,
423 );
424
425 let parent_head = collation.head_data.clone();
427 if let Err(err) = construct_and_distribute_receipt(
428 PreparedCollation {
429 collation,
430 para_id,
431 relay_parent,
432 validation_data: validation_data.clone(),
433 validation_code_hash,
434 n_validators,
435 core_index: descriptor_core_index,
436 session_index,
437 },
438 &mut task_sender,
439 result_sender,
440 &metrics,
441 &transposed_claim_queue,
442 )
443 .await
444 {
445 gum::error!(
446 target: LOG_TARGET,
447 "Failed to construct and distribute collation: {}",
448 err
449 );
450 return
451 }
452
453 validation_data.parent_head = parent_head;
456 }
457 }),
458 )?;
459
460 Ok(())
461 }
462}
463
464#[overseer::subsystem(CollationGeneration, error=SubsystemError, prefix=self::overseer)]
465impl<Context> CollationGenerationSubsystem {
466 fn start(self, ctx: Context) -> SpawnedSubsystem {
467 let future = async move {
468 self.run(ctx).await;
469 Ok(())
470 }
471 .boxed();
472
473 SpawnedSubsystem { name: "collation-generation-subsystem", future }
474 }
475}
476
477#[derive(Clone)]
478struct PerSessionInfo {
479 n_validators: usize,
480}
481
482struct SessionInfoCache(LruMap<SessionIndex, PerSessionInfo>);
483
484impl SessionInfoCache {
485 fn new() -> Self {
486 Self(LruMap::new(ByLength::new(2)))
487 }
488
489 async fn get<Sender: SubsystemSender<RuntimeApiMessage>>(
490 &mut self,
491 relay_parent: Hash,
492 session_index: SessionIndex,
493 sender: &mut Sender,
494 ) -> Result<PerSessionInfo> {
495 if let Some(info) = self.0.get(&session_index) {
496 return Ok(info.clone())
497 }
498
499 let n_validators =
500 request_validators(relay_parent, &mut sender.clone()).await.await??.len();
501
502 let info = PerSessionInfo { n_validators };
503 self.0.insert(session_index, info);
504 Ok(self.0.get(&session_index).expect("Just inserted").clone())
505 }
506}
507
508struct PreparedCollation {
509 collation: Collation,
510 para_id: ParaId,
511 relay_parent: Hash,
512 validation_data: PersistedValidationData,
513 validation_code_hash: ValidationCodeHash,
514 n_validators: usize,
515 core_index: CoreIndex,
516 session_index: SessionIndex,
517}
518
519async fn construct_and_distribute_receipt(
522 collation: PreparedCollation,
523 sender: &mut impl overseer::CollationGenerationSenderTrait,
524 result_sender: Option<oneshot::Sender<CollationSecondedSignal>>,
525 metrics: &Metrics,
526 transposed_claim_queue: &TransposedClaimQueue,
527) -> Result<()> {
528 let PreparedCollation {
529 collation,
530 para_id,
531 relay_parent,
532 validation_data,
533 validation_code_hash,
534 n_validators,
535 core_index,
536 session_index,
537 } = collation;
538
539 let persisted_validation_data_hash = validation_data.hash();
540 let parent_head_data = validation_data.parent_head.clone();
541 let parent_head_data_hash = validation_data.parent_head.hash();
542
543 let pov = {
545 let pov = collation.proof_of_validity.into_compressed();
546 let encoded_size = pov.encoded_size();
547
548 if encoded_size > validation_data.max_pov_size as usize {
554 return Err(Error::POVSizeExceeded(encoded_size, validation_data.max_pov_size as usize))
555 }
556
557 pov
558 };
559
560 let pov_hash = pov.hash();
561
562 let erasure_root = erasure_root(n_validators, validation_data, pov.clone())?;
563
564 let commitments = CandidateCommitments {
565 upward_messages: collation.upward_messages,
566 horizontal_messages: collation.horizontal_messages,
567 new_validation_code: collation.new_validation_code,
568 head_data: collation.head_data,
569 processed_downward_messages: collation.processed_downward_messages,
570 hrmp_watermark: collation.hrmp_watermark,
571 };
572
573 let receipt = {
574 let ccr = CommittedCandidateReceiptV2 {
575 descriptor: CandidateDescriptorV2::new(
576 para_id,
577 relay_parent,
578 core_index,
579 session_index,
580 persisted_validation_data_hash,
581 pov_hash,
582 erasure_root,
583 commitments.head_data.hash(),
584 validation_code_hash,
585 ),
586 commitments: commitments.clone(),
587 };
588
589 ccr.parse_ump_signals(&transposed_claim_queue)
590 .map_err(Error::CandidateReceiptCheck)?;
591
592 ccr.to_plain()
593 };
594
595 gum::debug!(
596 target: LOG_TARGET,
597 candidate_hash = ?receipt.hash(),
598 ?pov_hash,
599 ?relay_parent,
600 para_id = %para_id,
601 ?core_index,
602 "Candidate generated",
603 );
604 gum::trace!(
605 target: LOG_TARGET,
606 ?commitments,
607 candidate_hash = ?receipt.hash(),
608 "Candidate commitments",
609 );
610
611 metrics.on_collation_generated();
612
613 sender
614 .send_message(CollatorProtocolMessage::DistributeCollation {
615 candidate_receipt: receipt,
616 parent_head_data_hash,
617 pov,
618 parent_head_data,
619 result_sender,
620 core_index,
621 })
622 .await;
623
624 Ok(())
625}
626
627fn erasure_root(
628 n_validators: usize,
629 persisted_validation: PersistedValidationData,
630 pov: PoV,
631) -> Result<Hash> {
632 let available_data =
633 AvailableData { validation_data: persisted_validation, pov: Arc::new(pov) };
634
635 let chunks = polkadot_erasure_coding::obtain_chunks_v1(n_validators, &available_data)?;
636 Ok(polkadot_erasure_coding::branches(&chunks).root())
637}