referrerpolicy=no-referrer-when-downgrade

polkadot_collator_protocol/validator_side/
collation.rs

1// Copyright (C) Parity Technologies (UK) Ltd.
2// This file is part of Polkadot.
3
4// Polkadot is free software: you can redistribute it and/or modify
5// it under the terms of the GNU General Public License as published by
6// the Free Software Foundation, either version 3 of the License, or
7// (at your option) any later version.
8
9// Polkadot is distributed in the hope that it will be useful,
10// but WITHOUT ANY WARRANTY; without even the implied warranty of
11// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
12// GNU General Public License for more details.
13
14// You should have received a copy of the GNU General Public License
15// along with Polkadot.  If not, see <http://www.gnu.org/licenses/>.
16
17//! Primitives for tracking collations-related data.
18//!
19//! Usually a path of collations is as follows:
20//!    1. First, collation must be advertised by collator.
21//!    2. The validator inspects the claim queue and decides if the collation should be fetched
22//!       based on the entries there. A parachain can't have more fetched collations than the
23//!       entries in the claim queue at a specific relay parent. When calculating this limit the
24//!       validator counts all advertisements within its view not just at the relay parent.
25//!    3. If the advertisement was accepted, it's queued for fetch (per relay parent).
26//!    4. Once it's requested, the collation is said to be pending fetch
27//!       (`CollationStatus::Fetching`).
28//!    5. Pending fetch collation becomes pending validation
29//!       (`CollationStatus::WaitingOnValidation`) once received, we send it to backing for
30//!       validation.
31//!    6. If it turns to be invalid or async backing allows seconding another candidate, carry on
32//!       with the next advertisement, otherwise we're done with this relay parent.
33//!
34//!    ┌───────────────────────────────────┐
35//!    └─▶Waiting ─▶ Fetching ─▶ WaitingOnValidation
36
37use std::{
38	collections::{BTreeMap, VecDeque},
39	future::Future,
40	pin::Pin,
41	task::Poll,
42};
43
44use futures::{future::BoxFuture, FutureExt};
45use polkadot_node_network_protocol::{
46	peer_set::CollationVersion,
47	request_response::{outgoing::RequestError, v1 as request_v1, OutgoingResult},
48	PeerId,
49};
50use polkadot_node_primitives::PoV;
51use polkadot_node_subsystem_util::metrics::prometheus::prometheus::HistogramTimer;
52use polkadot_primitives::{
53	CandidateHash, CandidateReceiptV2 as CandidateReceipt, CollatorId, Hash, HeadData,
54	Id as ParaId, PersistedValidationData,
55};
56use tokio_util::sync::CancellationToken;
57
58use super::error::SecondingError;
59use crate::LOG_TARGET;
60
61/// Candidate supplied with a para head it's built on top of.
62#[derive(Debug, Copy, Clone, Hash, Eq, PartialEq)]
63pub struct ProspectiveCandidate {
64	/// Candidate hash.
65	pub candidate_hash: CandidateHash,
66	/// Parent head-data hash as supplied in advertisement.
67	pub parent_head_data_hash: Hash,
68}
69
70impl ProspectiveCandidate {
71	pub fn candidate_hash(&self) -> CandidateHash {
72		self.candidate_hash
73	}
74}
75
76/// Identifier of a fetched collation.
77#[derive(Debug, Clone, Hash, Eq, PartialEq)]
78pub struct FetchedCollation {
79	/// Candidate's relay parent.
80	pub relay_parent: Hash,
81	/// Parachain id.
82	pub para_id: ParaId,
83	/// Candidate hash.
84	pub candidate_hash: CandidateHash,
85}
86
87impl From<&CandidateReceipt<Hash>> for FetchedCollation {
88	fn from(receipt: &CandidateReceipt<Hash>) -> Self {
89		let descriptor = receipt.descriptor();
90		Self {
91			relay_parent: descriptor.relay_parent(),
92			para_id: descriptor.para_id(),
93			candidate_hash: receipt.hash(),
94		}
95	}
96}
97
98/// Identifier of a collation being requested.
99#[derive(Debug, Copy, Clone, Hash, Eq, PartialEq)]
100pub struct PendingCollation {
101	/// Candidate's relay parent.
102	pub relay_parent: Hash,
103	/// Parachain id.
104	pub para_id: ParaId,
105	/// Peer that advertised this collation.
106	pub peer_id: PeerId,
107	/// Optional candidate hash and parent head-data hash if were
108	/// supplied in advertisement.
109	pub prospective_candidate: Option<ProspectiveCandidate>,
110	/// Hash of the candidate's commitments.
111	pub commitments_hash: Option<Hash>,
112}
113
114impl PendingCollation {
115	pub fn new(
116		relay_parent: Hash,
117		para_id: ParaId,
118		peer_id: &PeerId,
119		prospective_candidate: Option<ProspectiveCandidate>,
120	) -> Self {
121		Self {
122			relay_parent,
123			para_id,
124			peer_id: *peer_id,
125			prospective_candidate,
126			commitments_hash: None,
127		}
128	}
129}
130
131/// An identifier for a fetched collation that was blocked from being seconded because we don't have
132/// access to the parent's HeadData. Can be retried once the candidate outputting this head data is
133/// seconded.
134#[derive(Debug, Clone, Eq, PartialEq, Hash)]
135pub struct BlockedCollationId {
136	/// Para id.
137	pub para_id: ParaId,
138	/// Hash of the parent head data.
139	pub parent_head_data_hash: Hash,
140}
141
142/// Performs a sanity check between advertised and fetched collations.
143pub fn fetched_collation_sanity_check(
144	advertised: &PendingCollation,
145	fetched: &CandidateReceipt,
146	persisted_validation_data: &PersistedValidationData,
147	maybe_parent_head_and_hash: Option<(HeadData, Hash)>,
148) -> Result<(), SecondingError> {
149	if persisted_validation_data.hash() != fetched.descriptor().persisted_validation_data_hash() {
150		return Err(SecondingError::PersistedValidationDataMismatch)
151	}
152
153	if advertised
154		.prospective_candidate
155		.map_or(false, |pc| pc.candidate_hash() != fetched.hash())
156	{
157		return Err(SecondingError::CandidateHashMismatch)
158	}
159
160	if advertised.relay_parent != fetched.descriptor.relay_parent() {
161		return Err(SecondingError::RelayParentMismatch)
162	}
163
164	if maybe_parent_head_and_hash.map_or(false, |(head, hash)| head.hash() != hash) {
165		return Err(SecondingError::ParentHeadDataMismatch)
166	}
167
168	Ok(())
169}
170
171/// Identifier for a requested collation and the respective collator that advertised it.
172#[derive(Debug, Clone)]
173pub struct CollationEvent {
174	/// Collator id.
175	pub collator_id: CollatorId,
176	/// The network protocol version the collator is using.
177	pub collator_protocol_version: CollationVersion,
178	/// The requested collation data.
179	pub pending_collation: PendingCollation,
180}
181
182/// Fetched collation data.
183#[derive(Debug, Clone)]
184pub struct PendingCollationFetch {
185	/// Collation identifier.
186	pub collation_event: CollationEvent,
187	/// Candidate receipt.
188	pub candidate_receipt: CandidateReceipt,
189	/// Proof of validity.
190	pub pov: PoV,
191	/// Optional parachain parent head data.
192	/// Only needed for elastic scaling.
193	pub maybe_parent_head_data: Option<HeadData>,
194}
195
196/// The status of the collations in [`CollationsPerRelayParent`].
197#[derive(Debug, Clone, Copy)]
198pub enum CollationStatus {
199	/// We are waiting for a collation to be advertised to us.
200	Waiting,
201	/// We are currently fetching a collation for the specified `ParaId`.
202	Fetching(ParaId),
203	/// We are waiting that a collation is being validated.
204	WaitingOnValidation,
205}
206
207impl Default for CollationStatus {
208	fn default() -> Self {
209		Self::Waiting
210	}
211}
212
213impl CollationStatus {
214	/// Downgrades to `Waiting`
215	pub fn back_to_waiting(&mut self) {
216		*self = Self::Waiting
217	}
218}
219
220/// The number of claims in the claim queue and seconded candidates count for a specific `ParaId`.
221#[derive(Default, Debug)]
222struct CandidatesStatePerPara {
223	/// How many collations have been seconded.
224	pub seconded_per_para: usize,
225	// Claims in the claim queue for the `ParaId`.
226	pub claims_per_para: usize,
227}
228
229/// Information about collations per relay parent.
230pub struct Collations {
231	/// What is the current status in regards to a collation for this relay parent?
232	pub status: CollationStatus,
233	/// Collator we're fetching from, optionally which candidate was requested.
234	///
235	/// This is the currently last started fetch, which did not exceed `MAX_UNSHARED_DOWNLOAD_TIME`
236	/// yet.
237	pub fetching_from: Option<(CollatorId, Option<CandidateHash>)>,
238	/// Collation that were advertised to us, but we did not yet request or fetch. Grouped by
239	/// `ParaId`.
240	waiting_queue: BTreeMap<ParaId, VecDeque<(PendingCollation, CollatorId)>>,
241	/// Number of seconded candidates and claims in the claim queue per `ParaId`.
242	candidates_state: BTreeMap<ParaId, CandidatesStatePerPara>,
243}
244
245impl Collations {
246	pub(super) fn new(group_assignments: &Vec<ParaId>) -> Self {
247		let mut candidates_state = BTreeMap::<ParaId, CandidatesStatePerPara>::new();
248
249		for para_id in group_assignments {
250			candidates_state.entry(*para_id).or_default().claims_per_para += 1;
251		}
252
253		Self {
254			status: Default::default(),
255			fetching_from: None,
256			waiting_queue: Default::default(),
257			candidates_state,
258		}
259	}
260
261	/// Note a seconded collation for a given para.
262	pub(super) fn note_seconded(&mut self, para_id: ParaId) {
263		self.candidates_state.entry(para_id).or_default().seconded_per_para += 1;
264		gum::trace!(
265			target: LOG_TARGET,
266			?para_id,
267			new_count=self.candidates_state.entry(para_id).or_default().seconded_per_para,
268			"Note seconded."
269		);
270		self.status.back_to_waiting();
271	}
272
273	/// Adds a new collation to the waiting queue for the relay parent. This function doesn't
274	/// perform any limits check. The caller should assure that the collation limit is respected.
275	pub(super) fn add_to_waiting_queue(&mut self, collation: (PendingCollation, CollatorId)) {
276		self.waiting_queue.entry(collation.0.para_id).or_default().push_back(collation);
277	}
278
279	/// Picks a collation to fetch from the waiting queue.
280	/// When fetching collations we need to ensure that each parachain has got a fair core time
281	/// share depending on its assignments in the claim queue. This means that the number of
282	/// collations seconded per parachain should ideally be equal to the number of claims for the
283	/// particular parachain in the claim queue.
284	///
285	/// To achieve this each seconded collation is mapped to an entry from the claim queue. The next
286	/// fetch is the first unfulfilled entry from the claim queue for which there is an
287	/// advertisement.
288	///
289	/// `unfulfilled_claim_queue_entries` represents all claim queue entries which are still not
290	/// fulfilled.
291	pub(super) fn pick_a_collation_to_fetch(
292		&mut self,
293		unfulfilled_claim_queue_entries: Vec<ParaId>,
294	) -> Option<(PendingCollation, CollatorId)> {
295		gum::trace!(
296			target: LOG_TARGET,
297			waiting_queue=?self.waiting_queue,
298			candidates_state=?self.candidates_state,
299			"Pick a collation to fetch."
300		);
301
302		for assignment in unfulfilled_claim_queue_entries {
303			// if there is an unfulfilled assignment - return it
304			if let Some(collation) = self
305				.waiting_queue
306				.get_mut(&assignment)
307				.and_then(|collations| collations.pop_front())
308			{
309				return Some(collation)
310			}
311		}
312
313		None
314	}
315
316	pub(super) fn seconded_for_para(&self, para_id: &ParaId) -> usize {
317		self.candidates_state
318			.get(&para_id)
319			.map(|state| state.seconded_per_para)
320			.unwrap_or_default()
321	}
322}
323
324// Any error that can occur when awaiting a collation fetch response.
325#[derive(Debug, thiserror::Error)]
326pub(super) enum CollationFetchError {
327	#[error("Future was cancelled.")]
328	Cancelled,
329	#[error("{0}")]
330	Request(#[from] RequestError),
331}
332
333/// Future that concludes when the collator has responded to our collation fetch request
334/// or the request was cancelled by the validator.
335pub(super) struct CollationFetchRequest {
336	/// Info about the requested collation.
337	pub pending_collation: PendingCollation,
338	/// Collator id.
339	pub collator_id: CollatorId,
340	/// The network protocol version the collator is using.
341	pub collator_protocol_version: CollationVersion,
342	/// Responses from collator.
343	pub from_collator: BoxFuture<'static, OutgoingResult<request_v1::CollationFetchingResponse>>,
344	/// Handle used for checking if this request was cancelled.
345	pub cancellation_token: CancellationToken,
346	/// A metric histogram for the lifetime of the request
347	pub _lifetime_timer: Option<HistogramTimer>,
348}
349
350impl Future for CollationFetchRequest {
351	type Output = (
352		CollationEvent,
353		std::result::Result<request_v1::CollationFetchingResponse, CollationFetchError>,
354	);
355
356	fn poll(mut self: Pin<&mut Self>, cx: &mut std::task::Context<'_>) -> Poll<Self::Output> {
357		// First check if this fetch request was cancelled.
358		let cancelled = match std::pin::pin!(self.cancellation_token.cancelled()).poll(cx) {
359			Poll::Ready(()) => true,
360			Poll::Pending => false,
361		};
362
363		if cancelled {
364			return Poll::Ready((
365				CollationEvent {
366					collator_protocol_version: self.collator_protocol_version,
367					collator_id: self.collator_id.clone(),
368					pending_collation: self.pending_collation,
369				},
370				Err(CollationFetchError::Cancelled),
371			))
372		}
373
374		let res = self.from_collator.poll_unpin(cx).map(|res| {
375			(
376				CollationEvent {
377					collator_protocol_version: self.collator_protocol_version,
378					collator_id: self.collator_id.clone(),
379					pending_collation: self.pending_collation,
380				},
381				res.map_err(CollationFetchError::Request),
382			)
383		});
384
385		res
386	}
387}