polkadot_collator_protocol/validator_side/
collation.rs1use std::{
38 collections::{BTreeMap, VecDeque},
39 future::Future,
40 pin::Pin,
41 task::Poll,
42};
43
44use futures::{future::BoxFuture, FutureExt};
45use polkadot_node_network_protocol::{
46 peer_set::CollationVersion,
47 request_response::{outgoing::RequestError, v1 as request_v1, OutgoingResult},
48 PeerId,
49};
50use polkadot_node_primitives::PoV;
51use polkadot_node_subsystem_util::metrics::prometheus::prometheus::HistogramTimer;
52use polkadot_primitives::{
53 CandidateHash, CandidateReceiptV2 as CandidateReceipt, CollatorId, Hash, HeadData,
54 Id as ParaId, PersistedValidationData,
55};
56use tokio_util::sync::CancellationToken;
57
58use super::error::SecondingError;
59use crate::LOG_TARGET;
60
61#[derive(Debug, Copy, Clone, Hash, Eq, PartialEq)]
63pub struct ProspectiveCandidate {
64 pub candidate_hash: CandidateHash,
66 pub parent_head_data_hash: Hash,
68}
69
70impl ProspectiveCandidate {
71 pub fn candidate_hash(&self) -> CandidateHash {
72 self.candidate_hash
73 }
74}
75
76#[derive(Debug, Clone, Hash, Eq, PartialEq)]
78pub struct FetchedCollation {
79 pub relay_parent: Hash,
81 pub para_id: ParaId,
83 pub candidate_hash: CandidateHash,
85}
86
87impl From<&CandidateReceipt<Hash>> for FetchedCollation {
88 fn from(receipt: &CandidateReceipt<Hash>) -> Self {
89 let descriptor = receipt.descriptor();
90 Self {
91 relay_parent: descriptor.relay_parent(),
92 para_id: descriptor.para_id(),
93 candidate_hash: receipt.hash(),
94 }
95 }
96}
97
98#[derive(Debug, Copy, Clone, Hash, Eq, PartialEq)]
100pub struct PendingCollation {
101 pub relay_parent: Hash,
103 pub para_id: ParaId,
105 pub peer_id: PeerId,
107 pub prospective_candidate: Option<ProspectiveCandidate>,
110 pub commitments_hash: Option<Hash>,
112}
113
114impl PendingCollation {
115 pub fn new(
116 relay_parent: Hash,
117 para_id: ParaId,
118 peer_id: &PeerId,
119 prospective_candidate: Option<ProspectiveCandidate>,
120 ) -> Self {
121 Self {
122 relay_parent,
123 para_id,
124 peer_id: *peer_id,
125 prospective_candidate,
126 commitments_hash: None,
127 }
128 }
129}
130
131#[derive(Debug, Clone, Eq, PartialEq, Hash)]
135pub struct BlockedCollationId {
136 pub para_id: ParaId,
138 pub parent_head_data_hash: Hash,
140}
141
142pub fn fetched_collation_sanity_check(
144 advertised: &PendingCollation,
145 fetched: &CandidateReceipt,
146 persisted_validation_data: &PersistedValidationData,
147 maybe_parent_head_and_hash: Option<(HeadData, Hash)>,
148) -> Result<(), SecondingError> {
149 if persisted_validation_data.hash() != fetched.descriptor().persisted_validation_data_hash() {
150 return Err(SecondingError::PersistedValidationDataMismatch)
151 }
152
153 if advertised
154 .prospective_candidate
155 .map_or(false, |pc| pc.candidate_hash() != fetched.hash())
156 {
157 return Err(SecondingError::CandidateHashMismatch)
158 }
159
160 if advertised.relay_parent != fetched.descriptor.relay_parent() {
161 return Err(SecondingError::RelayParentMismatch)
162 }
163
164 if maybe_parent_head_and_hash.map_or(false, |(head, hash)| head.hash() != hash) {
165 return Err(SecondingError::ParentHeadDataMismatch)
166 }
167
168 Ok(())
169}
170
171#[derive(Debug, Clone)]
173pub struct CollationEvent {
174 pub collator_id: CollatorId,
176 pub collator_protocol_version: CollationVersion,
178 pub pending_collation: PendingCollation,
180}
181
182#[derive(Debug, Clone)]
184pub struct PendingCollationFetch {
185 pub collation_event: CollationEvent,
187 pub candidate_receipt: CandidateReceipt,
189 pub pov: PoV,
191 pub maybe_parent_head_data: Option<HeadData>,
194}
195
196#[derive(Debug, Clone, Copy)]
198pub enum CollationStatus {
199 Waiting,
201 Fetching(ParaId),
203 WaitingOnValidation,
205}
206
207impl Default for CollationStatus {
208 fn default() -> Self {
209 Self::Waiting
210 }
211}
212
213impl CollationStatus {
214 pub fn back_to_waiting(&mut self) {
216 *self = Self::Waiting
217 }
218}
219
220#[derive(Default, Debug)]
222struct CandidatesStatePerPara {
223 pub seconded_per_para: usize,
225 pub claims_per_para: usize,
227}
228
229pub struct Collations {
231 pub status: CollationStatus,
233 pub fetching_from: Option<(CollatorId, Option<CandidateHash>)>,
238 waiting_queue: BTreeMap<ParaId, VecDeque<(PendingCollation, CollatorId)>>,
241 candidates_state: BTreeMap<ParaId, CandidatesStatePerPara>,
243}
244
245impl Collations {
246 pub(super) fn new(group_assignments: &Vec<ParaId>) -> Self {
247 let mut candidates_state = BTreeMap::<ParaId, CandidatesStatePerPara>::new();
248
249 for para_id in group_assignments {
250 candidates_state.entry(*para_id).or_default().claims_per_para += 1;
251 }
252
253 Self {
254 status: Default::default(),
255 fetching_from: None,
256 waiting_queue: Default::default(),
257 candidates_state,
258 }
259 }
260
261 pub(super) fn note_seconded(&mut self, para_id: ParaId) {
263 self.candidates_state.entry(para_id).or_default().seconded_per_para += 1;
264 gum::trace!(
265 target: LOG_TARGET,
266 ?para_id,
267 new_count=self.candidates_state.entry(para_id).or_default().seconded_per_para,
268 "Note seconded."
269 );
270 self.status.back_to_waiting();
271 }
272
273 pub(super) fn add_to_waiting_queue(&mut self, collation: (PendingCollation, CollatorId)) {
276 self.waiting_queue.entry(collation.0.para_id).or_default().push_back(collation);
277 }
278
279 pub(super) fn pick_a_collation_to_fetch(
292 &mut self,
293 unfulfilled_claim_queue_entries: Vec<ParaId>,
294 ) -> Option<(PendingCollation, CollatorId)> {
295 gum::trace!(
296 target: LOG_TARGET,
297 waiting_queue=?self.waiting_queue,
298 candidates_state=?self.candidates_state,
299 "Pick a collation to fetch."
300 );
301
302 for assignment in unfulfilled_claim_queue_entries {
303 if let Some(collation) = self
305 .waiting_queue
306 .get_mut(&assignment)
307 .and_then(|collations| collations.pop_front())
308 {
309 return Some(collation)
310 }
311 }
312
313 None
314 }
315
316 pub(super) fn seconded_for_para(&self, para_id: &ParaId) -> usize {
317 self.candidates_state
318 .get(¶_id)
319 .map(|state| state.seconded_per_para)
320 .unwrap_or_default()
321 }
322}
323
324#[derive(Debug, thiserror::Error)]
326pub(super) enum CollationFetchError {
327 #[error("Future was cancelled.")]
328 Cancelled,
329 #[error("{0}")]
330 Request(#[from] RequestError),
331}
332
333pub(super) struct CollationFetchRequest {
336 pub pending_collation: PendingCollation,
338 pub collator_id: CollatorId,
340 pub collator_protocol_version: CollationVersion,
342 pub from_collator: BoxFuture<'static, OutgoingResult<request_v1::CollationFetchingResponse>>,
344 pub cancellation_token: CancellationToken,
346 pub _lifetime_timer: Option<HistogramTimer>,
348}
349
350impl Future for CollationFetchRequest {
351 type Output = (
352 CollationEvent,
353 std::result::Result<request_v1::CollationFetchingResponse, CollationFetchError>,
354 );
355
356 fn poll(mut self: Pin<&mut Self>, cx: &mut std::task::Context<'_>) -> Poll<Self::Output> {
357 let cancelled = match std::pin::pin!(self.cancellation_token.cancelled()).poll(cx) {
359 Poll::Ready(()) => true,
360 Poll::Pending => false,
361 };
362
363 if cancelled {
364 return Poll::Ready((
365 CollationEvent {
366 collator_protocol_version: self.collator_protocol_version,
367 collator_id: self.collator_id.clone(),
368 pending_collation: self.pending_collation,
369 },
370 Err(CollationFetchError::Cancelled),
371 ))
372 }
373
374 let res = self.from_collator.poll_unpin(cx).map(|res| {
375 (
376 CollationEvent {
377 collator_protocol_version: self.collator_protocol_version,
378 collator_id: self.collator_id.clone(),
379 pending_collation: self.pending_collation,
380 },
381 res.map_err(CollationFetchError::Request),
382 )
383 });
384
385 res
386 }
387}