1#![cfg_attr(not(feature = "std"), no_std)]
37
38pub mod migration;
39
40#[cfg(test)]
41mod mock;
42
43#[cfg(test)]
44mod tests;
45
46#[cfg(feature = "runtime-benchmarks")]
47mod benchmarking;
48#[cfg(feature = "bridging")]
49pub mod bridging;
50pub mod weights;
51pub mod weights_ext;
52
53pub use weights::WeightInfo;
54pub use weights_ext::WeightInfoExt;
55
56extern crate alloc;
57
58use alloc::{collections::BTreeSet, vec, vec::Vec};
59use bitflags::bitflags;
60use bounded_collections::{BoundedBTreeSet, BoundedSlice, BoundedVec};
61use codec::{Compact, Decode, DecodeLimit, Encode, MaxEncodedLen};
62use cumulus_primitives_core::{
63 relay_chain::BlockNumber as RelayBlockNumber, ChannelStatus, GetChannelInfo, MessageSendError,
64 ParaId, XcmpMessageFormat, XcmpMessageHandler, XcmpMessageSource,
65};
66
67use frame_support::{
68 defensive, defensive_assert,
69 traits::{
70 Defensive, DefensiveTruncateFrom, EnqueueMessage, EnsureOrigin, Get, Len, QueueFootprint,
71 QueueFootprintQuery, QueuePausedQuery,
72 },
73 weights::{Weight, WeightMeter},
74};
75use pallet_message_queue::OnQueueChanged;
76use polkadot_runtime_common::xcm_sender::PriceForMessageDelivery;
77use polkadot_runtime_parachains::{FeeTracker, GetMinFeeFactor};
78use scale_info::TypeInfo;
79use sp_core::MAX_POSSIBLE_ALLOCATION;
80use sp_runtime::{FixedU128, SaturatedConversion, WeakBoundedVec};
81use xcm::{latest::prelude::*, VersionedLocation, VersionedXcm, WrapVersion, MAX_XCM_DECODE_DEPTH};
82use xcm_builder::InspectMessageQueues;
83use xcm_executor::traits::ConvertOrigin;
84
85pub use pallet::*;
86
87pub type OverweightIndex = u64;
89pub type MaxXcmpMessageLenOf<T> =
91 <<T as Config>::XcmpQueue as EnqueueMessage<ParaId>>::MaxMessageLen;
92
93const LOG_TARGET: &str = "xcmp_queue";
94const DEFAULT_POV_SIZE: u64 = 64 * 1024; pub const XCM_BATCH_SIZE: usize = 250;
97pub const MAX_SIGNALS_PER_PAGE: usize = 3;
99
100pub mod delivery_fee_constants {
102 pub const THRESHOLD_FACTOR: u32 = 2;
104}
105
106#[frame_support::pallet]
107pub mod pallet {
108 use super::*;
109 use frame_support::{pallet_prelude::*, Twox64Concat};
110 use frame_system::pallet_prelude::*;
111
112 #[pallet::pallet]
113 #[pallet::storage_version(migration::STORAGE_VERSION)]
114 pub struct Pallet<T>(_);
115
116 #[pallet::config]
117 pub trait Config: frame_system::Config {
118 #[allow(deprecated)]
119 type RuntimeEvent: From<Event<Self>> + IsType<<Self as frame_system::Config>::RuntimeEvent>;
120
121 type ChannelInfo: GetChannelInfo;
123
124 type VersionWrapper: WrapVersion;
126
127 type XcmpQueue: EnqueueMessage<ParaId>
132 + QueueFootprintQuery<ParaId, MaxMessageLen = MaxXcmpMessageLenOf<Self>>;
133
134 #[pallet::constant]
140 type MaxInboundSuspended: Get<u32>;
141
142 #[pallet::constant]
151 type MaxActiveOutboundChannels: Get<u32>;
152
153 #[pallet::constant]
159 type MaxPageSize: Get<u32>;
160
161 type ControllerOrigin: EnsureOrigin<Self::RuntimeOrigin>;
163
164 type ControllerOriginConverter: ConvertOrigin<Self::RuntimeOrigin>;
167
168 type PriceForSiblingDelivery: PriceForMessageDelivery<Id = ParaId>;
170
171 type WeightInfo: WeightInfoExt;
173 }
174
175 #[pallet::call]
176 impl<T: Config> Pallet<T> {
177 #[pallet::call_index(1)]
181 #[pallet::weight((T::DbWeight::get().writes(1), DispatchClass::Operational,))]
182 pub fn suspend_xcm_execution(origin: OriginFor<T>) -> DispatchResult {
183 T::ControllerOrigin::ensure_origin(origin)?;
184
185 QueueSuspended::<T>::try_mutate(|suspended| {
186 if *suspended {
187 Err(Error::<T>::AlreadySuspended.into())
188 } else {
189 *suspended = true;
190 Ok(())
191 }
192 })
193 }
194
195 #[pallet::call_index(2)]
201 #[pallet::weight((T::DbWeight::get().writes(1), DispatchClass::Operational,))]
202 pub fn resume_xcm_execution(origin: OriginFor<T>) -> DispatchResult {
203 T::ControllerOrigin::ensure_origin(origin)?;
204
205 QueueSuspended::<T>::try_mutate(|suspended| {
206 if !*suspended {
207 Err(Error::<T>::AlreadyResumed.into())
208 } else {
209 *suspended = false;
210 Ok(())
211 }
212 })
213 }
214
215 #[pallet::call_index(3)]
221 #[pallet::weight((T::WeightInfo::set_config_with_u32(), DispatchClass::Operational,))]
222 pub fn update_suspend_threshold(origin: OriginFor<T>, new: u32) -> DispatchResult {
223 ensure_root(origin)?;
224
225 QueueConfig::<T>::try_mutate(|data| {
226 data.suspend_threshold = new;
227 data.validate::<T>()
228 })
229 }
230
231 #[pallet::call_index(4)]
237 #[pallet::weight((T::WeightInfo::set_config_with_u32(),DispatchClass::Operational,))]
238 pub fn update_drop_threshold(origin: OriginFor<T>, new: u32) -> DispatchResult {
239 ensure_root(origin)?;
240
241 QueueConfig::<T>::try_mutate(|data| {
242 data.drop_threshold = new;
243 data.validate::<T>()
244 })
245 }
246
247 #[pallet::call_index(5)]
253 #[pallet::weight((T::WeightInfo::set_config_with_u32(), DispatchClass::Operational,))]
254 pub fn update_resume_threshold(origin: OriginFor<T>, new: u32) -> DispatchResult {
255 ensure_root(origin)?;
256
257 QueueConfig::<T>::try_mutate(|data| {
258 data.resume_threshold = new;
259 data.validate::<T>()
260 })
261 }
262 }
263
264 #[pallet::hooks]
265 impl<T: Config> Hooks<BlockNumberFor<T>> for Pallet<T> {
266 fn integrity_test() {
267 assert!(!T::MaxPageSize::get().is_zero(), "MaxPageSize too low");
268
269 let w = Self::on_idle_weight();
270 assert!(w != Weight::zero());
271 assert!(w.all_lte(T::BlockWeights::get().max_block));
272
273 <T::WeightInfo as WeightInfoExt>::check_accuracy::<MaxXcmpMessageLenOf<T>>(0.15);
274 }
275
276 fn on_idle(_block: BlockNumberFor<T>, limit: Weight) -> Weight {
277 let mut meter = WeightMeter::with_limit(limit);
278
279 if meter.try_consume(Self::on_idle_weight()).is_err() {
280 tracing::debug!(
281 target: LOG_TARGET,
282 "Not enough weight for on_idle. {} < {}",
283 Self::on_idle_weight(), limit
284 );
285 return meter.consumed();
286 }
287
288 migration::v3::lazy_migrate_inbound_queue::<T>();
289
290 meter.consumed()
291 }
292 }
293
294 #[pallet::event]
295 #[pallet::generate_deposit(pub(super) fn deposit_event)]
296 pub enum Event<T: Config> {
297 XcmpMessageSent { message_hash: XcmHash },
299 }
300
301 #[pallet::error]
302 pub enum Error<T> {
303 BadQueueConfig,
305 AlreadySuspended,
307 AlreadyResumed,
309 TooManyActiveOutboundChannels,
311 TooBig,
313 }
314
315 #[pallet::storage]
324 pub type InboundXcmpSuspended<T: Config> =
325 StorageValue<_, BoundedBTreeSet<ParaId, T::MaxInboundSuspended>, ValueQuery>;
326
327 #[pallet::storage]
334 pub(super) type OutboundXcmpStatus<T: Config> = StorageValue<
335 _,
336 BoundedVec<OutboundChannelDetails, T::MaxActiveOutboundChannels>,
337 ValueQuery,
338 >;
339
340 #[pallet::storage]
342 pub(super) type OutboundXcmpMessages<T: Config> = StorageDoubleMap<
343 _,
344 Blake2_128Concat,
345 ParaId,
346 Twox64Concat,
347 u16,
348 WeakBoundedVec<u8, T::MaxPageSize>,
349 ValueQuery,
350 >;
351
352 #[pallet::storage]
354 pub(super) type SignalMessages<T: Config> =
355 StorageMap<_, Blake2_128Concat, ParaId, WeakBoundedVec<u8, T::MaxPageSize>, ValueQuery>;
356
357 #[pallet::storage]
359 pub(super) type QueueConfig<T: Config> = StorageValue<_, QueueConfigData, ValueQuery>;
360
361 #[pallet::storage]
363 pub(super) type QueueSuspended<T: Config> = StorageValue<_, bool, ValueQuery>;
364
365 #[pallet::storage]
367 pub(super) type DeliveryFeeFactor<T: Config> =
368 StorageMap<_, Twox64Concat, ParaId, FixedU128, ValueQuery, GetMinFeeFactor<Pallet<T>>>;
369}
370
371#[derive(Copy, Clone, Eq, PartialEq, Encode, Decode, Debug, TypeInfo, MaxEncodedLen)]
372pub enum OutboundState {
373 Ok,
374 Suspended,
375}
376
377bitflags! {
378 #[derive(Encode, Decode, TypeInfo, MaxEncodedLen)]
379 struct OutboundChannelFlags: u32 {
380 const CONCATENATED_OPAQUE_VERSIONED_XCM_SUPPORT = 1;
381 const CONCATENATED_OPAQUE_VERSIONED_XCM_NOTIFICATION_SENT = 1 << 1;
382 }
383}
384
385impl OutboundChannelFlags {
386 fn has_concatenated_opaque_versioned_xcm_support(&self) -> bool {
388 *self & Self::CONCATENATED_OPAQUE_VERSIONED_XCM_SUPPORT != Self::empty()
389 }
390
391 fn should_send_concatenated_opaque_versioned_xcm_notification(&self) -> bool {
394 if self.has_concatenated_opaque_versioned_xcm_support() {
395 return false;
396 }
397
398 if *self & Self::CONCATENATED_OPAQUE_VERSIONED_XCM_NOTIFICATION_SENT != Self::empty() {
399 return false;
400 }
401
402 true
403 }
404
405 fn notice_concatenated_opaque_versioned_xcm_support(&mut self) {
407 *self = *self | Self::CONCATENATED_OPAQUE_VERSIONED_XCM_SUPPORT;
408 }
409
410 fn notice_concatenated_opaque_versioned_xcm_notification_sent(&mut self) {
412 *self = *self | Self::CONCATENATED_OPAQUE_VERSIONED_XCM_NOTIFICATION_SENT;
413 }
414}
415
416#[derive(Clone, Eq, PartialEq, Encode, Decode, TypeInfo, Debug, MaxEncodedLen)]
418pub struct OutboundChannelDetails {
419 recipient: ParaId,
421 state: OutboundState,
423 signals_exist: bool,
425 first_index: u16,
427 last_index: u16,
429 flags: OutboundChannelFlags,
431}
432
433impl OutboundChannelDetails {
434 pub fn new(recipient: ParaId) -> OutboundChannelDetails {
435 OutboundChannelDetails {
436 recipient,
437 state: OutboundState::Ok,
438 signals_exist: false,
439 first_index: 0,
440 last_index: 0,
441 flags: OutboundChannelFlags::empty(),
442 }
443 }
444
445 pub fn with_signals(mut self) -> OutboundChannelDetails {
446 self.signals_exist = true;
447 self
448 }
449
450 pub fn with_suspended_state(mut self) -> OutboundChannelDetails {
451 self.state = OutboundState::Suspended;
452 self
453 }
454}
455
456#[derive(Copy, Clone, Eq, PartialEq, Encode, Decode, Debug, TypeInfo, MaxEncodedLen)]
457pub struct QueueConfigData {
458 suspend_threshold: u32,
461 drop_threshold: u32,
465 resume_threshold: u32,
468}
469
470impl Default for QueueConfigData {
471 fn default() -> Self {
472 Self {
475 drop_threshold: 48, suspend_threshold: 32, resume_threshold: 8, }
479 }
480}
481
482impl QueueConfigData {
483 pub fn validate<T: crate::Config>(&self) -> sp_runtime::DispatchResult {
487 if self.resume_threshold < self.suspend_threshold &&
488 self.suspend_threshold <= self.drop_threshold &&
489 self.resume_threshold > 0
490 {
491 Ok(())
492 } else {
493 Err(Error::<T>::BadQueueConfig.into())
494 }
495 }
496}
497
498#[derive(PartialEq, Eq, Copy, Clone, Encode, Decode, TypeInfo)]
499pub enum ChannelSignal {
500 Suspend,
501 Resume,
502}
503
504impl<T: Config> Pallet<T> {
505 fn try_get_outbound_channel(
506 all_channels: &BoundedVec<OutboundChannelDetails, T::MaxActiveOutboundChannels>,
507 recipient: ParaId,
508 ) -> Option<&OutboundChannelDetails> {
509 for channel_idx in 0..all_channels.len() {
510 if all_channels[channel_idx].recipient == recipient {
511 return Some(&all_channels[channel_idx]);
512 }
513 }
514
515 None
516 }
517
518 fn try_get_or_insert_outbound_channel(
519 all_channels: &mut BoundedVec<OutboundChannelDetails, T::MaxActiveOutboundChannels>,
520 recipient: ParaId,
521 ) -> Option<&mut OutboundChannelDetails> {
522 for channel_idx in 0..all_channels.len() {
523 if all_channels[channel_idx].recipient == recipient {
524 return Some(&mut all_channels[channel_idx]);
525 }
526 }
527
528 all_channels
529 .try_push(OutboundChannelDetails::new(recipient))
530 .inspect_err(|e| {
531 tracing::error!(target: LOG_TARGET, error=?e, "Failed to insert outbound HRMP channel");
532 })
533 .ok()?;
534 all_channels.last_mut()
535 }
536
537 fn send_fragment<Fragment: Encode>(
559 recipient: ParaId,
560 format: XcmpMessageFormat,
561 fragment: Fragment,
562 ) -> Result<u32, MessageSendError> {
563 let mut encoded_fragment = fragment.encode();
564 let encoded_fragment_len = encoded_fragment.len();
565
566 let channel_info =
570 T::ChannelInfo::get_channel_info(recipient).ok_or(MessageSendError::NoChannel)?;
571 let max_message_size = channel_info.max_message_size.min(T::MaxPageSize::get()) as usize;
573 let format_size = format.encoded_size();
574 let size_to_check = encoded_fragment
577 .len()
578 .checked_add(format_size)
579 .ok_or(MessageSendError::TooBig)?;
580 if size_to_check > max_message_size {
581 return Err(MessageSendError::TooBig);
582 }
583
584 let mut all_channels = <OutboundXcmpStatus<T>>::get();
585 let channel_details =
586 Self::try_get_or_insert_outbound_channel(&mut all_channels, recipient)
587 .ok_or(MessageSendError::TooManyChannels)?;
588 if let XcmpMessageFormat::ConcatenatedOpaqueVersionedXcm = format {
589 channel_details
590 .flags
591 .notice_concatenated_opaque_versioned_xcm_notification_sent();
592 }
593
594 let mut existing_page = None;
595 'existing_page_check: {
596 if channel_details.last_index > channel_details.first_index {
597 let page =
598 OutboundXcmpMessages::<T>::get(recipient, channel_details.last_index - 1);
599 if XcmpMessageFormat::decode(&mut &page[..]) != Ok(format) {
600 break 'existing_page_check;
601 }
602 if page.len() + encoded_fragment.len() > max_message_size {
603 break 'existing_page_check;
604 }
605 existing_page = Some(page.into_inner());
606 }
607 }
608 let mut current_page = existing_page.unwrap_or_else(|| {
609 channel_details.last_index += 1;
611 format.encode()
612 });
613
614 current_page.append(&mut encoded_fragment);
615 let current_page = WeakBoundedVec::try_from(current_page).map_err(|error| {
616 tracing::debug!(target: LOG_TARGET, ?error, "Failed to create bounded message page");
617 MessageSendError::TooBig
618 })?;
619 let page_count =
620 channel_details.last_index.saturating_sub(channel_details.first_index) as u32;
621 let last_page_size = current_page.len();
622 <OutboundXcmpMessages<T>>::insert(recipient, channel_details.last_index - 1, current_page);
623 <OutboundXcmpStatus<T>>::put(all_channels);
624
625 let total_size =
629 page_count.saturating_sub(1) * max_message_size as u32 + last_page_size as u32;
630 let threshold = channel_info.max_total_size / delivery_fee_constants::THRESHOLD_FACTOR;
631 if total_size > threshold {
632 Self::increase_fee_factor(recipient, encoded_fragment_len as u128);
633 }
634
635 Ok(page_count)
636 }
637
638 fn send_signal(dest: ParaId, signal: ChannelSignal) -> Result<(), Error<T>> {
641 let mut s = <OutboundXcmpStatus<T>>::get();
642 if let Some(details) = s.iter_mut().find(|item| item.recipient == dest) {
643 details.signals_exist = true;
644 } else {
645 s.try_push(OutboundChannelDetails::new(dest).with_signals()).map_err(|error| {
646 tracing::debug!(target: LOG_TARGET, ?error, "Failed to activate XCMP channel");
647 Error::<T>::TooManyActiveOutboundChannels
648 })?;
649 }
650
651 let page = BoundedVec::<u8, T::MaxPageSize>::try_from(
652 (XcmpMessageFormat::Signals, signal).encode(),
653 )
654 .map_err(|error| {
655 tracing::debug!(target: LOG_TARGET, ?error, "Failed to encode signal message");
656 Error::<T>::TooBig
657 })?;
658 let page = WeakBoundedVec::force_from(page.into_inner(), None);
659
660 <SignalMessages<T>>::insert(dest, page);
661 <OutboundXcmpStatus<T>>::put(s);
662 Ok(())
663 }
664
665 fn suspend_channel(target: ParaId) {
666 <OutboundXcmpStatus<T>>::mutate(|s| {
667 if let Some(details) = s.iter_mut().find(|item| item.recipient == target) {
668 let ok = details.state == OutboundState::Ok;
669 defensive_assert!(ok, "WARNING: Attempt to suspend channel that was not Ok.");
670 details.state = OutboundState::Suspended;
671 } else {
672 if s.try_push(OutboundChannelDetails::new(target).with_suspended_state()).is_err() {
673 defensive!("Cannot pause channel; too many outbound channels");
674 }
675 }
676 });
677 }
678
679 fn resume_channel(target: ParaId) {
680 <OutboundXcmpStatus<T>>::mutate(|s| {
681 if let Some(index) = s.iter().position(|item| item.recipient == target) {
682 let suspended = s[index].state == OutboundState::Suspended;
683 defensive_assert!(
684 suspended,
685 "WARNING: Attempt to resume channel that was not suspended."
686 );
687 if s[index].first_index == s[index].last_index {
688 s.remove(index);
689 } else {
690 s[index].state = OutboundState::Ok;
691 }
692 } else {
693 defensive!("WARNING: Attempt to resume channel that was not suspended.");
694 }
695 });
696 }
697
698 fn enqueue_xcmp_messages<'a>(
699 sender: ParaId,
700 xcms: &[BoundedSlice<'a, u8, MaxXcmpMessageLenOf<T>>],
701 is_first_sender_batch: bool,
702 meter: &mut WeightMeter,
703 ) -> Result<(), ()> {
704 let QueueConfigData { drop_threshold, .. } = <QueueConfig<T>>::get();
705 let batches_footprints =
706 T::XcmpQueue::get_batches_footprints(sender, xcms.iter().copied(), drop_threshold);
707
708 let best_batch_footprint = batches_footprints.search_best_by(|batch_info| {
709 let required_weight = T::WeightInfo::enqueue_xcmp_messages(
710 batches_footprints.first_page_pos.saturated_into(),
711 batch_info,
712 is_first_sender_batch,
713 );
714
715 match meter.can_consume(required_weight) {
716 true => core::cmp::Ordering::Less,
717 false => core::cmp::Ordering::Greater,
718 }
719 });
720
721 meter.consume(T::WeightInfo::enqueue_xcmp_messages(
722 batches_footprints.first_page_pos.saturated_into(),
723 best_batch_footprint,
724 is_first_sender_batch,
725 ));
726 T::XcmpQueue::enqueue_messages(
727 xcms.iter().take(best_batch_footprint.msgs_count).copied(),
728 sender,
729 );
730
731 if best_batch_footprint.msgs_count < xcms.len() {
732 tracing::error!(
733 target: LOG_TARGET,
734 used_weight=?meter.consumed_ratio(),
735 "Out of weight: cannot enqueue entire XCMP messages batch; \
736 dropped some or all messages in batch."
737 );
738 return Err(());
739 }
740 Ok(())
741 }
742
743 pub(crate) fn take_first_concatenated_xcm<'a>(
750 data: &mut &'a [u8],
751 meter: &mut WeightMeter,
752 ) -> Result<BoundedSlice<'a, u8, MaxXcmpMessageLenOf<T>>, ()> {
753 let base_weight = T::WeightInfo::take_first_concatenated_xcm(0);
755 if meter.try_consume(base_weight).is_err() {
756 defensive!("Out of weight; could not decode all; dropping");
757 return Err(());
758 }
759
760 let input_data = &mut &data[..];
761 let mut input = codec::CountedInput::new(input_data);
762 VersionedXcm::<()>::decode_with_depth_limit(MAX_XCM_DECODE_DEPTH, &mut input).map_err(
763 |error| {
764 tracing::debug!(target: LOG_TARGET, ?error, "Failed to decode XCM with depth limit");
765 ()
766 },
767 )?;
768 let (xcm_data, remaining_data) = data.split_at(input.count() as usize);
769 *data = remaining_data;
770
771 let extra_weight =
775 T::WeightInfo::take_first_concatenated_xcm(xcm_data.len() as u32) - base_weight;
776 meter.consume(extra_weight);
777
778 let xcm = BoundedSlice::try_from(xcm_data).map_err(|error| {
779 tracing::error!(
780 target: LOG_TARGET,
781 ?error,
782 "Failed to take XCM after decoding: message is too long"
783 );
784 ()
785 })?;
786
787 Ok(xcm)
788 }
789
790 pub(crate) fn take_first_concatenated_opaque_xcm<'a>(
794 data: &mut &'a [u8],
795 ) -> Result<BoundedSlice<'a, u8, MaxXcmpMessageLenOf<T>>, ()> {
796 let xcm_len = Compact::<u32>::decode(data).map_err(|error| {
797 tracing::debug!(target: LOG_TARGET, ?error, "Failed to decode opaque XCM length");
798 ()
799 })?;
800 let (xcm_data, remaining_data) = match data.split_at_checked(xcm_len.0 as usize) {
801 Some((xcm_data, remaining_data)) => (xcm_data, remaining_data),
802 None => {
803 tracing::debug!(target: LOG_TARGET, ?xcm_len, "Wrong opaque XCM length");
804 return Err(());
805 },
806 };
807 *data = remaining_data;
808
809 let xcm = BoundedSlice::try_from(xcm_data).map_err(|error| {
810 tracing::error!(
811 target: LOG_TARGET,
812 ?error,
813 "Failed to take opaque XCM after decoding: message is too long"
814 );
815 ()
816 })?;
817
818 Ok(xcm)
819 }
820
821 pub(crate) fn take_first_concatenated_xcms<'a>(
825 data: &mut &'a [u8],
826 encoding: XcmEncoding,
827 batch_size: usize,
828 meter: &mut WeightMeter,
829 ) -> Result<
830 Vec<BoundedSlice<'a, u8, MaxXcmpMessageLenOf<T>>>,
831 Vec<BoundedSlice<'a, u8, MaxXcmpMessageLenOf<T>>>,
832 > {
833 let mut batch = vec![];
834 loop {
835 if data.is_empty() {
836 return Ok(batch);
837 }
838
839 let maybe_xcm = match encoding {
840 XcmEncoding::Simple => Self::take_first_concatenated_xcm(data, meter),
841 XcmEncoding::Double => Self::take_first_concatenated_opaque_xcm(data),
842 };
843 match maybe_xcm {
844 Ok(xcm) => {
845 batch.push(xcm);
846 if batch.len() >= batch_size {
847 return Ok(batch);
848 }
849 },
850 Err(_) => return Err(batch),
851 }
852 }
853 }
854
855 pub fn on_idle_weight() -> Weight {
857 <T as crate::Config>::WeightInfo::on_idle_good_msg()
858 .max(<T as crate::Config>::WeightInfo::on_idle_large_msg())
859 }
860
861 #[cfg(feature = "bridging")]
862 fn is_inbound_channel_suspended(sender: ParaId) -> bool {
863 <InboundXcmpSuspended<T>>::get().iter().any(|c| c == &sender)
864 }
865
866 #[cfg(feature = "bridging")]
867 fn outbound_channel_state(target: ParaId) -> Option<(OutboundState, u16)> {
869 <OutboundXcmpStatus<T>>::get().iter().find(|c| c.recipient == target).map(|c| {
870 let queued_pages = c.last_index.saturating_sub(c.first_index);
871 (c.state, queued_pages)
872 })
873 }
874}
875
876impl<T: Config> OnQueueChanged<ParaId> for Pallet<T> {
877 fn on_queue_changed(para: ParaId, fp: QueueFootprint) {
879 let QueueConfigData { resume_threshold, suspend_threshold, .. } = <QueueConfig<T>>::get();
880
881 let mut suspended_channels = <InboundXcmpSuspended<T>>::get();
882 let suspended = suspended_channels.contains(¶);
883
884 if suspended && fp.ready_pages <= resume_threshold {
885 if let Err(err) = Self::send_signal(para, ChannelSignal::Resume) {
886 tracing::error!(
887 target: LOG_TARGET,
888 error=?err,
889 sibling=?para,
890 "defensive: Could not send resumption signal to inbound channel of sibling; channel remains suspended."
891 );
892 } else {
893 suspended_channels.remove(¶);
894 <InboundXcmpSuspended<T>>::put(suspended_channels);
895 }
896 } else if !suspended && fp.ready_pages >= suspend_threshold {
897 tracing::warn!(target: LOG_TARGET, sibling=?para, "XCMP queue for sibling is full; suspending channel.");
898
899 if let Err(err) = Self::send_signal(para, ChannelSignal::Suspend) {
900 tracing::error!(
902 target: LOG_TARGET, error=?err,
903 "defensive: Could not send suspension signal; future messages may be dropped."
904 );
905 } else if let Err(err) = suspended_channels.try_insert(para) {
906 tracing::error!(
907 target: LOG_TARGET,
908 error=?err,
909 sibling=?para,
910 "Too many channels suspended; cannot suspend sibling; further messages may be dropped."
911 );
912 } else {
913 <InboundXcmpSuspended<T>>::put(suspended_channels);
914 }
915 }
916 }
917}
918
919impl<T: Config> QueuePausedQuery<ParaId> for Pallet<T> {
920 fn is_paused(para: &ParaId) -> bool {
921 if !QueueSuspended::<T>::get() {
922 return false;
923 }
924
925 let sender_origin = T::ControllerOriginConverter::convert_origin(
927 (Parent, Parachain((*para).into())),
928 OriginKind::Superuser,
929 );
930 let is_controller =
931 sender_origin.map_or(false, |origin| T::ControllerOrigin::try_origin(origin).is_ok());
932
933 !is_controller
934 }
935}
936
937#[derive(Copy, Clone)]
939enum XcmEncoding {
940 Simple,
945 Double,
953}
954
955impl<T: Config> XcmpMessageHandler for Pallet<T> {
956 fn handle_xcmp_messages<'a, I: Iterator<Item = (ParaId, RelayBlockNumber, &'a [u8])>>(
957 iter: I,
958 max_weight: Weight,
959 ) -> Weight {
960 let mut meter = WeightMeter::with_limit(max_weight);
961
962 let mut known_xcm_senders = BTreeSet::new();
963 for (sender, _sent_at, mut data) in iter {
964 let format = match XcmpMessageFormat::decode(&mut data) {
965 Ok(f) => f,
966 Err(_) => {
967 defensive!("Unknown XCMP message format - dropping");
968 continue;
969 },
970 };
971
972 match format {
973 XcmpMessageFormat::Signals => {
974 let mut signal_count = 0;
975 while !data.is_empty() && signal_count < MAX_SIGNALS_PER_PAGE {
976 signal_count += 1;
977 match ChannelSignal::decode(&mut data) {
978 Ok(ChannelSignal::Suspend) => {
979 if meter.try_consume(T::WeightInfo::suspend_channel()).is_err() {
980 defensive!(
981 "Not enough weight to process suspend signal - dropping"
982 );
983 break;
984 }
985 Self::suspend_channel(sender)
986 },
987 Ok(ChannelSignal::Resume) => {
988 if meter.try_consume(T::WeightInfo::resume_channel()).is_err() {
989 defensive!(
990 "Not enough weight to process resume signal - dropping"
991 );
992 break;
993 }
994 Self::resume_channel(sender)
995 },
996 Err(_) => {
997 defensive!("Undecodable channel signal - dropping");
998 break;
999 },
1000 }
1001 }
1002 },
1003 XcmpMessageFormat::ConcatenatedVersionedXcm |
1004 XcmpMessageFormat::ConcatenatedOpaqueVersionedXcm => {
1005 let encoding = match format {
1006 XcmpMessageFormat::ConcatenatedVersionedXcm => XcmEncoding::Simple,
1007 XcmpMessageFormat::ConcatenatedOpaqueVersionedXcm => {
1008 let mut all_channels = <OutboundXcmpStatus<T>>::get();
1009 if let Some(channel_details) =
1010 Self::try_get_or_insert_outbound_channel(&mut all_channels, sender)
1011 {
1012 channel_details
1013 .flags
1014 .notice_concatenated_opaque_versioned_xcm_support();
1015 }
1016 <OutboundXcmpStatus<T>>::put(all_channels);
1017
1018 XcmEncoding::Double
1019 },
1020 _ => {
1021 continue;
1023 },
1024 };
1025
1026 let mut is_first_sender_batch = known_xcm_senders.insert(sender);
1027 if is_first_sender_batch {
1028 if meter
1029 .try_consume(T::WeightInfo::uncached_enqueue_xcmp_messages())
1030 .is_err()
1031 {
1032 defensive!(
1033 "Out of weight: cannot enqueue XCMP messages; dropping page; \
1034 Used weight: ",
1035 meter.consumed_ratio()
1036 );
1037 continue;
1038 }
1039 }
1040
1041 let mut can_process_next_batch = true;
1042 while can_process_next_batch {
1043 let batch = match Self::take_first_concatenated_xcms(
1044 &mut data,
1045 encoding,
1046 XCM_BATCH_SIZE,
1047 &mut meter,
1048 ) {
1049 Ok(batch) => batch,
1050 Err(batch) => {
1051 can_process_next_batch = false;
1052 defensive!(
1053 "HRMP inbound decode stream broke; page will be dropped."
1054 );
1055 batch
1056 },
1057 };
1058 if batch.is_empty() {
1059 break;
1060 }
1061
1062 if let Err(()) = Self::enqueue_xcmp_messages(
1063 sender,
1064 &batch,
1065 is_first_sender_batch,
1066 &mut meter,
1067 ) {
1068 break;
1069 }
1070 is_first_sender_batch = false;
1071 }
1072 },
1073 XcmpMessageFormat::ConcatenatedEncodedBlob => {
1074 defensive!("Blob messages are unhandled - dropping");
1075 continue;
1076 },
1077 }
1078 }
1079
1080 meter.consumed()
1081 }
1082}
1083
1084impl<T: Config> XcmpMessageSource for Pallet<T> {
1085 fn take_outbound_messages(
1086 maximum_channels: usize,
1087 excluded_recipients: &[ParaId],
1088 ) -> Vec<(ParaId, Vec<u8>)> {
1089 let mut statuses = <OutboundXcmpStatus<T>>::get().into_inner();
1090 let old_statuses_len = statuses.len();
1091 let max_message_count = statuses.len().min(maximum_channels);
1092 let mut result = Vec::with_capacity(max_message_count);
1093
1094 statuses.retain_mut(|status| {
1095 let OutboundChannelDetails {
1096 recipient: para_id,
1097 state: outbound_state,
1098 signals_exist,
1099 first_index,
1100 last_index,
1101 flags,
1102 } = status;
1103
1104 let (max_size_now, max_size_ever) = match T::ChannelInfo::get_channel_status(*para_id) {
1105 ChannelStatus::Closed => {
1106 for i in *first_index..*last_index {
1109 <OutboundXcmpMessages<T>>::remove(*para_id, i);
1110 }
1111 if *signals_exist {
1112 <SignalMessages<T>>::remove(*para_id);
1113 }
1114 return false;
1115 },
1116 ChannelStatus::Full => return true,
1117 ChannelStatus::Ready(max_size_now, max_size_ever) => (max_size_now, max_size_ever),
1118 };
1119
1120 if excluded_recipients.contains(para_id) {
1122 return true;
1123 }
1124
1125 if result.len() == max_message_count {
1127 return true;
1130 }
1131
1132 let page = 'page_fetch: {
1133 if *signals_exist {
1134 let page = <SignalMessages<T>>::get(*para_id);
1135 defensive_assert!(!page.is_empty(), "Signals must exist");
1136
1137 if page.len() < max_size_now {
1138 <SignalMessages<T>>::remove(*para_id);
1139 *signals_exist = false;
1140 break 'page_fetch page;
1141 }
1142
1143 defensive!("Signals should fit into a single page");
1144 return true;
1145 }
1146
1147 if *outbound_state == OutboundState::Suspended {
1148 return true;
1150 }
1151
1152 if last_index > first_index {
1153 let page = <OutboundXcmpMessages<T>>::get(*para_id, *first_index);
1154 if page.len() < max_size_now {
1155 <OutboundXcmpMessages<T>>::remove(*para_id, *first_index);
1156 *first_index += 1;
1157 break 'page_fetch page;
1158 }
1159 }
1160
1161 if flags.should_send_concatenated_opaque_versioned_xcm_notification() {
1165 match WeakBoundedVec::try_from(XcmpMessageFormat::ConcatenatedOpaqueVersionedXcm.encode()) {
1166 Ok(page) => {
1167 flags.notice_concatenated_opaque_versioned_xcm_notification_sent();
1168 break 'page_fetch page;
1169 },
1170 Err(_) => {
1171 defensive!("XcmpMessageFormat should fit into a single page");
1172 return true;
1173 }
1174 };
1175 }
1176
1177 return true;
1178 };
1179
1180 if first_index == last_index {
1181 *first_index = 0;
1182 *last_index = 0;
1183 }
1184
1185 if page.len() > max_size_ever {
1186 defensive!("WARNING: oversize message in queue - dropping");
1190 } else {
1191 result.push((*para_id, page.into_inner()));
1192 }
1193
1194 let max_total_size = match T::ChannelInfo::get_channel_info(*para_id) {
1195 Some(channel_info) => channel_info.max_total_size,
1196 None => {
1197 tracing::warn!(target: LOG_TARGET, "calling `get_channel_info` with no RelevantMessagingState?!");
1198 MAX_POSSIBLE_ALLOCATION },
1200 };
1201 let threshold = max_total_size.saturating_div(delivery_fee_constants::THRESHOLD_FACTOR);
1202 let remaining_total_size: usize = (*first_index..*last_index)
1203 .map(|index| OutboundXcmpMessages::<T>::decode_len(*para_id, index).unwrap())
1204 .sum();
1205 if remaining_total_size <= threshold as usize {
1206 Self::decrease_fee_factor(*para_id);
1207 }
1208
1209 true
1210 });
1211 debug_assert!(!statuses.iter().any(|s| s.signals_exist), "Signals should be handled");
1212 let mut statuses = BoundedVec::defensive_truncate_from(statuses);
1213
1214 result.sort_by_key(|(recipient, _msg)| *recipient);
1217
1218 let pruned = old_statuses_len - statuses.len();
1220 let _ = statuses.try_rotate_left(result.len().saturating_sub(pruned)).defensive_proof(
1225 "Could not store HRMP channels config. Some HRMP channels may be broken.",
1226 );
1227
1228 <OutboundXcmpStatus<T>>::put(statuses);
1229
1230 result
1231 }
1232}
1233
1234impl<T: Config> SendXcm for Pallet<T> {
1236 type Ticket = (ParaId, VersionedXcm<()>);
1237
1238 fn validate(
1239 dest: &mut Option<Location>,
1240 msg: &mut Option<Xcm<()>>,
1241 ) -> SendResult<(ParaId, VersionedXcm<()>)> {
1242 let d = dest.take().ok_or(SendError::MissingArgument)?;
1243
1244 match d.unpack() {
1245 (1, [Parachain(id)]) => {
1247 let xcm = msg.take().ok_or(SendError::MissingArgument)?;
1248 let id = ParaId::from(*id);
1249 let price = T::PriceForSiblingDelivery::price_for_delivery(id, &xcm);
1250 let versioned_xcm = T::VersionWrapper::wrap_version(&d, xcm)
1251 .map_err(|()| SendError::DestinationUnsupported)?;
1252 versioned_xcm
1253 .check_is_decodable()
1254 .map_err(|()| SendError::ExceedsMaxMessageSize)?;
1255
1256 Ok(((id, versioned_xcm), price))
1257 },
1258 _ => {
1259 *dest = Some(d);
1262 Err(SendError::NotApplicable)
1263 },
1264 }
1265 }
1266
1267 fn deliver((recipient, xcm): (ParaId, VersionedXcm<()>)) -> Result<XcmHash, SendError> {
1268 let hash = xcm.using_encoded(sp_io::hashing::blake2_256);
1269
1270 let mut encoding = XcmEncoding::Simple;
1271 let mut all_channels = <OutboundXcmpStatus<T>>::get();
1272 if let Some(channel_details) = Self::try_get_outbound_channel(&mut all_channels, recipient)
1273 {
1274 if channel_details.flags.has_concatenated_opaque_versioned_xcm_support() {
1275 encoding = XcmEncoding::Double;
1276 }
1277 }
1278
1279 let result = match encoding {
1280 XcmEncoding::Simple => {
1281 Self::send_fragment(recipient, XcmpMessageFormat::ConcatenatedVersionedXcm, xcm)
1282 },
1283 XcmEncoding::Double => Self::send_fragment(
1284 recipient,
1285 XcmpMessageFormat::ConcatenatedOpaqueVersionedXcm,
1286 xcm.encode(),
1287 ),
1288 };
1289 match result {
1290 Ok(_) => {
1291 Self::deposit_event(Event::XcmpMessageSent { message_hash: hash });
1292 Ok(hash)
1293 },
1294 Err(e) => {
1295 tracing::error!(target: LOG_TARGET, error=?e, "Deliver error");
1296 Err(SendError::Transport(e.into()))
1297 },
1298 }
1299 }
1300}
1301
1302impl<T: Config> InspectMessageQueues for Pallet<T> {
1303 fn clear_messages() {
1304 let _ = OutboundXcmpMessages::<T>::clear(u32::MAX, None);
1306 OutboundXcmpStatus::<T>::mutate(|details_vec| {
1307 for details in details_vec {
1308 details.first_index = 0;
1309 details.last_index = 0;
1310 }
1311 });
1312 }
1313
1314 fn get_messages() -> Vec<(VersionedLocation, Vec<VersionedXcm<()>>)> {
1315 use xcm::prelude::*;
1316
1317 OutboundXcmpMessages::<T>::iter()
1318 .map(|(para_id, _, messages)| {
1319 let data = &mut &messages[..];
1320
1321 let decoded_format = XcmpMessageFormat::decode(data).unwrap();
1322 let mut decoded_messages = Vec::new();
1323 while !data.is_empty() {
1324 let message_bytes = match decoded_format {
1325 XcmpMessageFormat::ConcatenatedVersionedXcm => {
1326 Self::take_first_concatenated_xcm(data, &mut WeightMeter::new())
1327 },
1328 XcmpMessageFormat::ConcatenatedOpaqueVersionedXcm => {
1329 Self::take_first_concatenated_opaque_xcm(data)
1330 },
1331 unexpected_format => {
1332 panic!("Unexpected XCMP format: {unexpected_format:?}!")
1333 },
1334 }
1335 .unwrap();
1336 let decoded_message = VersionedXcm::<()>::decode_with_depth_limit(
1337 MAX_XCM_DECODE_DEPTH,
1338 &mut &message_bytes[..],
1339 )
1340 .unwrap();
1341 decoded_messages.push(decoded_message);
1342 }
1343
1344 (
1345 VersionedLocation::from(Location::new(1, Parachain(para_id.into()))),
1346 decoded_messages,
1347 )
1348 })
1349 .collect()
1350 }
1351}
1352
1353impl<T: Config> FeeTracker for Pallet<T> {
1354 type Id = ParaId;
1355
1356 fn get_fee_factor(id: Self::Id) -> FixedU128 {
1357 <DeliveryFeeFactor<T>>::get(id)
1358 }
1359
1360 fn set_fee_factor(id: Self::Id, val: FixedU128) {
1361 <DeliveryFeeFactor<T>>::set(id, val);
1362 }
1363}