use crate::{
configuration::{self, HostConfiguration},
disputes, dmp, hrmp, paras,
scheduler::{self, common::CoreAssignment},
shared::{self, AllowedRelayParentsTracker},
};
use bitvec::{order::Lsb0 as BitOrderLsb0, vec::BitVec};
use frame_support::{
defensive,
pallet_prelude::*,
traits::{Defensive, EnqueueMessage},
BoundedSlice,
};
use frame_system::pallet_prelude::*;
use pallet_message_queue::OnQueueChanged;
use parity_scale_codec::{Decode, Encode};
use primitives::{
supermajority_threshold, well_known_keys, AvailabilityBitfield, BackedCandidate,
CandidateCommitments, CandidateDescriptor, CandidateHash, CandidateReceipt,
CommittedCandidateReceipt, CoreIndex, GroupIndex, Hash, HeadData, Id as ParaId,
SignedAvailabilityBitfields, SigningContext, UpwardMessage, ValidatorId, ValidatorIndex,
ValidityAttestation,
};
use scale_info::TypeInfo;
use sp_runtime::{traits::One, DispatchError, SaturatedConversion, Saturating};
#[cfg(feature = "std")]
use sp_std::fmt;
use sp_std::{collections::btree_set::BTreeSet, prelude::*};
pub use pallet::*;
#[cfg(test)]
pub(crate) mod tests;
#[cfg(feature = "runtime-benchmarks")]
mod benchmarking;
pub trait WeightInfo {
fn receive_upward_messages(i: u32) -> Weight;
}
pub struct TestWeightInfo;
impl WeightInfo for TestWeightInfo {
fn receive_upward_messages(_: u32) -> Weight {
Weight::MAX
}
}
impl WeightInfo for () {
fn receive_upward_messages(_: u32) -> Weight {
Weight::zero()
}
}
pub const MAX_UPWARD_MESSAGE_SIZE_BOUND: u32 = 128 * 1024;
#[derive(Encode, Decode, TypeInfo)]
#[cfg_attr(test, derive(Debug))]
pub struct AvailabilityBitfieldRecord<N> {
bitfield: AvailabilityBitfield, submitted_at: N, }
#[derive(Encode, Decode, PartialEq, TypeInfo)]
#[cfg_attr(test, derive(Debug))]
pub struct CandidatePendingAvailability<H, N> {
core: CoreIndex,
hash: CandidateHash,
descriptor: CandidateDescriptor<H>,
availability_votes: BitVec<u8, BitOrderLsb0>,
backers: BitVec<u8, BitOrderLsb0>,
relay_parent_number: N,
backed_in_number: N,
backing_group: GroupIndex,
}
impl<H, N> CandidatePendingAvailability<H, N> {
pub(crate) fn availability_votes(&self) -> &BitVec<u8, BitOrderLsb0> {
&self.availability_votes
}
pub(crate) fn backed_in_number(&self) -> &N {
&self.backed_in_number
}
pub(crate) fn core_occupied(&self) -> CoreIndex {
self.core
}
pub(crate) fn candidate_hash(&self) -> CandidateHash {
self.hash
}
pub(crate) fn candidate_descriptor(&self) -> &CandidateDescriptor<H> {
&self.descriptor
}
pub(crate) fn relay_parent_number(&self) -> N
where
N: Clone,
{
self.relay_parent_number.clone()
}
#[cfg(any(feature = "runtime-benchmarks", test))]
pub(crate) fn new(
core: CoreIndex,
hash: CandidateHash,
descriptor: CandidateDescriptor<H>,
availability_votes: BitVec<u8, BitOrderLsb0>,
backers: BitVec<u8, BitOrderLsb0>,
relay_parent_number: N,
backed_in_number: N,
backing_group: GroupIndex,
) -> Self {
Self {
core,
hash,
descriptor,
availability_votes,
backers,
relay_parent_number,
backed_in_number,
backing_group,
}
}
}
pub trait RewardValidators {
fn reward_backing(validators: impl IntoIterator<Item = ValidatorIndex>);
fn reward_bitfields(validators: impl IntoIterator<Item = ValidatorIndex>);
}
#[derive(Encode, Decode, PartialEq, TypeInfo)]
#[cfg_attr(test, derive(Debug))]
pub(crate) struct ProcessedCandidates<H = Hash> {
pub(crate) core_indices: Vec<(CoreIndex, ParaId)>,
pub(crate) candidate_receipt_with_backing_validator_indices:
Vec<(CandidateReceipt<H>, Vec<(ValidatorIndex, ValidityAttestation)>)>,
}
impl<H> Default for ProcessedCandidates<H> {
fn default() -> Self {
Self {
core_indices: Vec::new(),
candidate_receipt_with_backing_validator_indices: Vec::new(),
}
}
}
pub fn minimum_backing_votes(n_validators: usize) -> usize {
sp_std::cmp::min(n_validators, 2)
}
pub trait QueueFootprinter {
type Origin;
fn message_count(origin: Self::Origin) -> u64;
}
impl QueueFootprinter for () {
type Origin = UmpQueueId;
fn message_count(_: Self::Origin) -> u64 {
0
}
}
#[derive(Encode, Decode, Clone, MaxEncodedLen, Eq, PartialEq, RuntimeDebug, TypeInfo)]
pub enum AggregateMessageOrigin {
#[codec(index = 0)]
Ump(UmpQueueId),
}
#[derive(Encode, Decode, Clone, MaxEncodedLen, Eq, PartialEq, RuntimeDebug, TypeInfo)]
pub enum UmpQueueId {
#[codec(index = 0)]
Para(ParaId),
}
#[cfg(feature = "runtime-benchmarks")]
impl From<u32> for AggregateMessageOrigin {
fn from(n: u32) -> Self {
Self::Ump(UmpQueueId::Para(n.into()))
}
}
pub type MaxUmpMessageLenOf<T> =
<<T as Config>::MessageQueue as EnqueueMessage<AggregateMessageOrigin>>::MaxMessageLen;
#[frame_support::pallet]
pub mod pallet {
use super::*;
#[pallet::pallet]
#[pallet::without_storage_info]
pub struct Pallet<T>(_);
#[pallet::config]
pub trait Config:
frame_system::Config
+ shared::Config
+ paras::Config
+ dmp::Config
+ hrmp::Config
+ configuration::Config
+ scheduler::Config
{
type RuntimeEvent: From<Event<Self>> + IsType<<Self as frame_system::Config>::RuntimeEvent>;
type DisputesHandler: disputes::DisputesHandler<BlockNumberFor<Self>>;
type RewardValidators: RewardValidators;
type MessageQueue: EnqueueMessage<AggregateMessageOrigin>;
type WeightInfo: WeightInfo;
}
#[pallet::event]
#[pallet::generate_deposit(pub(super) fn deposit_event)]
pub enum Event<T: Config> {
CandidateBacked(CandidateReceipt<T::Hash>, HeadData, CoreIndex, GroupIndex),
CandidateIncluded(CandidateReceipt<T::Hash>, HeadData, CoreIndex, GroupIndex),
CandidateTimedOut(CandidateReceipt<T::Hash>, HeadData, CoreIndex),
UpwardMessagesReceived { from: ParaId, count: u32 },
}
#[pallet::error]
pub enum Error<T> {
UnsortedOrDuplicateValidatorIndices,
UnsortedOrDuplicateDisputeStatementSet,
UnsortedOrDuplicateBackedCandidates,
UnexpectedRelayParent,
WrongBitfieldSize,
BitfieldAllZeros,
BitfieldDuplicateOrUnordered,
ValidatorIndexOutOfBounds,
InvalidBitfieldSignature,
UnscheduledCandidate,
CandidateScheduledBeforeParaFree,
ScheduledOutOfOrder,
HeadDataTooLarge,
PrematureCodeUpgrade,
NewCodeTooLarge,
DisallowedRelayParent,
InvalidAssignment,
InvalidGroupIndex,
InsufficientBacking,
InvalidBacking,
NotCollatorSigned,
ValidationDataHashMismatch,
IncorrectDownwardMessageHandling,
InvalidUpwardMessages,
HrmpWatermarkMishandling,
InvalidOutboundHrmp,
InvalidValidationCodeHash,
ParaHeadMismatch,
BitfieldReferencesFreedCore,
}
#[pallet::storage]
pub(crate) type AvailabilityBitfields<T: Config> =
StorageMap<_, Twox64Concat, ValidatorIndex, AvailabilityBitfieldRecord<BlockNumberFor<T>>>;
#[pallet::storage]
pub(crate) type PendingAvailability<T: Config> = StorageMap<
_,
Twox64Concat,
ParaId,
CandidatePendingAvailability<T::Hash, BlockNumberFor<T>>,
>;
#[pallet::storage]
pub(crate) type PendingAvailabilityCommitments<T: Config> =
StorageMap<_, Twox64Concat, ParaId, CandidateCommitments>;
#[pallet::call]
impl<T: Config> Pallet<T> {}
}
const LOG_TARGET: &str = "runtime::inclusion";
#[derive(derive_more::From)]
#[cfg_attr(feature = "std", derive(Debug))]
enum AcceptanceCheckErr<BlockNumber> {
HeadDataTooLarge,
PrematureCodeUpgrade,
NewCodeTooLarge,
ProcessedDownwardMessages(dmp::ProcessedDownwardMessagesAcceptanceErr),
UpwardMessages(UmpAcceptanceCheckErr),
HrmpWatermark(hrmp::HrmpWatermarkAcceptanceErr<BlockNumber>),
OutboundHrmp(hrmp::OutboundHrmpAcceptanceErr),
}
#[cfg_attr(test, derive(PartialEq))]
pub enum UmpAcceptanceCheckErr {
MoreMessagesThanPermitted { sent: u32, permitted: u32 },
MessageSize { idx: u32, msg_size: u32, max_size: u32 },
CapacityExceeded { count: u64, limit: u64 },
TotalSizeExceeded { total_size: u64, limit: u64 },
IsOffboarding,
}
#[cfg(feature = "std")]
impl fmt::Debug for UmpAcceptanceCheckErr {
fn fmt(&self, fmt: &mut fmt::Formatter) -> fmt::Result {
match *self {
UmpAcceptanceCheckErr::MoreMessagesThanPermitted { sent, permitted } => write!(
fmt,
"more upward messages than permitted by config ({} > {})",
sent, permitted,
),
UmpAcceptanceCheckErr::MessageSize { idx, msg_size, max_size } => write!(
fmt,
"upward message idx {} larger than permitted by config ({} > {})",
idx, msg_size, max_size,
),
UmpAcceptanceCheckErr::CapacityExceeded { count, limit } => write!(
fmt,
"the ump queue would have more items than permitted by config ({} > {})",
count, limit,
),
UmpAcceptanceCheckErr::TotalSizeExceeded { total_size, limit } => write!(
fmt,
"the ump queue would have grown past the max size permitted by config ({} > {})",
total_size, limit,
),
UmpAcceptanceCheckErr::IsOffboarding =>
write!(fmt, "upward message rejected because the para is off-boarding",),
}
}
}
impl<T: Config> Pallet<T> {
pub(crate) fn initializer_initialize(_now: BlockNumberFor<T>) -> Weight {
Weight::zero()
}
pub(crate) fn initializer_finalize() {}
pub(crate) fn initializer_on_new_session(
_notification: &crate::initializer::SessionChangeNotification<BlockNumberFor<T>>,
outgoing_paras: &[ParaId],
) {
for _ in <PendingAvailabilityCommitments<T>>::drain() {}
for _ in <PendingAvailability<T>>::drain() {}
for _ in <AvailabilityBitfields<T>>::drain() {}
Self::cleanup_outgoing_ump_dispatch_queues(outgoing_paras);
}
pub(crate) fn cleanup_outgoing_ump_dispatch_queues(outgoing: &[ParaId]) {
for outgoing_para in outgoing {
Self::cleanup_outgoing_ump_dispatch_queue(*outgoing_para);
}
}
pub(crate) fn cleanup_outgoing_ump_dispatch_queue(para: ParaId) {
T::MessageQueue::sweep_queue(AggregateMessageOrigin::Ump(UmpQueueId::Para(para)));
}
pub(crate) fn update_pending_availability_and_get_freed_cores<F>(
expected_bits: usize,
validators: &[ValidatorId],
signed_bitfields: SignedAvailabilityBitfields,
core_lookup: F,
) -> Vec<(CoreIndex, CandidateHash)>
where
F: Fn(CoreIndex) -> Option<ParaId>,
{
let mut assigned_paras_record = (0..expected_bits)
.map(|bit_index| core_lookup(CoreIndex::from(bit_index as u32)))
.map(|opt_para_id| {
opt_para_id.map(|para_id| (para_id, PendingAvailability::<T>::get(¶_id)))
})
.collect::<Vec<_>>();
let now = <frame_system::Pallet<T>>::block_number();
for (checked_bitfield, validator_index) in
signed_bitfields.into_iter().map(|signed_bitfield| {
let validator_idx = signed_bitfield.validator_index();
let checked_bitfield = signed_bitfield.into_payload();
(checked_bitfield, validator_idx)
}) {
for (bit_idx, _) in checked_bitfield.0.iter().enumerate().filter(|(_, is_av)| **is_av) {
let pending_availability = if let Some((_, pending_availability)) =
assigned_paras_record[bit_idx].as_mut()
{
pending_availability
} else {
continue
};
let validator_index = validator_index.0 as usize;
if let Some(mut bit) =
pending_availability.as_mut().and_then(|candidate_pending_availability| {
candidate_pending_availability.availability_votes.get_mut(validator_index)
}) {
*bit = true;
}
}
let record =
AvailabilityBitfieldRecord { bitfield: checked_bitfield, submitted_at: now };
<AvailabilityBitfields<T>>::insert(&validator_index, record);
}
let threshold = availability_threshold(validators.len());
let mut freed_cores = Vec::with_capacity(expected_bits);
for (para_id, pending_availability) in assigned_paras_record
.into_iter()
.flatten()
.filter_map(|(id, p)| p.map(|p| (id, p)))
{
if pending_availability.availability_votes.count_ones() >= threshold {
<PendingAvailability<T>>::remove(¶_id);
let commitments = match PendingAvailabilityCommitments::<T>::take(¶_id) {
Some(commitments) => commitments,
None => {
log::warn!(
target: LOG_TARGET,
"Inclusion::process_bitfields: PendingAvailability and PendingAvailabilityCommitments
are out of sync, did someone mess with the storage?",
);
continue
},
};
let receipt = CommittedCandidateReceipt {
descriptor: pending_availability.descriptor,
commitments,
};
let _weight = Self::enact_candidate(
pending_availability.relay_parent_number,
receipt,
pending_availability.backers,
pending_availability.availability_votes,
pending_availability.core,
pending_availability.backing_group,
);
freed_cores.push((pending_availability.core, pending_availability.hash));
} else {
<PendingAvailability<T>>::insert(¶_id, &pending_availability);
}
}
freed_cores
}
pub(crate) fn process_candidates<GV>(
allowed_relay_parents: &AllowedRelayParentsTracker<T::Hash, BlockNumberFor<T>>,
candidates: Vec<BackedCandidate<T::Hash>>,
scheduled: Vec<CoreAssignment<BlockNumberFor<T>>>,
group_validators: GV,
) -> Result<ProcessedCandidates<T::Hash>, DispatchError>
where
GV: Fn(GroupIndex) -> Option<Vec<ValidatorIndex>>,
{
let now = <frame_system::Pallet<T>>::block_number();
ensure!(candidates.len() <= scheduled.len(), Error::<T>::UnscheduledCandidate);
if scheduled.is_empty() {
return Ok(ProcessedCandidates::default())
}
let validators = shared::Pallet::<T>::active_validator_keys();
let mut candidate_receipt_with_backing_validator_indices =
Vec::with_capacity(candidates.len());
let core_indices_and_backers = {
let mut skip = 0;
let mut core_indices_and_backers = Vec::with_capacity(candidates.len());
let mut last_core = None;
let mut check_assignment_in_order =
|assignment: &CoreAssignment<BlockNumberFor<T>>| -> DispatchResult {
ensure!(
last_core.map_or(true, |core| assignment.core > core),
Error::<T>::ScheduledOutOfOrder,
);
last_core = Some(assignment.core);
Ok(())
};
'next_backed_candidate: for (candidate_idx, backed_candidate) in
candidates.iter().enumerate()
{
let relay_parent_hash = backed_candidate.descriptor().relay_parent;
let para_id = backed_candidate.descriptor().para_id;
let prev_context = <paras::Pallet<T>>::para_most_recent_context(para_id);
let check_ctx = CandidateCheckContext::<T>::new(prev_context);
let signing_context = SigningContext {
parent_hash: relay_parent_hash,
session_index: shared::Pallet::<T>::session_index(),
};
let relay_parent_number = match check_ctx.verify_backed_candidate(
&allowed_relay_parents,
candidate_idx,
backed_candidate,
)? {
Err(FailedToCreatePVD) => {
log::debug!(
target: LOG_TARGET,
"Failed to create PVD for candidate {}",
candidate_idx,
);
return Ok(ProcessedCandidates::default())
},
Ok(rpn) => rpn,
};
let para_id = backed_candidate.descriptor().para_id;
let mut backers = bitvec::bitvec![u8, BitOrderLsb0; 0; validators.len()];
for (i, core_assignment) in scheduled[skip..].iter().enumerate() {
check_assignment_in_order(core_assignment)?;
if para_id == core_assignment.paras_entry.para_id() {
ensure!(
<PendingAvailability<T>>::get(¶_id).is_none() &&
<PendingAvailabilityCommitments<T>>::get(¶_id).is_none(),
Error::<T>::CandidateScheduledBeforeParaFree,
);
skip = i + skip + 1;
let group_idx = <scheduler::Pallet<T>>::group_assigned_to_core(
core_assignment.core,
relay_parent_number + One::one(),
)
.ok_or_else(|| {
log::warn!(
target: LOG_TARGET,
"Failed to compute group index for candidate {}",
candidate_idx
);
Error::<T>::InvalidAssignment
})?;
let group_vals = group_validators(group_idx)
.ok_or_else(|| Error::<T>::InvalidGroupIndex)?;
{
let maybe_amount_validated = primitives::check_candidate_backing(
&backed_candidate,
&signing_context,
group_vals.len(),
|intra_group_vi| {
group_vals
.get(intra_group_vi)
.and_then(|vi| validators.get(vi.0 as usize))
.map(|v| v.clone())
},
);
match maybe_amount_validated {
Ok(amount_validated) => ensure!(
amount_validated >= minimum_backing_votes(group_vals.len()),
Error::<T>::InsufficientBacking,
),
Err(()) => {
Err(Error::<T>::InvalidBacking)?;
},
}
let mut backer_idx_and_attestation =
Vec::<(ValidatorIndex, ValidityAttestation)>::with_capacity(
backed_candidate.validator_indices.count_ones(),
);
let candidate_receipt = backed_candidate.receipt();
for ((bit_idx, _), attestation) in backed_candidate
.validator_indices
.iter()
.enumerate()
.filter(|(_, signed)| **signed)
.zip(backed_candidate.validity_votes.iter().cloned())
{
let val_idx = group_vals
.get(bit_idx)
.expect("this query succeeded above; qed");
backer_idx_and_attestation.push((*val_idx, attestation));
backers.set(val_idx.0 as _, true);
}
candidate_receipt_with_backing_validator_indices
.push((candidate_receipt, backer_idx_and_attestation));
}
core_indices_and_backers.push((
(core_assignment.core, core_assignment.paras_entry.para_id()),
backers,
group_idx,
relay_parent_number,
));
continue 'next_backed_candidate
}
}
ensure!(false, Error::<T>::UnscheduledCandidate);
}
for assignment in scheduled[skip..].iter() {
check_assignment_in_order(assignment)?;
}
core_indices_and_backers
};
let core_indices = core_indices_and_backers.iter().map(|(c, ..)| *c).collect();
for (candidate, (core, backers, group, relay_parent_number)) in
candidates.into_iter().zip(core_indices_and_backers)
{
let para_id = candidate.descriptor().para_id;
let availability_votes: BitVec<u8, BitOrderLsb0> =
bitvec::bitvec![u8, BitOrderLsb0; 0; validators.len()];
Self::deposit_event(Event::<T>::CandidateBacked(
candidate.candidate.to_plain(),
candidate.candidate.commitments.head_data.clone(),
core.0,
group,
));
let candidate_hash = candidate.candidate.hash();
let (descriptor, commitments) =
(candidate.candidate.descriptor, candidate.candidate.commitments);
<PendingAvailability<T>>::insert(
¶_id,
CandidatePendingAvailability {
core: core.0,
hash: candidate_hash,
descriptor,
availability_votes,
relay_parent_number,
backers: backers.to_bitvec(),
backed_in_number: now,
backing_group: group,
},
);
<PendingAvailabilityCommitments<T>>::insert(¶_id, commitments);
}
Ok(ProcessedCandidates::<T::Hash> {
core_indices,
candidate_receipt_with_backing_validator_indices,
})
}
pub(crate) fn check_validation_outputs_for_runtime_api(
para_id: ParaId,
relay_parent_number: BlockNumberFor<T>,
validation_outputs: primitives::CandidateCommitments,
) -> bool {
let prev_context = <paras::Pallet<T>>::para_most_recent_context(para_id);
let check_ctx = CandidateCheckContext::<T>::new(prev_context);
if check_ctx
.check_validation_outputs(
para_id,
relay_parent_number,
&validation_outputs.head_data,
&validation_outputs.new_validation_code,
validation_outputs.processed_downward_messages,
&validation_outputs.upward_messages,
BlockNumberFor::<T>::from(validation_outputs.hrmp_watermark),
&validation_outputs.horizontal_messages,
)
.is_err()
{
log::debug!(
target: LOG_TARGET,
"Validation outputs checking for parachain `{}` failed",
u32::from(para_id),
);
false
} else {
true
}
}
fn enact_candidate(
relay_parent_number: BlockNumberFor<T>,
receipt: CommittedCandidateReceipt<T::Hash>,
backers: BitVec<u8, BitOrderLsb0>,
availability_votes: BitVec<u8, BitOrderLsb0>,
core_index: CoreIndex,
backing_group: GroupIndex,
) -> Weight {
let plain = receipt.to_plain();
let commitments = receipt.commitments;
let config = <configuration::Pallet<T>>::config();
T::RewardValidators::reward_backing(
backers
.iter()
.enumerate()
.filter(|(_, backed)| **backed)
.map(|(i, _)| ValidatorIndex(i as _)),
);
T::RewardValidators::reward_bitfields(
availability_votes
.iter()
.enumerate()
.filter(|(_, voted)| **voted)
.map(|(i, _)| ValidatorIndex(i as _)),
);
let mut weight = T::DbWeight::get().reads_writes(1, 0);
if let Some(new_code) = commitments.new_validation_code {
let now = <frame_system::Pallet<T>>::block_number();
weight.saturating_add(<paras::Pallet<T>>::schedule_code_upgrade(
receipt.descriptor.para_id,
new_code,
now,
&config,
));
}
weight.saturating_accrue(<dmp::Pallet<T>>::prune_dmq(
receipt.descriptor.para_id,
commitments.processed_downward_messages,
));
weight.saturating_accrue(Self::receive_upward_messages(
receipt.descriptor.para_id,
commitments.upward_messages.as_slice(),
));
weight.saturating_accrue(<hrmp::Pallet<T>>::prune_hrmp(
receipt.descriptor.para_id,
BlockNumberFor::<T>::from(commitments.hrmp_watermark),
));
weight.saturating_accrue(<hrmp::Pallet<T>>::queue_outbound_hrmp(
receipt.descriptor.para_id,
commitments.horizontal_messages,
));
Self::deposit_event(Event::<T>::CandidateIncluded(
plain,
commitments.head_data.clone(),
core_index,
backing_group,
));
weight.saturating_add(<paras::Pallet<T>>::note_new_head(
receipt.descriptor.para_id,
commitments.head_data,
relay_parent_number,
))
}
pub(crate) fn relay_dispatch_queue_size(para_id: ParaId) -> (u32, u32) {
let fp = T::MessageQueue::footprint(AggregateMessageOrigin::Ump(UmpQueueId::Para(para_id)));
(fp.count as u32, fp.size as u32)
}
pub(crate) fn check_upward_messages(
config: &HostConfiguration<BlockNumberFor<T>>,
para: ParaId,
upward_messages: &[UpwardMessage],
) -> Result<(), UmpAcceptanceCheckErr> {
if <paras::Pallet<T>>::is_offboarding(para) {
ensure!(upward_messages.is_empty(), UmpAcceptanceCheckErr::IsOffboarding);
}
let additional_msgs = upward_messages.len() as u32;
if additional_msgs > config.max_upward_message_num_per_candidate {
return Err(UmpAcceptanceCheckErr::MoreMessagesThanPermitted {
sent: additional_msgs,
permitted: config.max_upward_message_num_per_candidate,
})
}
let (para_queue_count, mut para_queue_size) = Self::relay_dispatch_queue_size(para);
if para_queue_count.saturating_add(additional_msgs) > config.max_upward_queue_count {
return Err(UmpAcceptanceCheckErr::CapacityExceeded {
count: para_queue_count.saturating_add(additional_msgs).into(),
limit: config.max_upward_queue_count.into(),
})
}
for (idx, msg) in upward_messages.into_iter().enumerate() {
let msg_size = msg.len() as u32;
if msg_size > config.max_upward_message_size {
return Err(UmpAcceptanceCheckErr::MessageSize {
idx: idx as u32,
msg_size,
max_size: config.max_upward_message_size,
})
}
if para_queue_size.saturating_add(msg_size) > config.max_upward_queue_size {
return Err(UmpAcceptanceCheckErr::TotalSizeExceeded {
total_size: para_queue_size.saturating_add(msg_size).into(),
limit: config.max_upward_queue_size.into(),
})
}
para_queue_size.saturating_accrue(msg_size);
}
Ok(())
}
pub(crate) fn receive_upward_messages(para: ParaId, upward_messages: &[Vec<u8>]) -> Weight {
let bounded = upward_messages
.iter()
.filter_map(|d| {
BoundedSlice::try_from(&d[..])
.map_err(|e| {
defensive!("Accepted candidate contains too long msg, len=", d.len());
e
})
.ok()
})
.collect();
Self::receive_bounded_upward_messages(para, bounded)
}
pub(crate) fn receive_bounded_upward_messages(
para: ParaId,
messages: Vec<BoundedSlice<'_, u8, MaxUmpMessageLenOf<T>>>,
) -> Weight {
let count = messages.len() as u32;
if count == 0 {
return Weight::zero()
}
T::MessageQueue::enqueue_messages(
messages.into_iter(),
AggregateMessageOrigin::Ump(UmpQueueId::Para(para)),
);
let weight = <T as Config>::WeightInfo::receive_upward_messages(count);
Self::deposit_event(Event::UpwardMessagesReceived { from: para, count });
weight
}
pub(crate) fn collect_pending(
pred: impl Fn(CoreIndex, BlockNumberFor<T>) -> bool,
) -> Vec<CoreIndex> {
let mut cleaned_up_ids = Vec::new();
let mut cleaned_up_cores = Vec::new();
for (para_id, pending_record) in <PendingAvailability<T>>::iter() {
if pred(pending_record.core, pending_record.backed_in_number) {
cleaned_up_ids.push(para_id);
cleaned_up_cores.push(pending_record.core);
}
}
for para_id in cleaned_up_ids {
let pending = <PendingAvailability<T>>::take(¶_id);
let commitments = <PendingAvailabilityCommitments<T>>::take(¶_id);
if let (Some(pending), Some(commitments)) = (pending, commitments) {
let candidate = CandidateReceipt {
descriptor: pending.descriptor,
commitments_hash: commitments.hash(),
};
Self::deposit_event(Event::<T>::CandidateTimedOut(
candidate,
commitments.head_data,
pending.core,
));
}
}
cleaned_up_cores
}
pub(crate) fn collect_disputed(disputed: &BTreeSet<CandidateHash>) -> Vec<CoreIndex> {
let mut cleaned_up_ids = Vec::new();
let mut cleaned_up_cores = Vec::new();
for (para_id, pending_record) in <PendingAvailability<T>>::iter() {
if disputed.contains(&pending_record.hash) {
cleaned_up_ids.push(para_id);
cleaned_up_cores.push(pending_record.core);
}
}
for para_id in cleaned_up_ids {
let _ = <PendingAvailability<T>>::take(¶_id);
let _ = <PendingAvailabilityCommitments<T>>::take(¶_id);
}
cleaned_up_cores
}
pub(crate) fn force_enact(para: ParaId) {
let pending = <PendingAvailability<T>>::take(¶);
let commitments = <PendingAvailabilityCommitments<T>>::take(¶);
if let (Some(pending), Some(commitments)) = (pending, commitments) {
let candidate =
CommittedCandidateReceipt { descriptor: pending.descriptor, commitments };
Self::enact_candidate(
pending.relay_parent_number,
candidate,
pending.backers,
pending.availability_votes,
pending.core,
pending.backing_group,
);
}
}
pub(crate) fn candidate_pending_availability(
para: ParaId,
) -> Option<CommittedCandidateReceipt<T::Hash>> {
<PendingAvailability<T>>::get(¶)
.map(|p| p.descriptor)
.and_then(|d| <PendingAvailabilityCommitments<T>>::get(¶).map(move |c| (d, c)))
.map(|(d, c)| CommittedCandidateReceipt { descriptor: d, commitments: c })
}
pub(crate) fn pending_availability(
para: ParaId,
) -> Option<CandidatePendingAvailability<T::Hash, BlockNumberFor<T>>> {
<PendingAvailability<T>>::get(¶)
}
}
const fn availability_threshold(n_validators: usize) -> usize {
supermajority_threshold(n_validators)
}
impl<BlockNumber> AcceptanceCheckErr<BlockNumber> {
fn strip_into_dispatch_err<T: Config>(self) -> Error<T> {
use AcceptanceCheckErr::*;
match self {
HeadDataTooLarge => Error::<T>::HeadDataTooLarge,
PrematureCodeUpgrade => Error::<T>::PrematureCodeUpgrade,
NewCodeTooLarge => Error::<T>::NewCodeTooLarge,
ProcessedDownwardMessages(_) => Error::<T>::IncorrectDownwardMessageHandling,
UpwardMessages(_) => Error::<T>::InvalidUpwardMessages,
HrmpWatermark(_) => Error::<T>::HrmpWatermarkMishandling,
OutboundHrmp(_) => Error::<T>::InvalidOutboundHrmp,
}
}
}
impl<T: Config> OnQueueChanged<AggregateMessageOrigin> for Pallet<T> {
fn on_queue_changed(origin: AggregateMessageOrigin, count: u64, size: u64) {
let para = match origin {
AggregateMessageOrigin::Ump(UmpQueueId::Para(p)) => p,
};
let (count, size) = (count.saturated_into(), size.saturated_into());
#[allow(deprecated)]
well_known_keys::relay_dispatch_queue_size_typed(para).set((count, size));
let config = <configuration::Pallet<T>>::config();
let remaining_count = config.max_upward_queue_count.saturating_sub(count);
let remaining_size = config.max_upward_queue_size.saturating_sub(size);
well_known_keys::relay_dispatch_queue_remaining_capacity(para)
.set((remaining_count, remaining_size));
}
}
pub(crate) struct CandidateCheckContext<T: Config> {
config: configuration::HostConfiguration<BlockNumberFor<T>>,
prev_context: Option<BlockNumberFor<T>>,
}
pub(crate) struct FailedToCreatePVD;
impl<T: Config> CandidateCheckContext<T> {
pub(crate) fn new(prev_context: Option<BlockNumberFor<T>>) -> Self {
Self { config: <configuration::Pallet<T>>::config(), prev_context }
}
pub(crate) fn verify_backed_candidate(
&self,
allowed_relay_parents: &AllowedRelayParentsTracker<T::Hash, BlockNumberFor<T>>,
candidate_idx: usize,
backed_candidate: &BackedCandidate<<T as frame_system::Config>::Hash>,
) -> Result<Result<BlockNumberFor<T>, FailedToCreatePVD>, Error<T>> {
let para_id = backed_candidate.descriptor().para_id;
let relay_parent = backed_candidate.descriptor().relay_parent;
let (relay_parent_storage_root, relay_parent_number) = {
match allowed_relay_parents.acquire_info(relay_parent, self.prev_context) {
None => return Err(Error::<T>::DisallowedRelayParent),
Some(info) => info,
}
};
{
let persisted_validation_data = match crate::util::make_persisted_validation_data::<T>(
para_id,
relay_parent_number,
relay_parent_storage_root,
)
.defensive_proof("the para is registered")
{
Some(l) => l,
None => return Ok(Err(FailedToCreatePVD)),
};
let expected = persisted_validation_data.hash();
ensure!(
expected == backed_candidate.descriptor().persisted_validation_data_hash,
Error::<T>::ValidationDataHashMismatch,
);
}
ensure!(
backed_candidate.descriptor().check_collator_signature().is_ok(),
Error::<T>::NotCollatorSigned,
);
let validation_code_hash = <paras::Pallet<T>>::current_code_hash(para_id)
.ok_or_else(|| Error::<T>::UnscheduledCandidate)?;
ensure!(
backed_candidate.descriptor().validation_code_hash == validation_code_hash,
Error::<T>::InvalidValidationCodeHash,
);
ensure!(
backed_candidate.descriptor().para_head ==
backed_candidate.candidate.commitments.head_data.hash(),
Error::<T>::ParaHeadMismatch,
);
if let Err(err) = self.check_validation_outputs(
para_id,
relay_parent_number,
&backed_candidate.candidate.commitments.head_data,
&backed_candidate.candidate.commitments.new_validation_code,
backed_candidate.candidate.commitments.processed_downward_messages,
&backed_candidate.candidate.commitments.upward_messages,
BlockNumberFor::<T>::from(backed_candidate.candidate.commitments.hrmp_watermark),
&backed_candidate.candidate.commitments.horizontal_messages,
) {
log::debug!(
target: LOG_TARGET,
"Validation outputs checking during inclusion of a candidate {} for parachain `{}` failed",
candidate_idx,
u32::from(para_id),
);
Err(err.strip_into_dispatch_err::<T>())?;
};
Ok(Ok(relay_parent_number))
}
fn check_validation_outputs(
&self,
para_id: ParaId,
relay_parent_number: BlockNumberFor<T>,
head_data: &HeadData,
new_validation_code: &Option<primitives::ValidationCode>,
processed_downward_messages: u32,
upward_messages: &[primitives::UpwardMessage],
hrmp_watermark: BlockNumberFor<T>,
horizontal_messages: &[primitives::OutboundHrmpMessage<ParaId>],
) -> Result<(), AcceptanceCheckErr<BlockNumberFor<T>>> {
ensure!(
head_data.0.len() <= self.config.max_head_data_size as _,
AcceptanceCheckErr::HeadDataTooLarge,
);
if let Some(new_validation_code) = new_validation_code {
ensure!(
<paras::Pallet<T>>::can_upgrade_validation_code(para_id),
AcceptanceCheckErr::PrematureCodeUpgrade,
);
ensure!(
new_validation_code.0.len() <= self.config.max_code_size as _,
AcceptanceCheckErr::NewCodeTooLarge,
);
}
<dmp::Pallet<T>>::check_processed_downward_messages(
para_id,
relay_parent_number,
processed_downward_messages,
)?;
Pallet::<T>::check_upward_messages(&self.config, para_id, upward_messages)?;
<hrmp::Pallet<T>>::check_hrmp_watermark(para_id, relay_parent_number, hrmp_watermark)?;
<hrmp::Pallet<T>>::check_outbound_hrmp(&self.config, para_id, horizontal_messages)?;
Ok(())
}
}
impl<T: Config> QueueFootprinter for Pallet<T> {
type Origin = UmpQueueId;
fn message_count(origin: Self::Origin) -> u64 {
T::MessageQueue::footprint(AggregateMessageOrigin::Ump(origin)).count
}
}