referrerpolicy=no-referrer-when-downgrade

polkadot_node_collation_generation/
lib.rs

1// Copyright (C) Parity Technologies (UK) Ltd.
2// This file is part of Polkadot.
3
4// Polkadot is free software: you can redistribute it and/or modify
5// it under the terms of the GNU General Public License as published by
6// the Free Software Foundation, either version 3 of the License, or
7// (at your option) any later version.
8
9// Polkadot is distributed in the hope that it will be useful,
10// but WITHOUT ANY WARRANTY; without even the implied warranty of
11// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
12// GNU General Public License for more details.
13
14// You should have received a copy of the GNU General Public License
15// along with Polkadot.  If not, see <http://www.gnu.org/licenses/>.
16
17//! The collation generation subsystem is the interface between polkadot and the collators.
18//!
19//! # Protocol
20//!
21//! On every `ActiveLeavesUpdate`:
22//!
23//! * If there is no collation generation config, ignore.
24//! * Otherwise, for each `activated` head in the update:
25//!   * Determine if the para is scheduled on any core by fetching the `availability_cores` Runtime
26//!     API.
27//!   * Use the Runtime API subsystem to fetch the full validation data.
28//!   * Invoke the `collator`, and use its outputs to produce a
29//!     [`polkadot_primitives::CandidateReceiptV2`], signed with the configuration's `key`.
30//!   * Dispatch a [`CollatorProtocolMessage::DistributeCollation`]`(receipt, pov)`.
31
32#![deny(missing_docs)]
33
34use codec::Encode;
35use error::{Error, Result};
36use futures::{channel::oneshot, future::FutureExt, select};
37use polkadot_node_primitives::{
38	AvailableData, Collation, CollationGenerationConfig, CollationSecondedSignal, PoV,
39	SubmitCollationParams,
40};
41use polkadot_node_subsystem::{
42	messages::{CollationGenerationMessage, CollatorProtocolMessage, RuntimeApiMessage},
43	overseer, ActiveLeavesUpdate, FromOrchestra, OverseerSignal, SpawnedSubsystem,
44	SubsystemContext, SubsystemError, SubsystemResult, SubsystemSender,
45};
46use polkadot_node_subsystem_util::{
47	request_claim_queue, request_persisted_validation_data, request_session_index_for_child,
48	request_validation_code_hash, request_validators, runtime::ClaimQueueSnapshot,
49};
50use polkadot_primitives::{
51	transpose_claim_queue, CandidateCommitments, CandidateDescriptorV2,
52	CommittedCandidateReceiptV2, CoreIndex, Hash, Id as ParaId, OccupiedCoreAssumption,
53	PersistedValidationData, SessionIndex, TransposedClaimQueue, ValidationCodeHash,
54};
55use schnellru::{ByLength, LruMap};
56use std::{collections::HashSet, sync::Arc};
57
58mod error;
59
60#[cfg(test)]
61mod tests;
62
63mod metrics;
64use self::metrics::Metrics;
65
66const LOG_TARGET: &'static str = "parachain::collation-generation";
67
68/// Collation Generation Subsystem
69pub struct CollationGenerationSubsystem {
70	config: Option<Arc<CollationGenerationConfig>>,
71	session_info_cache: SessionInfoCache,
72	metrics: Metrics,
73}
74
75#[overseer::contextbounds(CollationGeneration, prefix = self::overseer)]
76impl CollationGenerationSubsystem {
77	/// Create a new instance of the `CollationGenerationSubsystem`.
78	pub fn new(metrics: Metrics) -> Self {
79		Self { config: None, metrics, session_info_cache: SessionInfoCache::new() }
80	}
81
82	/// Run this subsystem
83	///
84	/// Conceptually, this is very simple: it just loops forever.
85	///
86	/// - On incoming overseer messages, it starts or stops jobs as appropriate.
87	/// - On other incoming messages, if they can be converted into `Job::ToJob` and include a hash,
88	///   then they're forwarded to the appropriate individual job.
89	/// - On outgoing messages from the jobs, it forwards them to the overseer.
90	///
91	/// If `err_tx` is not `None`, errors are forwarded onto that channel as they occur.
92	/// Otherwise, most are logged and then discarded.
93	async fn run<Context>(mut self, mut ctx: Context) {
94		loop {
95			select! {
96				incoming = ctx.recv().fuse() => {
97					if self.handle_incoming::<Context>(incoming, &mut ctx).await {
98						break;
99					}
100				},
101			}
102		}
103	}
104
105	// handle an incoming message. return true if we should break afterwards.
106	// note: this doesn't strictly need to be a separate function; it's more an administrative
107	// function so that we don't clutter the run loop. It could in principle be inlined directly
108	// into there. it should hopefully therefore be ok that it's an async function mutably borrowing
109	// self.
110	async fn handle_incoming<Context>(
111		&mut self,
112		incoming: SubsystemResult<FromOrchestra<<Context as SubsystemContext>::Message>>,
113		ctx: &mut Context,
114	) -> bool {
115		match incoming {
116			Ok(FromOrchestra::Signal(OverseerSignal::ActiveLeaves(ActiveLeavesUpdate {
117				activated,
118				..
119			}))) => {
120				if let Err(err) = self.handle_new_activation(activated.map(|v| v.hash), ctx).await {
121					gum::warn!(target: LOG_TARGET, err = ?err, "failed to handle new activation");
122				}
123
124				false
125			},
126			Ok(FromOrchestra::Signal(OverseerSignal::Conclude)) => true,
127			Ok(FromOrchestra::Communication {
128				msg: CollationGenerationMessage::Initialize(config),
129			}) => {
130				if self.config.is_some() {
131					gum::error!(target: LOG_TARGET, "double initialization");
132				} else {
133					self.config = Some(Arc::new(config));
134				}
135				false
136			},
137			Ok(FromOrchestra::Communication {
138				msg: CollationGenerationMessage::Reinitialize(config),
139			}) => {
140				self.config = Some(Arc::new(config));
141				false
142			},
143			Ok(FromOrchestra::Communication {
144				msg: CollationGenerationMessage::SubmitCollation(params),
145			}) => {
146				if let Err(err) = self.handle_submit_collation(params, ctx).await {
147					gum::error!(target: LOG_TARGET, ?err, "Failed to submit collation");
148				}
149
150				false
151			},
152			Ok(FromOrchestra::Signal(OverseerSignal::BlockFinalized(..))) => false,
153			Err(err) => {
154				gum::error!(
155					target: LOG_TARGET,
156					err = ?err,
157					"error receiving message from subsystem context: {:?}",
158					err
159				);
160				true
161			},
162		}
163	}
164
165	async fn handle_submit_collation<Context>(
166		&mut self,
167		params: SubmitCollationParams,
168		ctx: &mut Context,
169	) -> Result<()> {
170		let Some(config) = &self.config else {
171			return Err(Error::SubmittedBeforeInit);
172		};
173		let _timer = self.metrics.time_submit_collation();
174
175		let SubmitCollationParams {
176			relay_parent,
177			collation,
178			parent_head,
179			validation_code_hash,
180			result_sender,
181			core_index,
182		} = params;
183
184		let mut validation_data = match request_persisted_validation_data(
185			relay_parent,
186			config.para_id,
187			OccupiedCoreAssumption::TimedOut,
188			ctx.sender(),
189		)
190		.await
191		.await??
192		{
193			Some(v) => v,
194			None => {
195				gum::debug!(
196					target: LOG_TARGET,
197					relay_parent = ?relay_parent,
198					our_para = %config.para_id,
199					"No validation data for para - does it exist at this relay-parent?",
200				);
201				return Ok(())
202			},
203		};
204
205		// We need to swap the parent-head data, but all other fields here will be correct.
206		validation_data.parent_head = parent_head;
207
208		let claim_queue = request_claim_queue(relay_parent, ctx.sender()).await.await??;
209
210		let session_index =
211			request_session_index_for_child(relay_parent, ctx.sender()).await.await??;
212
213		let session_info =
214			self.session_info_cache.get(relay_parent, session_index, ctx.sender()).await?;
215		let collation = PreparedCollation {
216			collation,
217			relay_parent,
218			para_id: config.para_id,
219			validation_data,
220			validation_code_hash,
221			n_validators: session_info.n_validators,
222			core_index,
223			session_index,
224		};
225
226		construct_and_distribute_receipt(
227			collation,
228			ctx.sender(),
229			result_sender,
230			&mut self.metrics,
231			&transpose_claim_queue(claim_queue),
232		)
233		.await?;
234
235		Ok(())
236	}
237
238	async fn handle_new_activation<Context>(
239		&mut self,
240		maybe_activated: Option<Hash>,
241		ctx: &mut Context,
242	) -> Result<()> {
243		let Some(config) = &self.config else {
244			return Ok(());
245		};
246
247		let Some(relay_parent) = maybe_activated else { return Ok(()) };
248
249		// If there is no collation function provided, bail out early.
250		// Important: Lookahead collator and slot based collator do not use `CollatorFn`.
251		if config.collator.is_none() {
252			return Ok(())
253		}
254
255		let para_id = config.para_id;
256
257		let _timer = self.metrics.time_new_activation();
258
259		let session_index =
260			request_session_index_for_child(relay_parent, ctx.sender()).await.await??;
261
262		let session_info =
263			self.session_info_cache.get(relay_parent, session_index, ctx.sender()).await?;
264		let n_validators = session_info.n_validators;
265
266		let claim_queue =
267			ClaimQueueSnapshot::from(request_claim_queue(relay_parent, ctx.sender()).await.await??);
268
269		let assigned_cores = claim_queue
270			.iter_all_claims()
271			.filter_map(|(core_idx, para_ids)| {
272				para_ids.iter().any(|&para_id| para_id == config.para_id).then_some(*core_idx)
273			})
274			.collect::<Vec<_>>();
275
276		// Nothing to do if no core is assigned to us at any depth.
277		if assigned_cores.is_empty() {
278			return Ok(())
279		}
280
281		// We are being very optimistic here, but one of the cores could be pending availability
282		// for some more blocks, or even time out. We assume all cores are being freed.
283
284		let mut validation_data = match request_persisted_validation_data(
285			relay_parent,
286			para_id,
287			// Just use included assumption always. If there are no pending candidates it's a
288			// no-op.
289			OccupiedCoreAssumption::Included,
290			ctx.sender(),
291		)
292		.await
293		.await??
294		{
295			Some(v) => v,
296			None => {
297				gum::debug!(
298					target: LOG_TARGET,
299					relay_parent = ?relay_parent,
300					our_para = %para_id,
301					"validation data is not available",
302				);
303				return Ok(())
304			},
305		};
306
307		let validation_code_hash = match request_validation_code_hash(
308			relay_parent,
309			para_id,
310			// Just use included assumption always. If there are no pending candidates it's a
311			// no-op.
312			OccupiedCoreAssumption::Included,
313			ctx.sender(),
314		)
315		.await
316		.await??
317		{
318			Some(v) => v,
319			None => {
320				gum::debug!(
321					target: LOG_TARGET,
322					relay_parent = ?relay_parent,
323					our_para = %para_id,
324					"validation code hash is not found.",
325				);
326				return Ok(())
327			},
328		};
329
330		let task_config = config.clone();
331		let metrics = self.metrics.clone();
332		let mut task_sender = ctx.sender().clone();
333
334		ctx.spawn(
335			"chained-collation-builder",
336			Box::pin(async move {
337				let transposed_claim_queue = transpose_claim_queue(claim_queue.0.clone());
338
339				// Track used core indexes not to submit collations on the same core.
340				let mut used_cores = HashSet::new();
341
342				for i in 0..assigned_cores.len() {
343					// Get the collation.
344					let collator_fn = match task_config.collator.as_ref() {
345						Some(x) => x,
346						None => return,
347					};
348
349					let (collation, result_sender) =
350						match collator_fn(relay_parent, &validation_data).await {
351							Some(collation) => collation.into_inner(),
352							None => {
353								gum::debug!(
354									target: LOG_TARGET,
355									?para_id,
356									"collator returned no collation on collate",
357								);
358								return
359							},
360						};
361
362					// Use the core_selector method from CandidateCommitments to extract
363					// CoreSelector and ClaimQueueOffset.
364					let mut commitments = CandidateCommitments::default();
365					commitments.upward_messages = collation.upward_messages.clone();
366
367					let ump_signals = match commitments.ump_signals() {
368						Ok(signals) => signals,
369						Err(err) => {
370							gum::debug!(
371								target: LOG_TARGET,
372								?para_id,
373								"error processing UMP signals: {}",
374								err
375							);
376							return
377						},
378					};
379
380					let (cs_index, cq_offset) = ump_signals
381						.core_selector()
382						.map(|(cs_index, cq_offset)| (cs_index.0 as usize, cq_offset.0 as usize))
383						.unwrap_or((i, 0));
384
385					// Identify the cores to build collations on using the given claim queue offset.
386					let cores_to_build_on = claim_queue
387						.iter_claims_at_depth(cq_offset)
388						.filter_map(|(core_idx, para_id)| {
389							(para_id == task_config.para_id).then_some(core_idx)
390						})
391						.collect::<Vec<_>>();
392
393					if cores_to_build_on.is_empty() {
394						gum::debug!(
395							target: LOG_TARGET,
396							?para_id,
397							"no core is assigned to para at depth {}",
398							cq_offset,
399						);
400						return
401					}
402
403					let descriptor_core_index =
404						cores_to_build_on[cs_index % cores_to_build_on.len()];
405
406					// Ensure the core index has not been used before.
407					if used_cores.contains(&descriptor_core_index.0) {
408						gum::warn!(
409							target: LOG_TARGET,
410							?para_id,
411							"parachain repeatedly selected the same core index: {}",
412							descriptor_core_index.0,
413						);
414						return
415					}
416
417					used_cores.insert(descriptor_core_index.0);
418					gum::trace!(
419						target: LOG_TARGET,
420						?para_id,
421						"selected core index: {}",
422						descriptor_core_index.0,
423					);
424
425					// Distribute the collation.
426					let parent_head = collation.head_data.clone();
427					if let Err(err) = construct_and_distribute_receipt(
428						PreparedCollation {
429							collation,
430							para_id,
431							relay_parent,
432							validation_data: validation_data.clone(),
433							validation_code_hash,
434							n_validators,
435							core_index: descriptor_core_index,
436							session_index,
437						},
438						&mut task_sender,
439						result_sender,
440						&metrics,
441						&transposed_claim_queue,
442					)
443					.await
444					{
445						gum::error!(
446							target: LOG_TARGET,
447							"Failed to construct and distribute collation: {}",
448							err
449						);
450						return
451					}
452
453					// Chain the collations. All else stays the same as we build the chained
454					// collation on same relay parent.
455					validation_data.parent_head = parent_head;
456				}
457			}),
458		)?;
459
460		Ok(())
461	}
462}
463
464#[overseer::subsystem(CollationGeneration, error=SubsystemError, prefix=self::overseer)]
465impl<Context> CollationGenerationSubsystem {
466	fn start(self, ctx: Context) -> SpawnedSubsystem {
467		let future = async move {
468			self.run(ctx).await;
469			Ok(())
470		}
471		.boxed();
472
473		SpawnedSubsystem { name: "collation-generation-subsystem", future }
474	}
475}
476
477#[derive(Clone)]
478struct PerSessionInfo {
479	n_validators: usize,
480}
481
482struct SessionInfoCache(LruMap<SessionIndex, PerSessionInfo>);
483
484impl SessionInfoCache {
485	fn new() -> Self {
486		Self(LruMap::new(ByLength::new(2)))
487	}
488
489	async fn get<Sender: SubsystemSender<RuntimeApiMessage>>(
490		&mut self,
491		relay_parent: Hash,
492		session_index: SessionIndex,
493		sender: &mut Sender,
494	) -> Result<PerSessionInfo> {
495		if let Some(info) = self.0.get(&session_index) {
496			return Ok(info.clone())
497		}
498
499		let n_validators =
500			request_validators(relay_parent, &mut sender.clone()).await.await??.len();
501
502		let info = PerSessionInfo { n_validators };
503		self.0.insert(session_index, info);
504		Ok(self.0.get(&session_index).expect("Just inserted").clone())
505	}
506}
507
508struct PreparedCollation {
509	collation: Collation,
510	para_id: ParaId,
511	relay_parent: Hash,
512	validation_data: PersistedValidationData,
513	validation_code_hash: ValidationCodeHash,
514	n_validators: usize,
515	core_index: CoreIndex,
516	session_index: SessionIndex,
517}
518
519/// Takes a prepared collation, along with its context, and produces a candidate receipt
520/// which is distributed to validators.
521async fn construct_and_distribute_receipt(
522	collation: PreparedCollation,
523	sender: &mut impl overseer::CollationGenerationSenderTrait,
524	result_sender: Option<oneshot::Sender<CollationSecondedSignal>>,
525	metrics: &Metrics,
526	transposed_claim_queue: &TransposedClaimQueue,
527) -> Result<()> {
528	let PreparedCollation {
529		collation,
530		para_id,
531		relay_parent,
532		validation_data,
533		validation_code_hash,
534		n_validators,
535		core_index,
536		session_index,
537	} = collation;
538
539	let persisted_validation_data_hash = validation_data.hash();
540	let parent_head_data = validation_data.parent_head.clone();
541	let parent_head_data_hash = validation_data.parent_head.hash();
542
543	// Apply compression to the block data.
544	let pov = {
545		let pov = collation.proof_of_validity.into_compressed();
546		let encoded_size = pov.encoded_size();
547
548		// As long as `POV_BOMB_LIMIT` is at least `max_pov_size`, this ensures
549		// that honest collators never produce a PoV which is uncompressed.
550		//
551		// As such, honest collators never produce an uncompressed PoV which starts with
552		// a compression magic number, which would lead validators to reject the collation.
553		if encoded_size > validation_data.max_pov_size as usize {
554			return Err(Error::POVSizeExceeded(encoded_size, validation_data.max_pov_size as usize))
555		}
556
557		pov
558	};
559
560	let pov_hash = pov.hash();
561
562	let erasure_root = erasure_root(n_validators, validation_data, pov.clone())?;
563
564	let commitments = CandidateCommitments {
565		upward_messages: collation.upward_messages,
566		horizontal_messages: collation.horizontal_messages,
567		new_validation_code: collation.new_validation_code,
568		head_data: collation.head_data,
569		processed_downward_messages: collation.processed_downward_messages,
570		hrmp_watermark: collation.hrmp_watermark,
571	};
572
573	let receipt = {
574		let ccr = CommittedCandidateReceiptV2 {
575			descriptor: CandidateDescriptorV2::new(
576				para_id,
577				relay_parent,
578				core_index,
579				session_index,
580				persisted_validation_data_hash,
581				pov_hash,
582				erasure_root,
583				commitments.head_data.hash(),
584				validation_code_hash,
585			),
586			commitments: commitments.clone(),
587		};
588
589		ccr.parse_ump_signals(&transposed_claim_queue)
590			.map_err(Error::CandidateReceiptCheck)?;
591
592		ccr.to_plain()
593	};
594
595	gum::debug!(
596		target: LOG_TARGET,
597		candidate_hash = ?receipt.hash(),
598		?pov_hash,
599		?relay_parent,
600		para_id = %para_id,
601		?core_index,
602		"Candidate generated",
603	);
604	gum::trace!(
605		target: LOG_TARGET,
606		?commitments,
607		candidate_hash = ?receipt.hash(),
608		"Candidate commitments",
609	);
610
611	metrics.on_collation_generated();
612
613	sender
614		.send_message(CollatorProtocolMessage::DistributeCollation {
615			candidate_receipt: receipt,
616			parent_head_data_hash,
617			pov,
618			parent_head_data,
619			result_sender,
620			core_index,
621		})
622		.await;
623
624	Ok(())
625}
626
627fn erasure_root(
628	n_validators: usize,
629	persisted_validation: PersistedValidationData,
630	pov: PoV,
631) -> Result<Hash> {
632	let available_data =
633		AvailableData { validation_data: persisted_validation, pov: Arc::new(pov) };
634
635	let chunks = polkadot_erasure_coding::obtain_chunks_v1(n_validators, &available_data)?;
636	Ok(polkadot_erasure_coding::branches(&chunks).root())
637}