1#![deny(missing_docs, unused_crate_dependencies)]
21
22use bitvec::vec::BitVec;
23use futures::{
24 channel::oneshot, future::BoxFuture, prelude::*, stream::FuturesUnordered, FutureExt,
25};
26use futures_timer::Delay;
27
28use polkadot_node_subsystem::{
29 messages::{
30 Ancestors, CandidateBackingMessage, ProspectiveParachainsMessage, ProvisionableData,
31 ProvisionerInherentData, ProvisionerMessage,
32 },
33 overseer, ActivatedLeaf, ActiveLeavesUpdate, FromOrchestra, OverseerSignal, SpawnedSubsystem,
34 SubsystemError,
35};
36use polkadot_node_subsystem_util::{request_availability_cores, TimeoutExt};
37use polkadot_primitives::{
38 BackedCandidate, CandidateHash, CoreIndex, CoreState, Hash, Id as ParaId,
39 SignedAvailabilityBitfield, ValidatorIndex,
40};
41use std::collections::{BTreeMap, HashMap};
42
43mod disputes;
44mod error;
45mod metrics;
46
47pub use self::metrics::*;
48use error::{Error, FatalResult};
49
50#[cfg(test)]
51mod tests;
52
53const PRE_PROPOSE_TIMEOUT: std::time::Duration = core::time::Duration::from_millis(2000);
55const SEND_INHERENT_DATA_TIMEOUT: std::time::Duration = core::time::Duration::from_millis(500);
57
58const LOG_TARGET: &str = "parachain::provisioner";
59
60pub struct ProvisionerSubsystem {
62 metrics: Metrics,
63}
64
65impl ProvisionerSubsystem {
66 pub fn new(metrics: Metrics) -> Self {
68 Self { metrics }
69 }
70}
71
72pub struct PerRelayParent {
74 leaf: ActivatedLeaf,
75 signed_bitfields: Vec<SignedAvailabilityBitfield>,
76 is_inherent_ready: bool,
77 awaiting_inherent: Vec<oneshot::Sender<ProvisionerInherentData>>,
78}
79
80impl PerRelayParent {
81 fn new(leaf: ActivatedLeaf) -> Self {
82 Self {
83 leaf,
84 signed_bitfields: Vec::new(),
85 is_inherent_ready: false,
86 awaiting_inherent: Vec::new(),
87 }
88 }
89}
90
91type InherentDelays = FuturesUnordered<BoxFuture<'static, Hash>>;
92
93#[overseer::subsystem(Provisioner, error=SubsystemError, prefix=self::overseer)]
94impl<Context> ProvisionerSubsystem {
95 fn start(self, ctx: Context) -> SpawnedSubsystem {
96 let future = async move {
97 run(ctx, self.metrics)
98 .await
99 .map_err(|e| SubsystemError::with_origin("provisioner", e))
100 }
101 .boxed();
102
103 SpawnedSubsystem { name: "provisioner-subsystem", future }
104 }
105}
106
107#[overseer::contextbounds(Provisioner, prefix = self::overseer)]
108async fn run<Context>(mut ctx: Context, metrics: Metrics) -> FatalResult<()> {
109 let mut inherent_delays = InherentDelays::new();
110 let mut per_relay_parent = HashMap::new();
111
112 loop {
113 let result =
114 run_iteration(&mut ctx, &mut per_relay_parent, &mut inherent_delays, &metrics).await;
115
116 match result {
117 Ok(()) => break,
118 err => crate::error::log_error(err)?,
119 }
120 }
121
122 Ok(())
123}
124
125#[overseer::contextbounds(Provisioner, prefix = self::overseer)]
126async fn run_iteration<Context>(
127 ctx: &mut Context,
128 per_relay_parent: &mut HashMap<Hash, PerRelayParent>,
129 inherent_delays: &mut InherentDelays,
130 metrics: &Metrics,
131) -> Result<(), Error> {
132 loop {
133 futures::select! {
134 from_overseer = ctx.recv().fuse() => {
135 match from_overseer.map_err(Error::OverseerExited)? {
137 FromOrchestra::Signal(OverseerSignal::ActiveLeaves(update)) =>
138 handle_active_leaves_update(update, per_relay_parent, inherent_delays).await?,
139 FromOrchestra::Signal(OverseerSignal::BlockFinalized(..)) => {},
140 FromOrchestra::Signal(OverseerSignal::Conclude) => return Ok(()),
141 FromOrchestra::Communication { msg } => {
142 handle_communication(ctx, per_relay_parent, msg, metrics).await?;
143 },
144 }
145 },
146 hash = inherent_delays.select_next_some() => {
147 if let Some(state) = per_relay_parent.get_mut(&hash) {
148 state.is_inherent_ready = true;
149
150 gum::trace!(
151 target: LOG_TARGET,
152 relay_parent = ?hash,
153 "Inherent Data became ready"
154 );
155
156 let return_senders = std::mem::take(&mut state.awaiting_inherent);
157 if !return_senders.is_empty() {
158 send_inherent_data_bg(ctx, &state, return_senders, metrics.clone()).await?;
159 }
160 }
161 }
162 }
163 }
164}
165
166async fn handle_active_leaves_update(
167 update: ActiveLeavesUpdate,
168 per_relay_parent: &mut HashMap<Hash, PerRelayParent>,
169 inherent_delays: &mut InherentDelays,
170) -> Result<(), Error> {
171 gum::trace!(target: LOG_TARGET, "Handle ActiveLeavesUpdate");
172 for deactivated in &update.deactivated {
173 per_relay_parent.remove(deactivated);
174 }
175
176 if let Some(leaf) = update.activated {
177 gum::trace!(target: LOG_TARGET, leaf_hash=?leaf.hash, "Adding delay");
178 let delay_fut = Delay::new(PRE_PROPOSE_TIMEOUT).map(move |_| leaf.hash).boxed();
179 per_relay_parent.insert(leaf.hash, PerRelayParent::new(leaf));
180 inherent_delays.push(delay_fut);
181 }
182
183 Ok(())
184}
185
186#[overseer::contextbounds(Provisioner, prefix = self::overseer)]
187async fn handle_communication<Context>(
188 ctx: &mut Context,
189 per_relay_parent: &mut HashMap<Hash, PerRelayParent>,
190 message: ProvisionerMessage,
191 metrics: &Metrics,
192) -> Result<(), Error> {
193 match message {
194 ProvisionerMessage::RequestInherentData(relay_parent, return_sender) => {
195 gum::trace!(target: LOG_TARGET, ?relay_parent, "Inherent data got requested.");
196
197 if let Some(state) = per_relay_parent.get_mut(&relay_parent) {
198 if state.is_inherent_ready {
199 gum::trace!(target: LOG_TARGET, ?relay_parent, "Calling send_inherent_data.");
200 send_inherent_data_bg(ctx, &state, vec![return_sender], metrics.clone())
201 .await?;
202 } else {
203 gum::trace!(
204 target: LOG_TARGET,
205 ?relay_parent,
206 "Queuing inherent data request (inherent data not yet ready)."
207 );
208 state.awaiting_inherent.push(return_sender);
209 }
210 }
211 },
212 ProvisionerMessage::ProvisionableData(relay_parent, data) => {
213 if let Some(state) = per_relay_parent.get_mut(&relay_parent) {
214 let _timer = metrics.time_provisionable_data();
215
216 gum::trace!(target: LOG_TARGET, ?relay_parent, "Received provisionable data: {:?}", &data);
217
218 note_provisionable_data(state, data);
219 }
220 },
221 }
222
223 Ok(())
224}
225
226#[overseer::contextbounds(Provisioner, prefix = self::overseer)]
227async fn send_inherent_data_bg<Context>(
228 ctx: &mut Context,
229 per_relay_parent: &PerRelayParent,
230 return_senders: Vec<oneshot::Sender<ProvisionerInherentData>>,
231 metrics: Metrics,
232) -> Result<(), Error> {
233 let leaf = per_relay_parent.leaf.clone();
234 let signed_bitfields = per_relay_parent.signed_bitfields.clone();
235 let mut sender = ctx.sender().clone();
236
237 let bg = async move {
238 let _timer = metrics.time_request_inherent_data();
239
240 gum::trace!(
241 target: LOG_TARGET,
242 relay_parent = ?leaf.hash,
243 "Sending inherent data in background."
244 );
245
246 let send_result =
247 send_inherent_data(&leaf, &signed_bitfields, return_senders, &mut sender, &metrics) .timeout(SEND_INHERENT_DATA_TIMEOUT)
249 .map(|v| match v {
250 Some(r) => r,
251 None => Err(Error::SendInherentDataTimeout),
252 });
253
254 match send_result.await {
255 Err(err) => {
256 if let Error::CanceledBackedCandidates(_) = err {
257 gum::debug!(
258 target: LOG_TARGET,
259 err = ?err,
260 "Failed to assemble or send inherent data - block got likely obsoleted already."
261 );
262 } else {
263 gum::warn!(target: LOG_TARGET, err = ?err, "failed to assemble or send inherent data");
264 }
265 metrics.on_inherent_data_request(Err(()));
266 },
267 Ok(()) => {
268 metrics.on_inherent_data_request(Ok(()));
269 gum::debug!(
270 target: LOG_TARGET,
271 signed_bitfield_count = signed_bitfields.len(),
272 leaf_hash = ?leaf.hash,
273 "inherent data sent successfully"
274 );
275 metrics.observe_inherent_data_bitfields_count(signed_bitfields.len());
276 },
277 }
278 };
279
280 ctx.spawn("send-inherent-data", bg.boxed())
281 .map_err(|_| Error::FailedToSpawnBackgroundTask)?;
282
283 Ok(())
284}
285
286fn note_provisionable_data(
287 per_relay_parent: &mut PerRelayParent,
288 provisionable_data: ProvisionableData,
289) {
290 match provisionable_data {
291 ProvisionableData::Bitfield(_, signed_bitfield) =>
292 per_relay_parent.signed_bitfields.push(signed_bitfield),
293 ProvisionableData::MisbehaviorReport(_, _, _) => {},
298 ProvisionableData::Dispute(_, _) => {},
313 }
314}
315
316type CoreAvailability = BitVec<u8, bitvec::order::Lsb0>;
317
318async fn send_inherent_data(
336 leaf: &ActivatedLeaf,
337 bitfields: &[SignedAvailabilityBitfield],
338 return_senders: Vec<oneshot::Sender<ProvisionerInherentData>>,
339 from_job: &mut impl overseer::ProvisionerSenderTrait,
340 metrics: &Metrics,
341) -> Result<(), Error> {
342 gum::trace!(
343 target: LOG_TARGET,
344 relay_parent = ?leaf.hash,
345 "Requesting availability cores"
346 );
347 let availability_cores = request_availability_cores(leaf.hash, from_job)
348 .await
349 .await
350 .map_err(|err| Error::CanceledAvailabilityCores(err))??;
351
352 gum::trace!(
353 target: LOG_TARGET,
354 relay_parent = ?leaf.hash,
355 "Selecting disputes"
356 );
357
358 let disputes = disputes::prioritized_selection::select_disputes(from_job, metrics, leaf).await;
359
360 gum::trace!(
361 target: LOG_TARGET,
362 relay_parent = ?leaf.hash,
363 "Selected disputes"
364 );
365
366 let bitfields = select_availability_bitfields(&availability_cores, bitfields, &leaf.hash);
367
368 gum::trace!(
369 target: LOG_TARGET,
370 relay_parent = ?leaf.hash,
371 "Selected bitfields"
372 );
373
374 let candidates = select_candidates(&availability_cores, &bitfields, leaf, from_job).await?;
375
376 gum::trace!(
377 target: LOG_TARGET,
378 relay_parent = ?leaf.hash,
379 "Selected candidates"
380 );
381
382 gum::debug!(
383 target: LOG_TARGET,
384 availability_cores_len = availability_cores.len(),
385 disputes_count = disputes.len(),
386 bitfields_count = bitfields.len(),
387 candidates_count = candidates.len(),
388 leaf_hash = ?leaf.hash,
389 "inherent data prepared",
390 );
391
392 let inherent_data =
393 ProvisionerInherentData { bitfields, backed_candidates: candidates, disputes };
394
395 gum::trace!(
396 target: LOG_TARGET,
397 relay_parent = ?leaf.hash,
398 "Sending back inherent data to requesters."
399 );
400
401 for return_sender in return_senders {
402 return_sender
403 .send(inherent_data.clone())
404 .map_err(|_data| Error::InherentDataReturnChannel)?;
405 }
406
407 Ok(())
408}
409
410fn select_availability_bitfields(
421 cores: &[CoreState],
422 bitfields: &[SignedAvailabilityBitfield],
423 leaf_hash: &Hash,
424) -> Vec<SignedAvailabilityBitfield> {
425 let mut selected: BTreeMap<ValidatorIndex, SignedAvailabilityBitfield> = BTreeMap::new();
426
427 gum::debug!(
428 target: LOG_TARGET,
429 bitfields_count = bitfields.len(),
430 ?leaf_hash,
431 "bitfields count before selection"
432 );
433
434 'a: for bitfield in bitfields.iter().cloned() {
435 if bitfield.payload().0.len() != cores.len() {
436 gum::debug!(target: LOG_TARGET, ?leaf_hash, "dropping bitfield due to length mismatch");
437 continue
438 }
439
440 let is_better = selected
441 .get(&bitfield.validator_index())
442 .map_or(true, |b| b.payload().0.count_ones() < bitfield.payload().0.count_ones());
443
444 if !is_better {
445 gum::trace!(
446 target: LOG_TARGET,
447 val_idx = bitfield.validator_index().0,
448 ?leaf_hash,
449 "dropping bitfield due to duplication - the better one is kept"
450 );
451 continue
452 }
453
454 for (idx, _) in cores.iter().enumerate().filter(|v| !v.1.is_occupied()) {
455 if *bitfield.payload().0.get(idx).as_deref().unwrap_or(&false) {
457 gum::debug!(
458 target: LOG_TARGET,
459 val_idx = bitfield.validator_index().0,
460 ?leaf_hash,
461 "dropping invalid bitfield - bit is set for an unoccupied core"
462 );
463 continue 'a
464 }
465 }
466
467 let _ = selected.insert(bitfield.validator_index(), bitfield);
468 }
469
470 gum::debug!(
471 target: LOG_TARGET,
472 ?leaf_hash,
473 "selected {} of all {} bitfields (each bitfield is from a unique validator)",
474 selected.len(),
475 bitfields.len()
476 );
477
478 selected.into_values().collect()
479}
480
481async fn request_backable_candidates(
484 availability_cores: &[CoreState],
485 bitfields: &[SignedAvailabilityBitfield],
486 relay_parent: &ActivatedLeaf,
487 sender: &mut impl overseer::ProvisionerSenderTrait,
488) -> Result<HashMap<ParaId, Vec<(CandidateHash, Hash)>>, Error> {
489 let block_number_under_construction = relay_parent.number + 1;
490
491 let mut scheduled_cores_per_para: BTreeMap<ParaId, usize> = BTreeMap::new();
494 let mut ancestors: HashMap<ParaId, Ancestors> =
496 HashMap::with_capacity(availability_cores.len());
497
498 for (core_idx, core) in availability_cores.iter().enumerate() {
499 let core_idx = CoreIndex(core_idx as u32);
500 match core {
501 CoreState::Scheduled(scheduled_core) => {
502 *scheduled_cores_per_para.entry(scheduled_core.para_id).or_insert(0) += 1;
503 },
504 CoreState::Occupied(occupied_core) => {
505 let is_available = bitfields_indicate_availability(
506 core_idx.0 as usize,
507 bitfields,
508 &occupied_core.availability,
509 );
510
511 if is_available {
512 ancestors
513 .entry(occupied_core.para_id())
514 .or_default()
515 .insert(occupied_core.candidate_hash);
516
517 if let Some(ref scheduled_core) = occupied_core.next_up_on_available {
518 *scheduled_cores_per_para.entry(scheduled_core.para_id).or_insert(0) += 1;
520 }
521 } else if occupied_core.time_out_at <= block_number_under_construction {
522 if let Some(ref scheduled_core) = occupied_core.next_up_on_time_out {
525 *scheduled_cores_per_para.entry(scheduled_core.para_id).or_insert(0) += 1;
527 }
528 } else {
529 ancestors
531 .entry(occupied_core.para_id())
532 .or_default()
533 .insert(occupied_core.candidate_hash);
534 }
535 },
536 CoreState::Free => continue,
537 };
538 }
539
540 let mut selected_candidates: HashMap<ParaId, Vec<(CandidateHash, Hash)>> =
541 HashMap::with_capacity(scheduled_cores_per_para.len());
542
543 for (para_id, core_count) in scheduled_cores_per_para {
544 let para_ancestors = ancestors.remove(¶_id).unwrap_or_default();
545
546 let response = get_backable_candidates(
547 relay_parent.hash,
548 para_id,
549 para_ancestors,
550 core_count as u32,
551 sender,
552 )
553 .await?;
554
555 if response.is_empty() {
556 gum::debug!(
557 target: LOG_TARGET,
558 leaf_hash = ?relay_parent.hash,
559 ?para_id,
560 "No backable candidate returned by prospective parachains",
561 );
562 continue
563 }
564
565 selected_candidates.insert(para_id, response);
566 }
567
568 Ok(selected_candidates)
569}
570
571async fn select_candidates(
574 availability_cores: &[CoreState],
575 bitfields: &[SignedAvailabilityBitfield],
576 leaf: &ActivatedLeaf,
577 sender: &mut impl overseer::ProvisionerSenderTrait,
578) -> Result<Vec<BackedCandidate>, Error> {
579 let relay_parent = leaf.hash;
580 gum::trace!(
581 target: LOG_TARGET,
582 leaf_hash=?relay_parent,
583 "before GetBackedCandidates"
584 );
585
586 let selected_candidates =
587 request_backable_candidates(availability_cores, bitfields, leaf, sender).await?;
588 gum::debug!(target: LOG_TARGET, ?selected_candidates, "Got backable candidates");
589
590 let (tx, rx) = oneshot::channel();
592 sender.send_unbounded_message(CandidateBackingMessage::GetBackableCandidates(
593 selected_candidates.clone(),
594 tx,
595 ));
596 let candidates = rx.await.map_err(|err| Error::CanceledBackedCandidates(err))?;
597 gum::trace!(
598 target: LOG_TARGET,
599 leaf_hash=?relay_parent,
600 "Got {} backed candidates", candidates.len()
601 );
602
603 let mut with_validation_code = false;
605 let mut merged_candidates = Vec::with_capacity(availability_cores.len());
607
608 for para_candidates in candidates.into_values() {
609 for candidate in para_candidates {
610 if candidate.candidate().commitments.new_validation_code.is_some() {
611 if with_validation_code {
612 break
613 } else {
614 with_validation_code = true;
615 }
616 }
617
618 merged_candidates.push(candidate);
619 }
620 }
621
622 gum::debug!(
623 target: LOG_TARGET,
624 n_candidates = merged_candidates.len(),
625 n_cores = availability_cores.len(),
626 ?relay_parent,
627 "Selected backed candidates",
628 );
629
630 Ok(merged_candidates)
631}
632
633async fn get_backable_candidates(
636 relay_parent: Hash,
637 para_id: ParaId,
638 ancestors: Ancestors,
639 count: u32,
640 sender: &mut impl overseer::ProvisionerSenderTrait,
641) -> Result<Vec<(CandidateHash, Hash)>, Error> {
642 let (tx, rx) = oneshot::channel();
643 sender
644 .send_message(ProspectiveParachainsMessage::GetBackableCandidates(
645 relay_parent,
646 para_id,
647 count,
648 ancestors,
649 tx,
650 ))
651 .await;
652
653 rx.await.map_err(Error::CanceledBackableCandidates)
654}
655
656fn bitfields_indicate_availability(
663 core_idx: usize,
664 bitfields: &[SignedAvailabilityBitfield],
665 availability: &CoreAvailability,
666) -> bool {
667 let mut availability = availability.clone();
668 let availability_len = availability.len();
669
670 for bitfield in bitfields {
671 let validator_idx = bitfield.validator_index().0 as usize;
672 match availability.get_mut(validator_idx) {
673 None => {
674 gum::warn!(
679 target: LOG_TARGET,
680 validator_idx = %validator_idx,
681 availability_len = %availability_len,
682 "attempted to set a transverse bit at idx {} which is greater than bitfield size {}",
683 validator_idx,
684 availability_len,
685 );
686
687 return false
688 },
689 Some(mut bit_mut) => *bit_mut |= bitfield.payload().0[core_idx],
690 }
691 }
692
693 3 * availability.count_ones() >= 2 * availability.len()
694}