1use crate::{
23 configuration::{self, HostConfiguration},
24 disputes, dmp, hrmp,
25 paras::{self, UpgradeStrategy},
26 scheduler,
27 shared::{self, AllowedSchedulingParentsTracker},
28 util::make_persisted_validation_data_with_parent,
29};
30use alloc::{
31 collections::{btree_map::BTreeMap, btree_set::BTreeSet, vec_deque::VecDeque},
32 vec,
33 vec::Vec,
34};
35use bitvec::{order::Lsb0 as BitOrderLsb0, vec::BitVec};
36use codec::{Decode, DecodeWithMemTracking, Encode};
37use core::fmt;
38use frame_support::{
39 defensive,
40 pallet_prelude::*,
41 traits::{EnqueueMessage, Footprint, QueueFootprint, QueueFootprintQuery},
42 BoundedSlice,
43};
44use frame_system::pallet_prelude::*;
45use pallet_message_queue::OnQueueChanged;
46use polkadot_primitives::{
47 effective_minimum_backing_votes, skip_ump_signals, supermajority_threshold, well_known_keys,
48 BackedCandidate, CandidateCommitments, CandidateDescriptorV2 as CandidateDescriptor,
49 CandidateHash, CandidateReceiptV2 as CandidateReceipt,
50 CommittedCandidateReceiptV2 as CommittedCandidateReceipt, CoreIndex, GroupIndex, HeadData,
51 Id as ParaId, SignedAvailabilityBitfields, SigningContext, UpwardMessage, ValidatorId,
52 ValidatorIndex, ValidityAttestation,
53};
54use scale_info::TypeInfo;
55use sp_runtime::{traits::One, DispatchError, SaturatedConversion, Saturating};
56
57pub use pallet::*;
58
59#[cfg(test)]
60pub(crate) mod tests;
61
62#[cfg(feature = "runtime-benchmarks")]
63mod benchmarking;
64
65pub mod migration;
66
67pub trait WeightInfo {
68 fn enact_candidate(u: u32, h: u32, c: u32) -> Weight;
74}
75
76pub struct TestWeightInfo;
77impl WeightInfo for TestWeightInfo {
78 fn enact_candidate(_u: u32, _h: u32, _c: u32) -> Weight {
79 Weight::zero()
80 }
81}
82
83impl WeightInfo for () {
84 fn enact_candidate(_u: u32, _h: u32, _c: u32) -> Weight {
85 Weight::zero()
86 }
87}
88
89pub const MAX_UPWARD_MESSAGE_SIZE_BOUND: u32 = 128 * 1024;
94
95#[derive(Encode, Decode, PartialEq, TypeInfo, Clone)]
97#[cfg_attr(test, derive(Debug))]
98pub struct CandidatePendingAvailability<H, N> {
99 core: CoreIndex,
101 hash: CandidateHash,
103 descriptor: CandidateDescriptor<H>,
105 commitments: CandidateCommitments,
107 availability_votes: BitVec<u8, BitOrderLsb0>,
109 backers: BitVec<u8, BitOrderLsb0>,
111 relay_parent_number: N,
113 backed_in_number: N,
115 backing_group: GroupIndex,
117}
118
119impl<H, N> CandidatePendingAvailability<H, N> {
120 pub(crate) fn availability_votes(&self) -> &BitVec<u8, BitOrderLsb0> {
122 &self.availability_votes
123 }
124
125 pub(crate) fn backed_in_number(&self) -> N
127 where
128 N: Clone,
129 {
130 self.backed_in_number.clone()
131 }
132
133 pub(crate) fn core_occupied(&self) -> CoreIndex {
135 self.core
136 }
137
138 pub(crate) fn candidate_hash(&self) -> CandidateHash {
140 self.hash
141 }
142
143 pub(crate) fn candidate_descriptor(&self) -> &CandidateDescriptor<H> {
145 &self.descriptor
146 }
147
148 pub(crate) fn candidate_commitments(&self) -> &CandidateCommitments {
150 &self.commitments
151 }
152
153 pub(crate) fn relay_parent_number(&self) -> N
155 where
156 N: Clone,
157 {
158 self.relay_parent_number.clone()
159 }
160
161 #[cfg(any(feature = "runtime-benchmarks", test))]
162 pub(crate) fn new(
163 core: CoreIndex,
164 hash: CandidateHash,
165 descriptor: CandidateDescriptor<H>,
166 commitments: CandidateCommitments,
167 availability_votes: BitVec<u8, BitOrderLsb0>,
168 backers: BitVec<u8, BitOrderLsb0>,
169 relay_parent_number: N,
170 backed_in_number: N,
171 backing_group: GroupIndex,
172 ) -> Self {
173 Self {
174 core,
175 hash,
176 descriptor,
177 commitments,
178 availability_votes,
179 backers,
180 relay_parent_number,
181 backed_in_number,
182 backing_group,
183 }
184 }
185}
186
187pub trait RewardValidators {
189 fn reward_backing(validators: impl IntoIterator<Item = ValidatorIndex>);
191 fn reward_bitfields(validators: impl IntoIterator<Item = ValidatorIndex>);
195}
196
197impl RewardValidators for () {
198 fn reward_backing(_: impl IntoIterator<Item = ValidatorIndex>) {}
199 fn reward_bitfields(_: impl IntoIterator<Item = ValidatorIndex>) {}
200}
201
202pub trait QueueFootprinter {
204 type Origin;
205
206 fn message_count(origin: Self::Origin) -> u64;
207}
208
209impl QueueFootprinter for () {
210 type Origin = UmpQueueId;
211
212 fn message_count(_: Self::Origin) -> u64 {
213 0
214 }
215}
216
217#[derive(
222 Encode, Decode, DecodeWithMemTracking, Clone, MaxEncodedLen, Eq, PartialEq, Debug, TypeInfo,
223)]
224pub enum AggregateMessageOrigin {
225 #[codec(index = 0)]
227 Ump(UmpQueueId),
228}
229
230#[derive(
235 Encode, Decode, DecodeWithMemTracking, Clone, MaxEncodedLen, Eq, PartialEq, Debug, TypeInfo,
236)]
237pub enum UmpQueueId {
238 #[codec(index = 0)]
240 Para(ParaId),
241}
242
243#[cfg(feature = "runtime-benchmarks")]
244impl From<u32> for AggregateMessageOrigin {
245 fn from(n: u32) -> Self {
246 Self::Ump(UmpQueueId::Para(n.into()))
248 }
249}
250
251pub type MaxUmpMessageLenOf<T> =
253 <<T as Config>::MessageQueue as EnqueueMessage<AggregateMessageOrigin>>::MaxMessageLen;
254
255#[frame_support::pallet]
256pub mod pallet {
257 use super::*;
258
259 const STORAGE_VERSION: StorageVersion = StorageVersion::new(1);
260 #[pallet::pallet]
261 #[pallet::without_storage_info]
262 #[pallet::storage_version(STORAGE_VERSION)]
263 pub struct Pallet<T>(_);
264
265 #[pallet::config]
266 pub trait Config:
267 frame_system::Config
268 + shared::Config
269 + paras::Config
270 + dmp::Config
271 + hrmp::Config
272 + configuration::Config
273 + scheduler::Config
274 {
275 #[allow(deprecated)]
276 type RuntimeEvent: From<Event<Self>> + IsType<<Self as frame_system::Config>::RuntimeEvent>;
277 type DisputesHandler: disputes::DisputesHandler<BlockNumberFor<Self>>;
278 type RewardValidators: RewardValidators;
279
280 type MessageQueue: EnqueueMessage<AggregateMessageOrigin>
286 + QueueFootprintQuery<AggregateMessageOrigin, MaxMessageLen = MaxUmpMessageLenOf<Self>>;
287
288 type WeightInfo: WeightInfo;
290 }
291
292 #[pallet::event]
293 #[pallet::generate_deposit(pub(super) fn deposit_event)]
294 pub enum Event<T: Config> {
295 CandidateBacked(CandidateReceipt<T::Hash>, HeadData, CoreIndex, GroupIndex),
297 CandidateIncluded(CandidateReceipt<T::Hash>, HeadData, CoreIndex, GroupIndex),
299 CandidateTimedOut(CandidateReceipt<T::Hash>, HeadData, CoreIndex),
301 UpwardMessagesReceived { from: ParaId, count: u32 },
303 }
304
305 #[pallet::error]
306 pub enum Error<T> {
307 ValidatorIndexOutOfBounds,
309 UnscheduledCandidate,
311 HeadDataTooLarge,
313 PrematureCodeUpgrade,
315 NewCodeTooLarge,
317 DisallowedRelayParent,
320 DisallowedSchedulingParent,
322 InvalidAssignment,
325 InvalidGroupIndex,
327 InsufficientBacking,
329 InvalidBacking,
331 ValidationDataHashMismatch,
333 IncorrectDownwardMessageHandling,
335 InvalidUpwardMessages,
337 HrmpWatermarkMishandling,
339 InvalidOutboundHrmp,
341 InvalidValidationCodeHash,
343 ParaHeadMismatch,
346 }
347
348 #[pallet::storage]
354 #[pallet::storage_prefix = "V1"]
355 pub(crate) type PendingAvailability<T: Config> = StorageMap<
356 _,
357 Twox64Concat,
358 ParaId,
359 VecDeque<CandidatePendingAvailability<T::Hash, BlockNumberFor<T>>>,
360 >;
361
362 #[pallet::call]
363 impl<T: Config> Pallet<T> {}
364}
365
366const LOG_TARGET: &str = "runtime::inclusion";
367
368#[derive(Debug)]
370enum AcceptanceCheckErr {
371 HeadDataTooLarge,
372 PrematureCodeUpgrade,
374 NewCodeTooLarge,
376 ProcessedDownwardMessages,
378 UpwardMessages,
380 HrmpWatermark,
382 OutboundHrmp,
384}
385
386impl From<dmp::ProcessedDownwardMessagesAcceptanceErr> for AcceptanceCheckErr {
387 fn from(_: dmp::ProcessedDownwardMessagesAcceptanceErr) -> Self {
388 Self::ProcessedDownwardMessages
389 }
390}
391
392impl From<UmpAcceptanceCheckErr> for AcceptanceCheckErr {
393 fn from(_: UmpAcceptanceCheckErr) -> Self {
394 Self::UpwardMessages
395 }
396}
397
398impl<BlockNumber> From<hrmp::HrmpWatermarkAcceptanceErr<BlockNumber>> for AcceptanceCheckErr {
399 fn from(_: hrmp::HrmpWatermarkAcceptanceErr<BlockNumber>) -> Self {
400 Self::HrmpWatermark
401 }
402}
403
404impl From<hrmp::OutboundHrmpAcceptanceErr> for AcceptanceCheckErr {
405 fn from(_: hrmp::OutboundHrmpAcceptanceErr) -> Self {
406 Self::OutboundHrmp
407 }
408}
409
410#[cfg_attr(test, derive(PartialEq))]
413#[allow(dead_code)]
414pub(crate) enum UmpAcceptanceCheckErr {
415 MoreMessagesThanPermitted { sent: u32, permitted: u32 },
417 MessageSize { idx: u32, msg_size: u32, max_size: u32 },
419 CapacityExceeded { count: u64, limit: u64 },
421 TotalSizeExceeded { total_size: u64, limit: u64 },
423 IsOffboarding,
425}
426
427impl fmt::Debug for UmpAcceptanceCheckErr {
428 fn fmt(&self, fmt: &mut fmt::Formatter) -> fmt::Result {
429 match *self {
430 UmpAcceptanceCheckErr::MoreMessagesThanPermitted { sent, permitted } => write!(
431 fmt,
432 "more upward messages than permitted by config ({} > {})",
433 sent, permitted,
434 ),
435 UmpAcceptanceCheckErr::MessageSize { idx, msg_size, max_size } => write!(
436 fmt,
437 "upward message idx {} larger than permitted by config ({} > {})",
438 idx, msg_size, max_size,
439 ),
440 UmpAcceptanceCheckErr::CapacityExceeded { count, limit } => write!(
441 fmt,
442 "the ump queue would have more items than permitted by config ({} > {})",
443 count, limit,
444 ),
445 UmpAcceptanceCheckErr::TotalSizeExceeded { total_size, limit } => write!(
446 fmt,
447 "the ump queue would have grown past the max size permitted by config ({} > {})",
448 total_size, limit,
449 ),
450 UmpAcceptanceCheckErr::IsOffboarding => {
451 write!(fmt, "upward message rejected because the para is off-boarding")
452 },
453 }
454 }
455}
456
457impl<T: Config> Pallet<T> {
458 pub(crate) fn initializer_initialize(_now: BlockNumberFor<T>) -> Weight {
460 Weight::zero()
461 }
462
463 pub(crate) fn initializer_finalize() {}
465
466 pub(crate) fn initializer_on_new_session(
468 _notification: &crate::initializer::SessionChangeNotification<BlockNumberFor<T>>,
469 outgoing_paras: &[ParaId],
470 ) {
471 for _ in PendingAvailability::<T>::drain() {}
474
475 Self::cleanup_outgoing_ump_dispatch_queues(outgoing_paras);
476 }
477
478 pub(crate) fn cleanup_outgoing_ump_dispatch_queues(outgoing: &[ParaId]) {
479 for outgoing_para in outgoing {
480 Self::cleanup_outgoing_ump_dispatch_queue(*outgoing_para);
481 }
482 }
483
484 pub(crate) fn cleanup_outgoing_ump_dispatch_queue(para: ParaId) {
485 T::MessageQueue::sweep_queue(AggregateMessageOrigin::Ump(UmpQueueId::Para(para)));
486 }
487
488 pub(crate) fn get_occupied_cores(
489 ) -> impl Iterator<Item = (CoreIndex, CandidatePendingAvailability<T::Hash, BlockNumberFor<T>>)>
490 {
491 PendingAvailability::<T>::iter_values().flat_map(|pending_candidates| {
492 pending_candidates.into_iter().map(|c| (c.core, c.clone()))
493 })
494 }
495
496 pub(crate) fn update_pending_availability_and_get_freed_cores(
505 validators: &[ValidatorId],
506 signed_bitfields: SignedAvailabilityBitfields,
507 ) -> (Weight, Vec<(CoreIndex, CandidateHash)>) {
508 let threshold = availability_threshold(validators.len());
509
510 let mut votes_per_core: BTreeMap<CoreIndex, BTreeSet<ValidatorIndex>> = BTreeMap::new();
511
512 for (checked_bitfield, validator_index) in
513 signed_bitfields.into_iter().map(|signed_bitfield| {
514 let validator_idx = signed_bitfield.validator_index();
515 let checked_bitfield = signed_bitfield.into_payload();
516 (checked_bitfield, validator_idx)
517 }) {
518 for (bit_idx, _) in checked_bitfield.0.iter().enumerate().filter(|(_, is_av)| **is_av) {
519 let core_index = CoreIndex(bit_idx as u32);
520 votes_per_core
521 .entry(core_index)
522 .or_insert_with(|| BTreeSet::new())
523 .insert(validator_index);
524 }
525 }
526
527 let mut freed_cores = vec![];
528 let mut weight = Weight::zero();
529
530 let pending_paraids: Vec<_> = PendingAvailability::<T>::iter_keys().collect();
531 for paraid in pending_paraids {
532 PendingAvailability::<T>::mutate(paraid, |candidates| {
533 if let Some(candidates) = candidates {
534 let mut last_enacted_index: Option<usize> = None;
535
536 for (candidate_index, candidate) in candidates.iter_mut().enumerate() {
537 if let Some(validator_indices) = votes_per_core.remove(&candidate.core) {
538 for validator_index in validator_indices.iter() {
539 if let Some(mut bit) =
543 candidate.availability_votes.get_mut(validator_index.0 as usize)
544 {
545 *bit = true;
546 }
547 }
548 }
549
550 if candidate.availability_votes.count_ones() >= threshold {
555 let can_enact = if candidate_index == 0 {
558 last_enacted_index == None
559 } else {
560 let prev_candidate_index = usize::try_from(candidate_index - 1)
561 .expect("Previous `if` would have caught a 0 candidate index.");
562 matches!(last_enacted_index, Some(old_index) if old_index == prev_candidate_index)
563 };
564
565 if can_enact {
566 last_enacted_index = Some(candidate_index);
567 }
568 }
569 }
570
571 if let Some(last_enacted_index) = last_enacted_index {
574 let evicted_candidates = candidates.drain(0..=last_enacted_index);
575 for candidate in evicted_candidates {
576 freed_cores.push((candidate.core, candidate.hash));
577
578 let receipt = CommittedCandidateReceipt {
579 descriptor: candidate.descriptor,
580 commitments: candidate.commitments,
581 };
582
583 let has_runtime_upgrade =
584 receipt.commitments.new_validation_code.as_ref().map_or(0, |_| 1);
585 let u = receipt.commitments.upward_messages.len() as u32;
586 let h = receipt.commitments.horizontal_messages.len() as u32;
587 let enact_weight = <T as Config>::WeightInfo::enact_candidate(
588 u,
589 h,
590 has_runtime_upgrade,
591 );
592 Self::enact_candidate(
593 candidate.relay_parent_number,
594 receipt,
595 candidate.backers,
596 candidate.availability_votes,
597 candidate.core,
598 candidate.backing_group,
599 );
600 weight.saturating_accrue(enact_weight);
601 }
602 }
603 }
604 });
605 }
606 (weight.set_proof_size(0), freed_cores)
612 }
613
614 pub(crate) fn process_candidates<GV>(
627 allowed_scheduling_parents: &AllowedSchedulingParentsTracker<T::Hash, BlockNumberFor<T>>,
628 candidates: &BTreeMap<ParaId, Vec<(BackedCandidate<T::Hash>, CoreIndex)>>,
629 group_validators: GV,
630 ) -> Result<
631 Vec<(CandidateReceipt<T::Hash>, Vec<(ValidatorIndex, ValidityAttestation)>)>,
632 DispatchError,
633 >
634 where
635 GV: Fn(GroupIndex) -> Option<Vec<ValidatorIndex>>,
636 {
637 if candidates.is_empty() {
638 return Ok(Default::default());
639 }
640
641 let now = frame_system::Pallet::<T>::block_number();
642 let validators = shared::ActiveValidatorKeys::<T>::get();
643
644 let mut candidate_receipt_with_backing_validator_indices =
646 Vec::with_capacity(candidates.len());
647
648 for (para_id, para_candidates) in candidates {
649 let mut latest_head_data = match Self::para_latest_head_data(para_id) {
650 None => {
651 defensive!("Latest included head data for paraid {:?} is None", para_id);
652 continue;
653 },
654 Some(latest_head_data) => latest_head_data,
655 };
656
657 for (candidate, core) in para_candidates.iter() {
658 let candidate_hash = candidate.candidate().hash();
659
660 let check_ctx = CandidateCheckContext::<T>::new(None);
663 let relay_parent_number = check_ctx
664 .verify_backed_candidate(candidate.candidate(), latest_head_data.clone())?;
665
666 let scheduling_parent = candidate.descriptor().scheduling_parent();
667
668 let (_, scheduling_parent_number) = allowed_scheduling_parents
669 .acquire_info(scheduling_parent)
670 .ok_or(Error::<T>::DisallowedSchedulingParent)?;
671
672 let group_idx = scheduler::Pallet::<T>::group_assigned_to_core(
677 *core,
678 scheduling_parent_number + One::one(),
679 )
680 .ok_or_else(|| {
681 log::warn!(
682 target: LOG_TARGET,
683 "Failed to compute group index for candidate {:?}",
684 candidate_hash
685 );
686 Error::<T>::InvalidAssignment
687 })?;
688 let group_vals =
689 group_validators(group_idx).ok_or_else(|| Error::<T>::InvalidGroupIndex)?;
690
691 let (backers, backer_idx_and_attestation) =
693 Self::check_backing_votes(candidate, &validators, group_vals)?;
694
695 latest_head_data = candidate.candidate().commitments.head_data.clone();
697 candidate_receipt_with_backing_validator_indices
698 .push((candidate.receipt(), backer_idx_and_attestation));
699
700 PendingAvailability::<T>::mutate(¶_id, |pending_availability| {
702 let new_candidate = CandidatePendingAvailability {
703 core: *core,
704 hash: candidate_hash,
705 descriptor: candidate.candidate().descriptor.clone(),
706 commitments: candidate.candidate().commitments.clone(),
707 availability_votes: bitvec::bitvec![u8, BitOrderLsb0; 0; validators.len()],
709 relay_parent_number,
710 backers: backers.to_bitvec(),
711 backed_in_number: now,
712 backing_group: group_idx,
713 };
714
715 if let Some(pending_availability) = pending_availability {
716 pending_availability.push_back(new_candidate);
717 } else {
718 *pending_availability =
719 Some([new_candidate].into_iter().collect::<VecDeque<_>>())
720 }
721 });
722
723 Self::deposit_event(Event::<T>::CandidateBacked(
725 candidate.candidate().to_plain(),
726 candidate.candidate().commitments.head_data.clone(),
727 *core,
728 group_idx,
729 ));
730 }
731 }
732
733 Ok(candidate_receipt_with_backing_validator_indices)
734 }
735
736 pub(crate) fn para_latest_head_data(para_id: &ParaId) -> Option<HeadData> {
738 match PendingAvailability::<T>::get(para_id).and_then(|pending_candidates| {
739 pending_candidates.back().map(|x| x.commitments.head_data.clone())
740 }) {
741 Some(head_data) => Some(head_data),
742 None => paras::Heads::<T>::get(para_id),
743 }
744 }
745
746 pub(crate) fn para_most_recent_context(para_id: &ParaId) -> Option<BlockNumberFor<T>> {
748 match PendingAvailability::<T>::get(para_id)
749 .and_then(|pending_candidates| pending_candidates.back().map(|x| x.relay_parent_number))
750 {
751 Some(relay_parent_number) => Some(relay_parent_number),
752 None => paras::MostRecentContext::<T>::get(para_id),
753 }
754 }
755
756 fn check_backing_votes(
757 backed_candidate: &BackedCandidate<T::Hash>,
758 validators: &[ValidatorId],
759 group_vals: Vec<ValidatorIndex>,
760 ) -> Result<(BitVec<u8, BitOrderLsb0>, Vec<(ValidatorIndex, ValidityAttestation)>), Error<T>> {
761 let minimum_backing_votes = configuration::ActiveConfig::<T>::get().minimum_backing_votes;
762
763 let mut backers = bitvec::bitvec![u8, BitOrderLsb0; 0; validators.len()];
764 let signing_context = SigningContext {
765 parent_hash: backed_candidate.candidate().descriptor.scheduling_parent(),
766 session_index: shared::CurrentSessionIndex::<T>::get(),
767 };
768
769 let (validator_indices, _) = backed_candidate.validator_indices_and_core_index();
770
771 let maybe_amount_validated = polkadot_primitives::check_candidate_backing(
773 backed_candidate.candidate().hash(),
774 backed_candidate.validity_votes(),
775 validator_indices,
776 &signing_context,
777 group_vals.len(),
778 |intra_group_vi| {
779 group_vals
780 .get(intra_group_vi)
781 .and_then(|vi| validators.get(vi.0 as usize))
782 .map(|v| v.clone())
783 },
784 );
785
786 match maybe_amount_validated {
787 Ok(amount_validated) => ensure!(
788 amount_validated >=
789 effective_minimum_backing_votes(group_vals.len(), minimum_backing_votes),
790 Error::<T>::InsufficientBacking,
791 ),
792 Err(()) => {
793 Err(Error::<T>::InvalidBacking)?;
794 },
795 }
796
797 let mut backer_idx_and_attestation =
798 Vec::<(ValidatorIndex, ValidityAttestation)>::with_capacity(
799 validator_indices.count_ones(),
800 );
801
802 for ((bit_idx, _), attestation) in validator_indices
803 .iter()
804 .enumerate()
805 .filter(|(_, signed)| **signed)
806 .zip(backed_candidate.validity_votes().iter().cloned())
807 {
808 let val_idx = group_vals.get(bit_idx).expect("this query succeeded above; qed");
809 backer_idx_and_attestation.push((*val_idx, attestation));
810
811 backers.set(val_idx.0 as _, true);
812 }
813
814 Ok((backers, backer_idx_and_attestation))
815 }
816
817 pub(crate) fn check_validation_outputs_for_runtime_api(
819 para_id: ParaId,
820 relay_parent_number: BlockNumberFor<T>,
821 validation_outputs: polkadot_primitives::CandidateCommitments,
822 ) -> bool {
823 let prev_context = Self::para_most_recent_context(¶_id);
824 let check_ctx = CandidateCheckContext::<T>::new(prev_context);
825
826 if let Err(err) = check_ctx.check_validation_outputs(
827 para_id,
828 relay_parent_number,
829 &validation_outputs.head_data,
830 &validation_outputs.new_validation_code,
831 validation_outputs.processed_downward_messages,
832 &validation_outputs.upward_messages,
833 BlockNumberFor::<T>::from(validation_outputs.hrmp_watermark),
834 &validation_outputs.horizontal_messages,
835 ) {
836 log::debug!(
837 target: LOG_TARGET,
838 "Validation outputs checking for parachain `{}` failed, error: {:?}",
839 u32::from(para_id), err
840 );
841 false
842 } else {
843 true
844 }
845 }
846
847 fn enact_candidate(
848 relay_parent_number: BlockNumberFor<T>,
849 receipt: CommittedCandidateReceipt<T::Hash>,
850 backers: BitVec<u8, BitOrderLsb0>,
851 availability_votes: BitVec<u8, BitOrderLsb0>,
852 core_index: CoreIndex,
853 backing_group: GroupIndex,
854 ) {
855 let plain = receipt.to_plain();
856 let commitments = receipt.commitments;
857 let config = configuration::ActiveConfig::<T>::get();
858
859 T::RewardValidators::reward_backing(
860 backers
861 .iter()
862 .enumerate()
863 .filter(|(_, backed)| **backed)
864 .map(|(i, _)| ValidatorIndex(i as _)),
865 );
866
867 T::RewardValidators::reward_bitfields(
868 availability_votes
869 .iter()
870 .enumerate()
871 .filter(|(_, voted)| **voted)
872 .map(|(i, _)| ValidatorIndex(i as _)),
873 );
874
875 if let Some(new_code) = commitments.new_validation_code {
876 let now = frame_system::Pallet::<T>::block_number();
878
879 paras::Pallet::<T>::schedule_code_upgrade(
880 receipt.descriptor.para_id(),
881 new_code,
882 now,
883 &config,
884 UpgradeStrategy::SetGoAheadSignal,
885 );
886 }
887
888 dmp::Pallet::<T>::prune_dmq(
890 receipt.descriptor.para_id(),
891 commitments.processed_downward_messages,
892 );
893 Self::receive_upward_messages(
894 receipt.descriptor.para_id(),
895 commitments.upward_messages.as_slice(),
896 );
897 hrmp::Pallet::<T>::prune_hrmp(
898 receipt.descriptor.para_id(),
899 BlockNumberFor::<T>::from(commitments.hrmp_watermark),
900 );
901 hrmp::Pallet::<T>::queue_outbound_hrmp(
902 receipt.descriptor.para_id(),
903 commitments.horizontal_messages,
904 );
905
906 Self::deposit_event(Event::<T>::CandidateIncluded(
907 plain,
908 commitments.head_data.clone(),
909 core_index,
910 backing_group,
911 ));
912
913 paras::Pallet::<T>::note_new_head(
914 receipt.descriptor.para_id(),
915 commitments.head_data,
916 relay_parent_number,
917 );
918 }
919
920 pub(crate) fn relay_dispatch_queue_size(para_id: ParaId) -> (u32, u32) {
921 let fp = T::MessageQueue::footprint(AggregateMessageOrigin::Ump(UmpQueueId::Para(para_id)));
922 (fp.storage.count as u32, fp.storage.size as u32)
923 }
924
925 pub(crate) fn check_upward_messages(
927 config: &HostConfiguration<BlockNumberFor<T>>,
928 para: ParaId,
929 upward_messages: &[UpwardMessage],
930 ) -> Result<(), UmpAcceptanceCheckErr> {
931 let upward_messages = skip_ump_signals(upward_messages.iter()).collect::<Vec<_>>();
933
934 if paras::Pallet::<T>::is_offboarding(para) {
936 ensure!(upward_messages.is_empty(), UmpAcceptanceCheckErr::IsOffboarding);
937 }
938
939 let additional_msgs = upward_messages.len() as u32;
940 if additional_msgs > config.max_upward_message_num_per_candidate {
941 return Err(UmpAcceptanceCheckErr::MoreMessagesThanPermitted {
942 sent: additional_msgs,
943 permitted: config.max_upward_message_num_per_candidate,
944 });
945 }
946
947 let (para_queue_count, mut para_queue_size) = Self::relay_dispatch_queue_size(para);
948
949 if para_queue_count.saturating_add(additional_msgs) > config.max_upward_queue_count {
950 return Err(UmpAcceptanceCheckErr::CapacityExceeded {
951 count: para_queue_count.saturating_add(additional_msgs).into(),
952 limit: config.max_upward_queue_count.into(),
953 });
954 }
955
956 for (idx, msg) in upward_messages.into_iter().enumerate() {
957 let msg_size = msg.len() as u32;
958 if msg_size > config.max_upward_message_size {
959 return Err(UmpAcceptanceCheckErr::MessageSize {
960 idx: idx as u32,
961 msg_size,
962 max_size: config.max_upward_message_size,
963 });
964 }
965 if para_queue_size.saturating_add(msg_size) > config.max_upward_queue_size {
969 return Err(UmpAcceptanceCheckErr::TotalSizeExceeded {
970 total_size: para_queue_size.saturating_add(msg_size).into(),
971 limit: config.max_upward_queue_size.into(),
972 });
973 }
974 para_queue_size.saturating_accrue(msg_size);
975 }
976
977 Ok(())
978 }
979
980 pub(crate) fn receive_upward_messages(para: ParaId, upward_messages: &[Vec<u8>]) {
986 let bounded = skip_ump_signals(upward_messages.iter())
987 .filter_map(|d| {
988 BoundedSlice::try_from(&d[..])
989 .inspect_err(|_| {
990 defensive!("Accepted candidate contains too long msg, len=", d.len());
991 })
992 .ok()
993 })
994 .collect();
995 Self::receive_bounded_upward_messages(para, bounded)
996 }
997
998 pub(crate) fn receive_bounded_upward_messages(
1000 para: ParaId,
1001 messages: Vec<BoundedSlice<'_, u8, MaxUmpMessageLenOf<T>>>,
1002 ) {
1003 let count = messages.len() as u32;
1004 if count == 0 {
1005 return;
1006 }
1007
1008 T::MessageQueue::enqueue_messages(
1009 messages.into_iter(),
1010 AggregateMessageOrigin::Ump(UmpQueueId::Para(para)),
1011 );
1012 Self::deposit_event(Event::UpwardMessagesReceived { from: para, count });
1013 }
1014
1015 pub(crate) fn free_timedout() -> Vec<CoreIndex> {
1019 let timeout_pred = scheduler::Pallet::<T>::availability_timeout_predicate();
1020
1021 let timed_out: Vec<_> = Self::free_failed_cores(
1022 |candidate| timeout_pred(candidate.backed_in_number).timed_out,
1023 None,
1024 )
1025 .collect();
1026
1027 let mut timed_out_cores = Vec::with_capacity(timed_out.len());
1028 for candidate in timed_out.iter() {
1029 timed_out_cores.push(candidate.core);
1030
1031 let receipt = CandidateReceipt {
1032 descriptor: candidate.descriptor.clone(),
1033 commitments_hash: candidate.commitments.hash(),
1034 };
1035
1036 Self::deposit_event(Event::<T>::CandidateTimedOut(
1037 receipt,
1038 candidate.commitments.head_data.clone(),
1039 candidate.core,
1040 ));
1041 }
1042
1043 timed_out_cores
1044 }
1045
1046 pub(crate) fn free_disputed(
1051 disputed: &BTreeSet<CandidateHash>,
1052 ) -> Vec<(CoreIndex, CandidateHash)> {
1053 Self::free_failed_cores(
1054 |candidate| disputed.contains(&candidate.hash),
1055 Some(disputed.len()),
1056 )
1057 .map(|candidate| (candidate.core, candidate.hash))
1058 .collect()
1059 }
1060
1061 fn free_failed_cores<
1065 P: Fn(&CandidatePendingAvailability<T::Hash, BlockNumberFor<T>>) -> bool,
1066 >(
1067 pred: P,
1068 capacity_hint: Option<usize>,
1069 ) -> impl Iterator<Item = CandidatePendingAvailability<T::Hash, BlockNumberFor<T>>> {
1070 let mut earliest_dropped_indices: BTreeMap<ParaId, usize> = BTreeMap::new();
1071
1072 for (para_id, pending_candidates) in PendingAvailability::<T>::iter() {
1073 let mut earliest_dropped_idx = None;
1076 for (index, candidate) in pending_candidates.iter().enumerate() {
1077 if pred(candidate) {
1078 earliest_dropped_idx = Some(index);
1079 break;
1082 }
1083 }
1084
1085 if let Some(earliest_dropped_idx) = earliest_dropped_idx {
1086 earliest_dropped_indices.insert(para_id, earliest_dropped_idx);
1087 }
1088 }
1089
1090 let mut cleaned_up_cores =
1091 if let Some(capacity) = capacity_hint { Vec::with_capacity(capacity) } else { vec![] };
1092
1093 for (para_id, earliest_dropped_idx) in earliest_dropped_indices {
1094 PendingAvailability::<T>::mutate(¶_id, |record| {
1096 if let Some(record) = record {
1097 let cleaned_up = record.drain(earliest_dropped_idx..);
1098 cleaned_up_cores.extend(cleaned_up);
1099 }
1100 });
1101 }
1102
1103 cleaned_up_cores.into_iter()
1104 }
1105
1106 pub(crate) fn force_enact(para: ParaId) {
1114 PendingAvailability::<T>::mutate(¶, |candidates| {
1115 if let Some(candidates) = candidates {
1116 for candidate in candidates.drain(..) {
1117 let receipt = CommittedCandidateReceipt {
1118 descriptor: candidate.descriptor,
1119 commitments: candidate.commitments,
1120 };
1121
1122 Self::enact_candidate(
1123 candidate.relay_parent_number,
1124 receipt,
1125 candidate.backers,
1126 candidate.availability_votes,
1127 candidate.core,
1128 candidate.backing_group,
1129 );
1130 }
1131 }
1132 });
1133 }
1134
1135 pub(crate) fn first_candidate_pending_availability(
1140 para: ParaId,
1141 ) -> Option<CommittedCandidateReceipt<T::Hash>> {
1142 PendingAvailability::<T>::get(¶).and_then(|p| {
1143 p.get(0).map(|p| CommittedCandidateReceipt {
1144 descriptor: p.descriptor.clone(),
1145 commitments: p.commitments.clone(),
1146 })
1147 })
1148 }
1149
1150 pub(crate) fn candidates_pending_availability(
1153 para: ParaId,
1154 ) -> Vec<CommittedCandidateReceipt<T::Hash>> {
1155 <PendingAvailability<T>>::get(¶)
1156 .map(|candidates| {
1157 candidates
1158 .into_iter()
1159 .map(|candidate| CommittedCandidateReceipt {
1160 descriptor: candidate.descriptor.clone(),
1161 commitments: candidate.commitments.clone(),
1162 })
1163 .collect()
1164 })
1165 .unwrap_or_default()
1166 }
1167}
1168
1169const fn availability_threshold(n_validators: usize) -> usize {
1170 supermajority_threshold(n_validators)
1171}
1172
1173impl AcceptanceCheckErr {
1174 fn strip_into_dispatch_err<T: Config>(self) -> Error<T> {
1177 use AcceptanceCheckErr::*;
1178 match self {
1179 HeadDataTooLarge => Error::<T>::HeadDataTooLarge,
1180 PrematureCodeUpgrade => Error::<T>::PrematureCodeUpgrade,
1181 NewCodeTooLarge => Error::<T>::NewCodeTooLarge,
1182 ProcessedDownwardMessages => Error::<T>::IncorrectDownwardMessageHandling,
1183 UpwardMessages => Error::<T>::InvalidUpwardMessages,
1184 HrmpWatermark => Error::<T>::HrmpWatermarkMishandling,
1185 OutboundHrmp => Error::<T>::InvalidOutboundHrmp,
1186 }
1187 }
1188}
1189
1190impl<T: Config> OnQueueChanged<AggregateMessageOrigin> for Pallet<T> {
1191 fn on_queue_changed(origin: AggregateMessageOrigin, fp: QueueFootprint) {
1193 let para = match origin {
1194 AggregateMessageOrigin::Ump(UmpQueueId::Para(p)) => p,
1195 };
1196 let QueueFootprint { storage: Footprint { count, size }, .. } = fp;
1197 let (count, size) = (count.saturated_into(), size.saturated_into());
1198 #[allow(deprecated)]
1200 well_known_keys::relay_dispatch_queue_size_typed(para).set((count, size));
1201
1202 let config = configuration::ActiveConfig::<T>::get();
1203 let remaining_count = config.max_upward_queue_count.saturating_sub(count);
1204 let remaining_size = config.max_upward_queue_size.saturating_sub(size);
1205 well_known_keys::relay_dispatch_queue_remaining_capacity(para)
1206 .set((remaining_count, remaining_size));
1207 }
1208}
1209
1210pub(crate) struct CandidateCheckContext<T: Config> {
1212 config: configuration::HostConfiguration<BlockNumberFor<T>>,
1213 prev_context: Option<BlockNumberFor<T>>,
1214}
1215
1216impl<T: Config> CandidateCheckContext<T> {
1217 pub(crate) fn new(prev_context: Option<BlockNumberFor<T>>) -> Self {
1218 Self { config: configuration::ActiveConfig::<T>::get(), prev_context }
1219 }
1220
1221 pub(crate) fn verify_backed_candidate(
1237 &self,
1238 backed_candidate_receipt: &CommittedCandidateReceipt<<T as frame_system::Config>::Hash>,
1239 parent_head_data: HeadData,
1240 ) -> Result<BlockNumberFor<T>, Error<T>> {
1241 let para_id = backed_candidate_receipt.descriptor.para_id();
1242 let relay_parent = backed_candidate_receipt.descriptor.relay_parent();
1243
1244 let session_index = backed_candidate_receipt
1250 .descriptor
1251 .session_index()
1252 .unwrap_or_else(|| shared::CurrentSessionIndex::<T>::get());
1253
1254 let (state_root, relay_parent_number) = {
1256 match shared::Pallet::<T>::get_relay_parent_info(session_index, relay_parent) {
1257 None => return Err(Error::<T>::DisallowedRelayParent),
1258 Some(info) => (info.state_root, info.number),
1259 }
1260 };
1261
1262 if let Some(prev_context) = self.prev_context {
1264 if relay_parent_number < prev_context {
1265 return Err(Error::<T>::DisallowedRelayParent);
1266 }
1267 }
1268
1269 {
1270 let persisted_validation_data = make_persisted_validation_data_with_parent::<T>(
1271 relay_parent_number,
1272 state_root,
1273 parent_head_data,
1274 );
1275
1276 let expected = persisted_validation_data.hash();
1277
1278 ensure!(
1279 expected == backed_candidate_receipt.descriptor.persisted_validation_data_hash(),
1280 Error::<T>::ValidationDataHashMismatch,
1281 );
1282 }
1283
1284 let validation_code_hash = paras::CurrentCodeHash::<T>::get(para_id)
1285 .ok_or_else(|| Error::<T>::UnscheduledCandidate)?;
1287 ensure!(
1288 backed_candidate_receipt.descriptor.validation_code_hash() == validation_code_hash,
1289 Error::<T>::InvalidValidationCodeHash,
1290 );
1291
1292 ensure!(
1293 backed_candidate_receipt.descriptor.para_head() ==
1294 backed_candidate_receipt.commitments.head_data.hash(),
1295 Error::<T>::ParaHeadMismatch,
1296 );
1297
1298 if let Err(err) = self.check_validation_outputs(
1299 para_id,
1300 relay_parent_number,
1301 &backed_candidate_receipt.commitments.head_data,
1302 &backed_candidate_receipt.commitments.new_validation_code,
1303 backed_candidate_receipt.commitments.processed_downward_messages,
1304 &backed_candidate_receipt.commitments.upward_messages,
1305 BlockNumberFor::<T>::from(backed_candidate_receipt.commitments.hrmp_watermark),
1306 &backed_candidate_receipt.commitments.horizontal_messages,
1307 ) {
1308 log::debug!(
1309 target: LOG_TARGET,
1310 "Validation outputs checking during inclusion of a candidate {:?} for parachain `{}` failed, error: {:?}",
1311 backed_candidate_receipt.hash(),
1312 u32::from(para_id),
1313 err
1314 );
1315 Err(err.strip_into_dispatch_err::<T>())?;
1316 };
1317 Ok(relay_parent_number)
1318 }
1319
1320 fn check_validation_outputs(
1337 &self,
1338 para_id: ParaId,
1339 relay_parent_number: BlockNumberFor<T>,
1340 head_data: &HeadData,
1341 new_validation_code: &Option<polkadot_primitives::ValidationCode>,
1342 processed_downward_messages: u32,
1343 upward_messages: &[polkadot_primitives::UpwardMessage],
1344 hrmp_watermark: BlockNumberFor<T>,
1345 horizontal_messages: &[polkadot_primitives::OutboundHrmpMessage<ParaId>],
1346 ) -> Result<(), AcceptanceCheckErr> {
1347 let max_head_data_size = usize::try_from(self.config.max_head_data_size)
1349 .map_err(|_| AcceptanceCheckErr::HeadDataTooLarge)?;
1350 ensure!(head_data.0.len() <= max_head_data_size, AcceptanceCheckErr::HeadDataTooLarge);
1351
1352 if let Some(new_validation_code) = new_validation_code {
1354 let max_code_size = usize::try_from(self.config.max_code_size)
1356 .map_err(|_| AcceptanceCheckErr::NewCodeTooLarge)?;
1357
1358 ensure!(
1359 paras::Pallet::<T>::can_upgrade_validation_code(para_id),
1360 AcceptanceCheckErr::PrematureCodeUpgrade,
1361 );
1362 ensure!(
1363 new_validation_code.0.len() <= max_code_size,
1364 AcceptanceCheckErr::NewCodeTooLarge,
1365 );
1366 }
1367
1368 dmp::Pallet::<T>::check_processed_downward_messages(
1370 para_id,
1371 relay_parent_number,
1372 processed_downward_messages,
1373 )
1374 .map_err(|e| {
1375 log::debug!(
1376 target: LOG_TARGET,
1377 "Check processed downward messages for parachain `{}` on relay parent number `{:?}` failed, error: {:?}",
1378 u32::from(para_id),
1379 relay_parent_number,
1380 e
1381 );
1382 e
1383 })?;
1384 Pallet::<T>::check_upward_messages(&self.config, para_id, upward_messages).map_err(
1385 |e| {
1386 log::debug!(
1387 target: LOG_TARGET,
1388 "Check upward messages for parachain `{}` failed, error: {:?}",
1389 u32::from(para_id),
1390 e
1391 );
1392 e
1393 },
1394 )?;
1395 hrmp::Pallet::<T>::check_hrmp_watermark(para_id, relay_parent_number, hrmp_watermark)
1396 .map_err(|e| {
1397 log::debug!(
1398 target: LOG_TARGET,
1399 "Check hrmp watermark for parachain `{}` on relay parent number `{:?}` failed, error: {:?}",
1400 u32::from(para_id),
1401 relay_parent_number,
1402 e
1403 );
1404 e
1405 })?;
1406 hrmp::Pallet::<T>::check_outbound_hrmp(&self.config, para_id, horizontal_messages)
1407 .map_err(|e| {
1408 log::debug!(
1409 target: LOG_TARGET,
1410 "Check outbound hrmp for parachain `{}` failed, error: {:?}",
1411 u32::from(para_id),
1412 e
1413 );
1414 e
1415 })?;
1416
1417 Ok(())
1418 }
1419}
1420
1421impl<T: Config> QueueFootprinter for Pallet<T> {
1422 type Origin = UmpQueueId;
1423
1424 fn message_count(origin: Self::Origin) -> u64 {
1425 T::MessageQueue::footprint(AggregateMessageOrigin::Ump(origin)).storage.count
1426 }
1427}