1#![cfg_attr(not(feature = "std"), no_std)]
37
38pub mod migration;
39
40#[cfg(test)]
41mod mock;
42
43#[cfg(test)]
44mod tests;
45
46#[cfg(feature = "runtime-benchmarks")]
47mod benchmarking;
48#[cfg(feature = "bridging")]
49pub mod bridging;
50pub mod weights;
51pub mod weights_ext;
52
53pub use weights::WeightInfo;
54pub use weights_ext::WeightInfoExt;
55
56extern crate alloc;
57
58use alloc::{collections::BTreeSet, vec, vec::Vec};
59use bitflags::bitflags;
60use bounded_collections::{BoundedBTreeSet, BoundedSlice, BoundedVec};
61use codec::{Compact, Decode, DecodeLimit, Encode, MaxEncodedLen};
62use cumulus_primitives_core::{
63 relay_chain::BlockNumber as RelayBlockNumber, ChannelStatus, GetChannelInfo, MessageSendError,
64 ParaId, XcmpMessageFormat, XcmpMessageHandler, XcmpMessageSource,
65};
66
67use frame_support::{
68 defensive, defensive_assert,
69 traits::{
70 Defensive, DefensiveTruncateFrom, EnqueueMessage, EnsureOrigin, Get, Len, QueueFootprint,
71 QueueFootprintQuery, QueuePausedQuery,
72 },
73 weights::{Weight, WeightMeter},
74};
75use pallet_message_queue::OnQueueChanged;
76use polkadot_runtime_common::xcm_sender::PriceForMessageDelivery;
77use polkadot_runtime_parachains::{FeeTracker, GetMinFeeFactor};
78use scale_info::TypeInfo;
79use sp_core::MAX_POSSIBLE_ALLOCATION;
80use sp_runtime::{FixedU128, SaturatedConversion, WeakBoundedVec};
81use xcm::{latest::prelude::*, VersionedLocation, VersionedXcm, WrapVersion, MAX_XCM_DECODE_DEPTH};
82use xcm_builder::InspectMessageQueues;
83use xcm_executor::traits::ConvertOrigin;
84
85pub use pallet::*;
86
87pub type OverweightIndex = u64;
89pub type MaxXcmpMessageLenOf<T> =
91 <<T as Config>::XcmpQueue as EnqueueMessage<ParaId>>::MaxMessageLen;
92
93const LOG_TARGET: &str = "xcmp_queue";
94const DEFAULT_POV_SIZE: u64 = 64 * 1024; pub const XCM_BATCH_SIZE: usize = 250;
97pub const MAX_SIGNALS_PER_PAGE: usize = 3;
99
100pub mod delivery_fee_constants {
102 pub const THRESHOLD_FACTOR: u32 = 2;
104}
105
106#[frame_support::pallet]
107pub mod pallet {
108 use super::*;
109 use frame_support::{pallet_prelude::*, Twox64Concat};
110 use frame_system::pallet_prelude::*;
111
112 #[pallet::pallet]
113 #[pallet::storage_version(migration::STORAGE_VERSION)]
114 pub struct Pallet<T>(_);
115
116 #[pallet::config]
117 pub trait Config: frame_system::Config {
118 #[allow(deprecated)]
119 type RuntimeEvent: From<Event<Self>> + IsType<<Self as frame_system::Config>::RuntimeEvent>;
120
121 type ChannelInfo: GetChannelInfo;
123
124 type VersionWrapper: WrapVersion;
126
127 type XcmpQueue: EnqueueMessage<ParaId>
132 + QueueFootprintQuery<ParaId, MaxMessageLen = MaxXcmpMessageLenOf<Self>>;
133
134 #[pallet::constant]
140 type MaxInboundSuspended: Get<u32>;
141
142 #[pallet::constant]
151 type MaxActiveOutboundChannels: Get<u32>;
152
153 #[pallet::constant]
159 type MaxPageSize: Get<u32>;
160
161 type ControllerOrigin: EnsureOrigin<Self::RuntimeOrigin>;
163
164 type ControllerOriginConverter: ConvertOrigin<Self::RuntimeOrigin>;
167
168 type PriceForSiblingDelivery: PriceForMessageDelivery<Id = ParaId>;
170
171 type WeightInfo: WeightInfoExt;
173 }
174
175 #[pallet::call]
176 impl<T: Config> Pallet<T> {
177 #[pallet::call_index(1)]
181 #[pallet::weight((T::DbWeight::get().writes(1), DispatchClass::Operational,))]
182 pub fn suspend_xcm_execution(origin: OriginFor<T>) -> DispatchResult {
183 T::ControllerOrigin::ensure_origin(origin)?;
184
185 QueueSuspended::<T>::try_mutate(|suspended| {
186 if *suspended {
187 Err(Error::<T>::AlreadySuspended.into())
188 } else {
189 *suspended = true;
190 Ok(())
191 }
192 })
193 }
194
195 #[pallet::call_index(2)]
201 #[pallet::weight((T::DbWeight::get().writes(1), DispatchClass::Operational,))]
202 pub fn resume_xcm_execution(origin: OriginFor<T>) -> DispatchResult {
203 T::ControllerOrigin::ensure_origin(origin)?;
204
205 QueueSuspended::<T>::try_mutate(|suspended| {
206 if !*suspended {
207 Err(Error::<T>::AlreadyResumed.into())
208 } else {
209 *suspended = false;
210 Ok(())
211 }
212 })
213 }
214
215 #[pallet::call_index(3)]
221 #[pallet::weight((T::WeightInfo::set_config_with_u32(), DispatchClass::Operational,))]
222 pub fn update_suspend_threshold(origin: OriginFor<T>, new: u32) -> DispatchResult {
223 ensure_root(origin)?;
224
225 QueueConfig::<T>::try_mutate(|data| {
226 data.suspend_threshold = new;
227 data.validate::<T>()
228 })
229 }
230
231 #[pallet::call_index(4)]
237 #[pallet::weight((T::WeightInfo::set_config_with_u32(),DispatchClass::Operational,))]
238 pub fn update_drop_threshold(origin: OriginFor<T>, new: u32) -> DispatchResult {
239 ensure_root(origin)?;
240
241 QueueConfig::<T>::try_mutate(|data| {
242 data.drop_threshold = new;
243 data.validate::<T>()
244 })
245 }
246
247 #[pallet::call_index(5)]
253 #[pallet::weight((T::WeightInfo::set_config_with_u32(), DispatchClass::Operational,))]
254 pub fn update_resume_threshold(origin: OriginFor<T>, new: u32) -> DispatchResult {
255 ensure_root(origin)?;
256
257 QueueConfig::<T>::try_mutate(|data| {
258 data.resume_threshold = new;
259 data.validate::<T>()
260 })
261 }
262 }
263
264 #[pallet::hooks]
265 impl<T: Config> Hooks<BlockNumberFor<T>> for Pallet<T> {
266 fn integrity_test() {
267 assert!(!T::MaxPageSize::get().is_zero(), "MaxPageSize too low");
268
269 let w = Self::on_idle_weight();
270 assert!(w != Weight::zero());
271 assert!(w.all_lte(T::BlockWeights::get().max_block));
272
273 <T::WeightInfo as WeightInfoExt>::check_accuracy::<MaxXcmpMessageLenOf<T>>(0.15);
274 }
275
276 fn on_idle(_block: BlockNumberFor<T>, limit: Weight) -> Weight {
277 let mut meter = WeightMeter::with_limit(limit);
278
279 if meter.try_consume(Self::on_idle_weight()).is_err() {
280 tracing::debug!(
281 target: LOG_TARGET,
282 "Not enough weight for on_idle. {} < {}",
283 Self::on_idle_weight(), limit
284 );
285 return meter.consumed();
286 }
287
288 migration::v3::lazy_migrate_inbound_queue::<T>();
289
290 meter.consumed()
291 }
292 }
293
294 #[pallet::event]
295 #[pallet::generate_deposit(pub(super) fn deposit_event)]
296 pub enum Event<T: Config> {
297 XcmpMessageSent { message_hash: XcmHash },
299 }
300
301 #[pallet::error]
302 pub enum Error<T> {
303 BadQueueConfig,
305 AlreadySuspended,
307 AlreadyResumed,
309 TooManyActiveOutboundChannels,
311 TooBig,
313 }
314
315 #[pallet::storage]
324 pub type InboundXcmpSuspended<T: Config> =
325 StorageValue<_, BoundedBTreeSet<ParaId, T::MaxInboundSuspended>, ValueQuery>;
326
327 #[pallet::storage]
334 pub(super) type OutboundXcmpStatus<T: Config> = StorageValue<
335 _,
336 BoundedVec<OutboundChannelDetails, T::MaxActiveOutboundChannels>,
337 ValueQuery,
338 >;
339
340 #[pallet::storage]
342 pub(super) type OutboundXcmpMessages<T: Config> = StorageDoubleMap<
343 _,
344 Blake2_128Concat,
345 ParaId,
346 Twox64Concat,
347 u16,
348 WeakBoundedVec<u8, T::MaxPageSize>,
349 ValueQuery,
350 >;
351
352 #[pallet::storage]
354 pub(super) type SignalMessages<T: Config> =
355 StorageMap<_, Blake2_128Concat, ParaId, WeakBoundedVec<u8, T::MaxPageSize>, ValueQuery>;
356
357 #[pallet::storage]
359 pub(super) type QueueConfig<T: Config> = StorageValue<_, QueueConfigData, ValueQuery>;
360
361 #[pallet::storage]
363 pub(super) type QueueSuspended<T: Config> = StorageValue<_, bool, ValueQuery>;
364
365 #[pallet::storage]
367 pub(super) type DeliveryFeeFactor<T: Config> =
368 StorageMap<_, Twox64Concat, ParaId, FixedU128, ValueQuery, GetMinFeeFactor<Pallet<T>>>;
369}
370
371#[derive(Copy, Clone, Eq, PartialEq, Encode, Decode, Debug, TypeInfo, MaxEncodedLen)]
372pub enum OutboundState {
373 Ok,
374 Suspended,
375}
376
377bitflags! {
378 #[derive(Encode, Decode, TypeInfo, MaxEncodedLen)]
379 struct OutboundChannelFlags: u32 {
380 const CONCATENATED_OPAQUE_VERSIONED_XCM_SUPPORT = 1;
381 const CONCATENATED_OPAQUE_VERSIONED_XCM_NOTIFICATION_SENT = 1 << 1;
382 }
383}
384
385impl OutboundChannelFlags {
386 fn has_concatenated_opaque_versioned_xcm_support(&self) -> bool {
388 *self & Self::CONCATENATED_OPAQUE_VERSIONED_XCM_SUPPORT != Self::empty()
389 }
390
391 fn should_send_concatenated_opaque_versioned_xcm_notification(&self) -> bool {
394 if self.has_concatenated_opaque_versioned_xcm_support() {
395 return false;
396 }
397
398 if *self & Self::CONCATENATED_OPAQUE_VERSIONED_XCM_NOTIFICATION_SENT != Self::empty() {
399 return false;
400 }
401
402 true
403 }
404
405 fn notice_concatenated_opaque_versioned_xcm_support(&mut self) {
407 *self = *self | Self::CONCATENATED_OPAQUE_VERSIONED_XCM_SUPPORT;
408 }
409
410 fn notice_concatenated_opaque_versioned_xcm_notification_sent(&mut self) {
412 *self = *self | Self::CONCATENATED_OPAQUE_VERSIONED_XCM_NOTIFICATION_SENT;
413 }
414}
415
416#[derive(Clone, Eq, PartialEq, Encode, Decode, TypeInfo, Debug, MaxEncodedLen)]
418pub struct OutboundChannelDetails {
419 recipient: ParaId,
421 state: OutboundState,
423 signals_exist: bool,
425 first_index: u16,
427 last_index: u16,
429 flags: OutboundChannelFlags,
431 queued_bytes: u32,
433}
434
435impl OutboundChannelDetails {
436 pub fn new(recipient: ParaId) -> OutboundChannelDetails {
437 OutboundChannelDetails {
438 recipient,
439 state: OutboundState::Ok,
440 signals_exist: false,
441 first_index: 0,
442 last_index: 0,
443 flags: OutboundChannelFlags::empty(),
444 queued_bytes: 0,
445 }
446 }
447
448 pub fn with_signals(mut self) -> OutboundChannelDetails {
449 self.signals_exist = true;
450 self
451 }
452
453 pub fn with_suspended_state(mut self) -> OutboundChannelDetails {
454 self.state = OutboundState::Suspended;
455 self
456 }
457}
458
459#[derive(Copy, Clone, Eq, PartialEq, Encode, Decode, Debug, TypeInfo, MaxEncodedLen)]
460pub struct QueueConfigData {
461 suspend_threshold: u32,
464 drop_threshold: u32,
468 resume_threshold: u32,
471}
472
473impl Default for QueueConfigData {
474 fn default() -> Self {
475 Self {
478 drop_threshold: 48, suspend_threshold: 32, resume_threshold: 8, }
482 }
483}
484
485impl QueueConfigData {
486 pub fn validate<T: crate::Config>(&self) -> sp_runtime::DispatchResult {
490 if self.resume_threshold < self.suspend_threshold &&
491 self.suspend_threshold <= self.drop_threshold &&
492 self.resume_threshold > 0
493 {
494 Ok(())
495 } else {
496 Err(Error::<T>::BadQueueConfig.into())
497 }
498 }
499}
500
501#[derive(PartialEq, Eq, Copy, Clone, Encode, Decode, TypeInfo)]
502pub enum ChannelSignal {
503 Suspend,
504 Resume,
505}
506
507impl<T: Config> Pallet<T> {
508 fn try_get_outbound_channel(
509 all_channels: &BoundedVec<OutboundChannelDetails, T::MaxActiveOutboundChannels>,
510 recipient: ParaId,
511 ) -> Option<&OutboundChannelDetails> {
512 for channel_idx in 0..all_channels.len() {
513 if all_channels[channel_idx].recipient == recipient {
514 return Some(&all_channels[channel_idx]);
515 }
516 }
517
518 None
519 }
520
521 fn try_get_or_insert_outbound_channel(
522 all_channels: &mut BoundedVec<OutboundChannelDetails, T::MaxActiveOutboundChannels>,
523 recipient: ParaId,
524 ) -> Option<&mut OutboundChannelDetails> {
525 for channel_idx in 0..all_channels.len() {
526 if all_channels[channel_idx].recipient == recipient {
527 return Some(&mut all_channels[channel_idx]);
528 }
529 }
530
531 all_channels
532 .try_push(OutboundChannelDetails::new(recipient))
533 .inspect_err(|e| {
534 tracing::error!(target: LOG_TARGET, error=?e, "Failed to insert outbound HRMP channel");
535 })
536 .ok()?;
537 all_channels.last_mut()
538 }
539
540 fn send_fragment<Fragment: Encode>(
562 recipient: ParaId,
563 format: XcmpMessageFormat,
564 fragment: Fragment,
565 ) -> Result<u32, MessageSendError> {
566 let mut encoded_fragment = fragment.encode();
567 let encoded_fragment_len = encoded_fragment.len();
568
569 let channel_info =
573 T::ChannelInfo::get_channel_info(recipient).ok_or(MessageSendError::NoChannel)?;
574 let max_message_size = channel_info.max_message_size.min(T::MaxPageSize::get()) as usize;
576 let format_size = format.encoded_size();
577 let size_to_check = encoded_fragment
580 .len()
581 .checked_add(format_size)
582 .ok_or(MessageSendError::TooBig)?;
583 if size_to_check > max_message_size {
584 return Err(MessageSendError::TooBig);
585 }
586
587 let mut all_channels = <OutboundXcmpStatus<T>>::get();
588 let channel_details =
589 Self::try_get_or_insert_outbound_channel(&mut all_channels, recipient)
590 .ok_or(MessageSendError::TooManyChannels)?;
591 if let XcmpMessageFormat::ConcatenatedOpaqueVersionedXcm = format {
592 channel_details
593 .flags
594 .notice_concatenated_opaque_versioned_xcm_notification_sent();
595 }
596
597 let mut existing_page = None;
598 'existing_page_check: {
599 if channel_details.last_index > channel_details.first_index {
600 let page =
601 OutboundXcmpMessages::<T>::get(recipient, channel_details.last_index - 1);
602 if XcmpMessageFormat::decode(&mut &page[..]) != Ok(format) {
603 break 'existing_page_check;
604 }
605 if page.len() + encoded_fragment.len() > max_message_size {
606 break 'existing_page_check;
607 }
608 existing_page = Some(page.into_inner());
609 }
610 }
611 let mut current_page = existing_page.unwrap_or_else(|| {
612 channel_details.last_index += 1;
614 let new_page = format.encode();
615 channel_details.queued_bytes =
616 channel_details.queued_bytes.saturating_add(new_page.len() as u32);
617 new_page
618 });
619
620 current_page.append(&mut encoded_fragment);
621 channel_details.queued_bytes =
622 channel_details.queued_bytes.saturating_add(encoded_fragment_len as u32);
623 let current_page = WeakBoundedVec::try_from(current_page).map_err(|error| {
624 tracing::debug!(target: LOG_TARGET, ?error, "Failed to create bounded message page");
625 MessageSendError::TooBig
626 })?;
627 let page_count =
628 channel_details.last_index.saturating_sub(channel_details.first_index) as u32;
629 <OutboundXcmpMessages<T>>::insert(recipient, channel_details.last_index - 1, current_page);
630
631 let threshold = channel_info.max_total_size / delivery_fee_constants::THRESHOLD_FACTOR;
632 if channel_details.queued_bytes > threshold {
633 Self::increase_fee_factor(recipient, encoded_fragment_len as u128);
634 }
635
636 <OutboundXcmpStatus<T>>::put(all_channels);
637
638 Ok(page_count)
639 }
640
641 fn send_signal(dest: ParaId, signal: ChannelSignal) -> Result<(), Error<T>> {
644 let mut s = <OutboundXcmpStatus<T>>::get();
645 if let Some(details) = s.iter_mut().find(|item| item.recipient == dest) {
646 details.signals_exist = true;
647 } else {
648 s.try_push(OutboundChannelDetails::new(dest).with_signals()).map_err(|error| {
649 tracing::debug!(target: LOG_TARGET, ?error, "Failed to activate XCMP channel");
650 Error::<T>::TooManyActiveOutboundChannels
651 })?;
652 }
653
654 let page = BoundedVec::<u8, T::MaxPageSize>::try_from(
655 (XcmpMessageFormat::Signals, signal).encode(),
656 )
657 .map_err(|error| {
658 tracing::debug!(target: LOG_TARGET, ?error, "Failed to encode signal message");
659 Error::<T>::TooBig
660 })?;
661 let page = WeakBoundedVec::force_from(page.into_inner(), None);
662
663 <SignalMessages<T>>::insert(dest, page);
664 <OutboundXcmpStatus<T>>::put(s);
665 Ok(())
666 }
667
668 fn suspend_channel(target: ParaId) {
669 <OutboundXcmpStatus<T>>::mutate(|s| {
670 if let Some(details) = s.iter_mut().find(|item| item.recipient == target) {
671 let ok = details.state == OutboundState::Ok;
672 defensive_assert!(ok, "WARNING: Attempt to suspend channel that was not Ok.");
673 details.state = OutboundState::Suspended;
674 } else {
675 if s.try_push(OutboundChannelDetails::new(target).with_suspended_state()).is_err() {
676 defensive!("Cannot pause channel; too many outbound channels");
677 }
678 }
679 });
680 }
681
682 fn resume_channel(target: ParaId) {
683 <OutboundXcmpStatus<T>>::mutate(|s| {
684 if let Some(index) = s.iter().position(|item| item.recipient == target) {
685 let suspended = s[index].state == OutboundState::Suspended;
686 defensive_assert!(
687 suspended,
688 "WARNING: Attempt to resume channel that was not suspended."
689 );
690 if s[index].first_index == s[index].last_index {
691 s.remove(index);
692 } else {
693 s[index].state = OutboundState::Ok;
694 }
695 } else {
696 defensive!("WARNING: Attempt to resume channel that was not suspended.");
697 }
698 });
699 }
700
701 fn enqueue_xcmp_messages<'a>(
702 sender: ParaId,
703 xcms: &[BoundedSlice<'a, u8, MaxXcmpMessageLenOf<T>>],
704 is_first_sender_batch: bool,
705 meter: &mut WeightMeter,
706 ) -> Result<(), ()> {
707 let QueueConfigData { drop_threshold, .. } = <QueueConfig<T>>::get();
708 let batches_footprints =
709 T::XcmpQueue::get_batches_footprints(sender, xcms.iter().copied(), drop_threshold);
710
711 let best_batch_footprint = batches_footprints.search_best_by(|batch_info| {
712 let required_weight = T::WeightInfo::enqueue_xcmp_messages(
713 batches_footprints.first_page_pos.saturated_into(),
714 batch_info,
715 is_first_sender_batch,
716 );
717
718 match meter.can_consume(required_weight) {
719 true => core::cmp::Ordering::Less,
720 false => core::cmp::Ordering::Greater,
721 }
722 });
723
724 meter.consume(T::WeightInfo::enqueue_xcmp_messages(
725 batches_footprints.first_page_pos.saturated_into(),
726 best_batch_footprint,
727 is_first_sender_batch,
728 ));
729 T::XcmpQueue::enqueue_messages(
730 xcms.iter().take(best_batch_footprint.msgs_count).copied(),
731 sender,
732 );
733
734 if best_batch_footprint.msgs_count < xcms.len() {
735 tracing::error!(
736 target: LOG_TARGET,
737 used_weight=?meter.consumed_ratio(),
738 "Out of weight: cannot enqueue entire XCMP messages batch; \
739 dropped some or all messages in batch."
740 );
741 return Err(());
742 }
743 Ok(())
744 }
745
746 pub(crate) fn take_first_concatenated_xcm<'a>(
753 data: &mut &'a [u8],
754 meter: &mut WeightMeter,
755 ) -> Result<BoundedSlice<'a, u8, MaxXcmpMessageLenOf<T>>, ()> {
756 let base_weight = T::WeightInfo::take_first_concatenated_xcm(0);
758 if meter.try_consume(base_weight).is_err() {
759 defensive!("Out of weight; could not decode all; dropping");
760 return Err(());
761 }
762
763 let input_data = &mut &data[..];
764 let mut input = codec::CountedInput::new(input_data);
765 VersionedXcm::<()>::decode_with_depth_limit(MAX_XCM_DECODE_DEPTH, &mut input).map_err(
766 |error| {
767 tracing::debug!(target: LOG_TARGET, ?error, "Failed to decode XCM with depth limit");
768 ()
769 },
770 )?;
771 let (xcm_data, remaining_data) = data.split_at(input.count() as usize);
772 *data = remaining_data;
773
774 let extra_weight =
778 T::WeightInfo::take_first_concatenated_xcm(xcm_data.len() as u32) - base_weight;
779 meter.consume(extra_weight);
780
781 let xcm = BoundedSlice::try_from(xcm_data).map_err(|error| {
782 tracing::error!(
783 target: LOG_TARGET,
784 ?error,
785 "Failed to take XCM after decoding: message is too long"
786 );
787 ()
788 })?;
789
790 Ok(xcm)
791 }
792
793 pub(crate) fn take_first_concatenated_opaque_xcm<'a>(
797 data: &mut &'a [u8],
798 ) -> Result<BoundedSlice<'a, u8, MaxXcmpMessageLenOf<T>>, ()> {
799 let xcm_len = Compact::<u32>::decode(data).map_err(|error| {
800 tracing::debug!(target: LOG_TARGET, ?error, "Failed to decode opaque XCM length");
801 ()
802 })?;
803 let (xcm_data, remaining_data) = match data.split_at_checked(xcm_len.0 as usize) {
804 Some((xcm_data, remaining_data)) => (xcm_data, remaining_data),
805 None => {
806 tracing::debug!(target: LOG_TARGET, ?xcm_len, "Wrong opaque XCM length");
807 return Err(());
808 },
809 };
810 *data = remaining_data;
811
812 let xcm = BoundedSlice::try_from(xcm_data).map_err(|error| {
813 tracing::error!(
814 target: LOG_TARGET,
815 ?error,
816 "Failed to take opaque XCM after decoding: message is too long"
817 );
818 ()
819 })?;
820
821 Ok(xcm)
822 }
823
824 pub(crate) fn take_first_concatenated_xcms<'a>(
828 data: &mut &'a [u8],
829 encoding: XcmEncoding,
830 batch_size: usize,
831 meter: &mut WeightMeter,
832 ) -> Result<
833 Vec<BoundedSlice<'a, u8, MaxXcmpMessageLenOf<T>>>,
834 Vec<BoundedSlice<'a, u8, MaxXcmpMessageLenOf<T>>>,
835 > {
836 let mut batch = vec![];
837 loop {
838 if data.is_empty() {
839 return Ok(batch);
840 }
841
842 let maybe_xcm = match encoding {
843 XcmEncoding::Simple => Self::take_first_concatenated_xcm(data, meter),
844 XcmEncoding::Double => Self::take_first_concatenated_opaque_xcm(data),
845 };
846 match maybe_xcm {
847 Ok(xcm) => {
848 batch.push(xcm);
849 if batch.len() >= batch_size {
850 return Ok(batch);
851 }
852 },
853 Err(_) => return Err(batch),
854 }
855 }
856 }
857
858 pub fn on_idle_weight() -> Weight {
860 <T as crate::Config>::WeightInfo::on_idle_good_msg()
861 .max(<T as crate::Config>::WeightInfo::on_idle_large_msg())
862 }
863
864 #[cfg(feature = "bridging")]
865 fn is_inbound_channel_suspended(sender: ParaId) -> bool {
866 <InboundXcmpSuspended<T>>::get().iter().any(|c| c == &sender)
867 }
868
869 #[cfg(feature = "bridging")]
870 fn outbound_channel_state(target: ParaId) -> Option<(OutboundState, u16)> {
872 <OutboundXcmpStatus<T>>::get().iter().find(|c| c.recipient == target).map(|c| {
873 let queued_pages = c.last_index.saturating_sub(c.first_index);
874 (c.state, queued_pages)
875 })
876 }
877}
878
879impl<T: Config> OnQueueChanged<ParaId> for Pallet<T> {
880 fn on_queue_changed(para: ParaId, fp: QueueFootprint) {
882 let QueueConfigData { resume_threshold, suspend_threshold, .. } = <QueueConfig<T>>::get();
883
884 let mut suspended_channels = <InboundXcmpSuspended<T>>::get();
885 let suspended = suspended_channels.contains(¶);
886
887 if suspended && fp.ready_pages <= resume_threshold {
888 if let Err(err) = Self::send_signal(para, ChannelSignal::Resume) {
889 tracing::error!(
890 target: LOG_TARGET,
891 error=?err,
892 sibling=?para,
893 "defensive: Could not send resumption signal to inbound channel of sibling; channel remains suspended."
894 );
895 } else {
896 suspended_channels.remove(¶);
897 <InboundXcmpSuspended<T>>::put(suspended_channels);
898 }
899 } else if !suspended && fp.ready_pages >= suspend_threshold {
900 tracing::warn!(target: LOG_TARGET, sibling=?para, "XCMP queue for sibling is full; suspending channel.");
901
902 if let Err(err) = Self::send_signal(para, ChannelSignal::Suspend) {
903 tracing::error!(
905 target: LOG_TARGET, error=?err,
906 "defensive: Could not send suspension signal; future messages may be dropped."
907 );
908 } else if let Err(err) = suspended_channels.try_insert(para) {
909 tracing::error!(
910 target: LOG_TARGET,
911 error=?err,
912 sibling=?para,
913 "Too many channels suspended; cannot suspend sibling; further messages may be dropped."
914 );
915 } else {
916 <InboundXcmpSuspended<T>>::put(suspended_channels);
917 }
918 }
919 }
920}
921
922impl<T: Config> QueuePausedQuery<ParaId> for Pallet<T> {
923 fn is_paused(para: &ParaId) -> bool {
924 if !QueueSuspended::<T>::get() {
925 return false;
926 }
927
928 let sender_origin = T::ControllerOriginConverter::convert_origin(
930 (Parent, Parachain((*para).into())),
931 OriginKind::Superuser,
932 );
933 let is_controller =
934 sender_origin.map_or(false, |origin| T::ControllerOrigin::try_origin(origin).is_ok());
935
936 !is_controller
937 }
938}
939
940#[derive(Copy, Clone)]
942enum XcmEncoding {
943 Simple,
948 Double,
956}
957
958impl<T: Config> XcmpMessageHandler for Pallet<T> {
959 fn handle_xcmp_messages<'a, I: Iterator<Item = (ParaId, RelayBlockNumber, &'a [u8])>>(
960 iter: I,
961 max_weight: Weight,
962 ) -> Weight {
963 let mut meter = WeightMeter::with_limit(max_weight);
964
965 let mut known_xcm_senders = BTreeSet::new();
966 for (sender, _sent_at, mut data) in iter {
967 let format = match XcmpMessageFormat::decode(&mut data) {
968 Ok(f) => f,
969 Err(_) => {
970 defensive!("Unknown XCMP message format - dropping");
971 continue;
972 },
973 };
974
975 match format {
976 XcmpMessageFormat::Signals => {
977 let mut signal_count = 0;
978 while !data.is_empty() && signal_count < MAX_SIGNALS_PER_PAGE {
979 signal_count += 1;
980 match ChannelSignal::decode(&mut data) {
981 Ok(ChannelSignal::Suspend) => {
982 if meter.try_consume(T::WeightInfo::suspend_channel()).is_err() {
983 defensive!(
984 "Not enough weight to process suspend signal - dropping"
985 );
986 break;
987 }
988 Self::suspend_channel(sender)
989 },
990 Ok(ChannelSignal::Resume) => {
991 if meter.try_consume(T::WeightInfo::resume_channel()).is_err() {
992 defensive!(
993 "Not enough weight to process resume signal - dropping"
994 );
995 break;
996 }
997 Self::resume_channel(sender)
998 },
999 Err(_) => {
1000 defensive!("Undecodable channel signal - dropping");
1001 break;
1002 },
1003 }
1004 }
1005 },
1006 XcmpMessageFormat::ConcatenatedVersionedXcm |
1007 XcmpMessageFormat::ConcatenatedOpaqueVersionedXcm => {
1008 let encoding = match format {
1009 XcmpMessageFormat::ConcatenatedVersionedXcm => XcmEncoding::Simple,
1010 XcmpMessageFormat::ConcatenatedOpaqueVersionedXcm => {
1011 let mut all_channels = <OutboundXcmpStatus<T>>::get();
1012 if let Some(channel_details) =
1013 Self::try_get_or_insert_outbound_channel(&mut all_channels, sender)
1014 {
1015 channel_details
1016 .flags
1017 .notice_concatenated_opaque_versioned_xcm_support();
1018 }
1019 <OutboundXcmpStatus<T>>::put(all_channels);
1020
1021 XcmEncoding::Double
1022 },
1023 _ => {
1024 continue;
1026 },
1027 };
1028
1029 let mut is_first_sender_batch = known_xcm_senders.insert(sender);
1030 if is_first_sender_batch {
1031 if meter
1032 .try_consume(T::WeightInfo::uncached_enqueue_xcmp_messages())
1033 .is_err()
1034 {
1035 defensive!(
1036 "Out of weight: cannot enqueue XCMP messages; dropping page; \
1037 Used weight: ",
1038 meter.consumed_ratio()
1039 );
1040 continue;
1041 }
1042 }
1043
1044 let mut can_process_next_batch = true;
1045 while can_process_next_batch {
1046 let batch = match Self::take_first_concatenated_xcms(
1047 &mut data,
1048 encoding,
1049 XCM_BATCH_SIZE,
1050 &mut meter,
1051 ) {
1052 Ok(batch) => batch,
1053 Err(batch) => {
1054 can_process_next_batch = false;
1055 defensive!(
1056 "HRMP inbound decode stream broke; page will be dropped."
1057 );
1058 batch
1059 },
1060 };
1061 if batch.is_empty() {
1062 break;
1063 }
1064
1065 if let Err(()) = Self::enqueue_xcmp_messages(
1066 sender,
1067 &batch,
1068 is_first_sender_batch,
1069 &mut meter,
1070 ) {
1071 break;
1072 }
1073 is_first_sender_batch = false;
1074 }
1075 },
1076 XcmpMessageFormat::ConcatenatedEncodedBlob => {
1077 defensive!("Blob messages are unhandled - dropping");
1078 continue;
1079 },
1080 }
1081 }
1082
1083 meter.consumed()
1084 }
1085}
1086
1087impl<T: Config> XcmpMessageSource for Pallet<T> {
1088 fn take_outbound_messages(
1089 maximum_channels: usize,
1090 excluded_recipients: &[ParaId],
1091 ) -> Vec<(ParaId, Vec<u8>)> {
1092 let mut statuses = <OutboundXcmpStatus<T>>::get().into_inner();
1093 let old_statuses_len = statuses.len();
1094 let max_message_count = statuses.len().min(maximum_channels);
1095 let mut result = Vec::with_capacity(max_message_count);
1096
1097 statuses.retain_mut(|status| {
1098 let OutboundChannelDetails {
1099 recipient: para_id,
1100 state: outbound_state,
1101 signals_exist,
1102 first_index,
1103 last_index,
1104 flags,
1105 queued_bytes,
1106 } = status;
1107
1108 let (max_size_now, max_size_ever) = match T::ChannelInfo::get_channel_status(*para_id) {
1109 ChannelStatus::Closed => {
1110 for i in *first_index..*last_index {
1113 <OutboundXcmpMessages<T>>::remove(*para_id, i);
1114 }
1115 if *signals_exist {
1116 <SignalMessages<T>>::remove(*para_id);
1117 }
1118 return false;
1119 },
1120 ChannelStatus::Full => return true,
1121 ChannelStatus::Ready(max_size_now, max_size_ever) => (max_size_now, max_size_ever),
1122 };
1123
1124 if excluded_recipients.contains(para_id) {
1126 return true;
1127 }
1128
1129 if result.len() == max_message_count {
1131 return true;
1134 }
1135
1136 let page = 'page_fetch: {
1137 if *signals_exist {
1138 let page = <SignalMessages<T>>::get(*para_id);
1139 defensive_assert!(!page.is_empty(), "Signals must exist");
1140
1141 if page.len() < max_size_now {
1142 <SignalMessages<T>>::remove(*para_id);
1143 *signals_exist = false;
1144 break 'page_fetch page;
1145 }
1146
1147 defensive!("Signals should fit into a single page");
1148 return true;
1149 }
1150
1151 if *outbound_state == OutboundState::Suspended {
1152 return true;
1154 }
1155
1156 if last_index > first_index {
1157 let page = <OutboundXcmpMessages<T>>::get(*para_id, *first_index);
1158 if page.len() < max_size_now {
1159 <OutboundXcmpMessages<T>>::remove(*para_id, *first_index);
1160 *first_index += 1;
1161 *queued_bytes = queued_bytes.saturating_sub(page.len() as u32);
1162 break 'page_fetch page;
1163 }
1164 }
1165
1166 if flags.should_send_concatenated_opaque_versioned_xcm_notification() {
1170 match WeakBoundedVec::try_from(XcmpMessageFormat::ConcatenatedOpaqueVersionedXcm.encode()) {
1171 Ok(page) => {
1172 flags.notice_concatenated_opaque_versioned_xcm_notification_sent();
1173 break 'page_fetch page;
1174 },
1175 Err(_) => {
1176 defensive!("XcmpMessageFormat should fit into a single page");
1177 return true;
1178 }
1179 };
1180 }
1181
1182 return true;
1183 };
1184
1185 if first_index == last_index {
1186 *first_index = 0;
1187 *last_index = 0;
1188 *queued_bytes = 0;
1189 }
1190
1191 if page.len() > max_size_ever {
1192 defensive!("WARNING: oversize message in queue - dropping");
1196 } else {
1197 result.push((*para_id, page.into_inner()));
1198 }
1199
1200 let max_total_size = match T::ChannelInfo::get_channel_info(*para_id) {
1201 Some(channel_info) => channel_info.max_total_size,
1202 None => {
1203 tracing::warn!(target: LOG_TARGET, "calling `get_channel_info` with no RelevantMessagingState?!");
1204 MAX_POSSIBLE_ALLOCATION
1206 },
1207 };
1208 let threshold = max_total_size.saturating_div(delivery_fee_constants::THRESHOLD_FACTOR);
1209 if *queued_bytes <= threshold {
1210 Self::decrease_fee_factor(*para_id);
1211 }
1212
1213 true
1214 });
1215 debug_assert!(!statuses.iter().any(|s| s.signals_exist), "Signals should be handled");
1216 let mut statuses = BoundedVec::defensive_truncate_from(statuses);
1217
1218 result.sort_by_key(|(recipient, _msg)| *recipient);
1221
1222 let pruned = old_statuses_len - statuses.len();
1224 let _ = statuses.try_rotate_left(result.len().saturating_sub(pruned)).defensive_proof(
1229 "Could not store HRMP channels config. Some HRMP channels may be broken.",
1230 );
1231
1232 <OutboundXcmpStatus<T>>::put(statuses);
1233
1234 result
1235 }
1236}
1237
1238impl<T: Config> SendXcm for Pallet<T> {
1240 type Ticket = (ParaId, VersionedXcm<()>);
1241
1242 fn validate(
1243 dest: &mut Option<Location>,
1244 msg: &mut Option<Xcm<()>>,
1245 ) -> SendResult<(ParaId, VersionedXcm<()>)> {
1246 let d = dest.take().ok_or(SendError::MissingArgument)?;
1247
1248 match d.unpack() {
1249 (1, [Parachain(id)]) => {
1251 let xcm = msg.take().ok_or(SendError::MissingArgument)?;
1252 let id = ParaId::from(*id);
1253 let price = T::PriceForSiblingDelivery::price_for_delivery(id, &xcm);
1254 let versioned_xcm = T::VersionWrapper::wrap_version(&d, xcm)
1255 .map_err(|()| SendError::DestinationUnsupported)?;
1256 versioned_xcm
1257 .check_is_decodable()
1258 .map_err(|()| SendError::ExceedsMaxMessageSize)?;
1259
1260 Ok(((id, versioned_xcm), price))
1261 },
1262 _ => {
1263 *dest = Some(d);
1266 Err(SendError::NotApplicable)
1267 },
1268 }
1269 }
1270
1271 fn deliver((recipient, xcm): (ParaId, VersionedXcm<()>)) -> Result<XcmHash, SendError> {
1272 let hash = xcm.using_encoded(sp_io::hashing::blake2_256);
1273
1274 let mut encoding = XcmEncoding::Simple;
1275 let mut all_channels = <OutboundXcmpStatus<T>>::get();
1276 if let Some(channel_details) = Self::try_get_outbound_channel(&mut all_channels, recipient)
1277 {
1278 if channel_details.flags.has_concatenated_opaque_versioned_xcm_support() {
1279 encoding = XcmEncoding::Double;
1280 }
1281 }
1282
1283 let result = match encoding {
1284 XcmEncoding::Simple => {
1285 Self::send_fragment(recipient, XcmpMessageFormat::ConcatenatedVersionedXcm, xcm)
1286 },
1287 XcmEncoding::Double => Self::send_fragment(
1288 recipient,
1289 XcmpMessageFormat::ConcatenatedOpaqueVersionedXcm,
1290 xcm.encode(),
1291 ),
1292 };
1293 match result {
1294 Ok(_) => {
1295 Self::deposit_event(Event::XcmpMessageSent { message_hash: hash });
1296 Ok(hash)
1297 },
1298 Err(e) => {
1299 tracing::error!(target: LOG_TARGET, error=?e, "Deliver error");
1300 Err(SendError::Transport(e.into()))
1301 },
1302 }
1303 }
1304}
1305
1306impl<T: Config> InspectMessageQueues for Pallet<T> {
1307 fn clear_messages() {
1308 let _ = OutboundXcmpMessages::<T>::clear(u32::MAX, None);
1310 OutboundXcmpStatus::<T>::mutate(|details_vec| {
1311 for details in details_vec {
1312 details.first_index = 0;
1313 details.last_index = 0;
1314 details.queued_bytes = 0;
1315 }
1316 });
1317 }
1318
1319 fn get_messages() -> Vec<(VersionedLocation, Vec<VersionedXcm<()>>)> {
1320 use xcm::prelude::*;
1321
1322 OutboundXcmpMessages::<T>::iter()
1323 .map(|(para_id, _, messages)| {
1324 let data = &mut &messages[..];
1325
1326 let decoded_format = XcmpMessageFormat::decode(data).unwrap();
1327 let mut decoded_messages = Vec::new();
1328 while !data.is_empty() {
1329 let message_bytes = match decoded_format {
1330 XcmpMessageFormat::ConcatenatedVersionedXcm => {
1331 Self::take_first_concatenated_xcm(data, &mut WeightMeter::new())
1332 },
1333 XcmpMessageFormat::ConcatenatedOpaqueVersionedXcm => {
1334 Self::take_first_concatenated_opaque_xcm(data)
1335 },
1336 unexpected_format => {
1337 panic!("Unexpected XCMP format: {unexpected_format:?}!")
1338 },
1339 }
1340 .unwrap();
1341 let decoded_message = VersionedXcm::<()>::decode_with_depth_limit(
1342 MAX_XCM_DECODE_DEPTH,
1343 &mut &message_bytes[..],
1344 )
1345 .unwrap();
1346 decoded_messages.push(decoded_message);
1347 }
1348
1349 (
1350 VersionedLocation::from(Location::new(1, Parachain(para_id.into()))),
1351 decoded_messages,
1352 )
1353 })
1354 .collect()
1355 }
1356}
1357
1358impl<T: Config> FeeTracker for Pallet<T> {
1359 type Id = ParaId;
1360
1361 fn get_fee_factor(id: Self::Id) -> FixedU128 {
1362 <DeliveryFeeFactor<T>>::get(id)
1363 }
1364
1365 fn set_fee_factor(id: Self::Id, val: FixedU128) {
1366 <DeliveryFeeFactor<T>>::set(id, val);
1367 }
1368}