referrerpolicy=no-referrer-when-downgrade

polkadot_node_core_provisioner/
lib.rs

1// Copyright (C) Parity Technologies (UK) Ltd.
2// This file is part of Polkadot.
3
4// Polkadot is free software: you can redistribute it and/or modify
5// it under the terms of the GNU General Public License as published by
6// the Free Software Foundation, either version 3 of the License, or
7// (at your option) any later version.
8
9// Polkadot is distributed in the hope that it will be useful,
10// but WITHOUT ANY WARRANTY; without even the implied warranty of
11// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
12// GNU General Public License for more details.
13
14// You should have received a copy of the GNU General Public License
15// along with Polkadot.  If not, see <http://www.gnu.org/licenses/>.
16
17//! The provisioner is responsible for assembling a relay chain block
18//! from a set of available parachain candidates of its choice.
19
20#![deny(missing_docs, unused_crate_dependencies)]
21
22use bitvec::vec::BitVec;
23use futures::{
24	channel::oneshot::{self, Canceled},
25	future::BoxFuture,
26	prelude::*,
27	stream::FuturesUnordered,
28	FutureExt,
29};
30use futures_timer::Delay;
31use polkadot_node_subsystem::{
32	messages::{
33		Ancestors, CandidateBackingMessage, ChainApiMessage, ProspectiveParachainsMessage,
34		ProvisionableData, ProvisionerInherentData, ProvisionerMessage,
35	},
36	overseer, ActivatedLeaf, ActiveLeavesUpdate, FromOrchestra, OverseerSignal, SpawnedSubsystem,
37	SubsystemError,
38};
39use polkadot_node_subsystem_util::{request_availability_cores, TimeoutExt};
40use polkadot_primitives::{
41	BackedCandidate, CandidateEvent, CandidateHash, CoreIndex, CoreState, Hash, Id as ParaId,
42	SignedAvailabilityBitfield, ValidatorIndex,
43};
44use sc_consensus_slots::time_until_next_slot;
45use schnellru::{ByLength, LruMap};
46use std::{
47	collections::{BTreeMap, HashMap},
48	time::Duration,
49};
50mod disputes;
51mod error;
52mod metrics;
53
54pub use self::metrics::*;
55use error::{Error, FatalResult};
56
57#[cfg(test)]
58mod tests;
59
60/// How long to wait before proposing.
61const PRE_PROPOSE_TIMEOUT: std::time::Duration = core::time::Duration::from_millis(2000);
62/// Some timeout to ensure task won't hang around in the background forever on issues.
63const SEND_INHERENT_DATA_TIMEOUT: std::time::Duration = core::time::Duration::from_millis(500);
64
65const LOG_TARGET: &str = "parachain::provisioner";
66
67/// The provisioner subsystem.
68pub struct ProvisionerSubsystem {
69	metrics: Metrics,
70}
71
72impl ProvisionerSubsystem {
73	/// Create a new instance of the `ProvisionerSubsystem`.
74	pub fn new(metrics: Metrics) -> Self {
75		Self { metrics }
76	}
77}
78
79/// A per-relay-parent state for the provisioning subsystem.
80pub struct PerRelayParent {
81	leaf: ActivatedLeaf,
82	signed_bitfields: Vec<SignedAvailabilityBitfield>,
83	is_inherent_ready: bool,
84	awaiting_inherent: Vec<oneshot::Sender<ProvisionerInherentData>>,
85}
86
87impl PerRelayParent {
88	fn new(leaf: ActivatedLeaf) -> Self {
89		Self {
90			leaf,
91			signed_bitfields: Vec::new(),
92			is_inherent_ready: false,
93			awaiting_inherent: Vec::new(),
94		}
95	}
96}
97
98type InherentDelays = FuturesUnordered<BoxFuture<'static, Hash>>;
99type SlotDelays = FuturesUnordered<BoxFuture<'static, Hash>>;
100type InherentReceivers =
101	FuturesUnordered<BoxFuture<'static, (Hash, Result<ProvisionerInherentData, Canceled>)>>;
102
103#[overseer::subsystem(Provisioner, error=SubsystemError, prefix=self::overseer)]
104impl<Context> ProvisionerSubsystem {
105	fn start(self, ctx: Context) -> SpawnedSubsystem {
106		let future = async move {
107			run(ctx, self.metrics)
108				.await
109				.map_err(|e| SubsystemError::with_origin("provisioner", e))
110		}
111		.boxed();
112
113		SpawnedSubsystem { name: "provisioner-subsystem", future }
114	}
115}
116
117#[overseer::contextbounds(Provisioner, prefix = self::overseer)]
118async fn run<Context>(mut ctx: Context, metrics: Metrics) -> FatalResult<()> {
119	let mut inherent_delays = InherentDelays::new();
120	let mut inherent_receivers = InherentReceivers::new();
121	let mut slot_delays = SlotDelays::new();
122	let mut per_relay_parent = HashMap::new();
123	let mut inherents = LruMap::new(ByLength::new(16));
124
125	loop {
126		let result = run_iteration(
127			&mut ctx,
128			&mut per_relay_parent,
129			&mut inherent_delays,
130			&mut inherent_receivers,
131			&mut inherents,
132			&mut slot_delays,
133			&metrics,
134		)
135		.await;
136
137		match result {
138			Ok(()) => break,
139			err => crate::error::log_error(err)?,
140		}
141	}
142
143	Ok(())
144}
145
146#[overseer::contextbounds(Provisioner, prefix = self::overseer)]
147async fn run_iteration<Context>(
148	ctx: &mut Context,
149	per_relay_parent: &mut HashMap<Hash, PerRelayParent>,
150	inherent_delays: &mut InherentDelays,
151	inherent_receivers: &mut InherentReceivers,
152	inherents: &mut LruMap<Hash, ProvisionerInherentData>,
153	slot_delays: &mut SlotDelays,
154	metrics: &Metrics,
155) -> Result<(), Error> {
156	loop {
157		futures::select! {
158			from_overseer = ctx.recv().fuse() => {
159				// Map the error to ensure that the subsystem exits when the overseer is gone.
160				match from_overseer.map_err(Error::OverseerExited)? {
161					FromOrchestra::Signal(OverseerSignal::ActiveLeaves(update)) =>
162						handle_active_leaves_update(ctx, update, per_relay_parent, inherent_delays, slot_delays, inherents, metrics).await?,
163					FromOrchestra::Signal(OverseerSignal::BlockFinalized(..)) => {},
164					FromOrchestra::Signal(OverseerSignal::Conclude) => return Ok(()),
165					FromOrchestra::Communication { msg } => {
166						handle_communication(ctx, per_relay_parent, msg, metrics).await?;
167					},
168				}
169			},
170			hash = slot_delays.select_next_some() => {
171				gum::debug!(target: LOG_TARGET, leaf_hash=?hash, "Slot start, preparing debug inherent");
172
173				let Some(state) = per_relay_parent.get_mut(&hash) else {
174					continue
175				};
176
177				// Create the inherent data just to record the backed candidates.
178				let (inherent_tx, inherent_rx) = oneshot::channel();
179				let task = async move {
180					match inherent_rx.await {
181						Ok(res) => (hash, Ok(res)),
182						Err(e) => (hash, Err(e)),
183					}
184				}
185				.boxed();
186
187				inherent_receivers.push(task);
188
189				send_inherent_data_bg(ctx, &state, vec![inherent_tx], metrics.clone()).await?;
190			},
191			(hash, inherent_data) = inherent_receivers.select_next_some() => {
192				let Ok(inherent_data) = inherent_data else {
193					continue
194				};
195
196				gum::trace!(
197					target: LOG_TARGET,
198					relay_parent = ?hash,
199					"Debug Inherent Data became ready"
200				);
201				inherents.insert(hash, inherent_data);
202			}
203			hash = inherent_delays.select_next_some() => {
204				if let Some(state) = per_relay_parent.get_mut(&hash) {
205					state.is_inherent_ready = true;
206
207					gum::trace!(
208						target: LOG_TARGET,
209						relay_parent = ?hash,
210						"Inherent Data became ready"
211					);
212
213					let return_senders = std::mem::take(&mut state.awaiting_inherent);
214					if !return_senders.is_empty() {
215						send_inherent_data_bg(ctx, &state, return_senders, metrics.clone()).await?;
216					}
217				}
218			}
219		}
220	}
221}
222
223#[overseer::contextbounds(Provisioner, prefix = self::overseer)]
224async fn handle_active_leaves_update<Context>(
225	ctx: &mut Context,
226	update: ActiveLeavesUpdate,
227	per_relay_parent: &mut HashMap<Hash, PerRelayParent>,
228	inherent_delays: &mut InherentDelays,
229	slot_delays: &mut SlotDelays,
230	inherents: &mut LruMap<Hash, ProvisionerInherentData>,
231	metrics: &Metrics,
232) -> Result<(), Error> {
233	gum::trace!(target: LOG_TARGET, "Handle ActiveLeavesUpdate");
234	for deactivated in &update.deactivated {
235		per_relay_parent.remove(deactivated);
236	}
237
238	let Some(leaf) = update.activated else { return Ok(()) };
239
240	gum::trace!(target: LOG_TARGET, leaf_hash=?leaf.hash, "Adding delay");
241	let delay_fut = Delay::new(PRE_PROPOSE_TIMEOUT).map(move |_| leaf.hash).boxed();
242	per_relay_parent.insert(leaf.hash, PerRelayParent::new(leaf.clone()));
243	inherent_delays.push(delay_fut);
244
245	let slot_delay = time_until_next_slot(Duration::from_millis(6000));
246	gum::debug!(target: LOG_TARGET, leaf_hash=?leaf.hash, "Expecting next slot in {}ms", slot_delay.as_millis());
247
248	let slot_delay_task =
249		Delay::new(slot_delay + PRE_PROPOSE_TIMEOUT).map(move |_| leaf.hash).boxed();
250	slot_delays.push(slot_delay_task);
251
252	let Ok(Ok(candidate_events)) =
253		polkadot_node_subsystem_util::request_candidate_events(leaf.hash, ctx.sender())
254			.await
255			.await
256	else {
257		gum::warn!(target: LOG_TARGET, leaf_hash=?leaf.hash, "Failed to fetch candidate events");
258
259		return Ok(())
260	};
261
262	let in_block_count = candidate_events
263		.into_iter()
264		.filter(|event| matches!(event, CandidateEvent::CandidateBacked(_, _, _, _)))
265		.count() as isize;
266
267	let (tx, rx) = oneshot::channel();
268	ctx.send_message(ChainApiMessage::BlockHeader(leaf.hash, tx)).await;
269
270	let Ok(Some(header)) = rx.await.unwrap_or_else(|err| {
271		gum::warn!(target: LOG_TARGET, hash = ?leaf.hash, ?err, "Missing header for block");
272		Ok(None)
273	}) else {
274		return Ok(())
275	};
276
277	gum::trace!(target: LOG_TARGET, hash = ?header.parent_hash, "Looking up debug inherent");
278
279	// Now, let's get the candidate count from our own inherent built earlier.
280	// The inherent is stored under the parent hash.
281	let Some(inherent) = inherents.get(&header.parent_hash) else { return Ok(()) };
282
283	let diff = inherent.backed_candidates.len() as isize - in_block_count;
284	gum::debug!(target: LOG_TARGET, 
285		 ?diff,
286		 ?in_block_count,
287		 local_count = ?inherent.backed_candidates.len(),
288		 leaf_hash=?leaf.hash, "Offchain vs on-chain backing update");
289
290	metrics.observe_backable_vs_in_block(diff);
291	Ok(())
292}
293
294#[overseer::contextbounds(Provisioner, prefix = self::overseer)]
295async fn handle_communication<Context>(
296	ctx: &mut Context,
297	per_relay_parent: &mut HashMap<Hash, PerRelayParent>,
298	message: ProvisionerMessage,
299	metrics: &Metrics,
300) -> Result<(), Error> {
301	match message {
302		ProvisionerMessage::RequestInherentData(relay_parent, return_sender) => {
303			gum::trace!(target: LOG_TARGET, ?relay_parent, "Inherent data got requested.");
304
305			if let Some(state) = per_relay_parent.get_mut(&relay_parent) {
306				if state.is_inherent_ready {
307					gum::trace!(target: LOG_TARGET, ?relay_parent, "Calling send_inherent_data.");
308					send_inherent_data_bg(ctx, &state, vec![return_sender], metrics.clone())
309						.await?;
310				} else {
311					gum::trace!(
312						target: LOG_TARGET,
313						?relay_parent,
314						"Queuing inherent data request (inherent data not yet ready)."
315					);
316					state.awaiting_inherent.push(return_sender);
317				}
318			}
319		},
320		ProvisionerMessage::ProvisionableData(relay_parent, data) => {
321			if let Some(state) = per_relay_parent.get_mut(&relay_parent) {
322				let _timer = metrics.time_provisionable_data();
323
324				gum::trace!(target: LOG_TARGET, ?relay_parent, "Received provisionable data: {:?}", &data);
325
326				note_provisionable_data(state, data);
327			}
328		},
329	}
330
331	Ok(())
332}
333
334#[overseer::contextbounds(Provisioner, prefix = self::overseer)]
335async fn send_inherent_data_bg<Context>(
336	ctx: &mut Context,
337	per_relay_parent: &PerRelayParent,
338	return_senders: Vec<oneshot::Sender<ProvisionerInherentData>>,
339	metrics: Metrics,
340) -> Result<(), Error> {
341	let leaf = per_relay_parent.leaf.clone();
342	let signed_bitfields = per_relay_parent.signed_bitfields.clone();
343	let mut sender = ctx.sender().clone();
344
345	let bg = async move {
346		let _timer = metrics.time_request_inherent_data();
347
348		gum::trace!(
349			target: LOG_TARGET,
350			relay_parent = ?leaf.hash,
351			"Sending inherent data in background."
352		);
353
354		let send_result =
355			send_inherent_data(&leaf, &signed_bitfields, return_senders, &mut sender, &metrics) // Make sure call is not taking forever:
356				.timeout(SEND_INHERENT_DATA_TIMEOUT)
357				.map(|v| match v {
358					Some(r) => r,
359					None => Err(Error::SendInherentDataTimeout),
360				});
361
362		match send_result.await {
363			Err(err) => {
364				if let Error::CanceledBackedCandidates(_) = err {
365					gum::debug!(
366						target: LOG_TARGET,
367						err = ?err,
368						"Failed to assemble or send inherent data - block got likely obsoleted already."
369					);
370				} else {
371					gum::warn!(target: LOG_TARGET, err = ?err, "failed to assemble or send inherent data");
372				}
373				metrics.on_inherent_data_request(Err(()));
374			},
375			Ok(()) => {
376				metrics.on_inherent_data_request(Ok(()));
377				gum::debug!(
378					target: LOG_TARGET,
379					signed_bitfield_count = signed_bitfields.len(),
380					leaf_hash = ?leaf.hash,
381					"inherent data sent successfully"
382				);
383				metrics.observe_inherent_data_bitfields_count(signed_bitfields.len());
384			},
385		}
386	};
387
388	ctx.spawn("send-inherent-data", bg.boxed())
389		.map_err(|_| Error::FailedToSpawnBackgroundTask)?;
390
391	Ok(())
392}
393
394fn note_provisionable_data(
395	per_relay_parent: &mut PerRelayParent,
396	provisionable_data: ProvisionableData,
397) {
398	match provisionable_data {
399		ProvisionableData::Bitfield(_, signed_bitfield) =>
400			per_relay_parent.signed_bitfields.push(signed_bitfield),
401		// We choose not to punish these forms of misbehavior for the time being.
402		// Risks from misbehavior are sufficiently mitigated at the protocol level
403		// via reputation changes. Punitive actions here may become desirable
404		// enough to dedicate time to in the future.
405		ProvisionableData::MisbehaviorReport(_, _, _) => {},
406		// We wait and do nothing here, preferring to initiate a dispute after the
407		// parablock candidate is included for the following reasons:
408		//
409		// 1. A dispute for a candidate triggered at any point before the candidate
410		// has been made available, including the backing stage, can't be
411		// guaranteed to conclude. Non-concluding disputes are unacceptable.
412		// 2. Candidates which haven't been made available don't pose a security
413		// risk as they can not be included, approved, or finalized.
414		//
415		// Currently we rely on approval checkers to trigger disputes for bad
416		// parablocks once they are included. But we can do slightly better by
417		// allowing disagreeing backers to record their disagreement and initiate a
418		// dispute once the parablock in question has been included. This potential
419		// change is tracked by: https://github.com/paritytech/polkadot/issues/3232
420		ProvisionableData::Dispute(_, _) => {},
421	}
422}
423
424type CoreAvailability = BitVec<u8, bitvec::order::Lsb0>;
425
426/// The provisioner is the subsystem best suited to choosing which specific
427/// backed candidates and availability bitfields should be assembled into the
428/// block. To engage this functionality, a
429/// `ProvisionerMessage::RequestInherentData` is sent; the response is a set of
430/// non-conflicting candidates and the appropriate bitfields. Non-conflicting
431/// means that there are never two distinct parachain candidates included for
432/// the same parachain and that new parachain candidates cannot be included
433/// until the previous one either gets declared available or expired.
434///
435/// The main complication here is going to be around handling
436/// occupied-core-assumptions. We might have candidates that are only
437/// includable when some bitfields are included. And we might have candidates
438/// that are not includable when certain bitfields are included.
439///
440/// When we're choosing bitfields to include, the rule should be simple:
441/// maximize availability. So basically, include all bitfields. And then
442/// choose a coherent set of candidates along with that.
443async fn send_inherent_data(
444	leaf: &ActivatedLeaf,
445	bitfields: &[SignedAvailabilityBitfield],
446	return_senders: Vec<oneshot::Sender<ProvisionerInherentData>>,
447	from_job: &mut impl overseer::ProvisionerSenderTrait,
448	metrics: &Metrics,
449) -> Result<(), Error> {
450	gum::trace!(
451		target: LOG_TARGET,
452		relay_parent = ?leaf.hash,
453		"Requesting availability cores"
454	);
455	let availability_cores = request_availability_cores(leaf.hash, from_job)
456		.await
457		.await
458		.map_err(|err| Error::CanceledAvailabilityCores(err))??;
459
460	gum::trace!(
461		target: LOG_TARGET,
462		relay_parent = ?leaf.hash,
463		"Selecting disputes"
464	);
465
466	let disputes = disputes::prioritized_selection::select_disputes(from_job, metrics, leaf).await;
467
468	gum::trace!(
469		target: LOG_TARGET,
470		relay_parent = ?leaf.hash,
471		"Selected disputes"
472	);
473
474	let bitfields = select_availability_bitfields(&availability_cores, bitfields, &leaf.hash);
475
476	gum::trace!(
477		target: LOG_TARGET,
478		relay_parent = ?leaf.hash,
479		"Selected bitfields"
480	);
481
482	let candidates = select_candidates(&availability_cores, &bitfields, leaf, from_job).await?;
483
484	gum::trace!(
485		target: LOG_TARGET,
486		relay_parent = ?leaf.hash,
487		"Selected candidates"
488	);
489
490	gum::debug!(
491		target: LOG_TARGET,
492		availability_cores_len = availability_cores.len(),
493		disputes_count = disputes.len(),
494		bitfields_count = bitfields.len(),
495		candidates_count = candidates.len(),
496		leaf_hash = ?leaf.hash,
497		"inherent data prepared",
498	);
499
500	let inherent_data =
501		ProvisionerInherentData { bitfields, backed_candidates: candidates, disputes };
502
503	gum::trace!(
504		target: LOG_TARGET,
505		relay_parent = ?leaf.hash,
506		"Sending back inherent data to requesters."
507	);
508
509	for return_sender in return_senders {
510		return_sender
511			.send(inherent_data.clone())
512			.map_err(|_data| Error::InherentDataReturnChannel)?;
513	}
514
515	Ok(())
516}
517
518/// In general, we want to pick all the bitfields. However, we have the following constraints:
519///
520/// - not more than one per validator
521/// - each 1 bit must correspond to an occupied core
522///
523/// If we have too many, an arbitrary selection policy is fine. For purposes of maximizing
524/// availability, we pick the one with the greatest number of 1 bits.
525///
526/// Note: This does not enforce any sorting precondition on the output; the ordering there will be
527/// unrelated to the sorting of the input.
528fn select_availability_bitfields(
529	cores: &[CoreState],
530	bitfields: &[SignedAvailabilityBitfield],
531	leaf_hash: &Hash,
532) -> Vec<SignedAvailabilityBitfield> {
533	let mut selected: BTreeMap<ValidatorIndex, SignedAvailabilityBitfield> = BTreeMap::new();
534
535	gum::debug!(
536		target: LOG_TARGET,
537		bitfields_count = bitfields.len(),
538		?leaf_hash,
539		"bitfields count before selection"
540	);
541
542	'a: for bitfield in bitfields.iter().cloned() {
543		if bitfield.payload().0.len() != cores.len() {
544			gum::debug!(target: LOG_TARGET, ?leaf_hash, "dropping bitfield due to length mismatch");
545			continue
546		}
547
548		let is_better = selected
549			.get(&bitfield.validator_index())
550			.map_or(true, |b| b.payload().0.count_ones() < bitfield.payload().0.count_ones());
551
552		if !is_better {
553			gum::trace!(
554				target: LOG_TARGET,
555				val_idx = bitfield.validator_index().0,
556				?leaf_hash,
557				"dropping bitfield due to duplication - the better one is kept"
558			);
559			continue
560		}
561
562		for (idx, _) in cores.iter().enumerate().filter(|v| !v.1.is_occupied()) {
563			// Bit is set for an unoccupied core - invalid
564			if *bitfield.payload().0.get(idx).as_deref().unwrap_or(&false) {
565				gum::debug!(
566					target: LOG_TARGET,
567					val_idx = bitfield.validator_index().0,
568					?leaf_hash,
569					"dropping invalid bitfield - bit is set for an unoccupied core"
570				);
571				continue 'a
572			}
573		}
574
575		let _ = selected.insert(bitfield.validator_index(), bitfield);
576	}
577
578	gum::debug!(
579		target: LOG_TARGET,
580		?leaf_hash,
581		"selected {} of all {} bitfields (each bitfield is from a unique validator)",
582		selected.len(),
583		bitfields.len()
584	);
585
586	selected.into_values().collect()
587}
588
589/// Requests backable candidates from Prospective Parachains subsystem
590/// based on core states.
591async fn request_backable_candidates(
592	availability_cores: &[CoreState],
593	bitfields: &[SignedAvailabilityBitfield],
594	relay_parent: &ActivatedLeaf,
595	sender: &mut impl overseer::ProvisionerSenderTrait,
596) -> Result<HashMap<ParaId, Vec<(CandidateHash, Hash)>>, Error> {
597	let block_number_under_construction = relay_parent.number + 1;
598
599	// Record how many cores are scheduled for each paraid. Use a BTreeMap because
600	// we'll need to iterate through them.
601	let mut scheduled_cores_per_para: BTreeMap<ParaId, usize> = BTreeMap::new();
602	// The on-chain ancestors of a para present in availability-cores.
603	let mut ancestors: HashMap<ParaId, Ancestors> =
604		HashMap::with_capacity(availability_cores.len());
605
606	for (core_idx, core) in availability_cores.iter().enumerate() {
607		let core_idx = CoreIndex(core_idx as u32);
608		match core {
609			CoreState::Scheduled(scheduled_core) => {
610				*scheduled_cores_per_para.entry(scheduled_core.para_id).or_insert(0) += 1;
611			},
612			CoreState::Occupied(occupied_core) => {
613				let is_available = bitfields_indicate_availability(
614					core_idx.0 as usize,
615					bitfields,
616					&occupied_core.availability,
617				);
618
619				if is_available {
620					ancestors
621						.entry(occupied_core.para_id())
622						.or_default()
623						.insert(occupied_core.candidate_hash);
624
625					if let Some(ref scheduled_core) = occupied_core.next_up_on_available {
626						// Request a new backable candidate for the newly scheduled para id.
627						*scheduled_cores_per_para.entry(scheduled_core.para_id).or_insert(0) += 1;
628					}
629				} else if occupied_core.time_out_at <= block_number_under_construction {
630					// Timed out before being available.
631
632					if let Some(ref scheduled_core) = occupied_core.next_up_on_time_out {
633						// Candidate's availability timed out, practically same as scheduled.
634						*scheduled_cores_per_para.entry(scheduled_core.para_id).or_insert(0) += 1;
635					}
636				} else {
637					// Not timed out and not available.
638					ancestors
639						.entry(occupied_core.para_id())
640						.or_default()
641						.insert(occupied_core.candidate_hash);
642				}
643			},
644			CoreState::Free => continue,
645		};
646	}
647
648	let mut selected_candidates: HashMap<ParaId, Vec<(CandidateHash, Hash)>> =
649		HashMap::with_capacity(scheduled_cores_per_para.len());
650
651	for (para_id, core_count) in scheduled_cores_per_para {
652		let para_ancestors = ancestors.remove(&para_id).unwrap_or_default();
653
654		let response = get_backable_candidates(
655			relay_parent.hash,
656			para_id,
657			para_ancestors,
658			core_count as u32,
659			sender,
660		)
661		.await?;
662
663		if response.is_empty() {
664			gum::debug!(
665				target: LOG_TARGET,
666				leaf_hash = ?relay_parent.hash,
667				?para_id,
668				"No backable candidate returned by prospective parachains",
669			);
670			continue
671		}
672
673		selected_candidates.insert(para_id, response);
674	}
675
676	Ok(selected_candidates)
677}
678
679/// Determine which cores are free, and then to the degree possible, pick a candidate appropriate to
680/// each free core.
681async fn select_candidates(
682	availability_cores: &[CoreState],
683	bitfields: &[SignedAvailabilityBitfield],
684	leaf: &ActivatedLeaf,
685	sender: &mut impl overseer::ProvisionerSenderTrait,
686) -> Result<Vec<BackedCandidate>, Error> {
687	let relay_parent = leaf.hash;
688	gum::trace!(
689		target: LOG_TARGET,
690		leaf_hash=?relay_parent,
691		"before GetBackedCandidates"
692	);
693
694	let selected_candidates =
695		request_backable_candidates(availability_cores, bitfields, leaf, sender).await?;
696	gum::debug!(target: LOG_TARGET, ?selected_candidates, "Got backable candidates");
697
698	// now get the backed candidates corresponding to these candidate receipts
699	let (tx, rx) = oneshot::channel();
700	sender.send_unbounded_message(CandidateBackingMessage::GetBackableCandidates(
701		selected_candidates.clone(),
702		tx,
703	));
704	let candidates = rx.await.map_err(|err| Error::CanceledBackedCandidates(err))?;
705	gum::trace!(
706		target: LOG_TARGET,
707		leaf_hash=?relay_parent,
708		"Got {} backed candidates", candidates.len()
709	);
710
711	// keep only one candidate with validation code.
712	let mut with_validation_code = false;
713	// merge the candidates into a common collection, preserving the order
714	let mut merged_candidates = Vec::with_capacity(availability_cores.len());
715
716	for para_candidates in candidates.into_values() {
717		for candidate in para_candidates {
718			if candidate.candidate().commitments.new_validation_code.is_some() {
719				if with_validation_code {
720					break
721				} else {
722					with_validation_code = true;
723				}
724			}
725
726			merged_candidates.push(candidate);
727		}
728	}
729
730	gum::debug!(
731		target: LOG_TARGET,
732		n_candidates = merged_candidates.len(),
733		n_cores = availability_cores.len(),
734		?relay_parent,
735		"Selected backed candidates",
736	);
737
738	Ok(merged_candidates)
739}
740
741/// Requests backable candidates from Prospective Parachains based on
742/// the given ancestors in the fragment chain. The ancestors may not be ordered.
743async fn get_backable_candidates(
744	relay_parent: Hash,
745	para_id: ParaId,
746	ancestors: Ancestors,
747	count: u32,
748	sender: &mut impl overseer::ProvisionerSenderTrait,
749) -> Result<Vec<(CandidateHash, Hash)>, Error> {
750	let (tx, rx) = oneshot::channel();
751	sender
752		.send_message(ProspectiveParachainsMessage::GetBackableCandidates(
753			relay_parent,
754			para_id,
755			count,
756			ancestors,
757			tx,
758		))
759		.await;
760
761	rx.await.map_err(Error::CanceledBackableCandidates)
762}
763
764/// The availability bitfield for a given core is the transpose
765/// of a set of signed availability bitfields. It goes like this:
766///
767/// - construct a transverse slice along `core_idx`
768/// - bitwise-or it with the availability slice
769/// - count the 1 bits, compare to the total length; true on 2/3+
770fn bitfields_indicate_availability(
771	core_idx: usize,
772	bitfields: &[SignedAvailabilityBitfield],
773	availability: &CoreAvailability,
774) -> bool {
775	let mut availability = availability.clone();
776	let availability_len = availability.len();
777
778	for bitfield in bitfields {
779		let validator_idx = bitfield.validator_index().0 as usize;
780		match availability.get_mut(validator_idx) {
781			None => {
782				// in principle, this function might return a `Result<bool, Error>` so that we can
783				// more clearly express this error condition however, in practice, that would just
784				// push off an error-handling routine which would look a whole lot like this one.
785				// simpler to just handle the error internally here.
786				gum::warn!(
787					target: LOG_TARGET,
788					validator_idx = %validator_idx,
789					availability_len = %availability_len,
790					"attempted to set a transverse bit at idx {} which is greater than bitfield size {}",
791					validator_idx,
792					availability_len,
793				);
794
795				return false
796			},
797			Some(mut bit_mut) => *bit_mut |= bitfield.payload().0[core_idx],
798		}
799	}
800
801	3 * availability.count_ones() >= 2 * availability.len()
802}