referrerpolicy=no-referrer-when-downgrade

polkadot_runtime_parachains/paras_inherent/
mod.rs

1// Copyright (C) Parity Technologies (UK) Ltd.
2// This file is part of Polkadot.
3
4// Polkadot is free software: you can redistribute it and/or modify
5// it under the terms of the GNU General Public License as published by
6// the Free Software Foundation, either version 3 of the License, or
7// (at your option) any later version.
8
9// Polkadot is distributed in the hope that it will be useful,
10// but WITHOUT ANY WARRANTY; without even the implied warranty of
11// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
12// GNU General Public License for more details.
13
14// You should have received a copy of the GNU General Public License
15// along with Polkadot.  If not, see <http://www.gnu.org/licenses/>.
16
17//! Provides glue code over the scheduler and inclusion modules, and accepting
18//! one inherent per block that can include new para candidates and bitfields.
19//!
20//! Unlike other modules in this crate, it does not need to be initialized by the initializer,
21//! as it has no initialization logic and its finalization logic depends only on the details of
22//! this module.
23
24use crate::{
25	configuration,
26	disputes::DisputesHandler,
27	inclusion::{self, CandidateCheckContext},
28	initializer,
29	metrics::METRICS,
30	paras, scheduler,
31	shared::{self, AllowedRelayParentsTracker},
32	ParaId,
33};
34use alloc::{
35	collections::{btree_map::BTreeMap, btree_set::BTreeSet},
36	vec,
37	vec::Vec,
38};
39use bitvec::prelude::BitVec;
40use core::result::Result;
41use frame_support::{
42	defensive,
43	dispatch::{DispatchErrorWithPostInfo, PostDispatchInfo},
44	inherent::{InherentData, InherentIdentifier, MakeFatalError, ProvideInherent},
45	pallet_prelude::*,
46	traits::Randomness,
47};
48
49use frame_system::pallet_prelude::*;
50use pallet_babe::{self, ParentBlockRandomness};
51use polkadot_primitives::{
52	effective_minimum_backing_votes, node_features::FeatureIndex, BackedCandidate,
53	CandidateDescriptorVersion, CandidateHash, CandidateReceiptV2 as CandidateReceipt,
54	CheckedDisputeStatementSet, CheckedMultiDisputeStatementSet, CoreIndex, DisputeStatementSet,
55	HeadData, InherentData as ParachainsInherentData, MultiDisputeStatementSet,
56	ScrapedOnChainVotes, SessionIndex, SignedAvailabilityBitfields, SigningContext,
57	UncheckedSignedAvailabilityBitfield, UncheckedSignedAvailabilityBitfields, ValidatorId,
58	ValidatorIndex, ValidityAttestation, PARACHAINS_INHERENT_IDENTIFIER,
59};
60use rand::{seq::SliceRandom, SeedableRng};
61use scale_info::TypeInfo;
62use sp_runtime::traits::{Header as HeaderT, One};
63
64mod misc;
65mod weights;
66
67use self::weights::checked_multi_dispute_statement_sets_weight;
68pub use self::{
69	misc::{IndexedRetain, IsSortedBy},
70	weights::{
71		backed_candidate_weight, backed_candidates_weight, dispute_statement_set_weight,
72		multi_dispute_statement_sets_weight, paras_inherent_total_weight, signed_bitfield_weight,
73		signed_bitfields_weight, TestWeightInfo, WeightInfo,
74	},
75};
76
77#[cfg(feature = "runtime-benchmarks")]
78mod benchmarking;
79
80#[cfg(test)]
81mod tests;
82
83const LOG_TARGET: &str = "runtime::inclusion-inherent";
84
85/// A bitfield concerning concluded disputes for candidates
86/// associated to the core index equivalent to the bit position.
87#[derive(Default, PartialEq, Eq, Clone, Encode, Decode, RuntimeDebug, TypeInfo)]
88pub(crate) struct DisputedBitfield(pub(crate) BitVec<u8, bitvec::order::Lsb0>);
89
90impl From<BitVec<u8, bitvec::order::Lsb0>> for DisputedBitfield {
91	fn from(inner: BitVec<u8, bitvec::order::Lsb0>) -> Self {
92		Self(inner)
93	}
94}
95
96#[cfg(test)]
97impl DisputedBitfield {
98	/// Create a new bitfield, where each bit is set to `false`.
99	pub fn zeros(n: usize) -> Self {
100		Self::from(BitVec::<u8, bitvec::order::Lsb0>::repeat(false, n))
101	}
102}
103
104pub use pallet::*;
105
106#[frame_support::pallet]
107pub mod pallet {
108	use super::*;
109
110	#[pallet::pallet]
111	#[pallet::without_storage_info]
112	pub struct Pallet<T>(_);
113
114	#[pallet::config]
115	#[pallet::disable_frame_system_supertrait_check]
116	pub trait Config:
117		inclusion::Config + scheduler::Config + initializer::Config + pallet_babe::Config
118	{
119		/// Weight information for extrinsics in this pallet.
120		type WeightInfo: WeightInfo;
121	}
122
123	#[pallet::error]
124	pub enum Error<T> {
125		/// Inclusion inherent called more than once per block.
126		TooManyInclusionInherents,
127		/// The hash of the submitted parent header doesn't correspond to the saved block hash of
128		/// the parent.
129		InvalidParentHeader,
130		/// Inherent data was filtered during execution. This should have only been done
131		/// during creation.
132		InherentDataFilteredDuringExecution,
133		/// Too many candidates supplied.
134		UnscheduledCandidate,
135	}
136
137	/// Whether the paras inherent was included within this block.
138	///
139	/// The `Option<()>` is effectively a `bool`, but it never hits storage in the `None` variant
140	/// due to the guarantees of FRAME's storage APIs.
141	///
142	/// If this is `None` at the end of the block, we panic and render the block invalid.
143	#[pallet::storage]
144	pub(crate) type Included<T> = StorageValue<_, ()>;
145
146	/// Scraped on chain data for extracting resolved disputes as well as backing votes.
147	#[pallet::storage]
148	pub type OnChainVotes<T: Config> = StorageValue<_, ScrapedOnChainVotes<T::Hash>>;
149
150	/// Update the disputes statements set part of the on-chain votes.
151	pub(crate) fn set_scrapable_on_chain_disputes<T: Config>(
152		session: SessionIndex,
153		checked_disputes: CheckedMultiDisputeStatementSet,
154	) {
155		crate::paras_inherent::OnChainVotes::<T>::mutate(move |value| {
156			let disputes =
157				checked_disputes.into_iter().map(DisputeStatementSet::from).collect::<Vec<_>>();
158			let backing_validators_per_candidate = match value.take() {
159				Some(v) => v.backing_validators_per_candidate,
160				None => Vec::new(),
161			};
162			*value = Some(ScrapedOnChainVotes::<T::Hash> {
163				backing_validators_per_candidate,
164				disputes,
165				session,
166			});
167		})
168	}
169
170	/// Update the backing votes including part of the on-chain votes.
171	pub(crate) fn set_scrapable_on_chain_backings<T: Config>(
172		session: SessionIndex,
173		backing_validators_per_candidate: Vec<(
174			CandidateReceipt<T::Hash>,
175			Vec<(ValidatorIndex, ValidityAttestation)>,
176		)>,
177	) {
178		crate::paras_inherent::OnChainVotes::<T>::mutate(move |value| {
179			let disputes = match value.take() {
180				Some(v) => v.disputes,
181				None => MultiDisputeStatementSet::default(),
182			};
183			*value = Some(ScrapedOnChainVotes::<T::Hash> {
184				backing_validators_per_candidate,
185				disputes,
186				session,
187			});
188		})
189	}
190
191	#[pallet::hooks]
192	impl<T: Config> Hooks<BlockNumberFor<T>> for Pallet<T> {
193		fn on_initialize(_: BlockNumberFor<T>) -> Weight {
194			T::DbWeight::get().reads_writes(1, 1) // in `on_finalize`.
195		}
196
197		fn on_finalize(_: BlockNumberFor<T>) {
198			if Included::<T>::take().is_none() {
199				panic!("Bitfields and heads must be included every block");
200			}
201		}
202	}
203
204	#[pallet::inherent]
205	impl<T: Config> ProvideInherent for Pallet<T> {
206		type Call = Call<T>;
207		type Error = MakeFatalError<()>;
208		const INHERENT_IDENTIFIER: InherentIdentifier = PARACHAINS_INHERENT_IDENTIFIER;
209
210		fn create_inherent(data: &InherentData) -> Option<Self::Call> {
211			let inherent_data = Self::create_inherent_inner(data)?;
212
213			Some(Call::enter { data: inherent_data })
214		}
215
216		fn is_inherent(call: &Self::Call) -> bool {
217			matches!(call, Call::enter { .. })
218		}
219	}
220
221	#[pallet::call]
222	impl<T: Config> Pallet<T> {
223		/// Enter the paras inherent. This will process bitfields and backed candidates.
224		#[pallet::call_index(0)]
225		#[pallet::weight((
226			paras_inherent_total_weight::<T>(
227				data.backed_candidates.as_slice(),
228				&data.bitfields,
229				&data.disputes,
230			),
231			DispatchClass::Mandatory,
232		))]
233		pub fn enter(
234			origin: OriginFor<T>,
235			data: ParachainsInherentData<HeaderFor<T>>,
236		) -> DispatchResultWithPostInfo {
237			ensure_none(origin)?;
238
239			ensure!(!Included::<T>::exists(), Error::<T>::TooManyInclusionInherents);
240			Included::<T>::set(Some(()));
241			let initial_data = data.clone();
242
243			Self::process_inherent_data(data).and_then(|(processed, post_info)| {
244				ensure!(initial_data == processed, Error::<T>::InherentDataFilteredDuringExecution);
245				Ok(post_info)
246			})
247		}
248	}
249}
250
251impl<T: Config> Pallet<T> {
252	/// Create the `ParachainsInherentData` that gets passed to [`Self::enter`] in
253	/// [`Self::create_inherent`]. This code is pulled out of [`Self::create_inherent`] so it can be
254	/// unit tested.
255	fn create_inherent_inner(data: &InherentData) -> Option<ParachainsInherentData<HeaderFor<T>>> {
256		let parachains_inherent_data = match data.get_data(&Self::INHERENT_IDENTIFIER) {
257			Ok(Some(d)) => d,
258			Ok(None) => return None,
259			Err(_) => {
260				log::warn!(target: LOG_TARGET, "ParachainsInherentData failed to decode");
261				return None
262			},
263		};
264		match Self::process_inherent_data(parachains_inherent_data) {
265			Ok((processed, _)) => Some(processed),
266			Err(err) => {
267				log::warn!(target: LOG_TARGET, "Processing inherent data failed: {:?}", err);
268				None
269			},
270		}
271	}
272
273	/// Process inherent data.
274	///
275	/// The given inherent data is processed and state is altered accordingly. If any data could
276	/// not be applied (inconsistencies, weight limit, ...) it is removed.
277	///
278	/// Returns: Result containing processed inherent data and weight, the processed inherent would
279	/// consume.
280	fn process_inherent_data(
281		data: ParachainsInherentData<HeaderFor<T>>,
282	) -> Result<(ParachainsInherentData<HeaderFor<T>>, PostDispatchInfo), DispatchErrorWithPostInfo>
283	{
284		#[cfg(feature = "runtime-metrics")]
285		sp_io::init_tracing();
286
287		let ParachainsInherentData {
288			mut bitfields,
289			mut backed_candidates,
290			parent_header,
291			mut disputes,
292		} = data;
293
294		log::debug!(
295			target: LOG_TARGET,
296			"[process_inherent_data] bitfields.len(): {}, backed_candidates.len(): {}, disputes.len() {}",
297			bitfields.len(),
298			backed_candidates.len(),
299			disputes.len()
300		);
301
302		let parent_hash = frame_system::Pallet::<T>::parent_hash();
303
304		ensure!(
305			parent_header.hash().as_ref() == parent_hash.as_ref(),
306			Error::<T>::InvalidParentHeader,
307		);
308
309		let now = frame_system::Pallet::<T>::block_number();
310		let config = configuration::ActiveConfig::<T>::get();
311
312		// Before anything else, update the allowed relay-parents.
313		{
314			let parent_number = now - One::one();
315			let parent_storage_root = *parent_header.state_root();
316
317			shared::AllowedRelayParents::<T>::mutate(|tracker| {
318				tracker.update(
319					parent_hash,
320					parent_storage_root,
321					scheduler::ClaimQueue::<T>::get()
322						.into_iter()
323						.map(|(core_index, paras)| {
324							(core_index, paras.into_iter().map(|e| e.para_id()).collect())
325						})
326						.collect(),
327					parent_number,
328					config.scheduler_params.lookahead,
329				);
330			});
331		}
332
333		let candidates_weight = backed_candidates_weight::<T>(&backed_candidates);
334		let bitfields_weight = signed_bitfields_weight::<T>(&bitfields);
335		let disputes_weight = multi_dispute_statement_sets_weight::<T>(&disputes);
336
337		// Weight before filtering/sanitization except for enacting the candidates
338		let weight_before_filtering = candidates_weight + bitfields_weight + disputes_weight;
339
340		METRICS.on_before_filter(weight_before_filtering.ref_time());
341		log::debug!(target: LOG_TARGET, "Size before filter: {}, candidates + bitfields: {}, disputes: {}", weight_before_filtering.proof_size(), candidates_weight.proof_size() + bitfields_weight.proof_size(), disputes_weight.proof_size());
342		log::debug!(target: LOG_TARGET, "Time weight before filter: {}, candidates + bitfields: {}, disputes: {}", weight_before_filtering.ref_time(), candidates_weight.ref_time() + bitfields_weight.ref_time(), disputes_weight.ref_time());
343
344		let current_session = shared::CurrentSessionIndex::<T>::get();
345		let expected_bits = scheduler::Pallet::<T>::num_availability_cores();
346		let validator_public = shared::ActiveValidatorKeys::<T>::get();
347
348		// We are assuming (incorrectly) to have all the weight (for the mandatory class or even
349		// full block) available to us. This can lead to slightly overweight blocks, which still
350		// works as the dispatch class for `enter` is `Mandatory`. By using the `Mandatory`
351		// dispatch class, the upper layers impose no limit on the weight of this inherent, instead
352		// we limit ourselves and make sure to stay within reasonable bounds. It might make sense
353		// to subtract BlockWeights::base_block to reduce chances of becoming overweight.
354		let max_block_weight = {
355			let dispatch_class = DispatchClass::Mandatory;
356			let max_block_weight_full = <T as frame_system::Config>::BlockWeights::get();
357			log::debug!(target: LOG_TARGET, "Max block weight: {}", max_block_weight_full.max_block);
358			// Get max block weight for the mandatory class if defined, otherwise total max weight
359			// of the block.
360			let max_weight = max_block_weight_full
361				.per_class
362				.get(dispatch_class)
363				.max_total
364				.unwrap_or(max_block_weight_full.max_block);
365			log::debug!(target: LOG_TARGET, "Used max block time weight: {}", max_weight);
366
367			let max_block_size_full = <T as frame_system::Config>::BlockLength::get();
368			let max_block_size = max_block_size_full.max.get(dispatch_class);
369			log::debug!(target: LOG_TARGET, "Used max block size: {}", max_block_size);
370
371			// Adjust proof size to max block size as we are tracking tx size.
372			max_weight.set_proof_size(*max_block_size as u64)
373		};
374		log::debug!(target: LOG_TARGET, "Used max block weight: {}", max_block_weight);
375
376		let entropy = compute_entropy::<T>(parent_hash);
377		let mut rng = rand_chacha::ChaChaRng::from_seed(entropy.into());
378
379		// Filter out duplicates and continue.
380		if let Err(()) = T::DisputesHandler::deduplicate_and_sort_dispute_data(&mut disputes) {
381			log::debug!(target: LOG_TARGET, "Found duplicate statement sets, retaining the first");
382		}
383
384		let post_conclusion_acceptance_period = config.dispute_post_conclusion_acceptance_period;
385
386		let dispute_statement_set_valid = move |set: DisputeStatementSet| {
387			T::DisputesHandler::filter_dispute_data(set, post_conclusion_acceptance_period)
388		};
389
390		// Limit the disputes first, since the following statements depend on the votes included
391		// here.
392		let (checked_disputes_sets, checked_disputes_sets_consumed_weight) =
393			limit_and_sanitize_disputes::<T, _>(
394				disputes,
395				dispute_statement_set_valid,
396				max_block_weight,
397			);
398
399		let mut all_weight_after = {
400			// Assure the maximum block weight is adhered, by limiting bitfields and backed
401			// candidates. Dispute statement sets were already limited before.
402			let non_disputes_weight = apply_weight_limit::<T>(
403				&mut backed_candidates,
404				&mut bitfields,
405				max_block_weight.saturating_sub(checked_disputes_sets_consumed_weight),
406				&mut rng,
407			);
408
409			let all_weight_after =
410				non_disputes_weight.saturating_add(checked_disputes_sets_consumed_weight);
411
412			METRICS.on_after_filter(all_weight_after.ref_time());
413			log::debug!(
414				target: LOG_TARGET,
415				"[process_inherent_data] after filter: bitfields.len(): {}, backed_candidates.len(): {}, checked_disputes_sets.len() {}",
416				bitfields.len(),
417				backed_candidates.len(),
418				checked_disputes_sets.len()
419			);
420			log::debug!(target: LOG_TARGET, "Size after filter: {}, candidates + bitfields: {}, disputes: {}", all_weight_after.proof_size(), non_disputes_weight.proof_size(), checked_disputes_sets_consumed_weight.proof_size());
421			log::debug!(target: LOG_TARGET, "Time weight after filter: {}, candidates + bitfields: {}, disputes: {}", all_weight_after.ref_time(), non_disputes_weight.ref_time(), checked_disputes_sets_consumed_weight.ref_time());
422
423			if all_weight_after.any_gt(max_block_weight) {
424				log::warn!(target: LOG_TARGET, "Post weight limiting weight is still too large, time: {}, size: {}", all_weight_after.ref_time(), all_weight_after.proof_size());
425			}
426			all_weight_after
427		};
428
429		// Note that `process_checked_multi_dispute_data` will iterate and import each
430		// dispute; so the input here must be reasonably bounded,
431		// which is guaranteed by the checks and weight limitation above.
432		// We don't care about fresh or not disputes
433		// this writes them to storage, so let's query it via those means
434		// if this fails for whatever reason, that's ok.
435		if let Err(e) =
436			T::DisputesHandler::process_checked_multi_dispute_data(&checked_disputes_sets)
437		{
438			log::warn!(target: LOG_TARGET, "MultiDisputesData failed to update: {:?}", e);
439		};
440		METRICS.on_disputes_imported(checked_disputes_sets.len() as u64);
441
442		set_scrapable_on_chain_disputes::<T>(current_session, checked_disputes_sets.clone());
443
444		if T::DisputesHandler::is_frozen() {
445			// Relay chain freeze, at this point we will not include any parachain blocks.
446			METRICS.on_relay_chain_freeze();
447
448			let disputes = checked_disputes_sets
449				.into_iter()
450				.map(|checked| checked.into())
451				.collect::<Vec<_>>();
452			let processed = ParachainsInherentData {
453				bitfields: Vec::new(),
454				backed_candidates: Vec::new(),
455				disputes,
456				parent_header,
457			};
458
459			// The relay chain we are currently on is invalid. Proceed no further on parachains.
460			return Ok((processed, Some(checked_disputes_sets_consumed_weight).into()))
461		}
462
463		// Contains the disputes that are concluded in the current session only,
464		// since these are the only ones that are relevant for the occupied cores
465		// and lightens the load on `free_disputed` significantly.
466		// Cores can't be occupied with candidates of the previous sessions, and only
467		// things with new votes can have just concluded. We only need to collect
468		// cores with disputes that conclude just now, because disputes that
469		// concluded longer ago have already had any corresponding cores cleaned up.
470		let current_concluded_invalid_disputes = checked_disputes_sets
471			.iter()
472			.map(AsRef::as_ref)
473			.filter(|dss| dss.session == current_session)
474			.map(|dss| (dss.session, dss.candidate_hash))
475			.filter(|(session, candidate)| {
476				<T>::DisputesHandler::concluded_invalid(*session, *candidate)
477			})
478			.map(|(_session, candidate)| candidate)
479			.collect::<BTreeSet<CandidateHash>>();
480
481		// Get the cores freed as a result of concluded invalid candidates.
482		let (freed_disputed, concluded_invalid_hashes): (Vec<CoreIndex>, BTreeSet<CandidateHash>) =
483			inclusion::Pallet::<T>::free_disputed(&current_concluded_invalid_disputes)
484				.into_iter()
485				.unzip();
486
487		// Create a bit index from the set of core indices where each index corresponds to
488		// a core index that was freed due to a dispute.
489		//
490		// I.e. 010100 would indicate, the candidates on Core 1 and 3 would be disputed.
491		let disputed_bitfield = create_disputed_bitfield(expected_bits, freed_disputed.iter());
492
493		let bitfields = sanitize_bitfields::<T>(
494			bitfields,
495			disputed_bitfield,
496			expected_bits,
497			parent_hash,
498			current_session,
499			&validator_public[..],
500		);
501		METRICS.on_bitfields_processed(bitfields.len() as u64);
502
503		// Process new availability bitfields, yielding any availability cores whose
504		// work has now concluded.
505		let (enact_weight, freed_concluded) =
506			inclusion::Pallet::<T>::update_pending_availability_and_get_freed_cores(
507				&validator_public[..],
508				bitfields.clone(),
509			);
510		all_weight_after.saturating_accrue(enact_weight);
511		log::debug!(
512			target: LOG_TARGET,
513			"Enacting weight: {}, all weight: {}",
514			enact_weight.ref_time(),
515			all_weight_after.ref_time(),
516		);
517
518		// It's possible that that after the enacting the candidates, the total weight
519		// goes over the limit, however, we can't do anything about it at this point.
520		// By using the `Mandatory` weight, we ensure the block is still accepted,
521		// but no other (user) transactions can be included.
522		if all_weight_after.any_gt(max_block_weight) {
523			log::warn!(
524				target: LOG_TARGET,
525				"Overweight para inherent data after enacting the candidates {:?}: {} > {}",
526				parent_hash,
527				all_weight_after,
528				max_block_weight,
529			);
530		}
531
532		// Inform the disputes module of all included candidates.
533		for (_, candidate_hash) in &freed_concluded {
534			T::DisputesHandler::note_included(current_session, *candidate_hash, now);
535		}
536
537		METRICS.on_candidates_included(freed_concluded.len() as u64);
538
539		// Get the timed out candidates
540		let freed_timeout = if scheduler::Pallet::<T>::availability_timeout_check_required() {
541			inclusion::Pallet::<T>::free_timedout()
542		} else {
543			Vec::new()
544		};
545
546		if !freed_timeout.is_empty() {
547			log::debug!(target: LOG_TARGET, "Evicted timed out cores: {:?}", freed_timeout);
548		}
549
550		// Back candidates.
551		let (candidate_receipt_with_backing_validator_indices, backed_candidates_with_core) =
552			Self::back_candidates(concluded_invalid_hashes, backed_candidates)?;
553
554		set_scrapable_on_chain_backings::<T>(
555			current_session,
556			candidate_receipt_with_backing_validator_indices,
557		);
558
559		let disputes = checked_disputes_sets
560			.into_iter()
561			.map(|checked| checked.into())
562			.collect::<Vec<_>>();
563
564		let bitfields = bitfields.into_iter().map(|v| v.into_unchecked()).collect();
565
566		let count = backed_candidates_with_core.len();
567		let processed = ParachainsInherentData {
568			bitfields,
569			backed_candidates: backed_candidates_with_core.into_iter().fold(
570				Vec::with_capacity(count),
571				|mut acc, (_id, candidates)| {
572					acc.extend(candidates.into_iter().map(|(c, _)| c));
573					acc
574				},
575			),
576			disputes,
577			parent_header,
578		};
579		Ok((processed, Some(all_weight_after).into()))
580	}
581
582	fn back_candidates(
583		concluded_invalid_hashes: BTreeSet<CandidateHash>,
584		backed_candidates: Vec<BackedCandidate<T::Hash>>,
585	) -> Result<
586		(
587			Vec<(CandidateReceipt<T::Hash>, Vec<(ValidatorIndex, ValidityAttestation)>)>,
588			BTreeMap<ParaId, Vec<(BackedCandidate<T::Hash>, CoreIndex)>>,
589		),
590		DispatchErrorWithPostInfo,
591	> {
592		let allowed_relay_parents = shared::AllowedRelayParents::<T>::get();
593		let upcoming_new_session = initializer::Pallet::<T>::upcoming_session_change();
594
595		METRICS.on_candidates_processed_total(backed_candidates.len() as u64);
596
597		if !upcoming_new_session {
598			let occupied_cores =
599				inclusion::Pallet::<T>::get_occupied_cores().map(|(core, _)| core).collect();
600
601			let mut eligible: BTreeMap<ParaId, BTreeSet<CoreIndex>> = BTreeMap::new();
602			let mut total_eligible_cores = 0;
603
604			for (core_idx, para_id) in Self::eligible_paras(&occupied_cores) {
605				total_eligible_cores += 1;
606				log::trace!(target: LOG_TARGET, "Found eligible para {:?} on core {:?}", para_id, core_idx);
607				eligible.entry(para_id).or_default().insert(core_idx);
608			}
609
610			let node_features = configuration::ActiveConfig::<T>::get().node_features;
611
612			let allow_v2_receipts = node_features
613				.get(FeatureIndex::CandidateReceiptV2 as usize)
614				.map(|b| *b)
615				.unwrap_or(false);
616
617			let backed_candidates_with_core = sanitize_backed_candidates::<T>(
618				backed_candidates,
619				&allowed_relay_parents,
620				concluded_invalid_hashes,
621				eligible,
622				allow_v2_receipts,
623			);
624			let count = count_backed_candidates(&backed_candidates_with_core);
625
626			ensure!(count <= total_eligible_cores, Error::<T>::UnscheduledCandidate);
627
628			METRICS.on_candidates_sanitized(count as u64);
629
630			// Process backed candidates according to scheduled cores.
631			let candidate_receipt_with_backing_validator_indices =
632				inclusion::Pallet::<T>::process_candidates(
633					&allowed_relay_parents,
634					&backed_candidates_with_core,
635					scheduler::Pallet::<T>::group_validators,
636				)?;
637
638			// We need to advance the claim queue on all cores, except for the ones that did not
639			// get freed in this block. The ones that did not get freed also cannot be newly
640			// occupied.
641			scheduler::Pallet::<T>::advance_claim_queue(&occupied_cores);
642
643			Ok((candidate_receipt_with_backing_validator_indices, backed_candidates_with_core))
644		} else {
645			log::debug!(
646				target: LOG_TARGET,
647				"Upcoming session change, not backing any new candidates."
648			);
649			// If we'll initialize a new session at the end of the block, we don't want to
650			// advance the claim queue.
651
652			Ok((vec![], BTreeMap::new()))
653		}
654	}
655
656	/// Paras that may get backed on cores.
657	///
658	/// 1. The para must be scheduled on core.
659	/// 2. Core needs to be free, otherwise backing is not possible.
660	///
661	/// We get a set of the occupied cores as input.
662	pub(crate) fn eligible_paras<'a>(
663		occupied_cores: &'a BTreeSet<CoreIndex>,
664	) -> impl Iterator<Item = (CoreIndex, ParaId)> + 'a {
665		scheduler::ClaimQueue::<T>::get().into_iter().filter_map(|(core_idx, queue)| {
666			if occupied_cores.contains(&core_idx) {
667				return None
668			}
669			let next_scheduled = queue.front()?;
670			Some((core_idx, next_scheduled.para_id()))
671		})
672	}
673}
674
675/// Derive a bitfield from dispute
676pub(super) fn create_disputed_bitfield<'a, I>(
677	expected_bits: usize,
678	freed_cores: I,
679) -> DisputedBitfield
680where
681	I: 'a + IntoIterator<Item = &'a CoreIndex>,
682{
683	let mut bitvec = BitVec::repeat(false, expected_bits);
684	for core_idx in freed_cores {
685		let core_idx = core_idx.0 as usize;
686		if core_idx < expected_bits {
687			bitvec.set(core_idx, true);
688		}
689	}
690	DisputedBitfield::from(bitvec)
691}
692
693/// Select a random subset, with preference for certain indices.
694///
695/// Adds random items to the set until all candidates
696/// are tried or the remaining weight is depleted.
697///
698/// Returns the weight of all selected items from `selectables`
699/// as well as their indices in ascending order.
700fn random_sel<X, F: Fn(&X) -> Weight>(
701	rng: &mut rand_chacha::ChaChaRng,
702	selectables: &[X],
703	mut preferred_indices: Vec<usize>,
704	weight_fn: F,
705	weight_limit: Weight,
706) -> (Weight, Vec<usize>) {
707	if selectables.is_empty() {
708		return (Weight::zero(), Vec::new())
709	}
710	// all indices that are not part of the preferred set
711	let mut indices = (0..selectables.len())
712		.into_iter()
713		.filter(|idx| !preferred_indices.contains(idx))
714		.collect::<Vec<_>>();
715	let mut picked_indices = Vec::with_capacity(selectables.len().saturating_sub(1));
716
717	let mut weight_acc = Weight::zero();
718
719	preferred_indices.shuffle(rng);
720	for preferred_idx in preferred_indices {
721		// preferred indices originate from outside
722		if let Some(item) = selectables.get(preferred_idx) {
723			let updated = weight_acc.saturating_add(weight_fn(item));
724			if updated.any_gt(weight_limit) {
725				continue
726			}
727			weight_acc = updated;
728			picked_indices.push(preferred_idx);
729		}
730	}
731
732	indices.shuffle(rng);
733	for idx in indices {
734		let item = &selectables[idx];
735		let updated = weight_acc.saturating_add(weight_fn(item));
736
737		if updated.any_gt(weight_limit) {
738			continue
739		}
740		weight_acc = updated;
741
742		picked_indices.push(idx);
743	}
744
745	// sorting indices, so the ordering is retained
746	// unstable sorting is fine, since there are no duplicates in indices
747	// and even if there were, they don't have an identity
748	picked_indices.sort_unstable();
749	(weight_acc, picked_indices)
750}
751
752/// Considers an upper threshold that the inherent data must not exceed.
753///
754/// If there is sufficient space, all bitfields and all candidates
755/// will be included.
756///
757/// Otherwise tries to include all disputes, and then tries to fill the remaining space with
758/// bitfields and then candidates.
759///
760/// The selection process is random. For candidates, there is an exception for code upgrades as they
761/// are preferred. And for disputes, local and older disputes are preferred (see
762/// `limit_and_sanitize_disputes`). for backed candidates, since with a increasing number of
763/// parachains their chances of inclusion become slim. All backed candidates  are checked
764/// beforehand in `fn create_inherent_inner` which guarantees sanity.
765///
766/// Assumes disputes are already filtered by the time this is called.
767///
768/// Returns the total weight consumed by `bitfields` and `candidates`.
769pub(crate) fn apply_weight_limit<T: Config + inclusion::Config>(
770	candidates: &mut Vec<BackedCandidate<<T>::Hash>>,
771	bitfields: &mut UncheckedSignedAvailabilityBitfields,
772	max_consumable_weight: Weight,
773	rng: &mut rand_chacha::ChaChaRng,
774) -> Weight {
775	let total_candidates_weight = backed_candidates_weight::<T>(candidates.as_slice());
776
777	let total_bitfields_weight = signed_bitfields_weight::<T>(&bitfields);
778
779	let total = total_bitfields_weight.saturating_add(total_candidates_weight);
780
781	// candidates + bitfields fit into the block
782	if max_consumable_weight.all_gte(total) {
783		return total
784	}
785
786	// Invariant: block author provides candidate in the order in which they form a chain
787	// wrt elastic scaling. If the invariant is broken, we'd fail later when filtering candidates
788	// which are unchained.
789
790	let mut chained_candidates: Vec<Vec<_>> = Vec::new();
791	let mut current_para_id = None;
792
793	for candidate in core::mem::take(candidates).into_iter() {
794		let candidate_para_id = candidate.descriptor().para_id();
795		if Some(candidate_para_id) == current_para_id {
796			let chain = chained_candidates
797				.last_mut()
798				.expect("if the current_para_id is Some, then vec is not empty; qed");
799			chain.push(candidate);
800		} else {
801			current_para_id = Some(candidate_para_id);
802			chained_candidates.push(vec![candidate]);
803		}
804	}
805
806	// Elastic scaling: we prefer chains that have a code upgrade among the candidates,
807	// as the candidates containing the upgrade tend to be large and hence stand no chance to
808	// be picked late while maintaining the weight bounds.
809	//
810	// Limitations: For simplicity if total weight of a chain of candidates is larger than
811	// the remaining weight, the chain will still not be included while it could still be possible
812	// to include part of that chain.
813	let preferred_chain_indices = chained_candidates
814		.iter()
815		.enumerate()
816		.filter_map(|(idx, candidates)| {
817			// Check if any of the candidate in chain contains a code upgrade.
818			if candidates
819				.iter()
820				.any(|candidate| candidate.candidate().commitments.new_validation_code.is_some())
821			{
822				Some(idx)
823			} else {
824				None
825			}
826		})
827		.collect::<Vec<usize>>();
828
829	// There is weight remaining to be consumed by a subset of chained candidates
830	// which are going to be picked now.
831	if let Some(max_consumable_by_candidates) =
832		max_consumable_weight.checked_sub(&total_bitfields_weight)
833	{
834		let (acc_candidate_weight, chained_indices) =
835			random_sel::<Vec<BackedCandidate<<T as frame_system::Config>::Hash>>, _>(
836				rng,
837				&chained_candidates,
838				preferred_chain_indices,
839				|candidates| backed_candidates_weight::<T>(&candidates),
840				max_consumable_by_candidates,
841			);
842		log::debug!(target: LOG_TARGET, "Indices Candidates: {:?}, size: {}", chained_indices, candidates.len());
843		chained_candidates
844			.indexed_retain(|idx, _backed_candidates| chained_indices.binary_search(&idx).is_ok());
845		// pick all bitfields, and
846		// fill the remaining space with candidates
847		let total_consumed = acc_candidate_weight.saturating_add(total_bitfields_weight);
848
849		*candidates = chained_candidates.into_iter().flatten().collect::<Vec<_>>();
850
851		return total_consumed
852	}
853
854	candidates.clear();
855
856	// insufficient space for even the bitfields alone, so only try to fit as many of those
857	// into the block and skip the candidates entirely
858	let (total_consumed, indices) = random_sel::<UncheckedSignedAvailabilityBitfield, _>(
859		rng,
860		&bitfields,
861		vec![],
862		|bitfield| signed_bitfield_weight::<T>(&bitfield),
863		max_consumable_weight,
864	);
865	log::debug!(target: LOG_TARGET, "Indices Bitfields: {:?}, size: {}", indices, bitfields.len());
866
867	bitfields.indexed_retain(|idx, _bitfield| indices.binary_search(&idx).is_ok());
868
869	total_consumed
870}
871
872/// Filter bitfields based on freed core indices, validity, and other sanity checks.
873///
874/// Do sanity checks on the bitfields:
875///
876///  1. no more than one bitfield per validator
877///  2. bitfields are ascending by validator index.
878///  3. each bitfield has exactly `expected_bits`
879///  4. signature is valid
880///  5. remove any disputed core indices
881///
882/// If any of those is not passed, the bitfield is dropped.
883pub(crate) fn sanitize_bitfields<T: crate::inclusion::Config>(
884	unchecked_bitfields: UncheckedSignedAvailabilityBitfields,
885	disputed_bitfield: DisputedBitfield,
886	expected_bits: usize,
887	parent_hash: T::Hash,
888	session_index: SessionIndex,
889	validators: &[ValidatorId],
890) -> SignedAvailabilityBitfields {
891	let mut bitfields = Vec::with_capacity(unchecked_bitfields.len());
892
893	let mut last_index: Option<ValidatorIndex> = None;
894
895	if disputed_bitfield.0.len() != expected_bits {
896		// This is a system logic error that should never occur, but we want to handle it gracefully
897		// so we just drop all bitfields
898		log::error!(target: LOG_TARGET, "BUG: disputed_bitfield != expected_bits");
899		return vec![]
900	}
901
902	let all_zeros = BitVec::<u8, bitvec::order::Lsb0>::repeat(false, expected_bits);
903	let signing_context = SigningContext { parent_hash, session_index };
904	for unchecked_bitfield in unchecked_bitfields {
905		// Find and skip invalid bitfields.
906		if unchecked_bitfield.unchecked_payload().0.len() != expected_bits {
907			log::trace!(
908				target: LOG_TARGET,
909				"bad bitfield length: {} != {:?}",
910				unchecked_bitfield.unchecked_payload().0.len(),
911				expected_bits,
912			);
913			continue
914		}
915
916		if unchecked_bitfield.unchecked_payload().0.clone() & disputed_bitfield.0.clone() !=
917			all_zeros
918		{
919			log::trace!(
920				target: LOG_TARGET,
921				"bitfield contains disputed cores: {:?}",
922				unchecked_bitfield.unchecked_payload().0.clone() & disputed_bitfield.0.clone()
923			);
924			continue
925		}
926
927		let validator_index = unchecked_bitfield.unchecked_validator_index();
928
929		if !last_index.map_or(true, |last_index: ValidatorIndex| last_index < validator_index) {
930			log::trace!(
931				target: LOG_TARGET,
932				"bitfield validator index is not greater than last: !({:?} < {})",
933				last_index.as_ref().map(|x| x.0),
934				validator_index.0
935			);
936			continue
937		}
938
939		if unchecked_bitfield.unchecked_validator_index().0 as usize >= validators.len() {
940			log::trace!(
941				target: LOG_TARGET,
942				"bitfield validator index is out of bounds: {} >= {}",
943				validator_index.0,
944				validators.len(),
945			);
946			continue
947		}
948
949		let validator_public = &validators[validator_index.0 as usize];
950
951		// Validate bitfield signature.
952		if let Ok(signed_bitfield) =
953			unchecked_bitfield.try_into_checked(&signing_context, validator_public)
954		{
955			bitfields.push(signed_bitfield);
956			METRICS.on_valid_bitfield_signature();
957		} else {
958			log::warn!(target: LOG_TARGET, "Invalid bitfield signature");
959			METRICS.on_invalid_bitfield_signature();
960		};
961
962		last_index = Some(validator_index);
963	}
964	bitfields
965}
966
967/// Perform required checks for given candidate receipt.
968///
969/// Returns `true` if candidate descriptor is version 1.
970///
971/// Otherwise returns `false` if:
972/// - version 2 descriptors are not allowed
973/// - the core index in descriptor doesn't match the one computed from the commitments
974/// - the `SelectCore` signal does not refer to a core at the top of claim queue
975fn sanitize_backed_candidate_v2<T: crate::inclusion::Config>(
976	candidate: &BackedCandidate<T::Hash>,
977	allowed_relay_parents: &AllowedRelayParentsTracker<T::Hash, BlockNumberFor<T>>,
978	allow_v2_receipts: bool,
979) -> bool {
980	let descriptor_version = candidate.descriptor().version();
981
982	if descriptor_version == CandidateDescriptorVersion::Unknown {
983		log::debug!(
984			target: LOG_TARGET,
985			"Candidate with unknown descriptor version. Dropping candidate {:?} for paraid {:?}.",
986			candidate.candidate().hash(),
987			candidate.descriptor().para_id()
988		);
989		return false
990	}
991
992	// It is mandatory to filter these before calling `filter_unchained_candidates` to ensure
993	// any we drop any descendants of the dropped v2 candidates.
994	if descriptor_version == CandidateDescriptorVersion::V2 && !allow_v2_receipts {
995		log::debug!(
996			target: LOG_TARGET,
997			"V2 candidate descriptors not allowed. Dropping candidate {:?} for paraid {:?}.",
998			candidate.candidate().hash(),
999			candidate.descriptor().para_id()
1000		);
1001		return false
1002	}
1003
1004	// Get the claim queue snapshot at the candidate relay parent.
1005	let Some((rp_info, _)) =
1006		allowed_relay_parents.acquire_info(candidate.descriptor().relay_parent(), None)
1007	else {
1008		log::debug!(
1009			target: LOG_TARGET,
1010			"Relay parent {:?} for candidate {:?} is not in the allowed relay parents.",
1011			candidate.descriptor().relay_parent(),
1012			candidate.candidate().hash(),
1013		);
1014		return false
1015	};
1016
1017	if let Err(err) = candidate.candidate().parse_ump_signals(&rp_info.claim_queue) {
1018		log::debug!(
1019			target: LOG_TARGET,
1020			"UMP signal check failed: {:?}. Dropping candidate {:?} for paraid {:?}.",
1021			err,
1022			candidate.candidate().hash(),
1023			candidate.descriptor().para_id()
1024		);
1025		return false
1026	}
1027
1028	if descriptor_version == CandidateDescriptorVersion::V1 {
1029		// Nothing more to check for v1 descriptors.
1030		return true
1031	}
1032
1033	let Some(session_index) = candidate.descriptor().session_index() else {
1034		log::debug!(
1035			target: LOG_TARGET,
1036			"Invalid V2 candidate receipt {:?} for paraid {:?}, missing session index.",
1037			candidate.candidate().hash(),
1038			candidate.descriptor().para_id(),
1039		);
1040		return false
1041	};
1042
1043	// Check if session index is equal to current session index.
1044	if session_index != shared::CurrentSessionIndex::<T>::get() {
1045		log::debug!(
1046			target: LOG_TARGET,
1047			"Dropping V2 candidate receipt {:?} for paraid {:?}, invalid session index {}, current session {}",
1048			candidate.candidate().hash(),
1049			candidate.descriptor().para_id(),
1050			session_index,
1051			shared::CurrentSessionIndex::<T>::get()
1052		);
1053		return false
1054	}
1055
1056	true
1057}
1058
1059/// Performs various filtering on the backed candidates inherent data.
1060/// Must maintain the invariant that the returned candidate collection contains the candidates
1061/// sorted in dependency order for each para. When doing any filtering, we must therefore drop any
1062/// subsequent candidates after the filtered one.
1063///
1064/// Filter out:
1065/// 1. Candidates that have v2 descriptors if the node `CandidateReceiptV2` feature is not enabled.
1066/// 2. any candidates which don't form a chain with the other candidates of the paraid (even if they
1067///    do form a chain but are not in the right order).
1068/// 3. any candidates that have a concluded invalid dispute or who are descendants of a concluded
1069///    invalid candidate.
1070/// 4. any unscheduled candidates, as well as candidates whose paraid has multiple cores assigned
1071///    but have no core index (either injected or in the v2 descriptor).
1072/// 5. all backing votes from disabled validators
1073/// 6. any candidates that end up with less than `effective_minimum_backing_votes` backing votes
1074///
1075/// Returns the scheduled
1076/// backed candidates which passed filtering, mapped by para id and in the right dependency order.
1077fn sanitize_backed_candidates<T: crate::inclusion::Config>(
1078	backed_candidates: Vec<BackedCandidate<T::Hash>>,
1079	allowed_relay_parents: &AllowedRelayParentsTracker<T::Hash, BlockNumberFor<T>>,
1080	concluded_invalid_with_descendants: BTreeSet<CandidateHash>,
1081	scheduled: BTreeMap<ParaId, BTreeSet<CoreIndex>>,
1082	allow_v2_receipts: bool,
1083) -> BTreeMap<ParaId, Vec<(BackedCandidate<T::Hash>, CoreIndex)>> {
1084	// Map the candidates to the right paraids, while making sure that the order between candidates
1085	// of the same para is preserved.
1086	let mut candidates_per_para: BTreeMap<ParaId, Vec<_>> = BTreeMap::new();
1087
1088	for candidate in backed_candidates {
1089		if !sanitize_backed_candidate_v2::<T>(&candidate, allowed_relay_parents, allow_v2_receipts)
1090		{
1091			continue
1092		}
1093
1094		candidates_per_para
1095			.entry(candidate.descriptor().para_id())
1096			.or_default()
1097			.push(candidate);
1098	}
1099
1100	// Check that candidates pertaining to the same para form a chain. Drop the ones that
1101	// don't, along with the rest of candidates which follow them in the input vector.
1102	filter_unchained_candidates::<T>(&mut candidates_per_para, allowed_relay_parents);
1103
1104	// Remove any candidates that were concluded invalid or who are descendants of concluded invalid
1105	// candidates (along with their descendants).
1106	retain_candidates::<T, _, _>(&mut candidates_per_para, |_, candidate| {
1107		let keep = !concluded_invalid_with_descendants.contains(&candidate.candidate().hash());
1108
1109		if !keep {
1110			log::debug!(
1111				target: LOG_TARGET,
1112				"Found backed candidate {:?} which was concluded invalid or is a descendant of a concluded invalid candidate, for paraid {:?}.",
1113				candidate.candidate().hash(),
1114				candidate.descriptor().para_id()
1115			);
1116		}
1117		keep
1118	});
1119
1120	// Map candidates to scheduled cores. Filter out any unscheduled candidates along with their
1121	// descendants.
1122	let mut backed_candidates_with_core =
1123		map_candidates_to_cores::<T>(&allowed_relay_parents, scheduled, candidates_per_para);
1124
1125	// Filter out backing statements from disabled validators. If by that we render a candidate with
1126	// less backing votes than required, filter that candidate also. As all the other filtering
1127	// operations above, we drop the descendants of the dropped candidates also.
1128	filter_backed_statements_from_disabled_validators::<T>(
1129		&mut backed_candidates_with_core,
1130		&allowed_relay_parents,
1131	);
1132
1133	backed_candidates_with_core
1134}
1135
1136fn count_backed_candidates<B>(backed_candidates: &BTreeMap<ParaId, Vec<B>>) -> usize {
1137	backed_candidates.values().map(|c| c.len()).sum()
1138}
1139
1140/// Derive entropy from babe provided per block randomness.
1141///
1142/// In the odd case none is available, uses the `parent_hash` and
1143/// a const value, while emitting a warning.
1144fn compute_entropy<T: Config>(parent_hash: T::Hash) -> [u8; 32] {
1145	const CANDIDATE_SEED_SUBJECT: [u8; 32] = *b"candidate-seed-selection-subject";
1146	// NOTE: this is slightly gameable since this randomness was already public
1147	// by the previous block, while for the block author this randomness was
1148	// known 2 epochs ago. it is marginally better than using the parent block
1149	// hash since it's harder to influence the VRF output than the block hash.
1150	let vrf_random = ParentBlockRandomness::<T>::random(&CANDIDATE_SEED_SUBJECT[..]).0;
1151	let mut entropy: [u8; 32] = CANDIDATE_SEED_SUBJECT;
1152	if let Some(vrf_random) = vrf_random {
1153		entropy.as_mut().copy_from_slice(vrf_random.as_ref());
1154	} else {
1155		// in case there is no VRF randomness present, we utilize the relay parent
1156		// as seed, it's better than a static value.
1157		log::warn!(target: LOG_TARGET, "ParentBlockRandomness did not provide entropy");
1158		entropy.as_mut().copy_from_slice(parent_hash.as_ref());
1159	}
1160	entropy
1161}
1162
1163/// Limit disputes in place.
1164///
1165/// Assumes ordering of disputes, retains sorting of the statement.
1166///
1167/// Prime source of overload safety for dispute votes:
1168/// 1. Check accumulated weight does not exceed the maximum block weight.
1169/// 2. If exceeded:
1170///   1. Check validity of all dispute statements sequentially
1171/// 2. If not exceeded:
1172///   1. If weight is exceeded by locals, pick the older ones (lower indices) until the weight limit
1173///      is reached.
1174///
1175/// Returns the consumed weight amount, that is guaranteed to be less than the provided
1176/// `max_consumable_weight`.
1177fn limit_and_sanitize_disputes<
1178	T: Config,
1179	CheckValidityFn: FnMut(DisputeStatementSet) -> Option<CheckedDisputeStatementSet>,
1180>(
1181	disputes: MultiDisputeStatementSet,
1182	mut dispute_statement_set_valid: CheckValidityFn,
1183	max_consumable_weight: Weight,
1184) -> (Vec<CheckedDisputeStatementSet>, Weight) {
1185	// The total weight if all disputes would be included
1186	let disputes_weight = multi_dispute_statement_sets_weight::<T>(&disputes);
1187
1188	if disputes_weight.any_gt(max_consumable_weight) {
1189		log::debug!(target: LOG_TARGET, "Above max consumable weight: {}/{}", disputes_weight, max_consumable_weight);
1190		let mut checked_acc = Vec::<CheckedDisputeStatementSet>::with_capacity(disputes.len());
1191
1192		// Accumulated weight of all disputes picked, that passed the checks.
1193		let mut weight_acc = Weight::zero();
1194
1195		// Select disputes in-order until the remaining weight is attained
1196		disputes.into_iter().for_each(|dss| {
1197			let dispute_weight = dispute_statement_set_weight::<T, &DisputeStatementSet>(&dss);
1198			let updated = weight_acc.saturating_add(dispute_weight);
1199			if max_consumable_weight.all_gte(updated) {
1200				// Always apply the weight. Invalid data cost processing time too:
1201				weight_acc = updated;
1202				if let Some(checked) = dispute_statement_set_valid(dss) {
1203					checked_acc.push(checked);
1204				}
1205			}
1206		});
1207
1208		(checked_acc, weight_acc)
1209	} else {
1210		// Go through all of them, and just apply the filter, they would all fit
1211		let checked = disputes
1212			.into_iter()
1213			.filter_map(|dss| dispute_statement_set_valid(dss))
1214			.collect::<Vec<CheckedDisputeStatementSet>>();
1215		// some might have been filtered out, so re-calc the weight
1216		let checked_disputes_weight = checked_multi_dispute_statement_sets_weight::<T>(&checked);
1217		(checked, checked_disputes_weight)
1218	}
1219}
1220
1221// Helper function for filtering candidates which don't pass the given predicate. When/if the first
1222// candidate which failed the predicate is found, all the other candidates that follow are dropped.
1223fn retain_candidates<
1224	T: inclusion::Config + paras::Config + inclusion::Config,
1225	F: FnMut(ParaId, &mut C) -> bool,
1226	C,
1227>(
1228	candidates_per_para: &mut BTreeMap<ParaId, Vec<C>>,
1229	mut pred: F,
1230) {
1231	for (para_id, candidates) in candidates_per_para.iter_mut() {
1232		let mut latest_valid_idx = None;
1233
1234		for (idx, candidate) in candidates.iter_mut().enumerate() {
1235			if pred(*para_id, candidate) {
1236				// Found a valid candidate.
1237				latest_valid_idx = Some(idx);
1238			} else {
1239				break
1240			}
1241		}
1242
1243		if let Some(latest_valid_idx) = latest_valid_idx {
1244			candidates.truncate(latest_valid_idx + 1);
1245		} else {
1246			candidates.clear();
1247		}
1248	}
1249
1250	candidates_per_para.retain(|_, c| !c.is_empty());
1251}
1252
1253// Filters statements from disabled validators in `BackedCandidate` and does a few more sanity
1254// checks.
1255fn filter_backed_statements_from_disabled_validators<
1256	T: shared::Config + scheduler::Config + inclusion::Config,
1257>(
1258	backed_candidates_with_core: &mut BTreeMap<
1259		ParaId,
1260		Vec<(BackedCandidate<<T as frame_system::Config>::Hash>, CoreIndex)>,
1261	>,
1262	allowed_relay_parents: &AllowedRelayParentsTracker<T::Hash, BlockNumberFor<T>>,
1263) {
1264	let disabled_validators =
1265		BTreeSet::<_>::from_iter(shared::Pallet::<T>::disabled_validators().into_iter());
1266
1267	if disabled_validators.is_empty() {
1268		// No disabled validators - nothing to do
1269		return
1270	}
1271
1272	let minimum_backing_votes = configuration::ActiveConfig::<T>::get().minimum_backing_votes;
1273
1274	// Process all backed candidates. `validator_indices` in `BackedCandidates` are indices within
1275	// the validator group assigned to the parachain. To obtain this group we need:
1276	// 1. Core index assigned to the parachain which has produced the candidate
1277	// 2. The relay chain block number of the candidate
1278	retain_candidates::<T, _, _>(backed_candidates_with_core, |para_id, (bc, core_idx)| {
1279		// `CoreIndex` not used, we just need a copy to write it back later.
1280		let (validator_indices, maybe_injected_core_index) = bc.validator_indices_and_core_index();
1281		let mut validator_indices = BitVec::<_>::from(validator_indices);
1282
1283		// Get relay parent block number of the candidate. We need this to get the group index
1284		// assigned to this core at this block number
1285		let relay_parent_block_number = match allowed_relay_parents
1286			.acquire_info(bc.descriptor().relay_parent(), None)
1287		{
1288			Some((_, block_num)) => block_num,
1289			None => {
1290				log::debug!(
1291					target: LOG_TARGET,
1292					"Relay parent {:?} for candidate is not in the allowed relay parents. Dropping the candidate.",
1293					bc.descriptor().relay_parent()
1294				);
1295				return false
1296			},
1297		};
1298
1299		// Get the group index for the core
1300		let group_idx = match scheduler::Pallet::<T>::group_assigned_to_core(
1301			*core_idx,
1302			relay_parent_block_number + One::one(),
1303		) {
1304			Some(group_idx) => group_idx,
1305			None => {
1306				log::debug!(target: LOG_TARGET, "Can't get the group index for core idx {:?}. Dropping the candidate.", core_idx);
1307				return false
1308			},
1309		};
1310
1311		// And finally get the validator group for this group index
1312		let validator_group = match scheduler::Pallet::<T>::group_validators(group_idx) {
1313			Some(validator_group) => validator_group,
1314			None => {
1315				log::debug!(target: LOG_TARGET, "Can't get the validators from group {:?}. Dropping the candidate.", group_idx);
1316				return false
1317			},
1318		};
1319
1320		// Bitmask with the disabled indices within the validator group
1321		let disabled_indices = BitVec::<u8, bitvec::order::Lsb0>::from_iter(
1322			validator_group.iter().map(|idx| disabled_validators.contains(idx)),
1323		);
1324		// The indices of statements from disabled validators in `BackedCandidate`. We have to drop
1325		// these.
1326		let indices_to_drop = disabled_indices.clone() & &validator_indices;
1327
1328		// Remove the corresponding votes from `validity_votes`
1329		for idx in indices_to_drop.iter_ones().rev() {
1330			// Map the index in `indices_to_drop` (which is an index into the validator group)
1331			// to the index in the validity votes vector, which might have less number of votes,
1332			// than validators assigned to the group.
1333			//
1334			// For each index `idx` in `indices_to_drop`, the corresponding index in the
1335			// validity votes vector is the number of `1` bits in `validator_indices` before `idx`.
1336			let mapped_idx = validator_indices[..idx].count_ones();
1337			bc.validity_votes_mut().remove(mapped_idx);
1338		}
1339
1340		// Apply the bitmask to drop the disabled validator from `validator_indices`
1341		validator_indices &= !disabled_indices;
1342		// Update the backed candidate
1343		bc.set_validator_indices_and_core_index(validator_indices, maybe_injected_core_index);
1344
1345		// By filtering votes we might render the candidate invalid and cause a failure in
1346		// [`process_candidates`]. To avoid this we have to perform a sanity check here. If there
1347		// are not enough backing votes after filtering we will remove the whole candidate.
1348		if bc.validity_votes().len() <
1349			effective_minimum_backing_votes(validator_group.len(), minimum_backing_votes)
1350		{
1351			log::debug!(
1352				target: LOG_TARGET,
1353				"Dropping candidate {:?} of paraid {:?} because it was left with too few backing votes after votes from disabled validators were filtered.",
1354				bc.candidate().hash(),
1355				para_id
1356			);
1357
1358			return false
1359		}
1360
1361		true
1362	});
1363}
1364
1365// Check that candidates pertaining to the same para form a chain. Drop the ones that
1366// don't, along with the rest of candidates which follow them in the input vector.
1367// In the process, duplicated candidates will also be dropped (even if they form a valid cycle;
1368// cycles are not allowed if they entail backing duplicated candidates).
1369fn filter_unchained_candidates<T: inclusion::Config + paras::Config + inclusion::Config>(
1370	candidates: &mut BTreeMap<ParaId, Vec<BackedCandidate<T::Hash>>>,
1371	allowed_relay_parents: &AllowedRelayParentsTracker<T::Hash, BlockNumberFor<T>>,
1372) {
1373	let mut para_latest_context: BTreeMap<ParaId, (HeadData, BlockNumberFor<T>)> = BTreeMap::new();
1374	for para_id in candidates.keys() {
1375		let Some(latest_head_data) = inclusion::Pallet::<T>::para_latest_head_data(&para_id) else {
1376			defensive!("Latest included head data for paraid {:?} is None", para_id);
1377			continue
1378		};
1379		let Some(latest_relay_parent) = inclusion::Pallet::<T>::para_most_recent_context(&para_id)
1380		else {
1381			defensive!("Latest relay parent for paraid {:?} is None", para_id);
1382			continue
1383		};
1384		para_latest_context.insert(*para_id, (latest_head_data, latest_relay_parent));
1385	}
1386
1387	let mut para_visited_candidates: BTreeMap<ParaId, BTreeSet<CandidateHash>> = BTreeMap::new();
1388
1389	retain_candidates::<T, _, _>(candidates, |para_id, candidate| {
1390		let Some((latest_head_data, latest_relay_parent)) = para_latest_context.get(&para_id)
1391		else {
1392			return false
1393		};
1394		let candidate_hash = candidate.candidate().hash();
1395
1396		let visited_candidates =
1397			para_visited_candidates.entry(para_id).or_insert_with(|| BTreeSet::new());
1398		if visited_candidates.contains(&candidate_hash) {
1399			log::debug!(
1400				target: LOG_TARGET,
1401				"Found duplicate candidates for paraid {:?}. Dropping the candidates with hash {:?}",
1402				para_id,
1403				candidate_hash
1404			);
1405
1406			// If we got a duplicate candidate, stop.
1407			return false
1408		} else {
1409			visited_candidates.insert(candidate_hash);
1410		}
1411
1412		let check_ctx = CandidateCheckContext::<T>::new(Some(*latest_relay_parent));
1413
1414		match check_ctx.verify_backed_candidate(
1415			&allowed_relay_parents,
1416			candidate.candidate(),
1417			latest_head_data.clone(),
1418		) {
1419			Ok(relay_parent_block_number) => {
1420				para_latest_context.insert(
1421					para_id,
1422					(
1423						candidate.candidate().commitments.head_data.clone(),
1424						relay_parent_block_number,
1425					),
1426				);
1427				true
1428			},
1429			Err(err) => {
1430				log::debug!(
1431					target: LOG_TARGET,
1432					"Backed candidate verification for candidate {:?} of paraid {:?} failed with {:?}",
1433					candidate_hash,
1434					para_id,
1435					err
1436				);
1437				false
1438			},
1439		}
1440	});
1441}
1442
1443/// Map candidates to scheduled cores.
1444/// If the para only has one scheduled core and one candidate supplied, map the candidate to the
1445/// single core. If the para has multiple cores scheduled, only map the candidates with core index.
1446/// Filter out the rest.
1447/// Also returns whether or not we dropped any candidates.
1448/// When dropping a candidate of a para, we must drop all subsequent candidates from that para
1449/// (because they form a chain).
1450fn map_candidates_to_cores<T: configuration::Config + scheduler::Config + inclusion::Config>(
1451	allowed_relay_parents: &AllowedRelayParentsTracker<T::Hash, BlockNumberFor<T>>,
1452	mut scheduled: BTreeMap<ParaId, BTreeSet<CoreIndex>>,
1453	candidates: BTreeMap<ParaId, Vec<BackedCandidate<T::Hash>>>,
1454) -> BTreeMap<ParaId, Vec<(BackedCandidate<T::Hash>, CoreIndex)>> {
1455	let mut backed_candidates_with_core = BTreeMap::new();
1456
1457	for (para_id, backed_candidates) in candidates.into_iter() {
1458		if backed_candidates.len() == 0 {
1459			defensive!("Backed candidates for paraid {} is empty.", para_id);
1460			continue
1461		}
1462
1463		let Some(scheduled_cores) = scheduled.get_mut(&para_id) else {
1464			log::debug!(
1465				target: LOG_TARGET,
1466				"Paraid: {:?} has no entry in scheduled cores but {} candidates were supplied.",
1467				para_id,
1468				backed_candidates.len()
1469			);
1470			continue
1471		};
1472
1473		// ParaIds without scheduled cores are silently filtered out.
1474		if scheduled_cores.len() == 0 {
1475			log::debug!(
1476				target: LOG_TARGET,
1477				"Paraid: {:?} has no scheduled cores but {} candidates were supplied.",
1478				para_id,
1479				backed_candidates.len()
1480			);
1481			continue
1482		}
1483
1484		// We must preserve the dependency order given in the input.
1485		let mut temp_backed_candidates = Vec::with_capacity(scheduled_cores.len());
1486
1487		for candidate in backed_candidates {
1488			if scheduled_cores.len() == 0 {
1489				// We've got candidates for all of this para's assigned cores. Move on to
1490				// the next para.
1491				log::debug!(
1492					target: LOG_TARGET,
1493					"Found enough candidates for paraid: {:?}.",
1494					candidate.descriptor().para_id()
1495				);
1496				break;
1497			}
1498
1499			if let Some(core_index) = get_core_index::<T>(allowed_relay_parents, &candidate) {
1500				if scheduled_cores.remove(&core_index) {
1501					temp_backed_candidates.push((candidate, core_index));
1502				} else {
1503					// if we got a candidate for a core index which is not scheduled, stop
1504					// the work for this para. the already processed candidate chain in
1505					// temp_backed_candidates is still fine though.
1506					log::debug!(
1507						target: LOG_TARGET,
1508						"Found a backed candidate {:?} with core index {}, which is not scheduled for paraid {:?}.",
1509						candidate.candidate().hash(),
1510						core_index.0,
1511						candidate.descriptor().para_id()
1512					);
1513
1514					break;
1515				}
1516			} else {
1517				// if we got a candidate which does not contain its core index, stop the
1518				// work for this para. the already processed candidate chain in
1519				// temp_backed_candidates is still fine though.
1520
1521				log::debug!(
1522					target: LOG_TARGET,
1523					"Found a backed candidate {:?} without core index information for para {:?}, dropping",
1524					candidate.candidate().hash(),
1525					candidate.descriptor().para_id()
1526				);
1527
1528				break;
1529			}
1530		}
1531
1532		if !temp_backed_candidates.is_empty() {
1533			backed_candidates_with_core
1534				.entry(para_id)
1535				.or_insert_with(|| vec![])
1536				.extend(temp_backed_candidates);
1537		}
1538	}
1539
1540	backed_candidates_with_core
1541}
1542
1543// Must be called only for candidates that have been sanitized already.
1544fn get_core_index<T: configuration::Config + scheduler::Config + inclusion::Config>(
1545	allowed_relay_parents: &AllowedRelayParentsTracker<T::Hash, BlockNumberFor<T>>,
1546	candidate: &BackedCandidate<T::Hash>,
1547) -> Option<CoreIndex> {
1548	candidate
1549		.candidate()
1550		.descriptor
1551		.core_index()
1552		.or_else(|| get_injected_core_index::<T>(allowed_relay_parents, &candidate))
1553}
1554
1555fn get_injected_core_index<T: configuration::Config + scheduler::Config + inclusion::Config>(
1556	allowed_relay_parents: &AllowedRelayParentsTracker<T::Hash, BlockNumberFor<T>>,
1557	candidate: &BackedCandidate<T::Hash>,
1558) -> Option<CoreIndex> {
1559	// After stripping the 8 bit extensions, the `validator_indices` field length is expected
1560	// to be equal to backing group size. If these don't match, the `CoreIndex` is badly encoded,
1561	// or not supported.
1562	let (validator_indices, Some(core_idx)) = candidate.validator_indices_and_core_index() else {
1563		return None
1564	};
1565
1566	let relay_parent_block_number =
1567		match allowed_relay_parents.acquire_info(candidate.descriptor().relay_parent(), None) {
1568			Some((_, block_num)) => block_num,
1569			None => {
1570				log::debug!(
1571					target: LOG_TARGET,
1572					"Relay parent {:?} for candidate {:?} is not in the allowed relay parents.",
1573					candidate.descriptor().relay_parent(),
1574					candidate.candidate().hash(),
1575				);
1576				return None
1577			},
1578		};
1579
1580	// Get the backing group of the candidate backed at `core_idx`.
1581	let group_idx = match scheduler::Pallet::<T>::group_assigned_to_core(
1582		core_idx,
1583		relay_parent_block_number + One::one(),
1584	) {
1585		Some(group_idx) => group_idx,
1586		None => {
1587			log::debug!(
1588				target: LOG_TARGET,
1589				"Can't get the group index for core idx {:?}.",
1590				core_idx,
1591			);
1592			return None
1593		},
1594	};
1595
1596	let group_validators = match scheduler::Pallet::<T>::group_validators(group_idx) {
1597		Some(validators) => validators,
1598		None => return None,
1599	};
1600
1601	if group_validators.len() == validator_indices.len() {
1602		Some(core_idx)
1603	} else {
1604		log::debug!(
1605			target: LOG_TARGET,
1606			"Expected validator_indices count different than the real one: {}, {} for candidate {:?}",
1607			group_validators.len(),
1608			validator_indices.len(),
1609			candidate.candidate().hash()
1610		);
1611
1612		None
1613	}
1614}