1#![cfg_attr(not(feature = "std"), no_std)]
37
38pub mod migration;
39
40#[cfg(test)]
41mod mock;
42
43#[cfg(test)]
44mod tests;
45
46#[cfg(feature = "runtime-benchmarks")]
47mod benchmarking;
48#[cfg(feature = "bridging")]
49pub mod bridging;
50pub mod weights;
51pub mod weights_ext;
52
53pub use weights::WeightInfo;
54pub use weights_ext::WeightInfoExt;
55
56extern crate alloc;
57
58use alloc::{collections::BTreeSet, vec, vec::Vec};
59use bounded_collections::{BoundedBTreeSet, BoundedSlice, BoundedVec};
60use codec::{Decode, DecodeLimit, Encode, MaxEncodedLen};
61use cumulus_primitives_core::{
62 relay_chain::BlockNumber as RelayBlockNumber, ChannelStatus, GetChannelInfo, MessageSendError,
63 ParaId, XcmpMessageFormat, XcmpMessageHandler, XcmpMessageSource,
64};
65
66use frame_support::{
67 defensive, defensive_assert,
68 traits::{
69 Defensive, EnqueueMessage, EnsureOrigin, Get, QueueFootprint, QueueFootprintQuery,
70 QueuePausedQuery,
71 },
72 weights::{Weight, WeightMeter},
73};
74use pallet_message_queue::OnQueueChanged;
75use polkadot_runtime_common::xcm_sender::PriceForMessageDelivery;
76use polkadot_runtime_parachains::{FeeTracker, GetMinFeeFactor};
77use scale_info::TypeInfo;
78use sp_core::MAX_POSSIBLE_ALLOCATION;
79use sp_runtime::{FixedU128, RuntimeDebug, SaturatedConversion, WeakBoundedVec};
80use xcm::{latest::prelude::*, VersionedLocation, VersionedXcm, WrapVersion, MAX_XCM_DECODE_DEPTH};
81use xcm_builder::InspectMessageQueues;
82use xcm_executor::traits::ConvertOrigin;
83
84pub use pallet::*;
85
86pub type OverweightIndex = u64;
88pub type MaxXcmpMessageLenOf<T> =
90 <<T as Config>::XcmpQueue as EnqueueMessage<ParaId>>::MaxMessageLen;
91
92const LOG_TARGET: &str = "xcmp_queue";
93const DEFAULT_POV_SIZE: u64 = 64 * 1024; pub const XCM_BATCH_SIZE: usize = 250;
96
97pub mod delivery_fee_constants {
99 pub const THRESHOLD_FACTOR: u32 = 2;
101}
102
103#[frame_support::pallet]
104pub mod pallet {
105 use super::*;
106 use frame_support::{pallet_prelude::*, Twox64Concat};
107 use frame_system::pallet_prelude::*;
108
109 #[pallet::pallet]
110 #[pallet::storage_version(migration::STORAGE_VERSION)]
111 pub struct Pallet<T>(_);
112
113 #[pallet::config]
114 pub trait Config: frame_system::Config {
115 #[allow(deprecated)]
116 type RuntimeEvent: From<Event<Self>> + IsType<<Self as frame_system::Config>::RuntimeEvent>;
117
118 type ChannelInfo: GetChannelInfo;
120
121 type VersionWrapper: WrapVersion;
123
124 type XcmpQueue: EnqueueMessage<ParaId>
129 + QueueFootprintQuery<ParaId, MaxMessageLen = MaxXcmpMessageLenOf<Self>>;
130
131 #[pallet::constant]
137 type MaxInboundSuspended: Get<u32>;
138
139 #[pallet::constant]
148 type MaxActiveOutboundChannels: Get<u32>;
149
150 #[pallet::constant]
156 type MaxPageSize: Get<u32>;
157
158 type ControllerOrigin: EnsureOrigin<Self::RuntimeOrigin>;
160
161 type ControllerOriginConverter: ConvertOrigin<Self::RuntimeOrigin>;
164
165 type PriceForSiblingDelivery: PriceForMessageDelivery<Id = ParaId>;
167
168 type WeightInfo: WeightInfoExt;
170 }
171
172 #[pallet::call]
173 impl<T: Config> Pallet<T> {
174 #[pallet::call_index(1)]
178 #[pallet::weight((T::DbWeight::get().writes(1), DispatchClass::Operational,))]
179 pub fn suspend_xcm_execution(origin: OriginFor<T>) -> DispatchResult {
180 T::ControllerOrigin::ensure_origin(origin)?;
181
182 QueueSuspended::<T>::try_mutate(|suspended| {
183 if *suspended {
184 Err(Error::<T>::AlreadySuspended.into())
185 } else {
186 *suspended = true;
187 Ok(())
188 }
189 })
190 }
191
192 #[pallet::call_index(2)]
198 #[pallet::weight((T::DbWeight::get().writes(1), DispatchClass::Operational,))]
199 pub fn resume_xcm_execution(origin: OriginFor<T>) -> DispatchResult {
200 T::ControllerOrigin::ensure_origin(origin)?;
201
202 QueueSuspended::<T>::try_mutate(|suspended| {
203 if !*suspended {
204 Err(Error::<T>::AlreadyResumed.into())
205 } else {
206 *suspended = false;
207 Ok(())
208 }
209 })
210 }
211
212 #[pallet::call_index(3)]
218 #[pallet::weight((T::WeightInfo::set_config_with_u32(), DispatchClass::Operational,))]
219 pub fn update_suspend_threshold(origin: OriginFor<T>, new: u32) -> DispatchResult {
220 ensure_root(origin)?;
221
222 QueueConfig::<T>::try_mutate(|data| {
223 data.suspend_threshold = new;
224 data.validate::<T>()
225 })
226 }
227
228 #[pallet::call_index(4)]
234 #[pallet::weight((T::WeightInfo::set_config_with_u32(),DispatchClass::Operational,))]
235 pub fn update_drop_threshold(origin: OriginFor<T>, new: u32) -> DispatchResult {
236 ensure_root(origin)?;
237
238 QueueConfig::<T>::try_mutate(|data| {
239 data.drop_threshold = new;
240 data.validate::<T>()
241 })
242 }
243
244 #[pallet::call_index(5)]
250 #[pallet::weight((T::WeightInfo::set_config_with_u32(), DispatchClass::Operational,))]
251 pub fn update_resume_threshold(origin: OriginFor<T>, new: u32) -> DispatchResult {
252 ensure_root(origin)?;
253
254 QueueConfig::<T>::try_mutate(|data| {
255 data.resume_threshold = new;
256 data.validate::<T>()
257 })
258 }
259 }
260
261 #[pallet::hooks]
262 impl<T: Config> Hooks<BlockNumberFor<T>> for Pallet<T> {
263 fn integrity_test() {
264 assert!(!T::MaxPageSize::get().is_zero(), "MaxPageSize too low");
265
266 let w = Self::on_idle_weight();
267 assert!(w != Weight::zero());
268 assert!(w.all_lte(T::BlockWeights::get().max_block));
269
270 <T::WeightInfo as WeightInfoExt>::check_accuracy::<MaxXcmpMessageLenOf<T>>(0.15);
271 }
272
273 fn on_idle(_block: BlockNumberFor<T>, limit: Weight) -> Weight {
274 let mut meter = WeightMeter::with_limit(limit);
275
276 if meter.try_consume(Self::on_idle_weight()).is_err() {
277 tracing::debug!(
278 target: LOG_TARGET,
279 "Not enough weight for on_idle. {} < {}",
280 Self::on_idle_weight(), limit
281 );
282 return meter.consumed()
283 }
284
285 migration::v3::lazy_migrate_inbound_queue::<T>();
286
287 meter.consumed()
288 }
289 }
290
291 #[pallet::event]
292 #[pallet::generate_deposit(pub(super) fn deposit_event)]
293 pub enum Event<T: Config> {
294 XcmpMessageSent { message_hash: XcmHash },
296 }
297
298 #[pallet::error]
299 pub enum Error<T> {
300 BadQueueConfig,
302 AlreadySuspended,
304 AlreadyResumed,
306 TooManyActiveOutboundChannels,
308 TooBig,
310 }
311
312 #[pallet::storage]
321 pub type InboundXcmpSuspended<T: Config> =
322 StorageValue<_, BoundedBTreeSet<ParaId, T::MaxInboundSuspended>, ValueQuery>;
323
324 #[pallet::storage]
331 pub(super) type OutboundXcmpStatus<T: Config> = StorageValue<
332 _,
333 BoundedVec<OutboundChannelDetails, T::MaxActiveOutboundChannels>,
334 ValueQuery,
335 >;
336
337 #[pallet::storage]
339 pub(super) type OutboundXcmpMessages<T: Config> = StorageDoubleMap<
340 _,
341 Blake2_128Concat,
342 ParaId,
343 Twox64Concat,
344 u16,
345 WeakBoundedVec<u8, T::MaxPageSize>,
346 ValueQuery,
347 >;
348
349 #[pallet::storage]
351 pub(super) type SignalMessages<T: Config> =
352 StorageMap<_, Blake2_128Concat, ParaId, WeakBoundedVec<u8, T::MaxPageSize>, ValueQuery>;
353
354 #[pallet::storage]
356 pub(super) type QueueConfig<T: Config> = StorageValue<_, QueueConfigData, ValueQuery>;
357
358 #[pallet::storage]
360 pub(super) type QueueSuspended<T: Config> = StorageValue<_, bool, ValueQuery>;
361
362 #[pallet::storage]
364 pub(super) type DeliveryFeeFactor<T: Config> =
365 StorageMap<_, Twox64Concat, ParaId, FixedU128, ValueQuery, GetMinFeeFactor<Pallet<T>>>;
366}
367
368#[derive(Copy, Clone, Eq, PartialEq, Encode, Decode, RuntimeDebug, TypeInfo, MaxEncodedLen)]
369pub enum OutboundState {
370 Ok,
371 Suspended,
372}
373
374#[derive(Clone, Eq, PartialEq, Encode, Decode, TypeInfo, RuntimeDebug, MaxEncodedLen)]
376pub struct OutboundChannelDetails {
377 recipient: ParaId,
379 state: OutboundState,
381 signals_exist: bool,
383 first_index: u16,
385 last_index: u16,
387}
388
389impl OutboundChannelDetails {
390 pub fn new(recipient: ParaId) -> OutboundChannelDetails {
391 OutboundChannelDetails {
392 recipient,
393 state: OutboundState::Ok,
394 signals_exist: false,
395 first_index: 0,
396 last_index: 0,
397 }
398 }
399
400 pub fn with_signals(mut self) -> OutboundChannelDetails {
401 self.signals_exist = true;
402 self
403 }
404
405 pub fn with_suspended_state(mut self) -> OutboundChannelDetails {
406 self.state = OutboundState::Suspended;
407 self
408 }
409}
410
411#[derive(Copy, Clone, Eq, PartialEq, Encode, Decode, RuntimeDebug, TypeInfo, MaxEncodedLen)]
412pub struct QueueConfigData {
413 suspend_threshold: u32,
416 drop_threshold: u32,
420 resume_threshold: u32,
423}
424
425impl Default for QueueConfigData {
426 fn default() -> Self {
427 Self {
430 drop_threshold: 48, suspend_threshold: 32, resume_threshold: 8, }
434 }
435}
436
437impl QueueConfigData {
438 pub fn validate<T: crate::Config>(&self) -> sp_runtime::DispatchResult {
442 if self.resume_threshold < self.suspend_threshold &&
443 self.suspend_threshold <= self.drop_threshold &&
444 self.resume_threshold > 0
445 {
446 Ok(())
447 } else {
448 Err(Error::<T>::BadQueueConfig.into())
449 }
450 }
451}
452
453#[derive(PartialEq, Eq, Copy, Clone, Encode, Decode, TypeInfo)]
454pub enum ChannelSignal {
455 Suspend,
456 Resume,
457}
458
459impl<T: Config> Pallet<T> {
460 fn send_fragment<Fragment: Encode>(
484 recipient: ParaId,
485 format: XcmpMessageFormat,
486 fragment: Fragment,
487 ) -> Result<u32, MessageSendError> {
488 let encoded_fragment = fragment.encode();
489
490 let channel_info =
494 T::ChannelInfo::get_channel_info(recipient).ok_or(MessageSendError::NoChannel)?;
495 let max_message_size = channel_info.max_message_size.min(T::MaxPageSize::get()) as usize;
497 let format_size = format.encoded_size();
498 let size_to_check = encoded_fragment
501 .len()
502 .checked_add(format_size)
503 .ok_or(MessageSendError::TooBig)?;
504 if size_to_check > max_message_size {
505 return Err(MessageSendError::TooBig)
506 }
507
508 let mut all_channels = <OutboundXcmpStatus<T>>::get();
509 let channel_details = if let Some(details) =
510 all_channels.iter_mut().find(|channel| channel.recipient == recipient)
511 {
512 details
513 } else {
514 all_channels.try_push(OutboundChannelDetails::new(recipient)).map_err(|e| {
515 tracing::error!(target: LOG_TARGET, error=?e, "Failed to activate HRMP channel");
516 MessageSendError::TooManyChannels
517 })?;
518 all_channels
519 .last_mut()
520 .expect("can't be empty; a new element was just pushed; qed")
521 };
522 let have_active = channel_details.last_index > channel_details.first_index;
523 let appended_to_last_page = have_active
526 .then(|| {
527 <OutboundXcmpMessages<T>>::try_mutate(
528 recipient,
529 channel_details.last_index - 1,
530 |page| {
531 if XcmpMessageFormat::decode(&mut &page[..]) != Ok(format) {
532 defensive!("Bad format in outbound queue; dropping message");
533 return Err(())
534 }
535 if page.len() + encoded_fragment.len() > max_message_size {
536 return Err(())
537 }
538 for frag in encoded_fragment.iter() {
539 page.try_push(*frag)?;
540 }
541 Ok(page.len())
542 },
543 )
544 .ok()
545 })
546 .flatten();
547
548 let (number_of_pages, last_page_size) = if let Some(size) = appended_to_last_page {
549 let number_of_pages = (channel_details.last_index - channel_details.first_index) as u32;
550 (number_of_pages, size)
551 } else {
552 let page_index = channel_details.last_index;
554 channel_details.last_index += 1;
555 let mut new_page = format.encode();
556 new_page.extend_from_slice(&encoded_fragment[..]);
557 let last_page_size = new_page.len();
558 let number_of_pages = (channel_details.last_index - channel_details.first_index) as u32;
559 let bounded_page =
560 BoundedVec::<u8, T::MaxPageSize>::try_from(new_page).map_err(|error| {
561 tracing::debug!(target: LOG_TARGET, ?error, "Failed to create bounded message page");
562 MessageSendError::TooBig
563 })?;
564 let bounded_page = WeakBoundedVec::force_from(bounded_page.into_inner(), None);
565 <OutboundXcmpMessages<T>>::insert(recipient, page_index, bounded_page);
566 <OutboundXcmpStatus<T>>::put(all_channels);
567 (number_of_pages, last_page_size)
568 };
569
570 let total_size =
574 number_of_pages.saturating_sub(1) * max_message_size as u32 + last_page_size as u32;
575 let threshold = channel_info.max_total_size / delivery_fee_constants::THRESHOLD_FACTOR;
576 if total_size > threshold {
577 Self::increase_fee_factor(recipient, encoded_fragment.len() as u128);
578 }
579
580 Ok(number_of_pages)
581 }
582
583 fn send_signal(dest: ParaId, signal: ChannelSignal) -> Result<(), Error<T>> {
586 let mut s = <OutboundXcmpStatus<T>>::get();
587 if let Some(details) = s.iter_mut().find(|item| item.recipient == dest) {
588 details.signals_exist = true;
589 } else {
590 s.try_push(OutboundChannelDetails::new(dest).with_signals()).map_err(|error| {
591 tracing::debug!(target: LOG_TARGET, ?error, "Failed to activate XCMP channel");
592 Error::<T>::TooManyActiveOutboundChannels
593 })?;
594 }
595
596 let page = BoundedVec::<u8, T::MaxPageSize>::try_from(
597 (XcmpMessageFormat::Signals, signal).encode(),
598 )
599 .map_err(|error| {
600 tracing::debug!(target: LOG_TARGET, ?error, "Failed to encode signal message");
601 Error::<T>::TooBig
602 })?;
603 let page = WeakBoundedVec::force_from(page.into_inner(), None);
604
605 <SignalMessages<T>>::insert(dest, page);
606 <OutboundXcmpStatus<T>>::put(s);
607 Ok(())
608 }
609
610 fn suspend_channel(target: ParaId) {
611 <OutboundXcmpStatus<T>>::mutate(|s| {
612 if let Some(details) = s.iter_mut().find(|item| item.recipient == target) {
613 let ok = details.state == OutboundState::Ok;
614 defensive_assert!(ok, "WARNING: Attempt to suspend channel that was not Ok.");
615 details.state = OutboundState::Suspended;
616 } else {
617 if s.try_push(OutboundChannelDetails::new(target).with_suspended_state()).is_err() {
618 defensive!("Cannot pause channel; too many outbound channels");
619 }
620 }
621 });
622 }
623
624 fn resume_channel(target: ParaId) {
625 <OutboundXcmpStatus<T>>::mutate(|s| {
626 if let Some(index) = s.iter().position(|item| item.recipient == target) {
627 let suspended = s[index].state == OutboundState::Suspended;
628 defensive_assert!(
629 suspended,
630 "WARNING: Attempt to resume channel that was not suspended."
631 );
632 if s[index].first_index == s[index].last_index {
633 s.remove(index);
634 } else {
635 s[index].state = OutboundState::Ok;
636 }
637 } else {
638 defensive!("WARNING: Attempt to resume channel that was not suspended.");
639 }
640 });
641 }
642
643 fn enqueue_xcmp_messages<'a>(
644 sender: ParaId,
645 xcms: &[BoundedSlice<'a, u8, MaxXcmpMessageLenOf<T>>],
646 meter: &mut WeightMeter,
647 ) -> Result<(), ()> {
648 let QueueConfigData { drop_threshold, .. } = <QueueConfig<T>>::get();
649 let batches_footprints =
650 T::XcmpQueue::get_batches_footprints(sender, xcms.iter().copied(), drop_threshold);
651
652 let best_batch_footprint = batches_footprints.search_best_by(|batch_info| {
653 let required_weight = T::WeightInfo::enqueue_xcmp_messages(
654 batches_footprints.first_page_pos.saturated_into(),
655 batch_info,
656 );
657
658 match meter.can_consume(required_weight) {
659 true => core::cmp::Ordering::Less,
660 false => core::cmp::Ordering::Greater,
661 }
662 });
663
664 meter.consume(T::WeightInfo::enqueue_xcmp_messages(
665 batches_footprints.first_page_pos.saturated_into(),
666 best_batch_footprint,
667 ));
668 T::XcmpQueue::enqueue_messages(
669 xcms.iter().take(best_batch_footprint.msgs_count).copied(),
670 sender,
671 );
672
673 if best_batch_footprint.msgs_count < xcms.len() {
674 tracing::error!(
675 target: LOG_TARGET,
676 used_weight=?meter.consumed_ratio(),
677 "Out of weight: cannot enqueue entire XCMP messages batch; \
678 dropped some or all messages in batch."
679 );
680 return Err(());
681 }
682 Ok(())
683 }
684
685 pub(crate) fn take_first_concatenated_xcm<'a>(
693 data: &mut &'a [u8],
694 meter: &mut WeightMeter,
695 ) -> Result<Option<BoundedSlice<'a, u8, MaxXcmpMessageLenOf<T>>>, ()> {
696 if data.is_empty() {
697 return Ok(None)
698 }
699
700 let base_weight = T::WeightInfo::take_first_concatenated_xcm(0);
702 if meter.try_consume(base_weight).is_err() {
703 defensive!("Out of weight; could not decode all; dropping");
704 return Err(())
705 }
706
707 let input_data = &mut &data[..];
708 let mut input = codec::CountedInput::new(input_data);
709 VersionedXcm::<()>::decode_with_depth_limit(MAX_XCM_DECODE_DEPTH, &mut input).map_err(
710 |error| {
711 tracing::debug!(target: LOG_TARGET, ?error, "Failed to decode XCM with depth limit");
712 ()
713 },
714 )?;
715 let (xcm_data, remaining_data) = data.split_at(input.count() as usize);
716 *data = remaining_data;
717
718 let extra_weight =
722 T::WeightInfo::take_first_concatenated_xcm(xcm_data.len() as u32) - base_weight;
723 meter.consume(extra_weight);
724
725 let xcm = Some(BoundedSlice::try_from(xcm_data).map_err(|error| {
726 tracing::error!(
727 target: LOG_TARGET,
728 ?error,
729 "Failed to take XCM after decoding: message is too long"
730 );
731 ()
732 })?);
733
734 Ok(xcm)
735 }
736
737 pub(crate) fn take_first_concatenated_xcms<'a>(
742 data: &mut &'a [u8],
743 batch_size: usize,
744 meter: &mut WeightMeter,
745 ) -> Result<
746 Vec<BoundedSlice<'a, u8, MaxXcmpMessageLenOf<T>>>,
747 Vec<BoundedSlice<'a, u8, MaxXcmpMessageLenOf<T>>>,
748 > {
749 let mut batch = vec![];
750 loop {
751 match Self::take_first_concatenated_xcm(data, meter) {
752 Ok(Some(xcm)) => {
753 batch.push(xcm);
754 if batch.len() >= batch_size {
755 return Ok(batch);
756 }
757 },
758 Ok(None) => return Ok(batch),
759 Err(_) => return Err(batch),
760 }
761 }
762 }
763
764 pub fn on_idle_weight() -> Weight {
766 <T as crate::Config>::WeightInfo::on_idle_good_msg()
767 .max(<T as crate::Config>::WeightInfo::on_idle_large_msg())
768 }
769
770 #[cfg(feature = "bridging")]
771 fn is_inbound_channel_suspended(sender: ParaId) -> bool {
772 <InboundXcmpSuspended<T>>::get().iter().any(|c| c == &sender)
773 }
774
775 #[cfg(feature = "bridging")]
776 fn outbound_channel_state(target: ParaId) -> Option<(OutboundState, u16)> {
778 <OutboundXcmpStatus<T>>::get().iter().find(|c| c.recipient == target).map(|c| {
779 let queued_pages = c.last_index.saturating_sub(c.first_index);
780 (c.state, queued_pages)
781 })
782 }
783}
784
785impl<T: Config> OnQueueChanged<ParaId> for Pallet<T> {
786 fn on_queue_changed(para: ParaId, fp: QueueFootprint) {
788 let QueueConfigData { resume_threshold, suspend_threshold, .. } = <QueueConfig<T>>::get();
789
790 let mut suspended_channels = <InboundXcmpSuspended<T>>::get();
791 let suspended = suspended_channels.contains(¶);
792
793 if suspended && fp.ready_pages <= resume_threshold {
794 if let Err(err) = Self::send_signal(para, ChannelSignal::Resume) {
795 tracing::error!(
796 target: LOG_TARGET,
797 error=?err,
798 sibling=?para,
799 "defensive: Could not send resumption signal to inbound channel of sibling; channel remains suspended."
800 );
801 } else {
802 suspended_channels.remove(¶);
803 <InboundXcmpSuspended<T>>::put(suspended_channels);
804 }
805 } else if !suspended && fp.ready_pages >= suspend_threshold {
806 tracing::warn!(target: LOG_TARGET, sibling=?para, "XCMP queue for sibling is full; suspending channel.");
807
808 if let Err(err) = Self::send_signal(para, ChannelSignal::Suspend) {
809 tracing::error!(
811 target: LOG_TARGET, error=?err,
812 "defensive: Could not send suspension signal; future messages may be dropped."
813 );
814 } else if let Err(err) = suspended_channels.try_insert(para) {
815 tracing::error!(
816 target: LOG_TARGET,
817 error=?err,
818 sibling=?para,
819 "Too many channels suspended; cannot suspend sibling; further messages may be dropped."
820 );
821 } else {
822 <InboundXcmpSuspended<T>>::put(suspended_channels);
823 }
824 }
825 }
826}
827
828impl<T: Config> QueuePausedQuery<ParaId> for Pallet<T> {
829 fn is_paused(para: &ParaId) -> bool {
830 if !QueueSuspended::<T>::get() {
831 return false
832 }
833
834 let sender_origin = T::ControllerOriginConverter::convert_origin(
836 (Parent, Parachain((*para).into())),
837 OriginKind::Superuser,
838 );
839 let is_controller =
840 sender_origin.map_or(false, |origin| T::ControllerOrigin::try_origin(origin).is_ok());
841
842 !is_controller
843 }
844}
845
846impl<T: Config> XcmpMessageHandler for Pallet<T> {
847 fn handle_xcmp_messages<'a, I: Iterator<Item = (ParaId, RelayBlockNumber, &'a [u8])>>(
848 iter: I,
849 max_weight: Weight,
850 ) -> Weight {
851 let mut meter = WeightMeter::with_limit(max_weight);
852
853 let mut known_xcm_senders = BTreeSet::new();
854 for (sender, _sent_at, mut data) in iter {
855 let format = match XcmpMessageFormat::decode(&mut data) {
856 Ok(f) => f,
857 Err(_) => {
858 defensive!("Unknown XCMP message format - dropping");
859 continue
860 },
861 };
862
863 match format {
864 XcmpMessageFormat::Signals =>
865 while !data.is_empty() {
866 if meter
867 .try_consume(
868 T::WeightInfo::suspend_channel()
869 .max(T::WeightInfo::resume_channel()),
870 )
871 .is_err()
872 {
873 defensive!("Not enough weight to process signals - dropping");
874 break
875 }
876
877 match ChannelSignal::decode(&mut data) {
878 Ok(ChannelSignal::Suspend) => Self::suspend_channel(sender),
879 Ok(ChannelSignal::Resume) => Self::resume_channel(sender),
880 Err(_) => {
881 defensive!("Undecodable channel signal - dropping");
882 break
883 },
884 }
885 },
886 XcmpMessageFormat::ConcatenatedVersionedXcm => {
887 if known_xcm_senders.insert(sender) {
888 if meter
889 .try_consume(T::WeightInfo::uncached_enqueue_xcmp_messages())
890 .is_err()
891 {
892 defensive!(
893 "Out of weight: cannot enqueue XCMP messages; dropping page; \
894 Used weight: ",
895 meter.consumed_ratio()
896 );
897 continue;
898 }
899 }
900
901 let mut can_process_next_batch = true;
902 while can_process_next_batch {
903 let batch = match Self::take_first_concatenated_xcms(
904 &mut data,
905 XCM_BATCH_SIZE,
906 &mut meter,
907 ) {
908 Ok(batch) => batch,
909 Err(batch) => {
910 can_process_next_batch = false;
911 defensive!(
912 "HRMP inbound decode stream broke; page will be dropped."
913 );
914 batch
915 },
916 };
917 if batch.is_empty() {
918 break;
919 }
920
921 if let Err(()) = Self::enqueue_xcmp_messages(sender, &batch, &mut meter) {
922 break
923 }
924 }
925 },
926 XcmpMessageFormat::ConcatenatedEncodedBlob => {
927 defensive!("Blob messages are unhandled - dropping");
928 continue
929 },
930 }
931 }
932
933 meter.consumed()
934 }
935}
936
937impl<T: Config> XcmpMessageSource for Pallet<T> {
938 fn take_outbound_messages(maximum_channels: usize) -> Vec<(ParaId, Vec<u8>)> {
939 let mut statuses = <OutboundXcmpStatus<T>>::get();
940 let old_statuses_len = statuses.len();
941 let max_message_count = statuses.len().min(maximum_channels);
942 let mut result = Vec::with_capacity(max_message_count);
943
944 for status in statuses.iter_mut() {
945 let OutboundChannelDetails {
946 recipient: para_id,
947 state: outbound_state,
948 mut signals_exist,
949 mut first_index,
950 mut last_index,
951 } = *status;
952
953 let (max_size_now, max_size_ever) = match T::ChannelInfo::get_channel_status(para_id) {
954 ChannelStatus::Closed => {
955 for i in first_index..last_index {
958 <OutboundXcmpMessages<T>>::remove(para_id, i);
959 }
960 if signals_exist {
961 <SignalMessages<T>>::remove(para_id);
962 }
963 *status = OutboundChannelDetails::new(para_id);
964 continue
965 },
966 ChannelStatus::Full => continue,
967 ChannelStatus::Ready(n, e) => (n, e),
968 };
969
970 if result.len() == max_message_count {
972 break
975 }
976
977 let page = if signals_exist {
978 let page = <SignalMessages<T>>::get(para_id);
979 defensive_assert!(!page.is_empty(), "Signals must exist");
980
981 if page.len() < max_size_now {
982 <SignalMessages<T>>::remove(para_id);
983 signals_exist = false;
984 page
985 } else {
986 defensive!("Signals should fit into a single page");
987 continue
988 }
989 } else if outbound_state == OutboundState::Suspended {
990 continue
992 } else if last_index > first_index {
993 let page = <OutboundXcmpMessages<T>>::get(para_id, first_index);
994 if page.len() < max_size_now {
995 <OutboundXcmpMessages<T>>::remove(para_id, first_index);
996 first_index += 1;
997 page
998 } else {
999 continue
1000 }
1001 } else {
1002 continue
1003 };
1004 if first_index == last_index {
1005 first_index = 0;
1006 last_index = 0;
1007 }
1008
1009 if page.len() > max_size_ever {
1010 defensive!("WARNING: oversize message in queue - dropping");
1014 } else {
1015 result.push((para_id, page.into_inner()));
1016 }
1017
1018 let max_total_size = match T::ChannelInfo::get_channel_info(para_id) {
1019 Some(channel_info) => channel_info.max_total_size,
1020 None => {
1021 tracing::warn!(target: LOG_TARGET, "calling `get_channel_info` with no RelevantMessagingState?!");
1022 MAX_POSSIBLE_ALLOCATION },
1024 };
1025 let threshold = max_total_size.saturating_div(delivery_fee_constants::THRESHOLD_FACTOR);
1026 let remaining_total_size: usize = (first_index..last_index)
1027 .map(|index| OutboundXcmpMessages::<T>::decode_len(para_id, index).unwrap())
1028 .sum();
1029 if remaining_total_size <= threshold as usize {
1030 Self::decrease_fee_factor(para_id);
1031 }
1032
1033 *status = OutboundChannelDetails {
1034 recipient: para_id,
1035 state: outbound_state,
1036 signals_exist,
1037 first_index,
1038 last_index,
1039 };
1040 }
1041 debug_assert!(!statuses.iter().any(|s| s.signals_exist), "Signals should be handled");
1042
1043 result.sort_by_key(|m| m.0);
1046
1047 statuses.retain(|x| {
1056 x.state == OutboundState::Suspended || x.signals_exist || x.first_index < x.last_index
1057 });
1058
1059 let pruned = old_statuses_len - statuses.len();
1061 let _ = statuses.try_rotate_left(result.len().saturating_sub(pruned)).defensive_proof(
1064 "Could not store HRMP channels config. Some HRMP channels may be broken.",
1065 );
1066
1067 <OutboundXcmpStatus<T>>::put(statuses);
1068
1069 result
1070 }
1071}
1072
1073impl<T: Config> SendXcm for Pallet<T> {
1075 type Ticket = (ParaId, VersionedXcm<()>);
1076
1077 fn validate(
1078 dest: &mut Option<Location>,
1079 msg: &mut Option<Xcm<()>>,
1080 ) -> SendResult<(ParaId, VersionedXcm<()>)> {
1081 let d = dest.take().ok_or(SendError::MissingArgument)?;
1082
1083 match d.unpack() {
1084 (1, [Parachain(id)]) => {
1086 let xcm = msg.take().ok_or(SendError::MissingArgument)?;
1087 let id = ParaId::from(*id);
1088 let price = T::PriceForSiblingDelivery::price_for_delivery(id, &xcm);
1089 let versioned_xcm = T::VersionWrapper::wrap_version(&d, xcm)
1090 .map_err(|()| SendError::DestinationUnsupported)?;
1091 versioned_xcm
1092 .check_is_decodable()
1093 .map_err(|()| SendError::ExceedsMaxMessageSize)?;
1094
1095 Ok(((id, versioned_xcm), price))
1096 },
1097 _ => {
1098 *dest = Some(d);
1101 Err(SendError::NotApplicable)
1102 },
1103 }
1104 }
1105
1106 fn deliver((id, xcm): (ParaId, VersionedXcm<()>)) -> Result<XcmHash, SendError> {
1107 let hash = xcm.using_encoded(sp_io::hashing::blake2_256);
1108
1109 match Self::send_fragment(id, XcmpMessageFormat::ConcatenatedVersionedXcm, xcm) {
1110 Ok(_) => {
1111 Self::deposit_event(Event::XcmpMessageSent { message_hash: hash });
1112 Ok(hash)
1113 },
1114 Err(e) => {
1115 tracing::error!(target: LOG_TARGET, error=?e, "Deliver error");
1116 Err(SendError::Transport(e.into()))
1117 },
1118 }
1119 }
1120}
1121
1122impl<T: Config> InspectMessageQueues for Pallet<T> {
1123 fn clear_messages() {
1124 let _ = OutboundXcmpMessages::<T>::clear(u32::MAX, None);
1126 OutboundXcmpStatus::<T>::mutate(|details_vec| {
1127 for details in details_vec {
1128 details.first_index = 0;
1129 details.last_index = 0;
1130 }
1131 });
1132 }
1133
1134 fn get_messages() -> Vec<(VersionedLocation, Vec<VersionedXcm<()>>)> {
1135 use xcm::prelude::*;
1136
1137 OutboundXcmpMessages::<T>::iter()
1138 .map(|(para_id, _, messages)| {
1139 let mut data = &messages[..];
1140 let decoded_format = XcmpMessageFormat::decode(&mut data).unwrap();
1141 if decoded_format != XcmpMessageFormat::ConcatenatedVersionedXcm {
1142 panic!("Unexpected format.")
1143 }
1144 let mut decoded_messages = Vec::new();
1145 while !data.is_empty() {
1146 let decoded_message = VersionedXcm::<()>::decode_with_depth_limit(
1147 MAX_XCM_DECODE_DEPTH,
1148 &mut data,
1149 )
1150 .unwrap();
1151 decoded_messages.push(decoded_message);
1152 }
1153
1154 (
1155 VersionedLocation::from(Location::new(1, Parachain(para_id.into()))),
1156 decoded_messages,
1157 )
1158 })
1159 .collect()
1160 }
1161}
1162
1163impl<T: Config> FeeTracker for Pallet<T> {
1164 type Id = ParaId;
1165
1166 fn get_fee_factor(id: Self::Id) -> FixedU128 {
1167 <DeliveryFeeFactor<T>>::get(id)
1168 }
1169
1170 fn set_fee_factor(id: Self::Id, val: FixedU128) {
1171 <DeliveryFeeFactor<T>>::set(id, val);
1172 }
1173}