use crate::{
configuration,
disputes::DisputesHandler,
inclusion::{self, CandidateCheckContext},
initializer,
metrics::METRICS,
paras, scheduler,
shared::{self, AllowedRelayParentsTracker},
ParaId,
};
use alloc::{
collections::{btree_map::BTreeMap, btree_set::BTreeSet},
vec,
vec::Vec,
};
use bitvec::prelude::BitVec;
use core::result::Result;
use frame_support::{
defensive,
dispatch::{DispatchErrorWithPostInfo, PostDispatchInfo},
inherent::{InherentData, InherentIdentifier, MakeFatalError, ProvideInherent},
pallet_prelude::*,
traits::Randomness,
};
use frame_system::pallet_prelude::*;
use pallet_babe::{self, ParentBlockRandomness};
use polkadot_primitives::{
effective_minimum_backing_votes,
node_features::FeatureIndex,
vstaging::{
BackedCandidate, CandidateDescriptorVersion, CandidateReceiptV2 as CandidateReceipt,
InherentData as ParachainsInherentData, ScrapedOnChainVotes,
},
CandidateHash, CheckedDisputeStatementSet, CheckedMultiDisputeStatementSet, CoreIndex,
DisputeStatementSet, HeadData, MultiDisputeStatementSet, SessionIndex,
SignedAvailabilityBitfields, SigningContext, UncheckedSignedAvailabilityBitfield,
UncheckedSignedAvailabilityBitfields, ValidatorId, ValidatorIndex, ValidityAttestation,
PARACHAINS_INHERENT_IDENTIFIER,
};
use rand::{seq::SliceRandom, SeedableRng};
use scale_info::TypeInfo;
use sp_runtime::traits::{Header as HeaderT, One};
mod misc;
mod weights;
use self::weights::checked_multi_dispute_statement_sets_weight;
pub use self::{
misc::{IndexedRetain, IsSortedBy},
weights::{
backed_candidate_weight, backed_candidates_weight, dispute_statement_set_weight,
multi_dispute_statement_sets_weight, paras_inherent_total_weight, signed_bitfield_weight,
signed_bitfields_weight, TestWeightInfo, WeightInfo,
},
};
#[cfg(feature = "runtime-benchmarks")]
mod benchmarking;
#[cfg(test)]
mod tests;
const LOG_TARGET: &str = "runtime::inclusion-inherent";
#[derive(Default, PartialEq, Eq, Clone, Encode, Decode, RuntimeDebug, TypeInfo)]
pub(crate) struct DisputedBitfield(pub(crate) BitVec<u8, bitvec::order::Lsb0>);
impl From<BitVec<u8, bitvec::order::Lsb0>> for DisputedBitfield {
fn from(inner: BitVec<u8, bitvec::order::Lsb0>) -> Self {
Self(inner)
}
}
#[cfg(test)]
impl DisputedBitfield {
pub fn zeros(n: usize) -> Self {
Self::from(BitVec::<u8, bitvec::order::Lsb0>::repeat(false, n))
}
}
pub use pallet::*;
#[frame_support::pallet]
pub mod pallet {
use super::*;
#[pallet::pallet]
#[pallet::without_storage_info]
pub struct Pallet<T>(_);
#[pallet::config]
#[pallet::disable_frame_system_supertrait_check]
pub trait Config:
inclusion::Config + scheduler::Config + initializer::Config + pallet_babe::Config
{
type WeightInfo: WeightInfo;
}
#[pallet::error]
pub enum Error<T> {
TooManyInclusionInherents,
InvalidParentHeader,
InherentDataFilteredDuringExecution,
UnscheduledCandidate,
}
#[pallet::storage]
pub(crate) type Included<T> = StorageValue<_, ()>;
#[pallet::storage]
pub type OnChainVotes<T: Config> = StorageValue<_, ScrapedOnChainVotes<T::Hash>>;
pub(crate) fn set_scrapable_on_chain_disputes<T: Config>(
session: SessionIndex,
checked_disputes: CheckedMultiDisputeStatementSet,
) {
crate::paras_inherent::OnChainVotes::<T>::mutate(move |value| {
let disputes =
checked_disputes.into_iter().map(DisputeStatementSet::from).collect::<Vec<_>>();
let backing_validators_per_candidate = match value.take() {
Some(v) => v.backing_validators_per_candidate,
None => Vec::new(),
};
*value = Some(ScrapedOnChainVotes::<T::Hash> {
backing_validators_per_candidate,
disputes,
session,
});
})
}
pub(crate) fn set_scrapable_on_chain_backings<T: Config>(
session: SessionIndex,
backing_validators_per_candidate: Vec<(
CandidateReceipt<T::Hash>,
Vec<(ValidatorIndex, ValidityAttestation)>,
)>,
) {
crate::paras_inherent::OnChainVotes::<T>::mutate(move |value| {
let disputes = match value.take() {
Some(v) => v.disputes,
None => MultiDisputeStatementSet::default(),
};
*value = Some(ScrapedOnChainVotes::<T::Hash> {
backing_validators_per_candidate,
disputes,
session,
});
})
}
#[pallet::hooks]
impl<T: Config> Hooks<BlockNumberFor<T>> for Pallet<T> {
fn on_initialize(_: BlockNumberFor<T>) -> Weight {
T::DbWeight::get().reads_writes(1, 1) }
fn on_finalize(_: BlockNumberFor<T>) {
if Included::<T>::take().is_none() {
panic!("Bitfields and heads must be included every block");
}
}
}
#[pallet::inherent]
impl<T: Config> ProvideInherent for Pallet<T> {
type Call = Call<T>;
type Error = MakeFatalError<()>;
const INHERENT_IDENTIFIER: InherentIdentifier = PARACHAINS_INHERENT_IDENTIFIER;
fn create_inherent(data: &InherentData) -> Option<Self::Call> {
let inherent_data = Self::create_inherent_inner(data)?;
Some(Call::enter { data: inherent_data })
}
fn is_inherent(call: &Self::Call) -> bool {
matches!(call, Call::enter { .. })
}
}
#[pallet::call]
impl<T: Config> Pallet<T> {
#[pallet::call_index(0)]
#[pallet::weight((
paras_inherent_total_weight::<T>(
data.backed_candidates.as_slice(),
&data.bitfields,
&data.disputes,
),
DispatchClass::Mandatory,
))]
pub fn enter(
origin: OriginFor<T>,
data: ParachainsInherentData<HeaderFor<T>>,
) -> DispatchResultWithPostInfo {
ensure_none(origin)?;
ensure!(!Included::<T>::exists(), Error::<T>::TooManyInclusionInherents);
Included::<T>::set(Some(()));
let initial_data = data.clone();
Self::process_inherent_data(data).and_then(|(processed, post_info)| {
ensure!(initial_data == processed, Error::<T>::InherentDataFilteredDuringExecution);
Ok(post_info)
})
}
}
}
impl<T: Config> Pallet<T> {
fn create_inherent_inner(data: &InherentData) -> Option<ParachainsInherentData<HeaderFor<T>>> {
let parachains_inherent_data = match data.get_data(&Self::INHERENT_IDENTIFIER) {
Ok(Some(d)) => d,
Ok(None) => return None,
Err(_) => {
log::warn!(target: LOG_TARGET, "ParachainsInherentData failed to decode");
return None
},
};
match Self::process_inherent_data(parachains_inherent_data) {
Ok((processed, _)) => Some(processed),
Err(err) => {
log::warn!(target: LOG_TARGET, "Processing inherent data failed: {:?}", err);
None
},
}
}
fn process_inherent_data(
data: ParachainsInherentData<HeaderFor<T>>,
) -> Result<(ParachainsInherentData<HeaderFor<T>>, PostDispatchInfo), DispatchErrorWithPostInfo>
{
#[cfg(feature = "runtime-metrics")]
sp_io::init_tracing();
let ParachainsInherentData {
mut bitfields,
mut backed_candidates,
parent_header,
mut disputes,
} = data;
log::debug!(
target: LOG_TARGET,
"[process_inherent_data] bitfields.len(): {}, backed_candidates.len(): {}, disputes.len() {}",
bitfields.len(),
backed_candidates.len(),
disputes.len()
);
let parent_hash = frame_system::Pallet::<T>::parent_hash();
ensure!(
parent_header.hash().as_ref() == parent_hash.as_ref(),
Error::<T>::InvalidParentHeader,
);
let now = frame_system::Pallet::<T>::block_number();
let config = configuration::ActiveConfig::<T>::get();
{
let parent_number = now - One::one();
let parent_storage_root = *parent_header.state_root();
shared::AllowedRelayParents::<T>::mutate(|tracker| {
tracker.update(
parent_hash,
parent_storage_root,
scheduler::ClaimQueue::<T>::get()
.into_iter()
.map(|(core_index, paras)| {
(core_index, paras.into_iter().map(|e| e.para_id()).collect())
})
.collect(),
parent_number,
config.async_backing_params.allowed_ancestry_len,
);
});
}
let candidates_weight = backed_candidates_weight::<T>(&backed_candidates);
let bitfields_weight = signed_bitfields_weight::<T>(&bitfields);
let disputes_weight = multi_dispute_statement_sets_weight::<T>(&disputes);
let weight_before_filtering = candidates_weight + bitfields_weight + disputes_weight;
METRICS.on_before_filter(weight_before_filtering.ref_time());
log::debug!(target: LOG_TARGET, "Size before filter: {}, candidates + bitfields: {}, disputes: {}", weight_before_filtering.proof_size(), candidates_weight.proof_size() + bitfields_weight.proof_size(), disputes_weight.proof_size());
log::debug!(target: LOG_TARGET, "Time weight before filter: {}, candidates + bitfields: {}, disputes: {}", weight_before_filtering.ref_time(), candidates_weight.ref_time() + bitfields_weight.ref_time(), disputes_weight.ref_time());
let current_session = shared::CurrentSessionIndex::<T>::get();
let expected_bits = scheduler::Pallet::<T>::num_availability_cores();
let validator_public = shared::ActiveValidatorKeys::<T>::get();
let max_block_weight = {
let dispatch_class = DispatchClass::Mandatory;
let max_block_weight_full = <T as frame_system::Config>::BlockWeights::get();
log::debug!(target: LOG_TARGET, "Max block weight: {}", max_block_weight_full.max_block);
let max_weight = max_block_weight_full
.per_class
.get(dispatch_class)
.max_total
.unwrap_or(max_block_weight_full.max_block);
log::debug!(target: LOG_TARGET, "Used max block time weight: {}", max_weight);
let max_block_size_full = <T as frame_system::Config>::BlockLength::get();
let max_block_size = max_block_size_full.max.get(dispatch_class);
log::debug!(target: LOG_TARGET, "Used max block size: {}", max_block_size);
max_weight.set_proof_size(*max_block_size as u64)
};
log::debug!(target: LOG_TARGET, "Used max block weight: {}", max_block_weight);
let entropy = compute_entropy::<T>(parent_hash);
let mut rng = rand_chacha::ChaChaRng::from_seed(entropy.into());
if let Err(()) = T::DisputesHandler::deduplicate_and_sort_dispute_data(&mut disputes) {
log::debug!(target: LOG_TARGET, "Found duplicate statement sets, retaining the first");
}
let post_conclusion_acceptance_period = config.dispute_post_conclusion_acceptance_period;
let dispute_statement_set_valid = move |set: DisputeStatementSet| {
T::DisputesHandler::filter_dispute_data(set, post_conclusion_acceptance_period)
};
let (checked_disputes_sets, checked_disputes_sets_consumed_weight) =
limit_and_sanitize_disputes::<T, _>(
disputes,
dispute_statement_set_valid,
max_block_weight,
);
let mut all_weight_after = {
let non_disputes_weight = apply_weight_limit::<T>(
&mut backed_candidates,
&mut bitfields,
max_block_weight.saturating_sub(checked_disputes_sets_consumed_weight),
&mut rng,
);
let all_weight_after =
non_disputes_weight.saturating_add(checked_disputes_sets_consumed_weight);
METRICS.on_after_filter(all_weight_after.ref_time());
log::debug!(
target: LOG_TARGET,
"[process_inherent_data] after filter: bitfields.len(): {}, backed_candidates.len(): {}, checked_disputes_sets.len() {}",
bitfields.len(),
backed_candidates.len(),
checked_disputes_sets.len()
);
log::debug!(target: LOG_TARGET, "Size after filter: {}, candidates + bitfields: {}, disputes: {}", all_weight_after.proof_size(), non_disputes_weight.proof_size(), checked_disputes_sets_consumed_weight.proof_size());
log::debug!(target: LOG_TARGET, "Time weight after filter: {}, candidates + bitfields: {}, disputes: {}", all_weight_after.ref_time(), non_disputes_weight.ref_time(), checked_disputes_sets_consumed_weight.ref_time());
if all_weight_after.any_gt(max_block_weight) {
log::warn!(target: LOG_TARGET, "Post weight limiting weight is still too large, time: {}, size: {}", all_weight_after.ref_time(), all_weight_after.proof_size());
}
all_weight_after
};
if let Err(e) =
T::DisputesHandler::process_checked_multi_dispute_data(&checked_disputes_sets)
{
log::warn!(target: LOG_TARGET, "MultiDisputesData failed to update: {:?}", e);
};
METRICS.on_disputes_imported(checked_disputes_sets.len() as u64);
set_scrapable_on_chain_disputes::<T>(current_session, checked_disputes_sets.clone());
if T::DisputesHandler::is_frozen() {
METRICS.on_relay_chain_freeze();
let disputes = checked_disputes_sets
.into_iter()
.map(|checked| checked.into())
.collect::<Vec<_>>();
let processed = ParachainsInherentData {
bitfields: Vec::new(),
backed_candidates: Vec::new(),
disputes,
parent_header,
};
return Ok((processed, Some(checked_disputes_sets_consumed_weight).into()))
}
let current_concluded_invalid_disputes = checked_disputes_sets
.iter()
.map(AsRef::as_ref)
.filter(|dss| dss.session == current_session)
.map(|dss| (dss.session, dss.candidate_hash))
.filter(|(session, candidate)| {
<T>::DisputesHandler::concluded_invalid(*session, *candidate)
})
.map(|(_session, candidate)| candidate)
.collect::<BTreeSet<CandidateHash>>();
let (freed_disputed, concluded_invalid_hashes): (Vec<CoreIndex>, BTreeSet<CandidateHash>) =
inclusion::Pallet::<T>::free_disputed(¤t_concluded_invalid_disputes)
.into_iter()
.unzip();
let disputed_bitfield = create_disputed_bitfield(expected_bits, freed_disputed.iter());
let bitfields = sanitize_bitfields::<T>(
bitfields,
disputed_bitfield,
expected_bits,
parent_hash,
current_session,
&validator_public[..],
);
METRICS.on_bitfields_processed(bitfields.len() as u64);
let (enact_weight, freed_concluded) =
inclusion::Pallet::<T>::update_pending_availability_and_get_freed_cores(
&validator_public[..],
bitfields.clone(),
);
all_weight_after.saturating_accrue(enact_weight);
log::debug!(
target: LOG_TARGET,
"Enacting weight: {}, all weight: {}",
enact_weight.ref_time(),
all_weight_after.ref_time(),
);
if all_weight_after.any_gt(max_block_weight) {
log::warn!(
target: LOG_TARGET,
"Overweight para inherent data after enacting the candidates {:?}: {} > {}",
parent_hash,
all_weight_after,
max_block_weight,
);
}
for (_, candidate_hash) in &freed_concluded {
T::DisputesHandler::note_included(current_session, *candidate_hash, now);
}
METRICS.on_candidates_included(freed_concluded.len() as u64);
let freed_timeout = if scheduler::Pallet::<T>::availability_timeout_check_required() {
inclusion::Pallet::<T>::free_timedout()
} else {
Vec::new()
};
if !freed_timeout.is_empty() {
log::debug!(target: LOG_TARGET, "Evicted timed out cores: {:?}", freed_timeout);
}
let (candidate_receipt_with_backing_validator_indices, backed_candidates_with_core) =
Self::back_candidates(concluded_invalid_hashes, backed_candidates)?;
set_scrapable_on_chain_backings::<T>(
current_session,
candidate_receipt_with_backing_validator_indices,
);
let disputes = checked_disputes_sets
.into_iter()
.map(|checked| checked.into())
.collect::<Vec<_>>();
let bitfields = bitfields.into_iter().map(|v| v.into_unchecked()).collect();
let count = backed_candidates_with_core.len();
let processed = ParachainsInherentData {
bitfields,
backed_candidates: backed_candidates_with_core.into_iter().fold(
Vec::with_capacity(count),
|mut acc, (_id, candidates)| {
acc.extend(candidates.into_iter().map(|(c, _)| c));
acc
},
),
disputes,
parent_header,
};
Ok((processed, Some(all_weight_after).into()))
}
fn back_candidates(
concluded_invalid_hashes: BTreeSet<CandidateHash>,
backed_candidates: Vec<BackedCandidate<T::Hash>>,
) -> Result<
(
Vec<(CandidateReceipt<T::Hash>, Vec<(ValidatorIndex, ValidityAttestation)>)>,
BTreeMap<ParaId, Vec<(BackedCandidate<T::Hash>, CoreIndex)>>,
),
DispatchErrorWithPostInfo,
> {
let allowed_relay_parents = shared::AllowedRelayParents::<T>::get();
let upcoming_new_session = initializer::Pallet::<T>::upcoming_session_change();
METRICS.on_candidates_processed_total(backed_candidates.len() as u64);
if !upcoming_new_session {
let occupied_cores =
inclusion::Pallet::<T>::get_occupied_cores().map(|(core, _)| core).collect();
let mut eligible: BTreeMap<ParaId, BTreeSet<CoreIndex>> = BTreeMap::new();
let mut total_eligible_cores = 0;
for (core_idx, para_id) in Self::eligible_paras(&occupied_cores) {
total_eligible_cores += 1;
log::trace!(target: LOG_TARGET, "Found eligible para {:?} on core {:?}", para_id, core_idx);
eligible.entry(para_id).or_default().insert(core_idx);
}
let node_features = configuration::ActiveConfig::<T>::get().node_features;
let core_index_enabled = node_features
.get(FeatureIndex::ElasticScalingMVP as usize)
.map(|b| *b)
.unwrap_or(false);
let allow_v2_receipts = node_features
.get(FeatureIndex::CandidateReceiptV2 as usize)
.map(|b| *b)
.unwrap_or(false);
let backed_candidates_with_core = sanitize_backed_candidates::<T>(
backed_candidates,
&allowed_relay_parents,
concluded_invalid_hashes,
eligible,
core_index_enabled,
allow_v2_receipts,
);
let count = count_backed_candidates(&backed_candidates_with_core);
ensure!(count <= total_eligible_cores, Error::<T>::UnscheduledCandidate);
METRICS.on_candidates_sanitized(count as u64);
let candidate_receipt_with_backing_validator_indices =
inclusion::Pallet::<T>::process_candidates(
&allowed_relay_parents,
&backed_candidates_with_core,
scheduler::Pallet::<T>::group_validators,
core_index_enabled,
)?;
scheduler::Pallet::<T>::advance_claim_queue(&occupied_cores);
Ok((candidate_receipt_with_backing_validator_indices, backed_candidates_with_core))
} else {
log::debug!(
target: LOG_TARGET,
"Upcoming session change, not backing any new candidates."
);
Ok((vec![], BTreeMap::new()))
}
}
pub(crate) fn eligible_paras<'a>(
occupied_cores: &'a BTreeSet<CoreIndex>,
) -> impl Iterator<Item = (CoreIndex, ParaId)> + 'a {
scheduler::ClaimQueue::<T>::get().into_iter().filter_map(|(core_idx, queue)| {
if occupied_cores.contains(&core_idx) {
return None
}
let next_scheduled = queue.front()?;
Some((core_idx, next_scheduled.para_id()))
})
}
}
pub(super) fn create_disputed_bitfield<'a, I>(
expected_bits: usize,
freed_cores: I,
) -> DisputedBitfield
where
I: 'a + IntoIterator<Item = &'a CoreIndex>,
{
let mut bitvec = BitVec::repeat(false, expected_bits);
for core_idx in freed_cores {
let core_idx = core_idx.0 as usize;
if core_idx < expected_bits {
bitvec.set(core_idx, true);
}
}
DisputedBitfield::from(bitvec)
}
fn random_sel<X, F: Fn(&X) -> Weight>(
rng: &mut rand_chacha::ChaChaRng,
selectables: &[X],
mut preferred_indices: Vec<usize>,
weight_fn: F,
weight_limit: Weight,
) -> (Weight, Vec<usize>) {
if selectables.is_empty() {
return (Weight::zero(), Vec::new())
}
let mut indices = (0..selectables.len())
.into_iter()
.filter(|idx| !preferred_indices.contains(idx))
.collect::<Vec<_>>();
let mut picked_indices = Vec::with_capacity(selectables.len().saturating_sub(1));
let mut weight_acc = Weight::zero();
preferred_indices.shuffle(rng);
for preferred_idx in preferred_indices {
if let Some(item) = selectables.get(preferred_idx) {
let updated = weight_acc.saturating_add(weight_fn(item));
if updated.any_gt(weight_limit) {
continue
}
weight_acc = updated;
picked_indices.push(preferred_idx);
}
}
indices.shuffle(rng);
for idx in indices {
let item = &selectables[idx];
let updated = weight_acc.saturating_add(weight_fn(item));
if updated.any_gt(weight_limit) {
continue
}
weight_acc = updated;
picked_indices.push(idx);
}
picked_indices.sort_unstable();
(weight_acc, picked_indices)
}
pub(crate) fn apply_weight_limit<T: Config + inclusion::Config>(
candidates: &mut Vec<BackedCandidate<<T>::Hash>>,
bitfields: &mut UncheckedSignedAvailabilityBitfields,
max_consumable_weight: Weight,
rng: &mut rand_chacha::ChaChaRng,
) -> Weight {
let total_candidates_weight = backed_candidates_weight::<T>(candidates.as_slice());
let total_bitfields_weight = signed_bitfields_weight::<T>(&bitfields);
let total = total_bitfields_weight.saturating_add(total_candidates_weight);
if max_consumable_weight.all_gte(total) {
return total
}
let mut chained_candidates: Vec<Vec<_>> = Vec::new();
let mut current_para_id = None;
for candidate in core::mem::take(candidates).into_iter() {
let candidate_para_id = candidate.descriptor().para_id();
if Some(candidate_para_id) == current_para_id {
let chain = chained_candidates
.last_mut()
.expect("if the current_para_id is Some, then vec is not empty; qed");
chain.push(candidate);
} else {
current_para_id = Some(candidate_para_id);
chained_candidates.push(vec![candidate]);
}
}
let preferred_chain_indices = chained_candidates
.iter()
.enumerate()
.filter_map(|(idx, candidates)| {
if candidates
.iter()
.any(|candidate| candidate.candidate().commitments.new_validation_code.is_some())
{
Some(idx)
} else {
None
}
})
.collect::<Vec<usize>>();
if let Some(max_consumable_by_candidates) =
max_consumable_weight.checked_sub(&total_bitfields_weight)
{
let (acc_candidate_weight, chained_indices) =
random_sel::<Vec<BackedCandidate<<T as frame_system::Config>::Hash>>, _>(
rng,
&chained_candidates,
preferred_chain_indices,
|candidates| backed_candidates_weight::<T>(&candidates),
max_consumable_by_candidates,
);
log::debug!(target: LOG_TARGET, "Indices Candidates: {:?}, size: {}", chained_indices, candidates.len());
chained_candidates
.indexed_retain(|idx, _backed_candidates| chained_indices.binary_search(&idx).is_ok());
let total_consumed = acc_candidate_weight.saturating_add(total_bitfields_weight);
*candidates = chained_candidates.into_iter().flatten().collect::<Vec<_>>();
return total_consumed
}
candidates.clear();
let (total_consumed, indices) = random_sel::<UncheckedSignedAvailabilityBitfield, _>(
rng,
&bitfields,
vec![],
|bitfield| signed_bitfield_weight::<T>(&bitfield),
max_consumable_weight,
);
log::debug!(target: LOG_TARGET, "Indices Bitfields: {:?}, size: {}", indices, bitfields.len());
bitfields.indexed_retain(|idx, _bitfield| indices.binary_search(&idx).is_ok());
total_consumed
}
pub(crate) fn sanitize_bitfields<T: crate::inclusion::Config>(
unchecked_bitfields: UncheckedSignedAvailabilityBitfields,
disputed_bitfield: DisputedBitfield,
expected_bits: usize,
parent_hash: T::Hash,
session_index: SessionIndex,
validators: &[ValidatorId],
) -> SignedAvailabilityBitfields {
let mut bitfields = Vec::with_capacity(unchecked_bitfields.len());
let mut last_index: Option<ValidatorIndex> = None;
if disputed_bitfield.0.len() != expected_bits {
log::error!(target: LOG_TARGET, "BUG: disputed_bitfield != expected_bits");
return vec![]
}
let all_zeros = BitVec::<u8, bitvec::order::Lsb0>::repeat(false, expected_bits);
let signing_context = SigningContext { parent_hash, session_index };
for unchecked_bitfield in unchecked_bitfields {
if unchecked_bitfield.unchecked_payload().0.len() != expected_bits {
log::trace!(
target: LOG_TARGET,
"bad bitfield length: {} != {:?}",
unchecked_bitfield.unchecked_payload().0.len(),
expected_bits,
);
continue
}
if unchecked_bitfield.unchecked_payload().0.clone() & disputed_bitfield.0.clone() !=
all_zeros
{
log::trace!(
target: LOG_TARGET,
"bitfield contains disputed cores: {:?}",
unchecked_bitfield.unchecked_payload().0.clone() & disputed_bitfield.0.clone()
);
continue
}
let validator_index = unchecked_bitfield.unchecked_validator_index();
if !last_index.map_or(true, |last_index: ValidatorIndex| last_index < validator_index) {
log::trace!(
target: LOG_TARGET,
"bitfield validator index is not greater than last: !({:?} < {})",
last_index.as_ref().map(|x| x.0),
validator_index.0
);
continue
}
if unchecked_bitfield.unchecked_validator_index().0 as usize >= validators.len() {
log::trace!(
target: LOG_TARGET,
"bitfield validator index is out of bounds: {} >= {}",
validator_index.0,
validators.len(),
);
continue
}
let validator_public = &validators[validator_index.0 as usize];
if let Ok(signed_bitfield) =
unchecked_bitfield.try_into_checked(&signing_context, validator_public)
{
bitfields.push(signed_bitfield);
METRICS.on_valid_bitfield_signature();
} else {
log::warn!(target: LOG_TARGET, "Invalid bitfield signature");
METRICS.on_invalid_bitfield_signature();
};
last_index = Some(validator_index);
}
bitfields
}
fn sanitize_backed_candidate_v2<T: crate::inclusion::Config>(
candidate: &BackedCandidate<T::Hash>,
allowed_relay_parents: &AllowedRelayParentsTracker<T::Hash, BlockNumberFor<T>>,
allow_v2_receipts: bool,
) -> bool {
if candidate.descriptor().version() == CandidateDescriptorVersion::V1 {
return true
}
if !allow_v2_receipts {
log::debug!(
target: LOG_TARGET,
"V2 candidate descriptors not allowed. Dropping candidate {:?} for paraid {:?}.",
candidate.candidate().hash(),
candidate.descriptor().para_id()
);
return false
}
let Some(session_index) = candidate.descriptor().session_index() else {
log::debug!(
target: LOG_TARGET,
"Invalid V2 candidate receipt {:?} for paraid {:?}, missing session index.",
candidate.candidate().hash(),
candidate.descriptor().para_id(),
);
return false
};
if session_index != shared::CurrentSessionIndex::<T>::get() {
log::debug!(
target: LOG_TARGET,
"Dropping V2 candidate receipt {:?} for paraid {:?}, invalid session index {}, current session {}",
candidate.candidate().hash(),
candidate.descriptor().para_id(),
session_index,
shared::CurrentSessionIndex::<T>::get()
);
return false
}
let Some((rp_info, _)) =
allowed_relay_parents.acquire_info(candidate.descriptor().relay_parent(), None)
else {
log::debug!(
target: LOG_TARGET,
"Relay parent {:?} for candidate {:?} is not in the allowed relay parents.",
candidate.descriptor().relay_parent(),
candidate.candidate().hash(),
);
return false
};
if let Err(err) = candidate.candidate().check_core_index(&rp_info.claim_queue) {
log::debug!(
target: LOG_TARGET,
"Dropping candidate {:?} for paraid {:?}, {:?}",
candidate.candidate().hash(),
candidate.descriptor().para_id(),
err,
);
return false
}
true
}
fn sanitize_backed_candidates<T: crate::inclusion::Config>(
backed_candidates: Vec<BackedCandidate<T::Hash>>,
allowed_relay_parents: &AllowedRelayParentsTracker<T::Hash, BlockNumberFor<T>>,
concluded_invalid_with_descendants: BTreeSet<CandidateHash>,
scheduled: BTreeMap<ParaId, BTreeSet<CoreIndex>>,
core_index_enabled: bool,
allow_v2_receipts: bool,
) -> BTreeMap<ParaId, Vec<(BackedCandidate<T::Hash>, CoreIndex)>> {
let mut candidates_per_para: BTreeMap<ParaId, Vec<_>> = BTreeMap::new();
for candidate in backed_candidates {
if !sanitize_backed_candidate_v2::<T>(&candidate, allowed_relay_parents, allow_v2_receipts)
{
continue
}
candidates_per_para
.entry(candidate.descriptor().para_id())
.or_default()
.push(candidate);
}
filter_unchained_candidates::<T>(&mut candidates_per_para, allowed_relay_parents);
retain_candidates::<T, _, _>(&mut candidates_per_para, |_, candidate| {
let keep = !concluded_invalid_with_descendants.contains(&candidate.candidate().hash());
if !keep {
log::debug!(
target: LOG_TARGET,
"Found backed candidate {:?} which was concluded invalid or is a descendant of a concluded invalid candidate, for paraid {:?}.",
candidate.candidate().hash(),
candidate.descriptor().para_id()
);
}
keep
});
let mut backed_candidates_with_core = map_candidates_to_cores::<T>(
&allowed_relay_parents,
scheduled,
core_index_enabled,
candidates_per_para,
);
filter_backed_statements_from_disabled_validators::<T>(
&mut backed_candidates_with_core,
&allowed_relay_parents,
core_index_enabled,
);
backed_candidates_with_core
}
fn count_backed_candidates<B>(backed_candidates: &BTreeMap<ParaId, Vec<B>>) -> usize {
backed_candidates.values().map(|c| c.len()).sum()
}
fn compute_entropy<T: Config>(parent_hash: T::Hash) -> [u8; 32] {
const CANDIDATE_SEED_SUBJECT: [u8; 32] = *b"candidate-seed-selection-subject";
let vrf_random = ParentBlockRandomness::<T>::random(&CANDIDATE_SEED_SUBJECT[..]).0;
let mut entropy: [u8; 32] = CANDIDATE_SEED_SUBJECT;
if let Some(vrf_random) = vrf_random {
entropy.as_mut().copy_from_slice(vrf_random.as_ref());
} else {
log::warn!(target: LOG_TARGET, "ParentBlockRandomness did not provide entropy");
entropy.as_mut().copy_from_slice(parent_hash.as_ref());
}
entropy
}
fn limit_and_sanitize_disputes<
T: Config,
CheckValidityFn: FnMut(DisputeStatementSet) -> Option<CheckedDisputeStatementSet>,
>(
disputes: MultiDisputeStatementSet,
mut dispute_statement_set_valid: CheckValidityFn,
max_consumable_weight: Weight,
) -> (Vec<CheckedDisputeStatementSet>, Weight) {
let disputes_weight = multi_dispute_statement_sets_weight::<T>(&disputes);
if disputes_weight.any_gt(max_consumable_weight) {
log::debug!(target: LOG_TARGET, "Above max consumable weight: {}/{}", disputes_weight, max_consumable_weight);
let mut checked_acc = Vec::<CheckedDisputeStatementSet>::with_capacity(disputes.len());
let mut weight_acc = Weight::zero();
disputes.into_iter().for_each(|dss| {
let dispute_weight = dispute_statement_set_weight::<T, &DisputeStatementSet>(&dss);
let updated = weight_acc.saturating_add(dispute_weight);
if max_consumable_weight.all_gte(updated) {
weight_acc = updated;
if let Some(checked) = dispute_statement_set_valid(dss) {
checked_acc.push(checked);
}
}
});
(checked_acc, weight_acc)
} else {
let checked = disputes
.into_iter()
.filter_map(|dss| dispute_statement_set_valid(dss))
.collect::<Vec<CheckedDisputeStatementSet>>();
let checked_disputes_weight = checked_multi_dispute_statement_sets_weight::<T>(&checked);
(checked, checked_disputes_weight)
}
}
fn retain_candidates<
T: inclusion::Config + paras::Config + inclusion::Config,
F: FnMut(ParaId, &mut C) -> bool,
C,
>(
candidates_per_para: &mut BTreeMap<ParaId, Vec<C>>,
mut pred: F,
) {
for (para_id, candidates) in candidates_per_para.iter_mut() {
let mut latest_valid_idx = None;
for (idx, candidate) in candidates.iter_mut().enumerate() {
if pred(*para_id, candidate) {
latest_valid_idx = Some(idx);
} else {
break
}
}
if let Some(latest_valid_idx) = latest_valid_idx {
candidates.truncate(latest_valid_idx + 1);
} else {
candidates.clear();
}
}
candidates_per_para.retain(|_, c| !c.is_empty());
}
fn filter_backed_statements_from_disabled_validators<
T: shared::Config + scheduler::Config + inclusion::Config,
>(
backed_candidates_with_core: &mut BTreeMap<
ParaId,
Vec<(BackedCandidate<<T as frame_system::Config>::Hash>, CoreIndex)>,
>,
allowed_relay_parents: &AllowedRelayParentsTracker<T::Hash, BlockNumberFor<T>>,
core_index_enabled: bool,
) {
let disabled_validators =
BTreeSet::<_>::from_iter(shared::Pallet::<T>::disabled_validators().into_iter());
if disabled_validators.is_empty() {
return
}
let minimum_backing_votes = configuration::ActiveConfig::<T>::get().minimum_backing_votes;
retain_candidates::<T, _, _>(backed_candidates_with_core, |para_id, (bc, core_idx)| {
let (validator_indices, maybe_core_index) =
bc.validator_indices_and_core_index(core_index_enabled);
let mut validator_indices = BitVec::<_>::from(validator_indices);
let relay_parent_block_number = match allowed_relay_parents
.acquire_info(bc.descriptor().relay_parent(), None)
{
Some((_, block_num)) => block_num,
None => {
log::debug!(
target: LOG_TARGET,
"Relay parent {:?} for candidate is not in the allowed relay parents. Dropping the candidate.",
bc.descriptor().relay_parent()
);
return false
},
};
let group_idx = match scheduler::Pallet::<T>::group_assigned_to_core(
*core_idx,
relay_parent_block_number + One::one(),
) {
Some(group_idx) => group_idx,
None => {
log::debug!(target: LOG_TARGET, "Can't get the group index for core idx {:?}. Dropping the candidate.", core_idx);
return false
},
};
let validator_group = match scheduler::Pallet::<T>::group_validators(group_idx) {
Some(validator_group) => validator_group,
None => {
log::debug!(target: LOG_TARGET, "Can't get the validators from group {:?}. Dropping the candidate.", group_idx);
return false
},
};
let disabled_indices = BitVec::<u8, bitvec::order::Lsb0>::from_iter(
validator_group.iter().map(|idx| disabled_validators.contains(idx)),
);
let indices_to_drop = disabled_indices.clone() & &validator_indices;
validator_indices &= !disabled_indices;
bc.set_validator_indices_and_core_index(validator_indices, maybe_core_index);
for idx in indices_to_drop.iter_ones().rev() {
bc.validity_votes_mut().remove(idx);
}
if bc.validity_votes().len() <
effective_minimum_backing_votes(validator_group.len(), minimum_backing_votes)
{
log::debug!(
target: LOG_TARGET,
"Dropping candidate {:?} of paraid {:?} because it was left with too few backing votes after votes from disabled validators were filtered.",
bc.candidate().hash(),
para_id
);
return false
}
true
});
}
fn filter_unchained_candidates<T: inclusion::Config + paras::Config + inclusion::Config>(
candidates: &mut BTreeMap<ParaId, Vec<BackedCandidate<T::Hash>>>,
allowed_relay_parents: &AllowedRelayParentsTracker<T::Hash, BlockNumberFor<T>>,
) {
let mut para_latest_context: BTreeMap<ParaId, (HeadData, BlockNumberFor<T>)> = BTreeMap::new();
for para_id in candidates.keys() {
let Some(latest_head_data) = inclusion::Pallet::<T>::para_latest_head_data(¶_id) else {
defensive!("Latest included head data for paraid {:?} is None", para_id);
continue
};
let Some(latest_relay_parent) = inclusion::Pallet::<T>::para_most_recent_context(¶_id)
else {
defensive!("Latest relay parent for paraid {:?} is None", para_id);
continue
};
para_latest_context.insert(*para_id, (latest_head_data, latest_relay_parent));
}
let mut para_visited_candidates: BTreeMap<ParaId, BTreeSet<CandidateHash>> = BTreeMap::new();
retain_candidates::<T, _, _>(candidates, |para_id, candidate| {
let Some((latest_head_data, latest_relay_parent)) = para_latest_context.get(¶_id)
else {
return false
};
let candidate_hash = candidate.candidate().hash();
let visited_candidates =
para_visited_candidates.entry(para_id).or_insert_with(|| BTreeSet::new());
if visited_candidates.contains(&candidate_hash) {
log::debug!(
target: LOG_TARGET,
"Found duplicate candidates for paraid {:?}. Dropping the candidates with hash {:?}",
para_id,
candidate_hash
);
return false
} else {
visited_candidates.insert(candidate_hash);
}
let check_ctx = CandidateCheckContext::<T>::new(Some(*latest_relay_parent));
match check_ctx.verify_backed_candidate(
&allowed_relay_parents,
candidate.candidate(),
latest_head_data.clone(),
) {
Ok(relay_parent_block_number) => {
para_latest_context.insert(
para_id,
(
candidate.candidate().commitments.head_data.clone(),
relay_parent_block_number,
),
);
true
},
Err(err) => {
log::debug!(
target: LOG_TARGET,
"Backed candidate verification for candidate {:?} of paraid {:?} failed with {:?}",
candidate_hash,
para_id,
err
);
false
},
}
});
}
fn map_candidates_to_cores<T: configuration::Config + scheduler::Config + inclusion::Config>(
allowed_relay_parents: &AllowedRelayParentsTracker<T::Hash, BlockNumberFor<T>>,
mut scheduled: BTreeMap<ParaId, BTreeSet<CoreIndex>>,
core_index_enabled: bool,
candidates: BTreeMap<ParaId, Vec<BackedCandidate<T::Hash>>>,
) -> BTreeMap<ParaId, Vec<(BackedCandidate<T::Hash>, CoreIndex)>> {
let mut backed_candidates_with_core = BTreeMap::new();
for (para_id, backed_candidates) in candidates.into_iter() {
if backed_candidates.len() == 0 {
defensive!("Backed candidates for paraid {} is empty.", para_id);
continue
}
let Some(scheduled_cores) = scheduled.get_mut(¶_id) else {
log::debug!(
target: LOG_TARGET,
"Paraid: {:?} has no entry in scheduled cores but {} candidates were supplied.",
para_id,
backed_candidates.len()
);
continue
};
if scheduled_cores.len() == 0 {
log::debug!(
target: LOG_TARGET,
"Paraid: {:?} has no scheduled cores but {} candidates were supplied.",
para_id,
backed_candidates.len()
);
continue
}
let mut temp_backed_candidates = Vec::with_capacity(scheduled_cores.len());
for candidate in backed_candidates {
if scheduled_cores.len() == 0 {
log::debug!(
target: LOG_TARGET,
"Found enough candidates for paraid: {:?}.",
candidate.descriptor().para_id()
);
break;
}
if let Some(core_index) =
get_core_index::<T>(core_index_enabled, allowed_relay_parents, &candidate)
{
if scheduled_cores.remove(&core_index) {
temp_backed_candidates.push((candidate, core_index));
} else {
log::debug!(
target: LOG_TARGET,
"Found a backed candidate {:?} with core index {}, which is not scheduled for paraid {:?}.",
candidate.candidate().hash(),
core_index.0,
candidate.descriptor().para_id()
);
break;
}
} else {
if scheduled_cores.len() == 1 {
temp_backed_candidates
.push((candidate, scheduled_cores.pop_first().expect("Length is 1")));
break;
}
log::debug!(
target: LOG_TARGET,
"Found a backed candidate {:?} without core index information, but paraid {:?} has multiple scheduled cores.",
candidate.candidate().hash(),
candidate.descriptor().para_id()
);
break;
}
}
if !temp_backed_candidates.is_empty() {
backed_candidates_with_core
.entry(para_id)
.or_insert_with(|| vec![])
.extend(temp_backed_candidates);
}
}
backed_candidates_with_core
}
fn get_core_index<T: configuration::Config + scheduler::Config + inclusion::Config>(
core_index_enabled: bool,
allowed_relay_parents: &AllowedRelayParentsTracker<T::Hash, BlockNumberFor<T>>,
candidate: &BackedCandidate<T::Hash>,
) -> Option<CoreIndex> {
candidate.candidate().descriptor.core_index().or_else(|| {
get_injected_core_index::<T>(core_index_enabled, allowed_relay_parents, &candidate)
})
}
fn get_injected_core_index<T: configuration::Config + scheduler::Config + inclusion::Config>(
core_index_enabled: bool,
allowed_relay_parents: &AllowedRelayParentsTracker<T::Hash, BlockNumberFor<T>>,
candidate: &BackedCandidate<T::Hash>,
) -> Option<CoreIndex> {
let (validator_indices, maybe_core_idx) =
candidate.validator_indices_and_core_index(core_index_enabled);
let Some(core_idx) = maybe_core_idx else { return None };
let relay_parent_block_number =
match allowed_relay_parents.acquire_info(candidate.descriptor().relay_parent(), None) {
Some((_, block_num)) => block_num,
None => {
log::debug!(
target: LOG_TARGET,
"Relay parent {:?} for candidate {:?} is not in the allowed relay parents.",
candidate.descriptor().relay_parent(),
candidate.candidate().hash(),
);
return None
},
};
let group_idx = match scheduler::Pallet::<T>::group_assigned_to_core(
core_idx,
relay_parent_block_number + One::one(),
) {
Some(group_idx) => group_idx,
None => {
log::debug!(
target: LOG_TARGET,
"Can't get the group index for core idx {:?}.",
core_idx,
);
return None
},
};
let group_validators = match scheduler::Pallet::<T>::group_validators(group_idx) {
Some(validators) => validators,
None => return None,
};
if group_validators.len() == validator_indices.len() {
Some(core_idx)
} else {
log::debug!(
target: LOG_TARGET,
"Expected validator_indices count different than the real one: {}, {} for candidate {:?}",
group_validators.len(),
validator_indices.len(),
candidate.candidate().hash()
);
None
}
}