1#![deny(missing_docs, unused_crate_dependencies)]
21
22use bitvec::vec::BitVec;
23use futures::{
24 channel::oneshot::{self, Canceled},
25 future::BoxFuture,
26 prelude::*,
27 stream::FuturesUnordered,
28 FutureExt,
29};
30use futures_timer::Delay;
31use polkadot_node_subsystem::{
32 messages::{
33 Ancestors, CandidateBackingMessage, ChainApiMessage, ProspectiveParachainsMessage,
34 ProvisionableData, ProvisionerInherentData, ProvisionerMessage,
35 },
36 overseer, ActivatedLeaf, ActiveLeavesUpdate, FromOrchestra, OverseerSignal, SpawnedSubsystem,
37 SubsystemError,
38};
39use polkadot_node_subsystem_util::{request_availability_cores, TimeoutExt};
40use polkadot_primitives::{
41 BackedCandidate, CandidateEvent, CandidateHash, CoreIndex, CoreState, Hash, Id as ParaId,
42 SignedAvailabilityBitfield, ValidatorIndex,
43};
44use sc_consensus_slots::time_until_next_slot;
45use schnellru::{ByLength, LruMap};
46use std::{
47 collections::{BTreeMap, HashMap},
48 time::Duration,
49};
50mod disputes;
51mod error;
52mod metrics;
53
54pub use self::metrics::*;
55use error::{Error, FatalResult};
56
57#[cfg(test)]
58mod tests;
59
60const PRE_PROPOSE_TIMEOUT: std::time::Duration = core::time::Duration::from_millis(2000);
62const SEND_INHERENT_DATA_TIMEOUT: std::time::Duration = core::time::Duration::from_millis(500);
64
65const LOG_TARGET: &str = "parachain::provisioner";
66
67pub struct ProvisionerSubsystem {
69 metrics: Metrics,
70}
71
72impl ProvisionerSubsystem {
73 pub fn new(metrics: Metrics) -> Self {
75 Self { metrics }
76 }
77}
78
79pub struct PerRelayParent {
81 leaf: ActivatedLeaf,
82 signed_bitfields: Vec<SignedAvailabilityBitfield>,
83 is_inherent_ready: bool,
84 awaiting_inherent: Vec<oneshot::Sender<ProvisionerInherentData>>,
85}
86
87impl PerRelayParent {
88 fn new(leaf: ActivatedLeaf) -> Self {
89 Self {
90 leaf,
91 signed_bitfields: Vec::new(),
92 is_inherent_ready: false,
93 awaiting_inherent: Vec::new(),
94 }
95 }
96}
97
98type InherentDelays = FuturesUnordered<BoxFuture<'static, Hash>>;
99type SlotDelays = FuturesUnordered<BoxFuture<'static, Hash>>;
100type InherentReceivers =
101 FuturesUnordered<BoxFuture<'static, (Hash, Result<ProvisionerInherentData, Canceled>)>>;
102
103#[overseer::subsystem(Provisioner, error=SubsystemError, prefix=self::overseer)]
104impl<Context> ProvisionerSubsystem {
105 fn start(self, ctx: Context) -> SpawnedSubsystem {
106 let future = async move {
107 run(ctx, self.metrics)
108 .await
109 .map_err(|e| SubsystemError::with_origin("provisioner", e))
110 }
111 .boxed();
112
113 SpawnedSubsystem { name: "provisioner-subsystem", future }
114 }
115}
116
117#[overseer::contextbounds(Provisioner, prefix = self::overseer)]
118async fn run<Context>(mut ctx: Context, metrics: Metrics) -> FatalResult<()> {
119 let mut inherent_delays = InherentDelays::new();
120 let mut inherent_receivers = InherentReceivers::new();
121 let mut slot_delays = SlotDelays::new();
122 let mut per_relay_parent = HashMap::new();
123 let mut inherents = LruMap::new(ByLength::new(16));
124
125 loop {
126 let result = run_iteration(
127 &mut ctx,
128 &mut per_relay_parent,
129 &mut inherent_delays,
130 &mut inherent_receivers,
131 &mut inherents,
132 &mut slot_delays,
133 &metrics,
134 )
135 .await;
136
137 match result {
138 Ok(()) => break,
139 err => crate::error::log_error(err)?,
140 }
141 }
142
143 Ok(())
144}
145
146#[overseer::contextbounds(Provisioner, prefix = self::overseer)]
147async fn run_iteration<Context>(
148 ctx: &mut Context,
149 per_relay_parent: &mut HashMap<Hash, PerRelayParent>,
150 inherent_delays: &mut InherentDelays,
151 inherent_receivers: &mut InherentReceivers,
152 inherents: &mut LruMap<Hash, ProvisionerInherentData>,
153 slot_delays: &mut SlotDelays,
154 metrics: &Metrics,
155) -> Result<(), Error> {
156 loop {
157 futures::select! {
158 from_overseer = ctx.recv().fuse() => {
159 match from_overseer.map_err(Error::OverseerExited)? {
161 FromOrchestra::Signal(OverseerSignal::ActiveLeaves(update)) =>
162 handle_active_leaves_update(ctx, update, per_relay_parent, inherent_delays, slot_delays, inherents, metrics).await?,
163 FromOrchestra::Signal(OverseerSignal::BlockFinalized(..)) => {},
164 FromOrchestra::Signal(OverseerSignal::Conclude) => return Ok(()),
165 FromOrchestra::Communication { msg } => {
166 handle_communication(ctx, per_relay_parent, msg, metrics).await?;
167 },
168 }
169 },
170 hash = slot_delays.select_next_some() => {
171 gum::debug!(target: LOG_TARGET, leaf_hash=?hash, "Slot start, preparing debug inherent");
172
173 let Some(state) = per_relay_parent.get_mut(&hash) else {
174 continue
175 };
176
177 let (inherent_tx, inherent_rx) = oneshot::channel();
179 let task = async move {
180 match inherent_rx.await {
181 Ok(res) => (hash, Ok(res)),
182 Err(e) => (hash, Err(e)),
183 }
184 }
185 .boxed();
186
187 inherent_receivers.push(task);
188
189 send_inherent_data_bg(ctx, &state, vec![inherent_tx], metrics.clone()).await?;
190 },
191 (hash, inherent_data) = inherent_receivers.select_next_some() => {
192 let Ok(inherent_data) = inherent_data else {
193 continue
194 };
195
196 gum::trace!(
197 target: LOG_TARGET,
198 relay_parent = ?hash,
199 "Debug Inherent Data became ready"
200 );
201 inherents.insert(hash, inherent_data);
202 }
203 hash = inherent_delays.select_next_some() => {
204 if let Some(state) = per_relay_parent.get_mut(&hash) {
205 state.is_inherent_ready = true;
206
207 gum::trace!(
208 target: LOG_TARGET,
209 relay_parent = ?hash,
210 "Inherent Data became ready"
211 );
212
213 let return_senders = std::mem::take(&mut state.awaiting_inherent);
214 if !return_senders.is_empty() {
215 send_inherent_data_bg(ctx, &state, return_senders, metrics.clone()).await?;
216 }
217 }
218 }
219 }
220 }
221}
222
223#[overseer::contextbounds(Provisioner, prefix = self::overseer)]
224async fn handle_active_leaves_update<Context>(
225 ctx: &mut Context,
226 update: ActiveLeavesUpdate,
227 per_relay_parent: &mut HashMap<Hash, PerRelayParent>,
228 inherent_delays: &mut InherentDelays,
229 slot_delays: &mut SlotDelays,
230 inherents: &mut LruMap<Hash, ProvisionerInherentData>,
231 metrics: &Metrics,
232) -> Result<(), Error> {
233 gum::trace!(target: LOG_TARGET, "Handle ActiveLeavesUpdate");
234 for deactivated in &update.deactivated {
235 per_relay_parent.remove(deactivated);
236 }
237
238 let Some(leaf) = update.activated else { return Ok(()) };
239
240 gum::trace!(target: LOG_TARGET, leaf_hash=?leaf.hash, "Adding delay");
241 let delay_fut = Delay::new(PRE_PROPOSE_TIMEOUT).map(move |_| leaf.hash).boxed();
242 per_relay_parent.insert(leaf.hash, PerRelayParent::new(leaf.clone()));
243 inherent_delays.push(delay_fut);
244
245 let slot_delay = time_until_next_slot(Duration::from_millis(6000));
246 gum::debug!(target: LOG_TARGET, leaf_hash=?leaf.hash, "Expecting next slot in {}ms", slot_delay.as_millis());
247
248 let slot_delay_task =
249 Delay::new(slot_delay + PRE_PROPOSE_TIMEOUT).map(move |_| leaf.hash).boxed();
250 slot_delays.push(slot_delay_task);
251
252 let Ok(Ok(candidate_events)) =
253 polkadot_node_subsystem_util::request_candidate_events(leaf.hash, ctx.sender())
254 .await
255 .await
256 else {
257 gum::warn!(target: LOG_TARGET, leaf_hash=?leaf.hash, "Failed to fetch candidate events");
258
259 return Ok(())
260 };
261
262 let in_block_count = candidate_events
263 .into_iter()
264 .filter(|event| matches!(event, CandidateEvent::CandidateBacked(_, _, _, _)))
265 .count() as isize;
266
267 let (tx, rx) = oneshot::channel();
268 ctx.send_message(ChainApiMessage::BlockHeader(leaf.hash, tx)).await;
269
270 let Ok(Some(header)) = rx.await.unwrap_or_else(|err| {
271 gum::warn!(target: LOG_TARGET, hash = ?leaf.hash, ?err, "Missing header for block");
272 Ok(None)
273 }) else {
274 return Ok(())
275 };
276
277 gum::trace!(target: LOG_TARGET, hash = ?header.parent_hash, "Looking up debug inherent");
278
279 let Some(inherent) = inherents.get(&header.parent_hash) else { return Ok(()) };
282
283 let diff = inherent.backed_candidates.len() as isize - in_block_count;
284 gum::debug!(target: LOG_TARGET,
285 ?diff,
286 ?in_block_count,
287 local_count = ?inherent.backed_candidates.len(),
288 leaf_hash=?leaf.hash, "Offchain vs on-chain backing update");
289
290 metrics.observe_backable_vs_in_block(diff);
291 Ok(())
292}
293
294#[overseer::contextbounds(Provisioner, prefix = self::overseer)]
295async fn handle_communication<Context>(
296 ctx: &mut Context,
297 per_relay_parent: &mut HashMap<Hash, PerRelayParent>,
298 message: ProvisionerMessage,
299 metrics: &Metrics,
300) -> Result<(), Error> {
301 match message {
302 ProvisionerMessage::RequestInherentData(relay_parent, return_sender) => {
303 gum::trace!(target: LOG_TARGET, ?relay_parent, "Inherent data got requested.");
304
305 if let Some(state) = per_relay_parent.get_mut(&relay_parent) {
306 if state.is_inherent_ready {
307 gum::trace!(target: LOG_TARGET, ?relay_parent, "Calling send_inherent_data.");
308 send_inherent_data_bg(ctx, &state, vec![return_sender], metrics.clone())
309 .await?;
310 } else {
311 gum::trace!(
312 target: LOG_TARGET,
313 ?relay_parent,
314 "Queuing inherent data request (inherent data not yet ready)."
315 );
316 state.awaiting_inherent.push(return_sender);
317 }
318 }
319 },
320 ProvisionerMessage::ProvisionableData(relay_parent, data) => {
321 if let Some(state) = per_relay_parent.get_mut(&relay_parent) {
322 let _timer = metrics.time_provisionable_data();
323
324 gum::trace!(target: LOG_TARGET, ?relay_parent, "Received provisionable data: {:?}", &data);
325
326 note_provisionable_data(state, data);
327 }
328 },
329 }
330
331 Ok(())
332}
333
334#[overseer::contextbounds(Provisioner, prefix = self::overseer)]
335async fn send_inherent_data_bg<Context>(
336 ctx: &mut Context,
337 per_relay_parent: &PerRelayParent,
338 return_senders: Vec<oneshot::Sender<ProvisionerInherentData>>,
339 metrics: Metrics,
340) -> Result<(), Error> {
341 let leaf = per_relay_parent.leaf.clone();
342 let signed_bitfields = per_relay_parent.signed_bitfields.clone();
343 let mut sender = ctx.sender().clone();
344
345 let bg = async move {
346 let _timer = metrics.time_request_inherent_data();
347
348 gum::trace!(
349 target: LOG_TARGET,
350 relay_parent = ?leaf.hash,
351 "Sending inherent data in background."
352 );
353
354 let send_result =
355 send_inherent_data(&leaf, &signed_bitfields, return_senders, &mut sender, &metrics) .timeout(SEND_INHERENT_DATA_TIMEOUT)
357 .map(|v| match v {
358 Some(r) => r,
359 None => Err(Error::SendInherentDataTimeout),
360 });
361
362 match send_result.await {
363 Err(err) => {
364 if let Error::CanceledBackedCandidates(_) = err {
365 gum::debug!(
366 target: LOG_TARGET,
367 err = ?err,
368 "Failed to assemble or send inherent data - block got likely obsoleted already."
369 );
370 } else {
371 gum::warn!(target: LOG_TARGET, err = ?err, "failed to assemble or send inherent data");
372 }
373 metrics.on_inherent_data_request(Err(()));
374 },
375 Ok(()) => {
376 metrics.on_inherent_data_request(Ok(()));
377 gum::debug!(
378 target: LOG_TARGET,
379 signed_bitfield_count = signed_bitfields.len(),
380 leaf_hash = ?leaf.hash,
381 "inherent data sent successfully"
382 );
383 metrics.observe_inherent_data_bitfields_count(signed_bitfields.len());
384 },
385 }
386 };
387
388 ctx.spawn("send-inherent-data", bg.boxed())
389 .map_err(|_| Error::FailedToSpawnBackgroundTask)?;
390
391 Ok(())
392}
393
394fn note_provisionable_data(
395 per_relay_parent: &mut PerRelayParent,
396 provisionable_data: ProvisionableData,
397) {
398 match provisionable_data {
399 ProvisionableData::Bitfield(_, signed_bitfield) =>
400 per_relay_parent.signed_bitfields.push(signed_bitfield),
401 ProvisionableData::MisbehaviorReport(_, _, _) => {},
406 ProvisionableData::Dispute(_, _) => {},
421 }
422}
423
424type CoreAvailability = BitVec<u8, bitvec::order::Lsb0>;
425
426async fn send_inherent_data(
444 leaf: &ActivatedLeaf,
445 bitfields: &[SignedAvailabilityBitfield],
446 return_senders: Vec<oneshot::Sender<ProvisionerInherentData>>,
447 from_job: &mut impl overseer::ProvisionerSenderTrait,
448 metrics: &Metrics,
449) -> Result<(), Error> {
450 gum::trace!(
451 target: LOG_TARGET,
452 relay_parent = ?leaf.hash,
453 "Requesting availability cores"
454 );
455 let availability_cores = request_availability_cores(leaf.hash, from_job)
456 .await
457 .await
458 .map_err(|err| Error::CanceledAvailabilityCores(err))??;
459
460 gum::trace!(
461 target: LOG_TARGET,
462 relay_parent = ?leaf.hash,
463 "Selecting disputes"
464 );
465
466 let disputes = disputes::prioritized_selection::select_disputes(from_job, metrics, leaf).await;
467
468 gum::trace!(
469 target: LOG_TARGET,
470 relay_parent = ?leaf.hash,
471 "Selected disputes"
472 );
473
474 let bitfields = select_availability_bitfields(&availability_cores, bitfields, &leaf.hash);
475
476 gum::trace!(
477 target: LOG_TARGET,
478 relay_parent = ?leaf.hash,
479 "Selected bitfields"
480 );
481
482 let candidates = select_candidates(&availability_cores, &bitfields, leaf, from_job).await?;
483
484 gum::trace!(
485 target: LOG_TARGET,
486 relay_parent = ?leaf.hash,
487 "Selected candidates"
488 );
489
490 gum::debug!(
491 target: LOG_TARGET,
492 availability_cores_len = availability_cores.len(),
493 disputes_count = disputes.len(),
494 bitfields_count = bitfields.len(),
495 candidates_count = candidates.len(),
496 leaf_hash = ?leaf.hash,
497 "inherent data prepared",
498 );
499
500 let inherent_data =
501 ProvisionerInherentData { bitfields, backed_candidates: candidates, disputes };
502
503 gum::trace!(
504 target: LOG_TARGET,
505 relay_parent = ?leaf.hash,
506 "Sending back inherent data to requesters."
507 );
508
509 for return_sender in return_senders {
510 return_sender
511 .send(inherent_data.clone())
512 .map_err(|_data| Error::InherentDataReturnChannel)?;
513 }
514
515 Ok(())
516}
517
518fn select_availability_bitfields(
529 cores: &[CoreState],
530 bitfields: &[SignedAvailabilityBitfield],
531 leaf_hash: &Hash,
532) -> Vec<SignedAvailabilityBitfield> {
533 let mut selected: BTreeMap<ValidatorIndex, SignedAvailabilityBitfield> = BTreeMap::new();
534
535 gum::debug!(
536 target: LOG_TARGET,
537 bitfields_count = bitfields.len(),
538 ?leaf_hash,
539 "bitfields count before selection"
540 );
541
542 'a: for bitfield in bitfields.iter().cloned() {
543 if bitfield.payload().0.len() != cores.len() {
544 gum::debug!(target: LOG_TARGET, ?leaf_hash, "dropping bitfield due to length mismatch");
545 continue
546 }
547
548 let is_better = selected
549 .get(&bitfield.validator_index())
550 .map_or(true, |b| b.payload().0.count_ones() < bitfield.payload().0.count_ones());
551
552 if !is_better {
553 gum::trace!(
554 target: LOG_TARGET,
555 val_idx = bitfield.validator_index().0,
556 ?leaf_hash,
557 "dropping bitfield due to duplication - the better one is kept"
558 );
559 continue
560 }
561
562 for (idx, _) in cores.iter().enumerate().filter(|v| !v.1.is_occupied()) {
563 if *bitfield.payload().0.get(idx).as_deref().unwrap_or(&false) {
565 gum::debug!(
566 target: LOG_TARGET,
567 val_idx = bitfield.validator_index().0,
568 ?leaf_hash,
569 "dropping invalid bitfield - bit is set for an unoccupied core"
570 );
571 continue 'a
572 }
573 }
574
575 let _ = selected.insert(bitfield.validator_index(), bitfield);
576 }
577
578 gum::debug!(
579 target: LOG_TARGET,
580 ?leaf_hash,
581 "selected {} of all {} bitfields (each bitfield is from a unique validator)",
582 selected.len(),
583 bitfields.len()
584 );
585
586 selected.into_values().collect()
587}
588
589async fn request_backable_candidates(
592 availability_cores: &[CoreState],
593 bitfields: &[SignedAvailabilityBitfield],
594 relay_parent: &ActivatedLeaf,
595 sender: &mut impl overseer::ProvisionerSenderTrait,
596) -> Result<HashMap<ParaId, Vec<(CandidateHash, Hash)>>, Error> {
597 let block_number_under_construction = relay_parent.number + 1;
598
599 let mut scheduled_cores_per_para: BTreeMap<ParaId, usize> = BTreeMap::new();
602 let mut ancestors: HashMap<ParaId, Ancestors> =
604 HashMap::with_capacity(availability_cores.len());
605
606 for (core_idx, core) in availability_cores.iter().enumerate() {
607 let core_idx = CoreIndex(core_idx as u32);
608 match core {
609 CoreState::Scheduled(scheduled_core) => {
610 *scheduled_cores_per_para.entry(scheduled_core.para_id).or_insert(0) += 1;
611 },
612 CoreState::Occupied(occupied_core) => {
613 let is_available = bitfields_indicate_availability(
614 core_idx.0 as usize,
615 bitfields,
616 &occupied_core.availability,
617 );
618
619 if is_available {
620 ancestors
621 .entry(occupied_core.para_id())
622 .or_default()
623 .insert(occupied_core.candidate_hash);
624
625 if let Some(ref scheduled_core) = occupied_core.next_up_on_available {
626 *scheduled_cores_per_para.entry(scheduled_core.para_id).or_insert(0) += 1;
628 }
629 } else if occupied_core.time_out_at <= block_number_under_construction {
630 if let Some(ref scheduled_core) = occupied_core.next_up_on_time_out {
633 *scheduled_cores_per_para.entry(scheduled_core.para_id).or_insert(0) += 1;
635 }
636 } else {
637 ancestors
639 .entry(occupied_core.para_id())
640 .or_default()
641 .insert(occupied_core.candidate_hash);
642 }
643 },
644 CoreState::Free => continue,
645 };
646 }
647
648 let mut selected_candidates: HashMap<ParaId, Vec<(CandidateHash, Hash)>> =
649 HashMap::with_capacity(scheduled_cores_per_para.len());
650
651 for (para_id, core_count) in scheduled_cores_per_para {
652 let para_ancestors = ancestors.remove(¶_id).unwrap_or_default();
653
654 let response = get_backable_candidates(
655 relay_parent.hash,
656 para_id,
657 para_ancestors,
658 core_count as u32,
659 sender,
660 )
661 .await?;
662
663 if response.is_empty() {
664 gum::debug!(
665 target: LOG_TARGET,
666 leaf_hash = ?relay_parent.hash,
667 ?para_id,
668 "No backable candidate returned by prospective parachains",
669 );
670 continue
671 }
672
673 selected_candidates.insert(para_id, response);
674 }
675
676 Ok(selected_candidates)
677}
678
679async fn select_candidates(
682 availability_cores: &[CoreState],
683 bitfields: &[SignedAvailabilityBitfield],
684 leaf: &ActivatedLeaf,
685 sender: &mut impl overseer::ProvisionerSenderTrait,
686) -> Result<Vec<BackedCandidate>, Error> {
687 let relay_parent = leaf.hash;
688 gum::trace!(
689 target: LOG_TARGET,
690 leaf_hash=?relay_parent,
691 "before GetBackedCandidates"
692 );
693
694 let selected_candidates =
695 request_backable_candidates(availability_cores, bitfields, leaf, sender).await?;
696 gum::debug!(target: LOG_TARGET, ?selected_candidates, "Got backable candidates");
697
698 let (tx, rx) = oneshot::channel();
700 sender.send_unbounded_message(CandidateBackingMessage::GetBackableCandidates(
701 selected_candidates.clone(),
702 tx,
703 ));
704 let candidates = rx.await.map_err(|err| Error::CanceledBackedCandidates(err))?;
705 gum::trace!(
706 target: LOG_TARGET,
707 leaf_hash=?relay_parent,
708 "Got {} backed candidates", candidates.len()
709 );
710
711 let mut with_validation_code = false;
713 let mut merged_candidates = Vec::with_capacity(availability_cores.len());
715
716 for para_candidates in candidates.into_values() {
717 for candidate in para_candidates {
718 if candidate.candidate().commitments.new_validation_code.is_some() {
719 if with_validation_code {
720 break
721 } else {
722 with_validation_code = true;
723 }
724 }
725
726 merged_candidates.push(candidate);
727 }
728 }
729
730 gum::debug!(
731 target: LOG_TARGET,
732 n_candidates = merged_candidates.len(),
733 n_cores = availability_cores.len(),
734 ?relay_parent,
735 "Selected backed candidates",
736 );
737
738 Ok(merged_candidates)
739}
740
741async fn get_backable_candidates(
744 relay_parent: Hash,
745 para_id: ParaId,
746 ancestors: Ancestors,
747 count: u32,
748 sender: &mut impl overseer::ProvisionerSenderTrait,
749) -> Result<Vec<(CandidateHash, Hash)>, Error> {
750 let (tx, rx) = oneshot::channel();
751 sender
752 .send_message(ProspectiveParachainsMessage::GetBackableCandidates(
753 relay_parent,
754 para_id,
755 count,
756 ancestors,
757 tx,
758 ))
759 .await;
760
761 rx.await.map_err(Error::CanceledBackableCandidates)
762}
763
764fn bitfields_indicate_availability(
771 core_idx: usize,
772 bitfields: &[SignedAvailabilityBitfield],
773 availability: &CoreAvailability,
774) -> bool {
775 let mut availability = availability.clone();
776 let availability_len = availability.len();
777
778 for bitfield in bitfields {
779 let validator_idx = bitfield.validator_index().0 as usize;
780 match availability.get_mut(validator_idx) {
781 None => {
782 gum::warn!(
787 target: LOG_TARGET,
788 validator_idx = %validator_idx,
789 availability_len = %availability_len,
790 "attempted to set a transverse bit at idx {} which is greater than bitfield size {}",
791 validator_idx,
792 availability_len,
793 );
794
795 return false
796 },
797 Some(mut bit_mut) => *bit_mut |= bitfield.payload().0[core_idx],
798 }
799 }
800
801 3 * availability.count_ones() >= 2 * availability.len()
802}