use crate::{
configuration::{self, HostConfiguration},
disputes, dmp, hrmp,
paras::{self, UpgradeStrategy},
scheduler,
shared::{self, AllowedRelayParentsTracker},
util::make_persisted_validation_data_with_parent,
};
use alloc::{
collections::{btree_map::BTreeMap, btree_set::BTreeSet, vec_deque::VecDeque},
vec,
vec::Vec,
};
use bitvec::{order::Lsb0 as BitOrderLsb0, vec::BitVec};
use codec::{Decode, Encode};
use core::fmt;
use frame_support::{
defensive,
pallet_prelude::*,
traits::{EnqueueMessage, Footprint, QueueFootprint},
BoundedSlice,
};
use frame_system::pallet_prelude::*;
use pallet_message_queue::OnQueueChanged;
use polkadot_primitives::{
effective_minimum_backing_votes, supermajority_threshold,
vstaging::{
skip_ump_signals, BackedCandidate, CandidateDescriptorV2 as CandidateDescriptor,
CandidateReceiptV2 as CandidateReceipt,
CommittedCandidateReceiptV2 as CommittedCandidateReceipt,
},
well_known_keys, CandidateCommitments, CandidateHash, CoreIndex, GroupIndex, HeadData,
Id as ParaId, SignedAvailabilityBitfields, SigningContext, UpwardMessage, ValidatorId,
ValidatorIndex, ValidityAttestation,
};
use scale_info::TypeInfo;
use sp_runtime::{traits::One, DispatchError, SaturatedConversion, Saturating};
pub use pallet::*;
#[cfg(test)]
pub(crate) mod tests;
#[cfg(feature = "runtime-benchmarks")]
mod benchmarking;
pub mod migration;
pub trait WeightInfo {
fn enact_candidate(u: u32, h: u32, c: u32) -> Weight;
}
pub struct TestWeightInfo;
impl WeightInfo for TestWeightInfo {
fn enact_candidate(_u: u32, _h: u32, _c: u32) -> Weight {
Weight::zero()
}
}
impl WeightInfo for () {
fn enact_candidate(_u: u32, _h: u32, _c: u32) -> Weight {
Weight::zero()
}
}
pub const MAX_UPWARD_MESSAGE_SIZE_BOUND: u32 = 128 * 1024;
#[derive(Encode, Decode, PartialEq, TypeInfo, Clone)]
#[cfg_attr(test, derive(Debug))]
pub struct CandidatePendingAvailability<H, N> {
core: CoreIndex,
hash: CandidateHash,
descriptor: CandidateDescriptor<H>,
commitments: CandidateCommitments,
availability_votes: BitVec<u8, BitOrderLsb0>,
backers: BitVec<u8, BitOrderLsb0>,
relay_parent_number: N,
backed_in_number: N,
backing_group: GroupIndex,
}
impl<H, N> CandidatePendingAvailability<H, N> {
pub(crate) fn availability_votes(&self) -> &BitVec<u8, BitOrderLsb0> {
&self.availability_votes
}
pub(crate) fn backed_in_number(&self) -> N
where
N: Clone,
{
self.backed_in_number.clone()
}
pub(crate) fn core_occupied(&self) -> CoreIndex {
self.core
}
pub(crate) fn candidate_hash(&self) -> CandidateHash {
self.hash
}
pub(crate) fn candidate_descriptor(&self) -> &CandidateDescriptor<H> {
&self.descriptor
}
pub(crate) fn candidate_commitments(&self) -> &CandidateCommitments {
&self.commitments
}
pub(crate) fn relay_parent_number(&self) -> N
where
N: Clone,
{
self.relay_parent_number.clone()
}
#[cfg(any(feature = "runtime-benchmarks", test))]
pub(crate) fn new(
core: CoreIndex,
hash: CandidateHash,
descriptor: CandidateDescriptor<H>,
commitments: CandidateCommitments,
availability_votes: BitVec<u8, BitOrderLsb0>,
backers: BitVec<u8, BitOrderLsb0>,
relay_parent_number: N,
backed_in_number: N,
backing_group: GroupIndex,
) -> Self {
Self {
core,
hash,
descriptor,
commitments,
availability_votes,
backers,
relay_parent_number,
backed_in_number,
backing_group,
}
}
}
pub trait RewardValidators {
fn reward_backing(validators: impl IntoIterator<Item = ValidatorIndex>);
fn reward_bitfields(validators: impl IntoIterator<Item = ValidatorIndex>);
}
pub trait QueueFootprinter {
type Origin;
fn message_count(origin: Self::Origin) -> u64;
}
impl QueueFootprinter for () {
type Origin = UmpQueueId;
fn message_count(_: Self::Origin) -> u64 {
0
}
}
#[derive(Encode, Decode, Clone, MaxEncodedLen, Eq, PartialEq, RuntimeDebug, TypeInfo)]
pub enum AggregateMessageOrigin {
#[codec(index = 0)]
Ump(UmpQueueId),
}
#[derive(Encode, Decode, Clone, MaxEncodedLen, Eq, PartialEq, RuntimeDebug, TypeInfo)]
pub enum UmpQueueId {
#[codec(index = 0)]
Para(ParaId),
}
#[cfg(feature = "runtime-benchmarks")]
impl From<u32> for AggregateMessageOrigin {
fn from(n: u32) -> Self {
Self::Ump(UmpQueueId::Para(n.into()))
}
}
pub type MaxUmpMessageLenOf<T> =
<<T as Config>::MessageQueue as EnqueueMessage<AggregateMessageOrigin>>::MaxMessageLen;
#[frame_support::pallet]
pub mod pallet {
use super::*;
const STORAGE_VERSION: StorageVersion = StorageVersion::new(1);
#[pallet::pallet]
#[pallet::without_storage_info]
#[pallet::storage_version(STORAGE_VERSION)]
pub struct Pallet<T>(_);
#[pallet::config]
pub trait Config:
frame_system::Config
+ shared::Config
+ paras::Config
+ dmp::Config
+ hrmp::Config
+ configuration::Config
+ scheduler::Config
{
type RuntimeEvent: From<Event<Self>> + IsType<<Self as frame_system::Config>::RuntimeEvent>;
type DisputesHandler: disputes::DisputesHandler<BlockNumberFor<Self>>;
type RewardValidators: RewardValidators;
type MessageQueue: EnqueueMessage<AggregateMessageOrigin>;
type WeightInfo: WeightInfo;
}
#[pallet::event]
#[pallet::generate_deposit(pub(super) fn deposit_event)]
pub enum Event<T: Config> {
CandidateBacked(CandidateReceipt<T::Hash>, HeadData, CoreIndex, GroupIndex),
CandidateIncluded(CandidateReceipt<T::Hash>, HeadData, CoreIndex, GroupIndex),
CandidateTimedOut(CandidateReceipt<T::Hash>, HeadData, CoreIndex),
UpwardMessagesReceived { from: ParaId, count: u32 },
}
#[pallet::error]
pub enum Error<T> {
ValidatorIndexOutOfBounds,
UnscheduledCandidate,
HeadDataTooLarge,
PrematureCodeUpgrade,
NewCodeTooLarge,
DisallowedRelayParent,
InvalidAssignment,
InvalidGroupIndex,
InsufficientBacking,
InvalidBacking,
ValidationDataHashMismatch,
IncorrectDownwardMessageHandling,
InvalidUpwardMessages,
HrmpWatermarkMishandling,
InvalidOutboundHrmp,
InvalidValidationCodeHash,
ParaHeadMismatch,
}
#[pallet::storage]
#[pallet::storage_prefix = "V1"]
pub(crate) type PendingAvailability<T: Config> = StorageMap<
_,
Twox64Concat,
ParaId,
VecDeque<CandidatePendingAvailability<T::Hash, BlockNumberFor<T>>>,
>;
#[pallet::call]
impl<T: Config> Pallet<T> {}
}
const LOG_TARGET: &str = "runtime::inclusion";
#[derive(Debug)]
enum AcceptanceCheckErr {
HeadDataTooLarge,
PrematureCodeUpgrade,
NewCodeTooLarge,
ProcessedDownwardMessages,
UpwardMessages,
HrmpWatermark,
OutboundHrmp,
}
impl From<dmp::ProcessedDownwardMessagesAcceptanceErr> for AcceptanceCheckErr {
fn from(_: dmp::ProcessedDownwardMessagesAcceptanceErr) -> Self {
Self::ProcessedDownwardMessages
}
}
impl From<UmpAcceptanceCheckErr> for AcceptanceCheckErr {
fn from(_: UmpAcceptanceCheckErr) -> Self {
Self::UpwardMessages
}
}
impl<BlockNumber> From<hrmp::HrmpWatermarkAcceptanceErr<BlockNumber>> for AcceptanceCheckErr {
fn from(_: hrmp::HrmpWatermarkAcceptanceErr<BlockNumber>) -> Self {
Self::HrmpWatermark
}
}
impl From<hrmp::OutboundHrmpAcceptanceErr> for AcceptanceCheckErr {
fn from(_: hrmp::OutboundHrmpAcceptanceErr) -> Self {
Self::OutboundHrmp
}
}
#[cfg_attr(test, derive(PartialEq))]
#[allow(dead_code)]
pub(crate) enum UmpAcceptanceCheckErr {
MoreMessagesThanPermitted { sent: u32, permitted: u32 },
MessageSize { idx: u32, msg_size: u32, max_size: u32 },
CapacityExceeded { count: u64, limit: u64 },
TotalSizeExceeded { total_size: u64, limit: u64 },
IsOffboarding,
}
impl fmt::Debug for UmpAcceptanceCheckErr {
fn fmt(&self, fmt: &mut fmt::Formatter) -> fmt::Result {
match *self {
UmpAcceptanceCheckErr::MoreMessagesThanPermitted { sent, permitted } => write!(
fmt,
"more upward messages than permitted by config ({} > {})",
sent, permitted,
),
UmpAcceptanceCheckErr::MessageSize { idx, msg_size, max_size } => write!(
fmt,
"upward message idx {} larger than permitted by config ({} > {})",
idx, msg_size, max_size,
),
UmpAcceptanceCheckErr::CapacityExceeded { count, limit } => write!(
fmt,
"the ump queue would have more items than permitted by config ({} > {})",
count, limit,
),
UmpAcceptanceCheckErr::TotalSizeExceeded { total_size, limit } => write!(
fmt,
"the ump queue would have grown past the max size permitted by config ({} > {})",
total_size, limit,
),
UmpAcceptanceCheckErr::IsOffboarding => {
write!(fmt, "upward message rejected because the para is off-boarding")
},
}
}
}
impl<T: Config> Pallet<T> {
pub(crate) fn initializer_initialize(_now: BlockNumberFor<T>) -> Weight {
Weight::zero()
}
pub(crate) fn initializer_finalize() {}
pub(crate) fn initializer_on_new_session(
_notification: &crate::initializer::SessionChangeNotification<BlockNumberFor<T>>,
outgoing_paras: &[ParaId],
) {
for _ in PendingAvailability::<T>::drain() {}
Self::cleanup_outgoing_ump_dispatch_queues(outgoing_paras);
}
pub(crate) fn cleanup_outgoing_ump_dispatch_queues(outgoing: &[ParaId]) {
for outgoing_para in outgoing {
Self::cleanup_outgoing_ump_dispatch_queue(*outgoing_para);
}
}
pub(crate) fn cleanup_outgoing_ump_dispatch_queue(para: ParaId) {
T::MessageQueue::sweep_queue(AggregateMessageOrigin::Ump(UmpQueueId::Para(para)));
}
pub(crate) fn get_occupied_cores(
) -> impl Iterator<Item = (CoreIndex, CandidatePendingAvailability<T::Hash, BlockNumberFor<T>>)>
{
PendingAvailability::<T>::iter_values().flat_map(|pending_candidates| {
pending_candidates.into_iter().map(|c| (c.core, c.clone()))
})
}
pub(crate) fn update_pending_availability_and_get_freed_cores(
validators: &[ValidatorId],
signed_bitfields: SignedAvailabilityBitfields,
) -> (Weight, Vec<(CoreIndex, CandidateHash)>) {
let threshold = availability_threshold(validators.len());
let mut votes_per_core: BTreeMap<CoreIndex, BTreeSet<ValidatorIndex>> = BTreeMap::new();
for (checked_bitfield, validator_index) in
signed_bitfields.into_iter().map(|signed_bitfield| {
let validator_idx = signed_bitfield.validator_index();
let checked_bitfield = signed_bitfield.into_payload();
(checked_bitfield, validator_idx)
}) {
for (bit_idx, _) in checked_bitfield.0.iter().enumerate().filter(|(_, is_av)| **is_av) {
let core_index = CoreIndex(bit_idx as u32);
votes_per_core
.entry(core_index)
.or_insert_with(|| BTreeSet::new())
.insert(validator_index);
}
}
let mut freed_cores = vec![];
let mut weight = Weight::zero();
let pending_paraids: Vec<_> = PendingAvailability::<T>::iter_keys().collect();
for paraid in pending_paraids {
PendingAvailability::<T>::mutate(paraid, |candidates| {
if let Some(candidates) = candidates {
let mut last_enacted_index: Option<usize> = None;
for (candidate_index, candidate) in candidates.iter_mut().enumerate() {
if let Some(validator_indices) = votes_per_core.remove(&candidate.core) {
for validator_index in validator_indices.iter() {
if let Some(mut bit) =
candidate.availability_votes.get_mut(validator_index.0 as usize)
{
*bit = true;
}
}
}
if candidate.availability_votes.count_ones() >= threshold {
let can_enact = if candidate_index == 0 {
last_enacted_index == None
} else {
let prev_candidate_index = usize::try_from(candidate_index - 1)
.expect("Previous `if` would have caught a 0 candidate index.");
matches!(last_enacted_index, Some(old_index) if old_index == prev_candidate_index)
};
if can_enact {
last_enacted_index = Some(candidate_index);
}
}
}
if let Some(last_enacted_index) = last_enacted_index {
let evicted_candidates = candidates.drain(0..=last_enacted_index);
for candidate in evicted_candidates {
freed_cores.push((candidate.core, candidate.hash));
let receipt = CommittedCandidateReceipt {
descriptor: candidate.descriptor,
commitments: candidate.commitments,
};
let has_runtime_upgrade =
receipt.commitments.new_validation_code.as_ref().map_or(0, |_| 1);
let u = receipt.commitments.upward_messages.len() as u32;
let h = receipt.commitments.horizontal_messages.len() as u32;
let enact_weight = <T as Config>::WeightInfo::enact_candidate(
u,
h,
has_runtime_upgrade,
);
Self::enact_candidate(
candidate.relay_parent_number,
receipt,
candidate.backers,
candidate.availability_votes,
candidate.core,
candidate.backing_group,
);
weight.saturating_accrue(enact_weight);
}
}
}
});
}
(weight, freed_cores)
}
pub(crate) fn process_candidates<GV>(
allowed_relay_parents: &AllowedRelayParentsTracker<T::Hash, BlockNumberFor<T>>,
candidates: &BTreeMap<ParaId, Vec<(BackedCandidate<T::Hash>, CoreIndex)>>,
group_validators: GV,
core_index_enabled: bool,
) -> Result<
Vec<(CandidateReceipt<T::Hash>, Vec<(ValidatorIndex, ValidityAttestation)>)>,
DispatchError,
>
where
GV: Fn(GroupIndex) -> Option<Vec<ValidatorIndex>>,
{
if candidates.is_empty() {
return Ok(Default::default())
}
let now = frame_system::Pallet::<T>::block_number();
let validators = shared::ActiveValidatorKeys::<T>::get();
let mut candidate_receipt_with_backing_validator_indices =
Vec::with_capacity(candidates.len());
for (para_id, para_candidates) in candidates {
let mut latest_head_data = match Self::para_latest_head_data(para_id) {
None => {
defensive!("Latest included head data for paraid {:?} is None", para_id);
continue
},
Some(latest_head_data) => latest_head_data,
};
for (candidate, core) in para_candidates.iter() {
let candidate_hash = candidate.candidate().hash();
let check_ctx = CandidateCheckContext::<T>::new(None);
let relay_parent_number = check_ctx.verify_backed_candidate(
&allowed_relay_parents,
candidate.candidate(),
latest_head_data.clone(),
)?;
let group_idx = scheduler::Pallet::<T>::group_assigned_to_core(
*core,
relay_parent_number + One::one(),
)
.ok_or_else(|| {
log::warn!(
target: LOG_TARGET,
"Failed to compute group index for candidate {:?}",
candidate_hash
);
Error::<T>::InvalidAssignment
})?;
let group_vals =
group_validators(group_idx).ok_or_else(|| Error::<T>::InvalidGroupIndex)?;
let (backers, backer_idx_and_attestation) = Self::check_backing_votes(
candidate,
&validators,
group_vals,
core_index_enabled,
)?;
latest_head_data = candidate.candidate().commitments.head_data.clone();
candidate_receipt_with_backing_validator_indices
.push((candidate.receipt(), backer_idx_and_attestation));
PendingAvailability::<T>::mutate(¶_id, |pending_availability| {
let new_candidate = CandidatePendingAvailability {
core: *core,
hash: candidate_hash,
descriptor: candidate.candidate().descriptor.clone(),
commitments: candidate.candidate().commitments.clone(),
availability_votes: bitvec::bitvec![u8, BitOrderLsb0; 0; validators.len()],
relay_parent_number,
backers: backers.to_bitvec(),
backed_in_number: now,
backing_group: group_idx,
};
if let Some(pending_availability) = pending_availability {
pending_availability.push_back(new_candidate);
} else {
*pending_availability =
Some([new_candidate].into_iter().collect::<VecDeque<_>>())
}
});
Self::deposit_event(Event::<T>::CandidateBacked(
candidate.candidate().to_plain(),
candidate.candidate().commitments.head_data.clone(),
*core,
group_idx,
));
}
}
Ok(candidate_receipt_with_backing_validator_indices)
}
pub(crate) fn para_latest_head_data(para_id: &ParaId) -> Option<HeadData> {
match PendingAvailability::<T>::get(para_id).and_then(|pending_candidates| {
pending_candidates.back().map(|x| x.commitments.head_data.clone())
}) {
Some(head_data) => Some(head_data),
None => paras::Heads::<T>::get(para_id),
}
}
pub(crate) fn para_most_recent_context(para_id: &ParaId) -> Option<BlockNumberFor<T>> {
match PendingAvailability::<T>::get(para_id)
.and_then(|pending_candidates| pending_candidates.back().map(|x| x.relay_parent_number))
{
Some(relay_parent_number) => Some(relay_parent_number),
None => paras::MostRecentContext::<T>::get(para_id),
}
}
fn check_backing_votes(
backed_candidate: &BackedCandidate<T::Hash>,
validators: &[ValidatorId],
group_vals: Vec<ValidatorIndex>,
core_index_enabled: bool,
) -> Result<(BitVec<u8, BitOrderLsb0>, Vec<(ValidatorIndex, ValidityAttestation)>), Error<T>> {
let minimum_backing_votes = configuration::ActiveConfig::<T>::get().minimum_backing_votes;
let mut backers = bitvec::bitvec![u8, BitOrderLsb0; 0; validators.len()];
let signing_context = SigningContext {
parent_hash: backed_candidate.descriptor().relay_parent(),
session_index: shared::CurrentSessionIndex::<T>::get(),
};
let (validator_indices, _) =
backed_candidate.validator_indices_and_core_index(core_index_enabled);
let maybe_amount_validated = polkadot_primitives::check_candidate_backing(
backed_candidate.candidate().hash(),
backed_candidate.validity_votes(),
validator_indices,
&signing_context,
group_vals.len(),
|intra_group_vi| {
group_vals
.get(intra_group_vi)
.and_then(|vi| validators.get(vi.0 as usize))
.map(|v| v.clone())
},
);
match maybe_amount_validated {
Ok(amount_validated) => ensure!(
amount_validated >=
effective_minimum_backing_votes(group_vals.len(), minimum_backing_votes),
Error::<T>::InsufficientBacking,
),
Err(()) => {
Err(Error::<T>::InvalidBacking)?;
},
}
let mut backer_idx_and_attestation =
Vec::<(ValidatorIndex, ValidityAttestation)>::with_capacity(
validator_indices.count_ones(),
);
for ((bit_idx, _), attestation) in validator_indices
.iter()
.enumerate()
.filter(|(_, signed)| **signed)
.zip(backed_candidate.validity_votes().iter().cloned())
{
let val_idx = group_vals.get(bit_idx).expect("this query succeeded above; qed");
backer_idx_and_attestation.push((*val_idx, attestation));
backers.set(val_idx.0 as _, true);
}
Ok((backers, backer_idx_and_attestation))
}
pub(crate) fn check_validation_outputs_for_runtime_api(
para_id: ParaId,
relay_parent_number: BlockNumberFor<T>,
validation_outputs: polkadot_primitives::CandidateCommitments,
) -> bool {
let prev_context = Self::para_most_recent_context(¶_id);
let check_ctx = CandidateCheckContext::<T>::new(prev_context);
if let Err(err) = check_ctx.check_validation_outputs(
para_id,
relay_parent_number,
&validation_outputs.head_data,
&validation_outputs.new_validation_code,
validation_outputs.processed_downward_messages,
&validation_outputs.upward_messages,
BlockNumberFor::<T>::from(validation_outputs.hrmp_watermark),
&validation_outputs.horizontal_messages,
) {
log::debug!(
target: LOG_TARGET,
"Validation outputs checking for parachain `{}` failed, error: {:?}",
u32::from(para_id), err
);
false
} else {
true
}
}
fn enact_candidate(
relay_parent_number: BlockNumberFor<T>,
receipt: CommittedCandidateReceipt<T::Hash>,
backers: BitVec<u8, BitOrderLsb0>,
availability_votes: BitVec<u8, BitOrderLsb0>,
core_index: CoreIndex,
backing_group: GroupIndex,
) {
let plain = receipt.to_plain();
let commitments = receipt.commitments;
let config = configuration::ActiveConfig::<T>::get();
T::RewardValidators::reward_backing(
backers
.iter()
.enumerate()
.filter(|(_, backed)| **backed)
.map(|(i, _)| ValidatorIndex(i as _)),
);
T::RewardValidators::reward_bitfields(
availability_votes
.iter()
.enumerate()
.filter(|(_, voted)| **voted)
.map(|(i, _)| ValidatorIndex(i as _)),
);
if let Some(new_code) = commitments.new_validation_code {
let now = frame_system::Pallet::<T>::block_number();
paras::Pallet::<T>::schedule_code_upgrade(
receipt.descriptor.para_id(),
new_code,
now,
&config,
UpgradeStrategy::SetGoAheadSignal,
);
}
dmp::Pallet::<T>::prune_dmq(
receipt.descriptor.para_id(),
commitments.processed_downward_messages,
);
Self::receive_upward_messages(
receipt.descriptor.para_id(),
commitments.upward_messages.as_slice(),
);
hrmp::Pallet::<T>::prune_hrmp(
receipt.descriptor.para_id(),
BlockNumberFor::<T>::from(commitments.hrmp_watermark),
);
hrmp::Pallet::<T>::queue_outbound_hrmp(
receipt.descriptor.para_id(),
commitments.horizontal_messages,
);
Self::deposit_event(Event::<T>::CandidateIncluded(
plain,
commitments.head_data.clone(),
core_index,
backing_group,
));
paras::Pallet::<T>::note_new_head(
receipt.descriptor.para_id(),
commitments.head_data,
relay_parent_number,
);
}
pub(crate) fn relay_dispatch_queue_size(para_id: ParaId) -> (u32, u32) {
let fp = T::MessageQueue::footprint(AggregateMessageOrigin::Ump(UmpQueueId::Para(para_id)));
(fp.storage.count as u32, fp.storage.size as u32)
}
pub(crate) fn check_upward_messages(
config: &HostConfiguration<BlockNumberFor<T>>,
para: ParaId,
upward_messages: &[UpwardMessage],
) -> Result<(), UmpAcceptanceCheckErr> {
let upward_messages = skip_ump_signals(upward_messages.iter()).collect::<Vec<_>>();
if paras::Pallet::<T>::is_offboarding(para) {
ensure!(upward_messages.is_empty(), UmpAcceptanceCheckErr::IsOffboarding);
}
let additional_msgs = upward_messages.len() as u32;
if additional_msgs > config.max_upward_message_num_per_candidate {
return Err(UmpAcceptanceCheckErr::MoreMessagesThanPermitted {
sent: additional_msgs,
permitted: config.max_upward_message_num_per_candidate,
})
}
let (para_queue_count, mut para_queue_size) = Self::relay_dispatch_queue_size(para);
if para_queue_count.saturating_add(additional_msgs) > config.max_upward_queue_count {
return Err(UmpAcceptanceCheckErr::CapacityExceeded {
count: para_queue_count.saturating_add(additional_msgs).into(),
limit: config.max_upward_queue_count.into(),
})
}
for (idx, msg) in upward_messages.into_iter().enumerate() {
let msg_size = msg.len() as u32;
if msg_size > config.max_upward_message_size {
return Err(UmpAcceptanceCheckErr::MessageSize {
idx: idx as u32,
msg_size,
max_size: config.max_upward_message_size,
})
}
if para_queue_size.saturating_add(msg_size) > config.max_upward_queue_size {
return Err(UmpAcceptanceCheckErr::TotalSizeExceeded {
total_size: para_queue_size.saturating_add(msg_size).into(),
limit: config.max_upward_queue_size.into(),
})
}
para_queue_size.saturating_accrue(msg_size);
}
Ok(())
}
pub(crate) fn receive_upward_messages(para: ParaId, upward_messages: &[Vec<u8>]) {
let bounded = skip_ump_signals(upward_messages.iter())
.filter_map(|d| {
BoundedSlice::try_from(&d[..])
.inspect_err(|_| {
defensive!("Accepted candidate contains too long msg, len=", d.len());
})
.ok()
})
.collect();
Self::receive_bounded_upward_messages(para, bounded)
}
pub(crate) fn receive_bounded_upward_messages(
para: ParaId,
messages: Vec<BoundedSlice<'_, u8, MaxUmpMessageLenOf<T>>>,
) {
let count = messages.len() as u32;
if count == 0 {
return
}
T::MessageQueue::enqueue_messages(
messages.into_iter(),
AggregateMessageOrigin::Ump(UmpQueueId::Para(para)),
);
Self::deposit_event(Event::UpwardMessagesReceived { from: para, count });
}
pub(crate) fn free_timedout() -> Vec<CoreIndex> {
let timeout_pred = scheduler::Pallet::<T>::availability_timeout_predicate();
let timed_out: Vec<_> = Self::free_failed_cores(
|candidate| timeout_pred(candidate.backed_in_number).timed_out,
None,
)
.collect();
let mut timed_out_cores = Vec::with_capacity(timed_out.len());
for candidate in timed_out.iter() {
timed_out_cores.push(candidate.core);
let receipt = CandidateReceipt {
descriptor: candidate.descriptor.clone(),
commitments_hash: candidate.commitments.hash(),
};
Self::deposit_event(Event::<T>::CandidateTimedOut(
receipt,
candidate.commitments.head_data.clone(),
candidate.core,
));
}
timed_out_cores
}
pub(crate) fn free_disputed(
disputed: &BTreeSet<CandidateHash>,
) -> Vec<(CoreIndex, CandidateHash)> {
Self::free_failed_cores(
|candidate| disputed.contains(&candidate.hash),
Some(disputed.len()),
)
.map(|candidate| (candidate.core, candidate.hash))
.collect()
}
fn free_failed_cores<
P: Fn(&CandidatePendingAvailability<T::Hash, BlockNumberFor<T>>) -> bool,
>(
pred: P,
capacity_hint: Option<usize>,
) -> impl Iterator<Item = CandidatePendingAvailability<T::Hash, BlockNumberFor<T>>> {
let mut earliest_dropped_indices: BTreeMap<ParaId, usize> = BTreeMap::new();
for (para_id, pending_candidates) in PendingAvailability::<T>::iter() {
let mut earliest_dropped_idx = None;
for (index, candidate) in pending_candidates.iter().enumerate() {
if pred(candidate) {
earliest_dropped_idx = Some(index);
break;
}
}
if let Some(earliest_dropped_idx) = earliest_dropped_idx {
earliest_dropped_indices.insert(para_id, earliest_dropped_idx);
}
}
let mut cleaned_up_cores =
if let Some(capacity) = capacity_hint { Vec::with_capacity(capacity) } else { vec![] };
for (para_id, earliest_dropped_idx) in earliest_dropped_indices {
PendingAvailability::<T>::mutate(¶_id, |record| {
if let Some(record) = record {
let cleaned_up = record.drain(earliest_dropped_idx..);
cleaned_up_cores.extend(cleaned_up);
}
});
}
cleaned_up_cores.into_iter()
}
pub(crate) fn force_enact(para: ParaId) {
PendingAvailability::<T>::mutate(¶, |candidates| {
if let Some(candidates) = candidates {
for candidate in candidates.drain(..) {
let receipt = CommittedCandidateReceipt {
descriptor: candidate.descriptor,
commitments: candidate.commitments,
};
Self::enact_candidate(
candidate.relay_parent_number,
receipt,
candidate.backers,
candidate.availability_votes,
candidate.core,
candidate.backing_group,
);
}
}
});
}
pub(crate) fn first_candidate_pending_availability(
para: ParaId,
) -> Option<CommittedCandidateReceipt<T::Hash>> {
PendingAvailability::<T>::get(¶).and_then(|p| {
p.get(0).map(|p| CommittedCandidateReceipt {
descriptor: p.descriptor.clone(),
commitments: p.commitments.clone(),
})
})
}
pub(crate) fn candidates_pending_availability(
para: ParaId,
) -> Vec<CommittedCandidateReceipt<T::Hash>> {
<PendingAvailability<T>>::get(¶)
.map(|candidates| {
candidates
.into_iter()
.map(|candidate| CommittedCandidateReceipt {
descriptor: candidate.descriptor.clone(),
commitments: candidate.commitments.clone(),
})
.collect()
})
.unwrap_or_default()
}
}
const fn availability_threshold(n_validators: usize) -> usize {
supermajority_threshold(n_validators)
}
impl AcceptanceCheckErr {
fn strip_into_dispatch_err<T: Config>(self) -> Error<T> {
use AcceptanceCheckErr::*;
match self {
HeadDataTooLarge => Error::<T>::HeadDataTooLarge,
PrematureCodeUpgrade => Error::<T>::PrematureCodeUpgrade,
NewCodeTooLarge => Error::<T>::NewCodeTooLarge,
ProcessedDownwardMessages => Error::<T>::IncorrectDownwardMessageHandling,
UpwardMessages => Error::<T>::InvalidUpwardMessages,
HrmpWatermark => Error::<T>::HrmpWatermarkMishandling,
OutboundHrmp => Error::<T>::InvalidOutboundHrmp,
}
}
}
impl<T: Config> OnQueueChanged<AggregateMessageOrigin> for Pallet<T> {
fn on_queue_changed(origin: AggregateMessageOrigin, fp: QueueFootprint) {
let para = match origin {
AggregateMessageOrigin::Ump(UmpQueueId::Para(p)) => p,
};
let QueueFootprint { storage: Footprint { count, size }, .. } = fp;
let (count, size) = (count.saturated_into(), size.saturated_into());
#[allow(deprecated)]
well_known_keys::relay_dispatch_queue_size_typed(para).set((count, size));
let config = configuration::ActiveConfig::<T>::get();
let remaining_count = config.max_upward_queue_count.saturating_sub(count);
let remaining_size = config.max_upward_queue_size.saturating_sub(size);
well_known_keys::relay_dispatch_queue_remaining_capacity(para)
.set((remaining_count, remaining_size));
}
}
pub(crate) struct CandidateCheckContext<T: Config> {
config: configuration::HostConfiguration<BlockNumberFor<T>>,
prev_context: Option<BlockNumberFor<T>>,
}
impl<T: Config> CandidateCheckContext<T> {
pub(crate) fn new(prev_context: Option<BlockNumberFor<T>>) -> Self {
Self { config: configuration::ActiveConfig::<T>::get(), prev_context }
}
pub(crate) fn verify_backed_candidate(
&self,
allowed_relay_parents: &AllowedRelayParentsTracker<T::Hash, BlockNumberFor<T>>,
backed_candidate_receipt: &CommittedCandidateReceipt<<T as frame_system::Config>::Hash>,
parent_head_data: HeadData,
) -> Result<BlockNumberFor<T>, Error<T>> {
let para_id = backed_candidate_receipt.descriptor.para_id();
let relay_parent = backed_candidate_receipt.descriptor.relay_parent();
let (state_root, relay_parent_number) = {
match allowed_relay_parents.acquire_info(relay_parent, self.prev_context) {
None => return Err(Error::<T>::DisallowedRelayParent),
Some((info, relay_parent_number)) => (info.state_root, relay_parent_number),
}
};
{
let persisted_validation_data = make_persisted_validation_data_with_parent::<T>(
relay_parent_number,
state_root,
parent_head_data,
);
let expected = persisted_validation_data.hash();
ensure!(
expected == backed_candidate_receipt.descriptor.persisted_validation_data_hash(),
Error::<T>::ValidationDataHashMismatch,
);
}
let validation_code_hash = paras::CurrentCodeHash::<T>::get(para_id)
.ok_or_else(|| Error::<T>::UnscheduledCandidate)?;
ensure!(
backed_candidate_receipt.descriptor.validation_code_hash() == validation_code_hash,
Error::<T>::InvalidValidationCodeHash,
);
ensure!(
backed_candidate_receipt.descriptor.para_head() ==
backed_candidate_receipt.commitments.head_data.hash(),
Error::<T>::ParaHeadMismatch,
);
if let Err(err) = self.check_validation_outputs(
para_id,
relay_parent_number,
&backed_candidate_receipt.commitments.head_data,
&backed_candidate_receipt.commitments.new_validation_code,
backed_candidate_receipt.commitments.processed_downward_messages,
&backed_candidate_receipt.commitments.upward_messages,
BlockNumberFor::<T>::from(backed_candidate_receipt.commitments.hrmp_watermark),
&backed_candidate_receipt.commitments.horizontal_messages,
) {
log::debug!(
target: LOG_TARGET,
"Validation outputs checking during inclusion of a candidate {:?} for parachain `{}` failed, error: {:?}",
backed_candidate_receipt.hash(),
u32::from(para_id),
err
);
Err(err.strip_into_dispatch_err::<T>())?;
};
Ok(relay_parent_number)
}
fn check_validation_outputs(
&self,
para_id: ParaId,
relay_parent_number: BlockNumberFor<T>,
head_data: &HeadData,
new_validation_code: &Option<polkadot_primitives::ValidationCode>,
processed_downward_messages: u32,
upward_messages: &[polkadot_primitives::UpwardMessage],
hrmp_watermark: BlockNumberFor<T>,
horizontal_messages: &[polkadot_primitives::OutboundHrmpMessage<ParaId>],
) -> Result<(), AcceptanceCheckErr> {
ensure!(
head_data.0.len() <= self.config.max_head_data_size as _,
AcceptanceCheckErr::HeadDataTooLarge,
);
if let Some(new_validation_code) = new_validation_code {
ensure!(
paras::Pallet::<T>::can_upgrade_validation_code(para_id),
AcceptanceCheckErr::PrematureCodeUpgrade,
);
ensure!(
new_validation_code.0.len() <= self.config.max_code_size as _,
AcceptanceCheckErr::NewCodeTooLarge,
);
}
dmp::Pallet::<T>::check_processed_downward_messages(
para_id,
relay_parent_number,
processed_downward_messages,
)
.map_err(|e| {
log::debug!(
target: LOG_TARGET,
"Check processed downward messages for parachain `{}` on relay parent number `{:?}` failed, error: {:?}",
u32::from(para_id),
relay_parent_number,
e
);
e
})?;
Pallet::<T>::check_upward_messages(&self.config, para_id, upward_messages).map_err(
|e| {
log::debug!(
target: LOG_TARGET,
"Check upward messages for parachain `{}` failed, error: {:?}",
u32::from(para_id),
e
);
e
},
)?;
hrmp::Pallet::<T>::check_hrmp_watermark(para_id, relay_parent_number, hrmp_watermark)
.map_err(|e| {
log::debug!(
target: LOG_TARGET,
"Check hrmp watermark for parachain `{}` on relay parent number `{:?}` failed, error: {:?}",
u32::from(para_id),
relay_parent_number,
e
);
e
})?;
hrmp::Pallet::<T>::check_outbound_hrmp(&self.config, para_id, horizontal_messages)
.map_err(|e| {
log::debug!(
target: LOG_TARGET,
"Check outbound hrmp for parachain `{}` failed, error: {:?}",
u32::from(para_id),
e
);
e
})?;
Ok(())
}
}
impl<T: Config> QueueFootprinter for Pallet<T> {
type Origin = UmpQueueId;
fn message_count(origin: Self::Origin) -> u64 {
T::MessageQueue::footprint(AggregateMessageOrigin::Ump(origin)).storage.count
}
}