1#![cfg_attr(not(feature = "std"), no_std)]
37
38pub mod migration;
39
40#[cfg(test)]
41mod mock;
42
43#[cfg(test)]
44mod tests;
45
46#[cfg(feature = "runtime-benchmarks")]
47mod benchmarking;
48#[cfg(feature = "bridging")]
49pub mod bridging;
50pub mod weights;
51pub mod weights_ext;
52
53pub use weights::WeightInfo;
54pub use weights_ext::WeightInfoExt;
55
56extern crate alloc;
57
58use alloc::{collections::BTreeSet, vec, vec::Vec};
59use bounded_collections::{BoundedBTreeSet, BoundedSlice, BoundedVec};
60use codec::{Compact, Decode, DecodeLimit, Encode, MaxEncodedLen};
61use cumulus_primitives_core::{
62 relay_chain::BlockNumber as RelayBlockNumber, ChannelStatus, GetChannelInfo, MessageSendError,
63 ParaId, XcmpMessageFormat, XcmpMessageHandler, XcmpMessageSource,
64};
65
66use frame_support::{
67 defensive, defensive_assert,
68 traits::{
69 Defensive, EnqueueMessage, EnsureOrigin, Get, QueueFootprint, QueueFootprintQuery,
70 QueuePausedQuery,
71 },
72 weights::{Weight, WeightMeter},
73};
74use pallet_message_queue::OnQueueChanged;
75use polkadot_runtime_common::xcm_sender::PriceForMessageDelivery;
76use polkadot_runtime_parachains::{FeeTracker, GetMinFeeFactor};
77use scale_info::TypeInfo;
78use sp_core::MAX_POSSIBLE_ALLOCATION;
79use sp_runtime::{FixedU128, RuntimeDebug, SaturatedConversion, WeakBoundedVec};
80use xcm::{latest::prelude::*, VersionedLocation, VersionedXcm, WrapVersion, MAX_XCM_DECODE_DEPTH};
81use xcm_builder::InspectMessageQueues;
82use xcm_executor::traits::ConvertOrigin;
83
84pub use pallet::*;
85
86pub type OverweightIndex = u64;
88pub type MaxXcmpMessageLenOf<T> =
90 <<T as Config>::XcmpQueue as EnqueueMessage<ParaId>>::MaxMessageLen;
91
92const LOG_TARGET: &str = "xcmp_queue";
93const DEFAULT_POV_SIZE: u64 = 64 * 1024; pub const XCM_BATCH_SIZE: usize = 250;
96pub const MAX_SIGNALS_PER_PAGE: usize = 3;
98
99pub mod delivery_fee_constants {
101 pub const THRESHOLD_FACTOR: u32 = 2;
103}
104
105#[frame_support::pallet]
106pub mod pallet {
107 use super::*;
108 use frame_support::{pallet_prelude::*, Twox64Concat};
109 use frame_system::pallet_prelude::*;
110
111 #[pallet::pallet]
112 #[pallet::storage_version(migration::STORAGE_VERSION)]
113 pub struct Pallet<T>(_);
114
115 #[pallet::config]
116 pub trait Config: frame_system::Config {
117 #[allow(deprecated)]
118 type RuntimeEvent: From<Event<Self>> + IsType<<Self as frame_system::Config>::RuntimeEvent>;
119
120 type ChannelInfo: GetChannelInfo;
122
123 type VersionWrapper: WrapVersion;
125
126 type XcmpQueue: EnqueueMessage<ParaId>
131 + QueueFootprintQuery<ParaId, MaxMessageLen = MaxXcmpMessageLenOf<Self>>;
132
133 #[pallet::constant]
139 type MaxInboundSuspended: Get<u32>;
140
141 #[pallet::constant]
150 type MaxActiveOutboundChannels: Get<u32>;
151
152 #[pallet::constant]
158 type MaxPageSize: Get<u32>;
159
160 type ControllerOrigin: EnsureOrigin<Self::RuntimeOrigin>;
162
163 type ControllerOriginConverter: ConvertOrigin<Self::RuntimeOrigin>;
166
167 type PriceForSiblingDelivery: PriceForMessageDelivery<Id = ParaId>;
169
170 type WeightInfo: WeightInfoExt;
172 }
173
174 #[pallet::call]
175 impl<T: Config> Pallet<T> {
176 #[pallet::call_index(1)]
180 #[pallet::weight((T::DbWeight::get().writes(1), DispatchClass::Operational,))]
181 pub fn suspend_xcm_execution(origin: OriginFor<T>) -> DispatchResult {
182 T::ControllerOrigin::ensure_origin(origin)?;
183
184 QueueSuspended::<T>::try_mutate(|suspended| {
185 if *suspended {
186 Err(Error::<T>::AlreadySuspended.into())
187 } else {
188 *suspended = true;
189 Ok(())
190 }
191 })
192 }
193
194 #[pallet::call_index(2)]
200 #[pallet::weight((T::DbWeight::get().writes(1), DispatchClass::Operational,))]
201 pub fn resume_xcm_execution(origin: OriginFor<T>) -> DispatchResult {
202 T::ControllerOrigin::ensure_origin(origin)?;
203
204 QueueSuspended::<T>::try_mutate(|suspended| {
205 if !*suspended {
206 Err(Error::<T>::AlreadyResumed.into())
207 } else {
208 *suspended = false;
209 Ok(())
210 }
211 })
212 }
213
214 #[pallet::call_index(3)]
220 #[pallet::weight((T::WeightInfo::set_config_with_u32(), DispatchClass::Operational,))]
221 pub fn update_suspend_threshold(origin: OriginFor<T>, new: u32) -> DispatchResult {
222 ensure_root(origin)?;
223
224 QueueConfig::<T>::try_mutate(|data| {
225 data.suspend_threshold = new;
226 data.validate::<T>()
227 })
228 }
229
230 #[pallet::call_index(4)]
236 #[pallet::weight((T::WeightInfo::set_config_with_u32(),DispatchClass::Operational,))]
237 pub fn update_drop_threshold(origin: OriginFor<T>, new: u32) -> DispatchResult {
238 ensure_root(origin)?;
239
240 QueueConfig::<T>::try_mutate(|data| {
241 data.drop_threshold = new;
242 data.validate::<T>()
243 })
244 }
245
246 #[pallet::call_index(5)]
252 #[pallet::weight((T::WeightInfo::set_config_with_u32(), DispatchClass::Operational,))]
253 pub fn update_resume_threshold(origin: OriginFor<T>, new: u32) -> DispatchResult {
254 ensure_root(origin)?;
255
256 QueueConfig::<T>::try_mutate(|data| {
257 data.resume_threshold = new;
258 data.validate::<T>()
259 })
260 }
261 }
262
263 #[pallet::hooks]
264 impl<T: Config> Hooks<BlockNumberFor<T>> for Pallet<T> {
265 fn integrity_test() {
266 assert!(!T::MaxPageSize::get().is_zero(), "MaxPageSize too low");
267
268 let w = Self::on_idle_weight();
269 assert!(w != Weight::zero());
270 assert!(w.all_lte(T::BlockWeights::get().max_block));
271
272 <T::WeightInfo as WeightInfoExt>::check_accuracy::<MaxXcmpMessageLenOf<T>>(0.15);
273 }
274
275 fn on_idle(_block: BlockNumberFor<T>, limit: Weight) -> Weight {
276 let mut meter = WeightMeter::with_limit(limit);
277
278 if meter.try_consume(Self::on_idle_weight()).is_err() {
279 tracing::debug!(
280 target: LOG_TARGET,
281 "Not enough weight for on_idle. {} < {}",
282 Self::on_idle_weight(), limit
283 );
284 return meter.consumed()
285 }
286
287 migration::v3::lazy_migrate_inbound_queue::<T>();
288
289 meter.consumed()
290 }
291 }
292
293 #[pallet::event]
294 #[pallet::generate_deposit(pub(super) fn deposit_event)]
295 pub enum Event<T: Config> {
296 XcmpMessageSent { message_hash: XcmHash },
298 }
299
300 #[pallet::error]
301 pub enum Error<T> {
302 BadQueueConfig,
304 AlreadySuspended,
306 AlreadyResumed,
308 TooManyActiveOutboundChannels,
310 TooBig,
312 }
313
314 #[pallet::storage]
323 pub type InboundXcmpSuspended<T: Config> =
324 StorageValue<_, BoundedBTreeSet<ParaId, T::MaxInboundSuspended>, ValueQuery>;
325
326 #[pallet::storage]
333 pub(super) type OutboundXcmpStatus<T: Config> = StorageValue<
334 _,
335 BoundedVec<OutboundChannelDetails, T::MaxActiveOutboundChannels>,
336 ValueQuery,
337 >;
338
339 #[pallet::storage]
341 pub(super) type OutboundXcmpMessages<T: Config> = StorageDoubleMap<
342 _,
343 Blake2_128Concat,
344 ParaId,
345 Twox64Concat,
346 u16,
347 WeakBoundedVec<u8, T::MaxPageSize>,
348 ValueQuery,
349 >;
350
351 #[pallet::storage]
353 pub(super) type SignalMessages<T: Config> =
354 StorageMap<_, Blake2_128Concat, ParaId, WeakBoundedVec<u8, T::MaxPageSize>, ValueQuery>;
355
356 #[pallet::storage]
358 pub(super) type QueueConfig<T: Config> = StorageValue<_, QueueConfigData, ValueQuery>;
359
360 #[pallet::storage]
362 pub(super) type QueueSuspended<T: Config> = StorageValue<_, bool, ValueQuery>;
363
364 #[pallet::storage]
366 pub(super) type DeliveryFeeFactor<T: Config> =
367 StorageMap<_, Twox64Concat, ParaId, FixedU128, ValueQuery, GetMinFeeFactor<Pallet<T>>>;
368}
369
370#[derive(Copy, Clone, Eq, PartialEq, Encode, Decode, RuntimeDebug, TypeInfo, MaxEncodedLen)]
371pub enum OutboundState {
372 Ok,
373 Suspended,
374}
375
376#[derive(Clone, Eq, PartialEq, Encode, Decode, TypeInfo, RuntimeDebug, MaxEncodedLen)]
378pub struct OutboundChannelDetails {
379 recipient: ParaId,
381 state: OutboundState,
383 signals_exist: bool,
385 first_index: u16,
387 last_index: u16,
389}
390
391impl OutboundChannelDetails {
392 pub fn new(recipient: ParaId) -> OutboundChannelDetails {
393 OutboundChannelDetails {
394 recipient,
395 state: OutboundState::Ok,
396 signals_exist: false,
397 first_index: 0,
398 last_index: 0,
399 }
400 }
401
402 pub fn with_signals(mut self) -> OutboundChannelDetails {
403 self.signals_exist = true;
404 self
405 }
406
407 pub fn with_suspended_state(mut self) -> OutboundChannelDetails {
408 self.state = OutboundState::Suspended;
409 self
410 }
411}
412
413#[derive(Copy, Clone, Eq, PartialEq, Encode, Decode, RuntimeDebug, TypeInfo, MaxEncodedLen)]
414pub struct QueueConfigData {
415 suspend_threshold: u32,
418 drop_threshold: u32,
422 resume_threshold: u32,
425}
426
427impl Default for QueueConfigData {
428 fn default() -> Self {
429 Self {
432 drop_threshold: 48, suspend_threshold: 32, resume_threshold: 8, }
436 }
437}
438
439impl QueueConfigData {
440 pub fn validate<T: crate::Config>(&self) -> sp_runtime::DispatchResult {
444 if self.resume_threshold < self.suspend_threshold &&
445 self.suspend_threshold <= self.drop_threshold &&
446 self.resume_threshold > 0
447 {
448 Ok(())
449 } else {
450 Err(Error::<T>::BadQueueConfig.into())
451 }
452 }
453}
454
455#[derive(PartialEq, Eq, Copy, Clone, Encode, Decode, TypeInfo)]
456pub enum ChannelSignal {
457 Suspend,
458 Resume,
459}
460
461impl<T: Config> Pallet<T> {
462 fn send_fragment<Fragment: Encode>(
484 recipient: ParaId,
485 format: XcmpMessageFormat,
486 fragment: Fragment,
487 ) -> Result<u32, MessageSendError> {
488 let encoded_fragment = fragment.encode();
489
490 let channel_info =
494 T::ChannelInfo::get_channel_info(recipient).ok_or(MessageSendError::NoChannel)?;
495 let max_message_size = channel_info.max_message_size.min(T::MaxPageSize::get()) as usize;
497 let format_size = format.encoded_size();
498 let size_to_check = encoded_fragment
501 .len()
502 .checked_add(format_size)
503 .ok_or(MessageSendError::TooBig)?;
504 if size_to_check > max_message_size {
505 return Err(MessageSendError::TooBig)
506 }
507
508 let mut all_channels = <OutboundXcmpStatus<T>>::get();
509 let channel_details = if let Some(details) =
510 all_channels.iter_mut().find(|channel| channel.recipient == recipient)
511 {
512 details
513 } else {
514 all_channels.try_push(OutboundChannelDetails::new(recipient)).map_err(|e| {
515 tracing::error!(target: LOG_TARGET, error=?e, "Failed to activate HRMP channel");
516 MessageSendError::TooManyChannels
517 })?;
518 all_channels
519 .last_mut()
520 .expect("can't be empty; a new element was just pushed; qed")
521 };
522 let have_active = channel_details.last_index > channel_details.first_index;
523 let appended_to_last_page = have_active
526 .then(|| {
527 <OutboundXcmpMessages<T>>::try_mutate(
528 recipient,
529 channel_details.last_index - 1,
530 |page| {
531 if XcmpMessageFormat::decode(&mut &page[..]) != Ok(format) {
532 defensive!("Bad format in outbound queue; dropping message");
533 return Err(())
534 }
535 if page.len() + encoded_fragment.len() > max_message_size {
536 return Err(())
537 }
538 for frag in encoded_fragment.iter() {
539 page.try_push(*frag)?;
540 }
541 Ok(page.len())
542 },
543 )
544 .ok()
545 })
546 .flatten();
547
548 let (number_of_pages, last_page_size) = if let Some(size) = appended_to_last_page {
549 let number_of_pages = (channel_details.last_index - channel_details.first_index) as u32;
550 (number_of_pages, size)
551 } else {
552 let page_index = channel_details.last_index;
554 channel_details.last_index += 1;
555 let mut new_page = format.encode();
556 new_page.extend_from_slice(&encoded_fragment[..]);
557 let last_page_size = new_page.len();
558 let number_of_pages = (channel_details.last_index - channel_details.first_index) as u32;
559 let bounded_page =
560 BoundedVec::<u8, T::MaxPageSize>::try_from(new_page).map_err(|error| {
561 tracing::debug!(target: LOG_TARGET, ?error, "Failed to create bounded message page");
562 MessageSendError::TooBig
563 })?;
564 let bounded_page = WeakBoundedVec::force_from(bounded_page.into_inner(), None);
565 <OutboundXcmpMessages<T>>::insert(recipient, page_index, bounded_page);
566 <OutboundXcmpStatus<T>>::put(all_channels);
567 (number_of_pages, last_page_size)
568 };
569
570 let total_size =
574 number_of_pages.saturating_sub(1) * max_message_size as u32 + last_page_size as u32;
575 let threshold = channel_info.max_total_size / delivery_fee_constants::THRESHOLD_FACTOR;
576 if total_size > threshold {
577 Self::increase_fee_factor(recipient, encoded_fragment.len() as u128);
578 }
579
580 Ok(number_of_pages)
581 }
582
583 fn send_signal(dest: ParaId, signal: ChannelSignal) -> Result<(), Error<T>> {
586 let mut s = <OutboundXcmpStatus<T>>::get();
587 if let Some(details) = s.iter_mut().find(|item| item.recipient == dest) {
588 details.signals_exist = true;
589 } else {
590 s.try_push(OutboundChannelDetails::new(dest).with_signals()).map_err(|error| {
591 tracing::debug!(target: LOG_TARGET, ?error, "Failed to activate XCMP channel");
592 Error::<T>::TooManyActiveOutboundChannels
593 })?;
594 }
595
596 let page = BoundedVec::<u8, T::MaxPageSize>::try_from(
597 (XcmpMessageFormat::Signals, signal).encode(),
598 )
599 .map_err(|error| {
600 tracing::debug!(target: LOG_TARGET, ?error, "Failed to encode signal message");
601 Error::<T>::TooBig
602 })?;
603 let page = WeakBoundedVec::force_from(page.into_inner(), None);
604
605 <SignalMessages<T>>::insert(dest, page);
606 <OutboundXcmpStatus<T>>::put(s);
607 Ok(())
608 }
609
610 fn suspend_channel(target: ParaId) {
611 <OutboundXcmpStatus<T>>::mutate(|s| {
612 if let Some(details) = s.iter_mut().find(|item| item.recipient == target) {
613 let ok = details.state == OutboundState::Ok;
614 defensive_assert!(ok, "WARNING: Attempt to suspend channel that was not Ok.");
615 details.state = OutboundState::Suspended;
616 } else {
617 if s.try_push(OutboundChannelDetails::new(target).with_suspended_state()).is_err() {
618 defensive!("Cannot pause channel; too many outbound channels");
619 }
620 }
621 });
622 }
623
624 fn resume_channel(target: ParaId) {
625 <OutboundXcmpStatus<T>>::mutate(|s| {
626 if let Some(index) = s.iter().position(|item| item.recipient == target) {
627 let suspended = s[index].state == OutboundState::Suspended;
628 defensive_assert!(
629 suspended,
630 "WARNING: Attempt to resume channel that was not suspended."
631 );
632 if s[index].first_index == s[index].last_index {
633 s.remove(index);
634 } else {
635 s[index].state = OutboundState::Ok;
636 }
637 } else {
638 defensive!("WARNING: Attempt to resume channel that was not suspended.");
639 }
640 });
641 }
642
643 fn enqueue_xcmp_messages<'a>(
644 sender: ParaId,
645 xcms: &[BoundedSlice<'a, u8, MaxXcmpMessageLenOf<T>>],
646 is_first_sender_batch: bool,
647 meter: &mut WeightMeter,
648 ) -> Result<(), ()> {
649 let QueueConfigData { drop_threshold, .. } = <QueueConfig<T>>::get();
650 let batches_footprints =
651 T::XcmpQueue::get_batches_footprints(sender, xcms.iter().copied(), drop_threshold);
652
653 let best_batch_footprint = batches_footprints.search_best_by(|batch_info| {
654 let required_weight = T::WeightInfo::enqueue_xcmp_messages(
655 batches_footprints.first_page_pos.saturated_into(),
656 batch_info,
657 is_first_sender_batch,
658 );
659
660 match meter.can_consume(required_weight) {
661 true => core::cmp::Ordering::Less,
662 false => core::cmp::Ordering::Greater,
663 }
664 });
665
666 meter.consume(T::WeightInfo::enqueue_xcmp_messages(
667 batches_footprints.first_page_pos.saturated_into(),
668 best_batch_footprint,
669 is_first_sender_batch,
670 ));
671 T::XcmpQueue::enqueue_messages(
672 xcms.iter().take(best_batch_footprint.msgs_count).copied(),
673 sender,
674 );
675
676 if best_batch_footprint.msgs_count < xcms.len() {
677 tracing::error!(
678 target: LOG_TARGET,
679 used_weight=?meter.consumed_ratio(),
680 "Out of weight: cannot enqueue entire XCMP messages batch; \
681 dropped some or all messages in batch."
682 );
683 return Err(());
684 }
685 Ok(())
686 }
687
688 pub(crate) fn take_first_concatenated_xcm<'a>(
695 data: &mut &'a [u8],
696 meter: &mut WeightMeter,
697 ) -> Result<BoundedSlice<'a, u8, MaxXcmpMessageLenOf<T>>, ()> {
698 let base_weight = T::WeightInfo::take_first_concatenated_xcm(0);
700 if meter.try_consume(base_weight).is_err() {
701 defensive!("Out of weight; could not decode all; dropping");
702 return Err(())
703 }
704
705 let input_data = &mut &data[..];
706 let mut input = codec::CountedInput::new(input_data);
707 VersionedXcm::<()>::decode_with_depth_limit(MAX_XCM_DECODE_DEPTH, &mut input).map_err(
708 |error| {
709 tracing::debug!(target: LOG_TARGET, ?error, "Failed to decode XCM with depth limit");
710 ()
711 },
712 )?;
713 let (xcm_data, remaining_data) = data.split_at(input.count() as usize);
714 *data = remaining_data;
715
716 let extra_weight =
720 T::WeightInfo::take_first_concatenated_xcm(xcm_data.len() as u32) - base_weight;
721 meter.consume(extra_weight);
722
723 let xcm = BoundedSlice::try_from(xcm_data).map_err(|error| {
724 tracing::error!(
725 target: LOG_TARGET,
726 ?error,
727 "Failed to take XCM after decoding: message is too long"
728 );
729 ()
730 })?;
731
732 Ok(xcm)
733 }
734
735 pub(crate) fn take_first_concatenated_opaque_xcm<'a>(
739 data: &mut &'a [u8],
740 ) -> Result<BoundedSlice<'a, u8, MaxXcmpMessageLenOf<T>>, ()> {
741 let xcm_len = Compact::<u32>::decode(data).map_err(|error| {
742 tracing::debug!(target: LOG_TARGET, ?error, "Failed to decode opaque XCM length");
743 ()
744 })?;
745 let (xcm_data, remaining_data) = match data.split_at_checked(xcm_len.0 as usize) {
746 Some((xcm_data, remaining_data)) => (xcm_data, remaining_data),
747 None => {
748 tracing::debug!(target: LOG_TARGET, ?xcm_len, "Wrong opaque XCM length");
749 return Err(())
750 },
751 };
752 *data = remaining_data;
753
754 let xcm = BoundedSlice::try_from(xcm_data).map_err(|error| {
755 tracing::error!(
756 target: LOG_TARGET,
757 ?error,
758 "Failed to take opaque XCM after decoding: message is too long"
759 );
760 ()
761 })?;
762
763 Ok(xcm)
764 }
765
766 pub(crate) fn take_first_concatenated_xcms<'a>(
770 data: &mut &'a [u8],
771 encoding: XcmEncoding,
772 batch_size: usize,
773 meter: &mut WeightMeter,
774 ) -> Result<
775 Vec<BoundedSlice<'a, u8, MaxXcmpMessageLenOf<T>>>,
776 Vec<BoundedSlice<'a, u8, MaxXcmpMessageLenOf<T>>>,
777 > {
778 let mut batch = vec![];
779 loop {
780 if data.is_empty() {
781 return Ok(batch)
782 }
783
784 let maybe_xcm = match encoding {
785 XcmEncoding::Simple => Self::take_first_concatenated_xcm(data, meter),
786 XcmEncoding::Double => Self::take_first_concatenated_opaque_xcm(data),
787 };
788 match maybe_xcm {
789 Ok(xcm) => {
790 batch.push(xcm);
791 if batch.len() >= batch_size {
792 return Ok(batch);
793 }
794 },
795 Err(_) => return Err(batch),
796 }
797 }
798 }
799
800 pub fn on_idle_weight() -> Weight {
802 <T as crate::Config>::WeightInfo::on_idle_good_msg()
803 .max(<T as crate::Config>::WeightInfo::on_idle_large_msg())
804 }
805
806 #[cfg(feature = "bridging")]
807 fn is_inbound_channel_suspended(sender: ParaId) -> bool {
808 <InboundXcmpSuspended<T>>::get().iter().any(|c| c == &sender)
809 }
810
811 #[cfg(feature = "bridging")]
812 fn outbound_channel_state(target: ParaId) -> Option<(OutboundState, u16)> {
814 <OutboundXcmpStatus<T>>::get().iter().find(|c| c.recipient == target).map(|c| {
815 let queued_pages = c.last_index.saturating_sub(c.first_index);
816 (c.state, queued_pages)
817 })
818 }
819}
820
821impl<T: Config> OnQueueChanged<ParaId> for Pallet<T> {
822 fn on_queue_changed(para: ParaId, fp: QueueFootprint) {
824 let QueueConfigData { resume_threshold, suspend_threshold, .. } = <QueueConfig<T>>::get();
825
826 let mut suspended_channels = <InboundXcmpSuspended<T>>::get();
827 let suspended = suspended_channels.contains(¶);
828
829 if suspended && fp.ready_pages <= resume_threshold {
830 if let Err(err) = Self::send_signal(para, ChannelSignal::Resume) {
831 tracing::error!(
832 target: LOG_TARGET,
833 error=?err,
834 sibling=?para,
835 "defensive: Could not send resumption signal to inbound channel of sibling; channel remains suspended."
836 );
837 } else {
838 suspended_channels.remove(¶);
839 <InboundXcmpSuspended<T>>::put(suspended_channels);
840 }
841 } else if !suspended && fp.ready_pages >= suspend_threshold {
842 tracing::warn!(target: LOG_TARGET, sibling=?para, "XCMP queue for sibling is full; suspending channel.");
843
844 if let Err(err) = Self::send_signal(para, ChannelSignal::Suspend) {
845 tracing::error!(
847 target: LOG_TARGET, error=?err,
848 "defensive: Could not send suspension signal; future messages may be dropped."
849 );
850 } else if let Err(err) = suspended_channels.try_insert(para) {
851 tracing::error!(
852 target: LOG_TARGET,
853 error=?err,
854 sibling=?para,
855 "Too many channels suspended; cannot suspend sibling; further messages may be dropped."
856 );
857 } else {
858 <InboundXcmpSuspended<T>>::put(suspended_channels);
859 }
860 }
861 }
862}
863
864impl<T: Config> QueuePausedQuery<ParaId> for Pallet<T> {
865 fn is_paused(para: &ParaId) -> bool {
866 if !QueueSuspended::<T>::get() {
867 return false
868 }
869
870 let sender_origin = T::ControllerOriginConverter::convert_origin(
872 (Parent, Parachain((*para).into())),
873 OriginKind::Superuser,
874 );
875 let is_controller =
876 sender_origin.map_or(false, |origin| T::ControllerOrigin::try_origin(origin).is_ok());
877
878 !is_controller
879 }
880}
881
882#[derive(Copy, Clone)]
884enum XcmEncoding {
885 Simple,
890 Double,
898}
899
900impl<T: Config> XcmpMessageHandler for Pallet<T> {
901 fn handle_xcmp_messages<'a, I: Iterator<Item = (ParaId, RelayBlockNumber, &'a [u8])>>(
902 iter: I,
903 max_weight: Weight,
904 ) -> Weight {
905 let mut meter = WeightMeter::with_limit(max_weight);
906
907 let mut known_xcm_senders = BTreeSet::new();
908 for (sender, _sent_at, mut data) in iter {
909 let format = match XcmpMessageFormat::decode(&mut data) {
910 Ok(f) => f,
911 Err(_) => {
912 defensive!("Unknown XCMP message format - dropping");
913 continue
914 },
915 };
916
917 match format {
918 XcmpMessageFormat::Signals => {
919 let mut signal_count = 0;
920 while !data.is_empty() && signal_count < MAX_SIGNALS_PER_PAGE {
921 signal_count += 1;
922 match ChannelSignal::decode(&mut data) {
923 Ok(ChannelSignal::Suspend) => {
924 if meter.try_consume(T::WeightInfo::suspend_channel()).is_err() {
925 defensive!(
926 "Not enough weight to process suspend signal - dropping"
927 );
928 break
929 }
930 Self::suspend_channel(sender)
931 },
932 Ok(ChannelSignal::Resume) => {
933 if meter.try_consume(T::WeightInfo::resume_channel()).is_err() {
934 defensive!(
935 "Not enough weight to process resume signal - dropping"
936 );
937 break
938 }
939 Self::resume_channel(sender)
940 },
941 Err(_) => {
942 defensive!("Undecodable channel signal - dropping");
943 break
944 },
945 }
946 }
947 },
948 XcmpMessageFormat::ConcatenatedVersionedXcm |
949 XcmpMessageFormat::ConcatenatedOpaqueVersionedXcm => {
950 let encoding = match format {
951 XcmpMessageFormat::ConcatenatedVersionedXcm => XcmEncoding::Simple,
952 XcmpMessageFormat::ConcatenatedOpaqueVersionedXcm => XcmEncoding::Double,
953 _ => {
954 continue
956 },
957 };
958
959 let mut is_first_sender_batch = known_xcm_senders.insert(sender);
960 if is_first_sender_batch {
961 if meter
962 .try_consume(T::WeightInfo::uncached_enqueue_xcmp_messages())
963 .is_err()
964 {
965 defensive!(
966 "Out of weight: cannot enqueue XCMP messages; dropping page; \
967 Used weight: ",
968 meter.consumed_ratio()
969 );
970 continue;
971 }
972 }
973
974 let mut can_process_next_batch = true;
975 while can_process_next_batch {
976 let batch = match Self::take_first_concatenated_xcms(
977 &mut data,
978 encoding,
979 XCM_BATCH_SIZE,
980 &mut meter,
981 ) {
982 Ok(batch) => batch,
983 Err(batch) => {
984 can_process_next_batch = false;
985 defensive!(
986 "HRMP inbound decode stream broke; page will be dropped."
987 );
988 batch
989 },
990 };
991 if batch.is_empty() {
992 break;
993 }
994
995 if let Err(()) = Self::enqueue_xcmp_messages(
996 sender,
997 &batch,
998 is_first_sender_batch,
999 &mut meter,
1000 ) {
1001 break
1002 }
1003 is_first_sender_batch = false;
1004 }
1005 },
1006 XcmpMessageFormat::ConcatenatedEncodedBlob => {
1007 defensive!("Blob messages are unhandled - dropping");
1008 continue
1009 },
1010 }
1011 }
1012
1013 meter.consumed()
1014 }
1015}
1016
1017impl<T: Config> XcmpMessageSource for Pallet<T> {
1018 fn take_outbound_messages(maximum_channels: usize) -> Vec<(ParaId, Vec<u8>)> {
1019 let mut statuses = <OutboundXcmpStatus<T>>::get();
1020 let old_statuses_len = statuses.len();
1021 let max_message_count = statuses.len().min(maximum_channels);
1022 let mut result = Vec::with_capacity(max_message_count);
1023
1024 for status in statuses.iter_mut() {
1025 let OutboundChannelDetails {
1026 recipient: para_id,
1027 state: outbound_state,
1028 mut signals_exist,
1029 mut first_index,
1030 mut last_index,
1031 } = *status;
1032
1033 let (max_size_now, max_size_ever) = match T::ChannelInfo::get_channel_status(para_id) {
1034 ChannelStatus::Closed => {
1035 for i in first_index..last_index {
1038 <OutboundXcmpMessages<T>>::remove(para_id, i);
1039 }
1040 if signals_exist {
1041 <SignalMessages<T>>::remove(para_id);
1042 }
1043 *status = OutboundChannelDetails::new(para_id);
1044 continue
1045 },
1046 ChannelStatus::Full => continue,
1047 ChannelStatus::Ready(n, e) => (n, e),
1048 };
1049
1050 if result.len() == max_message_count {
1052 break
1055 }
1056
1057 let page = if signals_exist {
1058 let page = <SignalMessages<T>>::get(para_id);
1059 defensive_assert!(!page.is_empty(), "Signals must exist");
1060
1061 if page.len() < max_size_now {
1062 <SignalMessages<T>>::remove(para_id);
1063 signals_exist = false;
1064 page
1065 } else {
1066 defensive!("Signals should fit into a single page");
1067 continue
1068 }
1069 } else if outbound_state == OutboundState::Suspended {
1070 continue
1072 } else if last_index > first_index {
1073 let page = <OutboundXcmpMessages<T>>::get(para_id, first_index);
1074 if page.len() < max_size_now {
1075 <OutboundXcmpMessages<T>>::remove(para_id, first_index);
1076 first_index += 1;
1077 page
1078 } else {
1079 continue
1080 }
1081 } else {
1082 continue
1083 };
1084 if first_index == last_index {
1085 first_index = 0;
1086 last_index = 0;
1087 }
1088
1089 if page.len() > max_size_ever {
1090 defensive!("WARNING: oversize message in queue - dropping");
1094 } else {
1095 result.push((para_id, page.into_inner()));
1096 }
1097
1098 let max_total_size = match T::ChannelInfo::get_channel_info(para_id) {
1099 Some(channel_info) => channel_info.max_total_size,
1100 None => {
1101 tracing::warn!(target: LOG_TARGET, "calling `get_channel_info` with no RelevantMessagingState?!");
1102 MAX_POSSIBLE_ALLOCATION },
1104 };
1105 let threshold = max_total_size.saturating_div(delivery_fee_constants::THRESHOLD_FACTOR);
1106 let remaining_total_size: usize = (first_index..last_index)
1107 .map(|index| OutboundXcmpMessages::<T>::decode_len(para_id, index).unwrap())
1108 .sum();
1109 if remaining_total_size <= threshold as usize {
1110 Self::decrease_fee_factor(para_id);
1111 }
1112
1113 *status = OutboundChannelDetails {
1114 recipient: para_id,
1115 state: outbound_state,
1116 signals_exist,
1117 first_index,
1118 last_index,
1119 };
1120 }
1121 debug_assert!(!statuses.iter().any(|s| s.signals_exist), "Signals should be handled");
1122
1123 result.sort_by_key(|m| m.0);
1126
1127 statuses.retain(|x| {
1136 x.state == OutboundState::Suspended || x.signals_exist || x.first_index < x.last_index
1137 });
1138
1139 let pruned = old_statuses_len - statuses.len();
1141 let _ = statuses.try_rotate_left(result.len().saturating_sub(pruned)).defensive_proof(
1144 "Could not store HRMP channels config. Some HRMP channels may be broken.",
1145 );
1146
1147 <OutboundXcmpStatus<T>>::put(statuses);
1148
1149 result
1150 }
1151}
1152
1153impl<T: Config> SendXcm for Pallet<T> {
1155 type Ticket = (ParaId, VersionedXcm<()>);
1156
1157 fn validate(
1158 dest: &mut Option<Location>,
1159 msg: &mut Option<Xcm<()>>,
1160 ) -> SendResult<(ParaId, VersionedXcm<()>)> {
1161 let d = dest.take().ok_or(SendError::MissingArgument)?;
1162
1163 match d.unpack() {
1164 (1, [Parachain(id)]) => {
1166 let xcm = msg.take().ok_or(SendError::MissingArgument)?;
1167 let id = ParaId::from(*id);
1168 let price = T::PriceForSiblingDelivery::price_for_delivery(id, &xcm);
1169 let versioned_xcm = T::VersionWrapper::wrap_version(&d, xcm)
1170 .map_err(|()| SendError::DestinationUnsupported)?;
1171 versioned_xcm
1172 .check_is_decodable()
1173 .map_err(|()| SendError::ExceedsMaxMessageSize)?;
1174
1175 Ok(((id, versioned_xcm), price))
1176 },
1177 _ => {
1178 *dest = Some(d);
1181 Err(SendError::NotApplicable)
1182 },
1183 }
1184 }
1185
1186 fn deliver((id, xcm): (ParaId, VersionedXcm<()>)) -> Result<XcmHash, SendError> {
1187 let hash = xcm.using_encoded(sp_io::hashing::blake2_256);
1188
1189 match Self::send_fragment(id, XcmpMessageFormat::ConcatenatedVersionedXcm, xcm) {
1190 Ok(_) => {
1191 Self::deposit_event(Event::XcmpMessageSent { message_hash: hash });
1192 Ok(hash)
1193 },
1194 Err(e) => {
1195 tracing::error!(target: LOG_TARGET, error=?e, "Deliver error");
1196 Err(SendError::Transport(e.into()))
1197 },
1198 }
1199 }
1200}
1201
1202impl<T: Config> InspectMessageQueues for Pallet<T> {
1203 fn clear_messages() {
1204 let _ = OutboundXcmpMessages::<T>::clear(u32::MAX, None);
1206 OutboundXcmpStatus::<T>::mutate(|details_vec| {
1207 for details in details_vec {
1208 details.first_index = 0;
1209 details.last_index = 0;
1210 }
1211 });
1212 }
1213
1214 fn get_messages() -> Vec<(VersionedLocation, Vec<VersionedXcm<()>>)> {
1215 use xcm::prelude::*;
1216
1217 OutboundXcmpMessages::<T>::iter()
1218 .map(|(para_id, _, messages)| {
1219 let data = &mut &messages[..];
1220
1221 let decoded_format = XcmpMessageFormat::decode(data).unwrap();
1222 let mut decoded_messages = Vec::new();
1223 while !data.is_empty() {
1224 let message_bytes = match decoded_format {
1225 XcmpMessageFormat::ConcatenatedVersionedXcm =>
1226 Self::take_first_concatenated_xcm(data, &mut WeightMeter::new()),
1227 XcmpMessageFormat::ConcatenatedOpaqueVersionedXcm =>
1228 Self::take_first_concatenated_opaque_xcm(data),
1229 unexpected_format => {
1230 panic!("Unexpected XCMP format: {unexpected_format:?}!")
1231 },
1232 }
1233 .unwrap();
1234 let decoded_message = VersionedXcm::<()>::decode_with_depth_limit(
1235 MAX_XCM_DECODE_DEPTH,
1236 &mut &message_bytes[..],
1237 )
1238 .unwrap();
1239 decoded_messages.push(decoded_message);
1240 }
1241
1242 (
1243 VersionedLocation::from(Location::new(1, Parachain(para_id.into()))),
1244 decoded_messages,
1245 )
1246 })
1247 .collect()
1248 }
1249}
1250
1251impl<T: Config> FeeTracker for Pallet<T> {
1252 type Id = ParaId;
1253
1254 fn get_fee_factor(id: Self::Id) -> FixedU128 {
1255 <DeliveryFeeFactor<T>>::get(id)
1256 }
1257
1258 fn set_fee_factor(id: Self::Id, val: FixedU128) {
1259 <DeliveryFeeFactor<T>>::set(id, val);
1260 }
1261}