referrerpolicy=no-referrer-when-downgrade

cumulus_pallet_xcmp_queue/
lib.rs

1// Copyright (C) Parity Technologies (UK) Ltd.
2// This file is part of Cumulus.
3// SPDX-License-Identifier: Apache-2.0
4
5// Licensed under the Apache License, Version 2.0 (the "License");
6// you may not use this file except in compliance with the License.
7// You may obtain a copy of the License at
8//
9// 	http://www.apache.org/licenses/LICENSE-2.0
10//
11// Unless required by applicable law or agreed to in writing, software
12// distributed under the License is distributed on an "AS IS" BASIS,
13// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14// See the License for the specific language governing permissions and
15// limitations under the License.
16
17//! A pallet which uses the XCMP transport layer to handle both incoming and outgoing XCM message
18//! sending and dispatch, queuing, signalling and backpressure. To do so, it implements:
19//! * `XcmpMessageHandler`
20//! * `XcmpMessageSource`
21//!
22//! Also provides an implementation of `SendXcm` which can be placed in a router tuple for relaying
23//! XCM over XCMP if the destination is `Parent/Parachain`. It requires an implementation of
24//! `XcmExecutor` for dispatching incoming XCM messages.
25//!
26//! To prevent out of memory errors on the `OutboundXcmpMessages` queue, an exponential fee factor
27//! (`DeliveryFeeFactor`) is set, much like the one used in DMP.
28//! The fee factor increases whenever the total size of messages in a particular channel passes a
29//! threshold. This threshold is defined as a percentage of the maximum total size the channel can
30//! have. More concretely, the threshold is `max_total_size` / `THRESHOLD_FACTOR`, where:
31//! - `max_total_size` is the maximum size, in bytes, of the channel, not number of messages.
32//! It is defined in the channel configuration.
33//! - `THRESHOLD_FACTOR` just declares which percentage of the max size is the actual threshold.
34//! If it's 2, then the threshold is half of the max size, if it's 4, it's a quarter, and so on.
35
36#![cfg_attr(not(feature = "std"), no_std)]
37
38pub mod migration;
39
40#[cfg(test)]
41mod mock;
42
43#[cfg(test)]
44mod tests;
45
46#[cfg(feature = "runtime-benchmarks")]
47mod benchmarking;
48#[cfg(feature = "bridging")]
49pub mod bridging;
50pub mod weights;
51pub mod weights_ext;
52
53pub use weights::WeightInfo;
54pub use weights_ext::WeightInfoExt;
55
56extern crate alloc;
57
58use alloc::{collections::BTreeSet, vec, vec::Vec};
59use bounded_collections::{BoundedBTreeSet, BoundedSlice, BoundedVec};
60use codec::{Compact, Decode, DecodeLimit, Encode, MaxEncodedLen};
61use cumulus_primitives_core::{
62	relay_chain::BlockNumber as RelayBlockNumber, ChannelStatus, GetChannelInfo, MessageSendError,
63	ParaId, XcmpMessageFormat, XcmpMessageHandler, XcmpMessageSource,
64};
65
66use frame_support::{
67	defensive, defensive_assert,
68	traits::{
69		Defensive, EnqueueMessage, EnsureOrigin, Get, QueueFootprint, QueueFootprintQuery,
70		QueuePausedQuery,
71	},
72	weights::{Weight, WeightMeter},
73};
74use pallet_message_queue::OnQueueChanged;
75use polkadot_runtime_common::xcm_sender::PriceForMessageDelivery;
76use polkadot_runtime_parachains::{FeeTracker, GetMinFeeFactor};
77use scale_info::TypeInfo;
78use sp_core::MAX_POSSIBLE_ALLOCATION;
79use sp_runtime::{FixedU128, RuntimeDebug, SaturatedConversion, WeakBoundedVec};
80use xcm::{latest::prelude::*, VersionedLocation, VersionedXcm, WrapVersion, MAX_XCM_DECODE_DEPTH};
81use xcm_builder::InspectMessageQueues;
82use xcm_executor::traits::ConvertOrigin;
83
84pub use pallet::*;
85
86/// Index used to identify overweight XCMs.
87pub type OverweightIndex = u64;
88/// The max length of an XCMP message.
89pub type MaxXcmpMessageLenOf<T> =
90	<<T as Config>::XcmpQueue as EnqueueMessage<ParaId>>::MaxMessageLen;
91
92const LOG_TARGET: &str = "xcmp_queue";
93const DEFAULT_POV_SIZE: u64 = 64 * 1024; // 64 KB
94/// The size of an XCM messages batch.
95pub const XCM_BATCH_SIZE: usize = 250;
96/// The maximum number of signals that we can have in an XCMP page.
97pub const MAX_SIGNALS_PER_PAGE: usize = 3;
98
99/// Constants related to delivery fee calculation
100pub mod delivery_fee_constants {
101	/// Fees will start increasing when queue is half full
102	pub const THRESHOLD_FACTOR: u32 = 2;
103}
104
105#[frame_support::pallet]
106pub mod pallet {
107	use super::*;
108	use frame_support::{pallet_prelude::*, Twox64Concat};
109	use frame_system::pallet_prelude::*;
110
111	#[pallet::pallet]
112	#[pallet::storage_version(migration::STORAGE_VERSION)]
113	pub struct Pallet<T>(_);
114
115	#[pallet::config]
116	pub trait Config: frame_system::Config {
117		#[allow(deprecated)]
118		type RuntimeEvent: From<Event<Self>> + IsType<<Self as frame_system::Config>::RuntimeEvent>;
119
120		/// Information on the available XCMP channels.
121		type ChannelInfo: GetChannelInfo;
122
123		/// Means of converting an `Xcm` into a `VersionedXcm`.
124		type VersionWrapper: WrapVersion;
125
126		/// Enqueue an inbound horizontal message for later processing.
127		///
128		/// This defines the maximal message length via [`crate::MaxXcmpMessageLenOf`]. The pallet
129		/// assumes that this hook will eventually process all the pushed messages.
130		type XcmpQueue: EnqueueMessage<ParaId>
131			+ QueueFootprintQuery<ParaId, MaxMessageLen = MaxXcmpMessageLenOf<Self>>;
132
133		/// The maximum number of inbound XCMP channels that can be suspended simultaneously.
134		///
135		/// Any further channel suspensions will fail and messages may get dropped without further
136		/// notice. Choosing a high value (1000) is okay; the trade-off that is described in
137		/// [`InboundXcmpSuspended`] still applies at that scale.
138		#[pallet::constant]
139		type MaxInboundSuspended: Get<u32>;
140
141		/// Maximal number of outbound XCMP channels that can have messages queued at the same time.
142		///
143		/// If this is reached, then no further messages can be sent to channels that do not yet
144		/// have a message queued. This should be set to the expected maximum of outbound channels
145		/// which is determined by [`Self::ChannelInfo`]. It is important to set this large enough,
146		/// since otherwise the congestion control protocol will not work as intended and messages
147		/// may be dropped. This value increases the PoV and should therefore not be picked too
148		/// high. Governance needs to pay attention to not open more channels than this value.
149		#[pallet::constant]
150		type MaxActiveOutboundChannels: Get<u32>;
151
152		/// The maximal page size for HRMP message pages.
153		///
154		/// A lower limit can be set dynamically, but this is the hard-limit for the PoV worst case
155		/// benchmarking. The limit for the size of a message is slightly below this, since some
156		/// overhead is incurred for encoding the format.
157		#[pallet::constant]
158		type MaxPageSize: Get<u32>;
159
160		/// The origin that is allowed to resume or suspend the XCMP queue.
161		type ControllerOrigin: EnsureOrigin<Self::RuntimeOrigin>;
162
163		/// The conversion function used to attempt to convert an XCM `Location` origin to a
164		/// superuser origin.
165		type ControllerOriginConverter: ConvertOrigin<Self::RuntimeOrigin>;
166
167		/// The price for delivering an XCM to a sibling parachain destination.
168		type PriceForSiblingDelivery: PriceForMessageDelivery<Id = ParaId>;
169
170		/// The weight information of this pallet.
171		type WeightInfo: WeightInfoExt;
172	}
173
174	#[pallet::call]
175	impl<T: Config> Pallet<T> {
176		/// Suspends all XCM executions for the XCMP queue, regardless of the sender's origin.
177		///
178		/// - `origin`: Must pass `ControllerOrigin`.
179		#[pallet::call_index(1)]
180		#[pallet::weight((T::DbWeight::get().writes(1), DispatchClass::Operational,))]
181		pub fn suspend_xcm_execution(origin: OriginFor<T>) -> DispatchResult {
182			T::ControllerOrigin::ensure_origin(origin)?;
183
184			QueueSuspended::<T>::try_mutate(|suspended| {
185				if *suspended {
186					Err(Error::<T>::AlreadySuspended.into())
187				} else {
188					*suspended = true;
189					Ok(())
190				}
191			})
192		}
193
194		/// Resumes all XCM executions for the XCMP queue.
195		///
196		/// Note that this function doesn't change the status of the in/out bound channels.
197		///
198		/// - `origin`: Must pass `ControllerOrigin`.
199		#[pallet::call_index(2)]
200		#[pallet::weight((T::DbWeight::get().writes(1), DispatchClass::Operational,))]
201		pub fn resume_xcm_execution(origin: OriginFor<T>) -> DispatchResult {
202			T::ControllerOrigin::ensure_origin(origin)?;
203
204			QueueSuspended::<T>::try_mutate(|suspended| {
205				if !*suspended {
206					Err(Error::<T>::AlreadyResumed.into())
207				} else {
208					*suspended = false;
209					Ok(())
210				}
211			})
212		}
213
214		/// Overwrites the number of pages which must be in the queue for the other side to be
215		/// told to suspend their sending.
216		///
217		/// - `origin`: Must pass `Root`.
218		/// - `new`: Desired value for `QueueConfigData.suspend_value`
219		#[pallet::call_index(3)]
220		#[pallet::weight((T::WeightInfo::set_config_with_u32(), DispatchClass::Operational,))]
221		pub fn update_suspend_threshold(origin: OriginFor<T>, new: u32) -> DispatchResult {
222			ensure_root(origin)?;
223
224			QueueConfig::<T>::try_mutate(|data| {
225				data.suspend_threshold = new;
226				data.validate::<T>()
227			})
228		}
229
230		/// Overwrites the number of pages which must be in the queue after which we drop any
231		/// further messages from the channel.
232		///
233		/// - `origin`: Must pass `Root`.
234		/// - `new`: Desired value for `QueueConfigData.drop_threshold`
235		#[pallet::call_index(4)]
236		#[pallet::weight((T::WeightInfo::set_config_with_u32(),DispatchClass::Operational,))]
237		pub fn update_drop_threshold(origin: OriginFor<T>, new: u32) -> DispatchResult {
238			ensure_root(origin)?;
239
240			QueueConfig::<T>::try_mutate(|data| {
241				data.drop_threshold = new;
242				data.validate::<T>()
243			})
244		}
245
246		/// Overwrites the number of pages which the queue must be reduced to before it signals
247		/// that message sending may recommence after it has been suspended.
248		///
249		/// - `origin`: Must pass `Root`.
250		/// - `new`: Desired value for `QueueConfigData.resume_threshold`
251		#[pallet::call_index(5)]
252		#[pallet::weight((T::WeightInfo::set_config_with_u32(), DispatchClass::Operational,))]
253		pub fn update_resume_threshold(origin: OriginFor<T>, new: u32) -> DispatchResult {
254			ensure_root(origin)?;
255
256			QueueConfig::<T>::try_mutate(|data| {
257				data.resume_threshold = new;
258				data.validate::<T>()
259			})
260		}
261	}
262
263	#[pallet::hooks]
264	impl<T: Config> Hooks<BlockNumberFor<T>> for Pallet<T> {
265		fn integrity_test() {
266			assert!(!T::MaxPageSize::get().is_zero(), "MaxPageSize too low");
267
268			let w = Self::on_idle_weight();
269			assert!(w != Weight::zero());
270			assert!(w.all_lte(T::BlockWeights::get().max_block));
271
272			<T::WeightInfo as WeightInfoExt>::check_accuracy::<MaxXcmpMessageLenOf<T>>(0.15);
273		}
274
275		fn on_idle(_block: BlockNumberFor<T>, limit: Weight) -> Weight {
276			let mut meter = WeightMeter::with_limit(limit);
277
278			if meter.try_consume(Self::on_idle_weight()).is_err() {
279				tracing::debug!(
280					target: LOG_TARGET,
281					"Not enough weight for on_idle. {} < {}",
282					Self::on_idle_weight(), limit
283				);
284				return meter.consumed()
285			}
286
287			migration::v3::lazy_migrate_inbound_queue::<T>();
288
289			meter.consumed()
290		}
291	}
292
293	#[pallet::event]
294	#[pallet::generate_deposit(pub(super) fn deposit_event)]
295	pub enum Event<T: Config> {
296		/// An HRMP message was sent to a sibling parachain.
297		XcmpMessageSent { message_hash: XcmHash },
298	}
299
300	#[pallet::error]
301	pub enum Error<T> {
302		/// Setting the queue config failed since one of its values was invalid.
303		BadQueueConfig,
304		/// The execution is already suspended.
305		AlreadySuspended,
306		/// The execution is already resumed.
307		AlreadyResumed,
308		/// There are too many active outbound channels.
309		TooManyActiveOutboundChannels,
310		/// The message is too big.
311		TooBig,
312	}
313
314	/// The suspended inbound XCMP channels. All others are not suspended.
315	///
316	/// This is a `StorageValue` instead of a `StorageMap` since we expect multiple reads per block
317	/// to different keys with a one byte payload. The access to `BoundedBTreeSet` will be cached
318	/// within the block and therefore only included once in the proof size.
319	///
320	/// NOTE: The PoV benchmarking cannot know this and will over-estimate, but the actual proof
321	/// will be smaller.
322	#[pallet::storage]
323	pub type InboundXcmpSuspended<T: Config> =
324		StorageValue<_, BoundedBTreeSet<ParaId, T::MaxInboundSuspended>, ValueQuery>;
325
326	/// The non-empty XCMP channels in order of becoming non-empty, and the index of the first
327	/// and last outbound message. If the two indices are equal, then it indicates an empty
328	/// queue and there must be a non-`Ok` `OutboundStatus`. We assume queues grow no greater
329	/// than 65535 items. Queue indices for normal messages begin at one; zero is reserved in
330	/// case of the need to send a high-priority signal message this block.
331	/// The bool is true if there is a signal message waiting to be sent.
332	#[pallet::storage]
333	pub(super) type OutboundXcmpStatus<T: Config> = StorageValue<
334		_,
335		BoundedVec<OutboundChannelDetails, T::MaxActiveOutboundChannels>,
336		ValueQuery,
337	>;
338
339	/// The messages outbound in a given XCMP channel.
340	#[pallet::storage]
341	pub(super) type OutboundXcmpMessages<T: Config> = StorageDoubleMap<
342		_,
343		Blake2_128Concat,
344		ParaId,
345		Twox64Concat,
346		u16,
347		WeakBoundedVec<u8, T::MaxPageSize>,
348		ValueQuery,
349	>;
350
351	/// Any signal messages waiting to be sent.
352	#[pallet::storage]
353	pub(super) type SignalMessages<T: Config> =
354		StorageMap<_, Blake2_128Concat, ParaId, WeakBoundedVec<u8, T::MaxPageSize>, ValueQuery>;
355
356	/// The configuration which controls the dynamics of the outbound queue.
357	#[pallet::storage]
358	pub(super) type QueueConfig<T: Config> = StorageValue<_, QueueConfigData, ValueQuery>;
359
360	/// Whether or not the XCMP queue is suspended from executing incoming XCMs or not.
361	#[pallet::storage]
362	pub(super) type QueueSuspended<T: Config> = StorageValue<_, bool, ValueQuery>;
363
364	/// The factor to multiply the base delivery fee by.
365	#[pallet::storage]
366	pub(super) type DeliveryFeeFactor<T: Config> =
367		StorageMap<_, Twox64Concat, ParaId, FixedU128, ValueQuery, GetMinFeeFactor<Pallet<T>>>;
368}
369
370#[derive(Copy, Clone, Eq, PartialEq, Encode, Decode, RuntimeDebug, TypeInfo, MaxEncodedLen)]
371pub enum OutboundState {
372	Ok,
373	Suspended,
374}
375
376/// Struct containing detailed information about the outbound channel.
377#[derive(Clone, Eq, PartialEq, Encode, Decode, TypeInfo, RuntimeDebug, MaxEncodedLen)]
378pub struct OutboundChannelDetails {
379	/// The `ParaId` of the parachain that this channel is connected with.
380	recipient: ParaId,
381	/// The state of the channel.
382	state: OutboundState,
383	/// Whether or not any signals exist in this channel.
384	signals_exist: bool,
385	/// The index of the first outbound message.
386	first_index: u16,
387	/// The index of the last outbound message.
388	last_index: u16,
389}
390
391impl OutboundChannelDetails {
392	pub fn new(recipient: ParaId) -> OutboundChannelDetails {
393		OutboundChannelDetails {
394			recipient,
395			state: OutboundState::Ok,
396			signals_exist: false,
397			first_index: 0,
398			last_index: 0,
399		}
400	}
401
402	pub fn with_signals(mut self) -> OutboundChannelDetails {
403		self.signals_exist = true;
404		self
405	}
406
407	pub fn with_suspended_state(mut self) -> OutboundChannelDetails {
408		self.state = OutboundState::Suspended;
409		self
410	}
411}
412
413#[derive(Copy, Clone, Eq, PartialEq, Encode, Decode, RuntimeDebug, TypeInfo, MaxEncodedLen)]
414pub struct QueueConfigData {
415	/// The number of pages which must be in the queue for the other side to be told to suspend
416	/// their sending.
417	suspend_threshold: u32,
418	/// The number of pages which must be in the queue after which we drop any further messages
419	/// from the channel. This should normally not happen since the `suspend_threshold` can be used
420	/// to suspend the channel.
421	drop_threshold: u32,
422	/// The number of pages which the queue must be reduced to before it signals that
423	/// message sending may recommence after it has been suspended.
424	resume_threshold: u32,
425}
426
427impl Default for QueueConfigData {
428	fn default() -> Self {
429		// NOTE that these default values are only used on genesis. They should give a rough idea of
430		// what to set these values to, but is in no way a requirement.
431		Self {
432			drop_threshold: 48,    // 64KiB * 48 = 3MiB
433			suspend_threshold: 32, // 64KiB * 32 = 2MiB
434			resume_threshold: 8,   // 64KiB * 8 = 512KiB
435		}
436	}
437}
438
439impl QueueConfigData {
440	/// Validate all assumptions about `Self`.
441	///
442	/// Should be called prior to accepting this as new config.
443	pub fn validate<T: crate::Config>(&self) -> sp_runtime::DispatchResult {
444		if self.resume_threshold < self.suspend_threshold &&
445			self.suspend_threshold <= self.drop_threshold &&
446			self.resume_threshold > 0
447		{
448			Ok(())
449		} else {
450			Err(Error::<T>::BadQueueConfig.into())
451		}
452	}
453}
454
455#[derive(PartialEq, Eq, Copy, Clone, Encode, Decode, TypeInfo)]
456pub enum ChannelSignal {
457	Suspend,
458	Resume,
459}
460
461impl<T: Config> Pallet<T> {
462	/// Place a message `fragment` on the outgoing XCMP queue for `recipient`.
463	///
464	/// Format is the type of aggregate message that the `fragment` may be safely encoded and
465	/// appended onto.
466	///
467	/// ## Background
468	///
469	/// For our purposes, one HRMP "message" is actually an aggregated block of XCM "messages".
470	///
471	/// For the sake of clarity, we distinguish between them as message AGGREGATEs versus
472	/// message FRAGMENTs.
473	///
474	/// So each AGGREGATE is comprised of one or more concatenated SCALE-encoded `Vec<u8>`
475	/// FRAGMENTs. Though each fragment is already probably a SCALE-encoded Xcm, we can't be
476	/// certain, so we SCALE encode each `Vec<u8>` fragment in order to ensure we have the
477	/// length prefixed and can thus decode each fragment from the aggregate stream. With this,
478	/// we can concatenate them into a single aggregate blob without needing to be concerned
479	/// about encoding fragment boundaries.
480	///
481	/// If successful, returns the number of pages in the outbound queue after enqueuing the new
482	/// fragment.
483	fn send_fragment<Fragment: Encode>(
484		recipient: ParaId,
485		format: XcmpMessageFormat,
486		fragment: Fragment,
487	) -> Result<u32, MessageSendError> {
488		let encoded_fragment = fragment.encode();
489
490		// Optimization note: `max_message_size` could potentially be stored in
491		// `OutboundXcmpMessages` once known; that way it's only accessed when a new page is needed.
492
493		let channel_info =
494			T::ChannelInfo::get_channel_info(recipient).ok_or(MessageSendError::NoChannel)?;
495		// Max message size refers to aggregates, or pages. Not to individual fragments.
496		let max_message_size = channel_info.max_message_size.min(T::MaxPageSize::get()) as usize;
497		let format_size = format.encoded_size();
498		// We check the encoded fragment length plus the format size against the max message size
499		// because the format is concatenated if a new page is needed.
500		let size_to_check = encoded_fragment
501			.len()
502			.checked_add(format_size)
503			.ok_or(MessageSendError::TooBig)?;
504		if size_to_check > max_message_size {
505			return Err(MessageSendError::TooBig)
506		}
507
508		let mut all_channels = <OutboundXcmpStatus<T>>::get();
509		let channel_details = if let Some(details) =
510			all_channels.iter_mut().find(|channel| channel.recipient == recipient)
511		{
512			details
513		} else {
514			all_channels.try_push(OutboundChannelDetails::new(recipient)).map_err(|e| {
515				tracing::error!(target: LOG_TARGET, error=?e, "Failed to activate HRMP channel");
516				MessageSendError::TooManyChannels
517			})?;
518			all_channels
519				.last_mut()
520				.expect("can't be empty; a new element was just pushed; qed")
521		};
522		let have_active = channel_details.last_index > channel_details.first_index;
523		// Try to append fragment to the last page, if there is enough space.
524		// We return the size of the last page inside of the option, to not calculate it again.
525		let appended_to_last_page = have_active
526			.then(|| {
527				<OutboundXcmpMessages<T>>::try_mutate(
528					recipient,
529					channel_details.last_index - 1,
530					|page| {
531						if XcmpMessageFormat::decode(&mut &page[..]) != Ok(format) {
532							defensive!("Bad format in outbound queue; dropping message");
533							return Err(())
534						}
535						if page.len() + encoded_fragment.len() > max_message_size {
536							return Err(())
537						}
538						for frag in encoded_fragment.iter() {
539							page.try_push(*frag)?;
540						}
541						Ok(page.len())
542					},
543				)
544				.ok()
545			})
546			.flatten();
547
548		let (number_of_pages, last_page_size) = if let Some(size) = appended_to_last_page {
549			let number_of_pages = (channel_details.last_index - channel_details.first_index) as u32;
550			(number_of_pages, size)
551		} else {
552			// Need to add a new page.
553			let page_index = channel_details.last_index;
554			channel_details.last_index += 1;
555			let mut new_page = format.encode();
556			new_page.extend_from_slice(&encoded_fragment[..]);
557			let last_page_size = new_page.len();
558			let number_of_pages = (channel_details.last_index - channel_details.first_index) as u32;
559			let bounded_page =
560				BoundedVec::<u8, T::MaxPageSize>::try_from(new_page).map_err(|error| {
561					tracing::debug!(target: LOG_TARGET, ?error, "Failed to create bounded message page");
562					MessageSendError::TooBig
563				})?;
564			let bounded_page = WeakBoundedVec::force_from(bounded_page.into_inner(), None);
565			<OutboundXcmpMessages<T>>::insert(recipient, page_index, bounded_page);
566			<OutboundXcmpStatus<T>>::put(all_channels);
567			(number_of_pages, last_page_size)
568		};
569
570		// We have to count the total size here since `channel_info.total_size` is not updated at
571		// this point in time. We assume all previous pages are filled, which, in practice, is not
572		// always the case.
573		let total_size =
574			number_of_pages.saturating_sub(1) * max_message_size as u32 + last_page_size as u32;
575		let threshold = channel_info.max_total_size / delivery_fee_constants::THRESHOLD_FACTOR;
576		if total_size > threshold {
577			Self::increase_fee_factor(recipient, encoded_fragment.len() as u128);
578		}
579
580		Ok(number_of_pages)
581	}
582
583	/// Sends a signal to the `dest` chain over XCMP. This is guaranteed to be dispatched on this
584	/// block.
585	fn send_signal(dest: ParaId, signal: ChannelSignal) -> Result<(), Error<T>> {
586		let mut s = <OutboundXcmpStatus<T>>::get();
587		if let Some(details) = s.iter_mut().find(|item| item.recipient == dest) {
588			details.signals_exist = true;
589		} else {
590			s.try_push(OutboundChannelDetails::new(dest).with_signals()).map_err(|error| {
591				tracing::debug!(target: LOG_TARGET, ?error, "Failed to activate XCMP channel");
592				Error::<T>::TooManyActiveOutboundChannels
593			})?;
594		}
595
596		let page = BoundedVec::<u8, T::MaxPageSize>::try_from(
597			(XcmpMessageFormat::Signals, signal).encode(),
598		)
599		.map_err(|error| {
600			tracing::debug!(target: LOG_TARGET, ?error, "Failed to encode signal message");
601			Error::<T>::TooBig
602		})?;
603		let page = WeakBoundedVec::force_from(page.into_inner(), None);
604
605		<SignalMessages<T>>::insert(dest, page);
606		<OutboundXcmpStatus<T>>::put(s);
607		Ok(())
608	}
609
610	fn suspend_channel(target: ParaId) {
611		<OutboundXcmpStatus<T>>::mutate(|s| {
612			if let Some(details) = s.iter_mut().find(|item| item.recipient == target) {
613				let ok = details.state == OutboundState::Ok;
614				defensive_assert!(ok, "WARNING: Attempt to suspend channel that was not Ok.");
615				details.state = OutboundState::Suspended;
616			} else {
617				if s.try_push(OutboundChannelDetails::new(target).with_suspended_state()).is_err() {
618					defensive!("Cannot pause channel; too many outbound channels");
619				}
620			}
621		});
622	}
623
624	fn resume_channel(target: ParaId) {
625		<OutboundXcmpStatus<T>>::mutate(|s| {
626			if let Some(index) = s.iter().position(|item| item.recipient == target) {
627				let suspended = s[index].state == OutboundState::Suspended;
628				defensive_assert!(
629					suspended,
630					"WARNING: Attempt to resume channel that was not suspended."
631				);
632				if s[index].first_index == s[index].last_index {
633					s.remove(index);
634				} else {
635					s[index].state = OutboundState::Ok;
636				}
637			} else {
638				defensive!("WARNING: Attempt to resume channel that was not suspended.");
639			}
640		});
641	}
642
643	fn enqueue_xcmp_messages<'a>(
644		sender: ParaId,
645		xcms: &[BoundedSlice<'a, u8, MaxXcmpMessageLenOf<T>>],
646		is_first_sender_batch: bool,
647		meter: &mut WeightMeter,
648	) -> Result<(), ()> {
649		let QueueConfigData { drop_threshold, .. } = <QueueConfig<T>>::get();
650		let batches_footprints =
651			T::XcmpQueue::get_batches_footprints(sender, xcms.iter().copied(), drop_threshold);
652
653		let best_batch_footprint = batches_footprints.search_best_by(|batch_info| {
654			let required_weight = T::WeightInfo::enqueue_xcmp_messages(
655				batches_footprints.first_page_pos.saturated_into(),
656				batch_info,
657				is_first_sender_batch,
658			);
659
660			match meter.can_consume(required_weight) {
661				true => core::cmp::Ordering::Less,
662				false => core::cmp::Ordering::Greater,
663			}
664		});
665
666		meter.consume(T::WeightInfo::enqueue_xcmp_messages(
667			batches_footprints.first_page_pos.saturated_into(),
668			best_batch_footprint,
669			is_first_sender_batch,
670		));
671		T::XcmpQueue::enqueue_messages(
672			xcms.iter().take(best_batch_footprint.msgs_count).copied(),
673			sender,
674		);
675
676		if best_batch_footprint.msgs_count < xcms.len() {
677			tracing::error!(
678				target: LOG_TARGET,
679				used_weight=?meter.consumed_ratio(),
680				"Out of weight: cannot enqueue entire XCMP messages batch; \
681				dropped some or all messages in batch."
682			);
683			return Err(());
684		}
685		Ok(())
686	}
687
688	/// Split concatenated encoded `VersionedXcm`s into individual items.
689	///
690	/// We directly encode them again since that is needed later on.
691	///
692	/// On error returns a partial batch with all the XCMs processed before the failure.
693	/// This can happen in case of a decoding/re-encoding failure.
694	pub(crate) fn take_first_concatenated_xcm<'a>(
695		data: &mut &'a [u8],
696		meter: &mut WeightMeter,
697	) -> Result<BoundedSlice<'a, u8, MaxXcmpMessageLenOf<T>>, ()> {
698		// Let's make sure that we can decode at least an empty xcm message.
699		let base_weight = T::WeightInfo::take_first_concatenated_xcm(0);
700		if meter.try_consume(base_weight).is_err() {
701			defensive!("Out of weight; could not decode all; dropping");
702			return Err(())
703		}
704
705		let input_data = &mut &data[..];
706		let mut input = codec::CountedInput::new(input_data);
707		VersionedXcm::<()>::decode_with_depth_limit(MAX_XCM_DECODE_DEPTH, &mut input).map_err(
708			|error| {
709				tracing::debug!(target: LOG_TARGET, ?error, "Failed to decode XCM with depth limit");
710				()
711			},
712		)?;
713		let (xcm_data, remaining_data) = data.split_at(input.count() as usize);
714		*data = remaining_data;
715
716		// Consume the extra weight that it took to decode this message.
717		// This depends on the message len in bytes.
718		// Saturates if it's over the limit.
719		let extra_weight =
720			T::WeightInfo::take_first_concatenated_xcm(xcm_data.len() as u32) - base_weight;
721		meter.consume(extra_weight);
722
723		let xcm = BoundedSlice::try_from(xcm_data).map_err(|error| {
724			tracing::error!(
725				target: LOG_TARGET,
726				?error,
727				"Failed to take XCM after decoding: message is too long"
728			);
729			()
730		})?;
731
732		Ok(xcm)
733	}
734
735	/// Split concatenated opaque `VersionedXcm`s into individual items.
736	///
737	/// This method is not benchmarked because it's almost free.
738	pub(crate) fn take_first_concatenated_opaque_xcm<'a>(
739		data: &mut &'a [u8],
740	) -> Result<BoundedSlice<'a, u8, MaxXcmpMessageLenOf<T>>, ()> {
741		let xcm_len = Compact::<u32>::decode(data).map_err(|error| {
742			tracing::debug!(target: LOG_TARGET, ?error, "Failed to decode opaque XCM length");
743			()
744		})?;
745		let (xcm_data, remaining_data) = match data.split_at_checked(xcm_len.0 as usize) {
746			Some((xcm_data, remaining_data)) => (xcm_data, remaining_data),
747			None => {
748				tracing::debug!(target: LOG_TARGET, ?xcm_len, "Wrong opaque XCM length");
749				return Err(())
750			},
751		};
752		*data = remaining_data;
753
754		let xcm = BoundedSlice::try_from(xcm_data).map_err(|error| {
755			tracing::error!(
756				target: LOG_TARGET,
757				?error,
758				"Failed to take opaque XCM after decoding: message is too long"
759			);
760			()
761		})?;
762
763		Ok(xcm)
764	}
765
766	/// Split concatenated encoded `VersionedXcm`s into batches.
767	///
768	/// We directly encode them again since that is needed later on.
769	pub(crate) fn take_first_concatenated_xcms<'a>(
770		data: &mut &'a [u8],
771		encoding: XcmEncoding,
772		batch_size: usize,
773		meter: &mut WeightMeter,
774	) -> Result<
775		Vec<BoundedSlice<'a, u8, MaxXcmpMessageLenOf<T>>>,
776		Vec<BoundedSlice<'a, u8, MaxXcmpMessageLenOf<T>>>,
777	> {
778		let mut batch = vec![];
779		loop {
780			if data.is_empty() {
781				return Ok(batch)
782			}
783
784			let maybe_xcm = match encoding {
785				XcmEncoding::Simple => Self::take_first_concatenated_xcm(data, meter),
786				XcmEncoding::Double => Self::take_first_concatenated_opaque_xcm(data),
787			};
788			match maybe_xcm {
789				Ok(xcm) => {
790					batch.push(xcm);
791					if batch.len() >= batch_size {
792						return Ok(batch);
793					}
794				},
795				Err(_) => return Err(batch),
796			}
797		}
798	}
799
800	/// The worst-case weight of `on_idle`.
801	pub fn on_idle_weight() -> Weight {
802		<T as crate::Config>::WeightInfo::on_idle_good_msg()
803			.max(<T as crate::Config>::WeightInfo::on_idle_large_msg())
804	}
805
806	#[cfg(feature = "bridging")]
807	fn is_inbound_channel_suspended(sender: ParaId) -> bool {
808		<InboundXcmpSuspended<T>>::get().iter().any(|c| c == &sender)
809	}
810
811	#[cfg(feature = "bridging")]
812	/// Returns tuple of `OutboundState` and number of queued pages.
813	fn outbound_channel_state(target: ParaId) -> Option<(OutboundState, u16)> {
814		<OutboundXcmpStatus<T>>::get().iter().find(|c| c.recipient == target).map(|c| {
815			let queued_pages = c.last_index.saturating_sub(c.first_index);
816			(c.state, queued_pages)
817		})
818	}
819}
820
821impl<T: Config> OnQueueChanged<ParaId> for Pallet<T> {
822	// Suspends/Resumes the queue when certain thresholds are reached.
823	fn on_queue_changed(para: ParaId, fp: QueueFootprint) {
824		let QueueConfigData { resume_threshold, suspend_threshold, .. } = <QueueConfig<T>>::get();
825
826		let mut suspended_channels = <InboundXcmpSuspended<T>>::get();
827		let suspended = suspended_channels.contains(&para);
828
829		if suspended && fp.ready_pages <= resume_threshold {
830			if let Err(err) = Self::send_signal(para, ChannelSignal::Resume) {
831				tracing::error!(
832					target: LOG_TARGET,
833					error=?err,
834					sibling=?para,
835					"defensive: Could not send resumption signal to inbound channel of sibling; channel remains suspended."
836				);
837			} else {
838				suspended_channels.remove(&para);
839				<InboundXcmpSuspended<T>>::put(suspended_channels);
840			}
841		} else if !suspended && fp.ready_pages >= suspend_threshold {
842			tracing::warn!(target: LOG_TARGET, sibling=?para, "XCMP queue for sibling is full; suspending channel.");
843
844			if let Err(err) = Self::send_signal(para, ChannelSignal::Suspend) {
845				// It will retry if `drop_threshold` is not reached, but it could be too late.
846				tracing::error!(
847					target: LOG_TARGET, error=?err,
848					"defensive: Could not send suspension signal; future messages may be dropped."
849				);
850			} else if let Err(err) = suspended_channels.try_insert(para) {
851				tracing::error!(
852					target: LOG_TARGET,
853					error=?err,
854					sibling=?para,
855					"Too many channels suspended; cannot suspend sibling; further messages may be dropped."
856				);
857			} else {
858				<InboundXcmpSuspended<T>>::put(suspended_channels);
859			}
860		}
861	}
862}
863
864impl<T: Config> QueuePausedQuery<ParaId> for Pallet<T> {
865	fn is_paused(para: &ParaId) -> bool {
866		if !QueueSuspended::<T>::get() {
867			return false
868		}
869
870		// Make an exception for the superuser queue:
871		let sender_origin = T::ControllerOriginConverter::convert_origin(
872			(Parent, Parachain((*para).into())),
873			OriginKind::Superuser,
874		);
875		let is_controller =
876			sender_origin.map_or(false, |origin| T::ControllerOrigin::try_origin(origin).is_ok());
877
878		!is_controller
879	}
880}
881
882/// The encoding of the XCM messages in an XCMP page.
883#[derive(Copy, Clone)]
884enum XcmEncoding {
885	/// Simple encoded (`xcm.encode()`)
886	///
887	/// When we receive this king of messages, we have to decode and then re-encoded them before
888	/// enqueueing them for later processing.
889	Simple,
890	/// Double encoded (`xcm.encode().encode()`)
891	///
892	/// The XCM message is encoded first, resulting a vector of bytes. And then the vector of bytes
893	/// is encoded again. This has 2 advantages:
894	/// 1. We can just decode them before enqueueing them for later processing. They don't need to
895	///    be re-encoded.
896	/// 2. Decoding a `Vec<u8>` is much more efficient than decoding XCM messages.
897	Double,
898}
899
900impl<T: Config> XcmpMessageHandler for Pallet<T> {
901	fn handle_xcmp_messages<'a, I: Iterator<Item = (ParaId, RelayBlockNumber, &'a [u8])>>(
902		iter: I,
903		max_weight: Weight,
904	) -> Weight {
905		let mut meter = WeightMeter::with_limit(max_weight);
906
907		let mut known_xcm_senders = BTreeSet::new();
908		for (sender, _sent_at, mut data) in iter {
909			let format = match XcmpMessageFormat::decode(&mut data) {
910				Ok(f) => f,
911				Err(_) => {
912					defensive!("Unknown XCMP message format - dropping");
913					continue
914				},
915			};
916
917			match format {
918				XcmpMessageFormat::Signals => {
919					let mut signal_count = 0;
920					while !data.is_empty() && signal_count < MAX_SIGNALS_PER_PAGE {
921						signal_count += 1;
922						match ChannelSignal::decode(&mut data) {
923							Ok(ChannelSignal::Suspend) => {
924								if meter.try_consume(T::WeightInfo::suspend_channel()).is_err() {
925									defensive!(
926										"Not enough weight to process suspend signal - dropping"
927									);
928									break
929								}
930								Self::suspend_channel(sender)
931							},
932							Ok(ChannelSignal::Resume) => {
933								if meter.try_consume(T::WeightInfo::resume_channel()).is_err() {
934									defensive!(
935										"Not enough weight to process resume signal - dropping"
936									);
937									break
938								}
939								Self::resume_channel(sender)
940							},
941							Err(_) => {
942								defensive!("Undecodable channel signal - dropping");
943								break
944							},
945						}
946					}
947				},
948				XcmpMessageFormat::ConcatenatedVersionedXcm |
949				XcmpMessageFormat::ConcatenatedOpaqueVersionedXcm => {
950					let encoding = match format {
951						XcmpMessageFormat::ConcatenatedVersionedXcm => XcmEncoding::Simple,
952						XcmpMessageFormat::ConcatenatedOpaqueVersionedXcm => XcmEncoding::Double,
953						_ => {
954							// This branch is unreachable.
955							continue
956						},
957					};
958
959					let mut is_first_sender_batch = known_xcm_senders.insert(sender);
960					if is_first_sender_batch {
961						if meter
962							.try_consume(T::WeightInfo::uncached_enqueue_xcmp_messages())
963							.is_err()
964						{
965							defensive!(
966								"Out of weight: cannot enqueue XCMP messages; dropping page; \
967                                    Used weight: ",
968								meter.consumed_ratio()
969							);
970							continue;
971						}
972					}
973
974					let mut can_process_next_batch = true;
975					while can_process_next_batch {
976						let batch = match Self::take_first_concatenated_xcms(
977							&mut data,
978							encoding,
979							XCM_BATCH_SIZE,
980							&mut meter,
981						) {
982							Ok(batch) => batch,
983							Err(batch) => {
984								can_process_next_batch = false;
985								defensive!(
986									"HRMP inbound decode stream broke; page will be dropped."
987								);
988								batch
989							},
990						};
991						if batch.is_empty() {
992							break;
993						}
994
995						if let Err(()) = Self::enqueue_xcmp_messages(
996							sender,
997							&batch,
998							is_first_sender_batch,
999							&mut meter,
1000						) {
1001							break
1002						}
1003						is_first_sender_batch = false;
1004					}
1005				},
1006				XcmpMessageFormat::ConcatenatedEncodedBlob => {
1007					defensive!("Blob messages are unhandled - dropping");
1008					continue
1009				},
1010			}
1011		}
1012
1013		meter.consumed()
1014	}
1015}
1016
1017impl<T: Config> XcmpMessageSource for Pallet<T> {
1018	fn take_outbound_messages(maximum_channels: usize) -> Vec<(ParaId, Vec<u8>)> {
1019		let mut statuses = <OutboundXcmpStatus<T>>::get();
1020		let old_statuses_len = statuses.len();
1021		let max_message_count = statuses.len().min(maximum_channels);
1022		let mut result = Vec::with_capacity(max_message_count);
1023
1024		for status in statuses.iter_mut() {
1025			let OutboundChannelDetails {
1026				recipient: para_id,
1027				state: outbound_state,
1028				mut signals_exist,
1029				mut first_index,
1030				mut last_index,
1031			} = *status;
1032
1033			let (max_size_now, max_size_ever) = match T::ChannelInfo::get_channel_status(para_id) {
1034				ChannelStatus::Closed => {
1035					// This means that there is no such channel anymore. Nothing to be done but
1036					// swallow the messages and discard the status.
1037					for i in first_index..last_index {
1038						<OutboundXcmpMessages<T>>::remove(para_id, i);
1039					}
1040					if signals_exist {
1041						<SignalMessages<T>>::remove(para_id);
1042					}
1043					*status = OutboundChannelDetails::new(para_id);
1044					continue
1045				},
1046				ChannelStatus::Full => continue,
1047				ChannelStatus::Ready(n, e) => (n, e),
1048			};
1049
1050			// This is a hard limit from the host config; not even signals can bypass it.
1051			if result.len() == max_message_count {
1052				// We check this condition in the beginning of the loop so that we don't include
1053				// a message where the limit is 0.
1054				break
1055			}
1056
1057			let page = if signals_exist {
1058				let page = <SignalMessages<T>>::get(para_id);
1059				defensive_assert!(!page.is_empty(), "Signals must exist");
1060
1061				if page.len() < max_size_now {
1062					<SignalMessages<T>>::remove(para_id);
1063					signals_exist = false;
1064					page
1065				} else {
1066					defensive!("Signals should fit into a single page");
1067					continue
1068				}
1069			} else if outbound_state == OutboundState::Suspended {
1070				// Signals are exempt from suspension.
1071				continue
1072			} else if last_index > first_index {
1073				let page = <OutboundXcmpMessages<T>>::get(para_id, first_index);
1074				if page.len() < max_size_now {
1075					<OutboundXcmpMessages<T>>::remove(para_id, first_index);
1076					first_index += 1;
1077					page
1078				} else {
1079					continue
1080				}
1081			} else {
1082				continue
1083			};
1084			if first_index == last_index {
1085				first_index = 0;
1086				last_index = 0;
1087			}
1088
1089			if page.len() > max_size_ever {
1090				// TODO: #274 This means that the channel's max message size has changed since
1091				//   the message was sent. We should parse it and split into smaller messages but
1092				//   since it's so unlikely then for now we just drop it.
1093				defensive!("WARNING: oversize message in queue - dropping");
1094			} else {
1095				result.push((para_id, page.into_inner()));
1096			}
1097
1098			let max_total_size = match T::ChannelInfo::get_channel_info(para_id) {
1099				Some(channel_info) => channel_info.max_total_size,
1100				None => {
1101					tracing::warn!(target: LOG_TARGET, "calling `get_channel_info` with no RelevantMessagingState?!");
1102					MAX_POSSIBLE_ALLOCATION // We use this as a fallback in case the messaging state is not present
1103				},
1104			};
1105			let threshold = max_total_size.saturating_div(delivery_fee_constants::THRESHOLD_FACTOR);
1106			let remaining_total_size: usize = (first_index..last_index)
1107				.map(|index| OutboundXcmpMessages::<T>::decode_len(para_id, index).unwrap())
1108				.sum();
1109			if remaining_total_size <= threshold as usize {
1110				Self::decrease_fee_factor(para_id);
1111			}
1112
1113			*status = OutboundChannelDetails {
1114				recipient: para_id,
1115				state: outbound_state,
1116				signals_exist,
1117				first_index,
1118				last_index,
1119			};
1120		}
1121		debug_assert!(!statuses.iter().any(|s| s.signals_exist), "Signals should be handled");
1122
1123		// Sort the outbound messages by ascending recipient para id to satisfy the acceptance
1124		// criteria requirement.
1125		result.sort_by_key(|m| m.0);
1126
1127		// Prune hrmp channels that became empty. Additionally, because it may so happen that we
1128		// only gave attention to some channels in `non_empty_hrmp_channels` it's important to
1129		// change the order. Otherwise, the next `on_finalize` we will again give attention
1130		// only to those channels that happen to be in the beginning, until they are emptied.
1131		// This leads to "starvation" of the channels near to the end.
1132		//
1133		// To mitigate this we shift all processed elements towards the end of the vector using
1134		// `rotate_left`. To get intuition how it works see the examples in its rustdoc.
1135		statuses.retain(|x| {
1136			x.state == OutboundState::Suspended || x.signals_exist || x.first_index < x.last_index
1137		});
1138
1139		// old_status_len must be >= status.len() since we never add anything to status.
1140		let pruned = old_statuses_len - statuses.len();
1141		// removing an item from status implies a message being sent, so the result messages must
1142		// be no less than the pruned channels.
1143		let _ = statuses.try_rotate_left(result.len().saturating_sub(pruned)).defensive_proof(
1144			"Could not store HRMP channels config. Some HRMP channels may be broken.",
1145		);
1146
1147		<OutboundXcmpStatus<T>>::put(statuses);
1148
1149		result
1150	}
1151}
1152
1153/// Xcm sender for sending to a sibling parachain.
1154impl<T: Config> SendXcm for Pallet<T> {
1155	type Ticket = (ParaId, VersionedXcm<()>);
1156
1157	fn validate(
1158		dest: &mut Option<Location>,
1159		msg: &mut Option<Xcm<()>>,
1160	) -> SendResult<(ParaId, VersionedXcm<()>)> {
1161		let d = dest.take().ok_or(SendError::MissingArgument)?;
1162
1163		match d.unpack() {
1164			// An HRMP message for a sibling parachain.
1165			(1, [Parachain(id)]) => {
1166				let xcm = msg.take().ok_or(SendError::MissingArgument)?;
1167				let id = ParaId::from(*id);
1168				let price = T::PriceForSiblingDelivery::price_for_delivery(id, &xcm);
1169				let versioned_xcm = T::VersionWrapper::wrap_version(&d, xcm)
1170					.map_err(|()| SendError::DestinationUnsupported)?;
1171				versioned_xcm
1172					.check_is_decodable()
1173					.map_err(|()| SendError::ExceedsMaxMessageSize)?;
1174
1175				Ok(((id, versioned_xcm), price))
1176			},
1177			_ => {
1178				// Anything else is unhandled. This includes a message that is not meant for us.
1179				// We need to make sure that dest/msg is not consumed here.
1180				*dest = Some(d);
1181				Err(SendError::NotApplicable)
1182			},
1183		}
1184	}
1185
1186	fn deliver((id, xcm): (ParaId, VersionedXcm<()>)) -> Result<XcmHash, SendError> {
1187		let hash = xcm.using_encoded(sp_io::hashing::blake2_256);
1188
1189		match Self::send_fragment(id, XcmpMessageFormat::ConcatenatedVersionedXcm, xcm) {
1190			Ok(_) => {
1191				Self::deposit_event(Event::XcmpMessageSent { message_hash: hash });
1192				Ok(hash)
1193			},
1194			Err(e) => {
1195				tracing::error!(target: LOG_TARGET, error=?e, "Deliver error");
1196				Err(SendError::Transport(e.into()))
1197			},
1198		}
1199	}
1200}
1201
1202impl<T: Config> InspectMessageQueues for Pallet<T> {
1203	fn clear_messages() {
1204		// Best effort.
1205		let _ = OutboundXcmpMessages::<T>::clear(u32::MAX, None);
1206		OutboundXcmpStatus::<T>::mutate(|details_vec| {
1207			for details in details_vec {
1208				details.first_index = 0;
1209				details.last_index = 0;
1210			}
1211		});
1212	}
1213
1214	fn get_messages() -> Vec<(VersionedLocation, Vec<VersionedXcm<()>>)> {
1215		use xcm::prelude::*;
1216
1217		OutboundXcmpMessages::<T>::iter()
1218			.map(|(para_id, _, messages)| {
1219				let data = &mut &messages[..];
1220
1221				let decoded_format = XcmpMessageFormat::decode(data).unwrap();
1222				let mut decoded_messages = Vec::new();
1223				while !data.is_empty() {
1224					let message_bytes = match decoded_format {
1225						XcmpMessageFormat::ConcatenatedVersionedXcm =>
1226							Self::take_first_concatenated_xcm(data, &mut WeightMeter::new()),
1227						XcmpMessageFormat::ConcatenatedOpaqueVersionedXcm =>
1228							Self::take_first_concatenated_opaque_xcm(data),
1229						unexpected_format => {
1230							panic!("Unexpected XCMP format: {unexpected_format:?}!")
1231						},
1232					}
1233					.unwrap();
1234					let decoded_message = VersionedXcm::<()>::decode_with_depth_limit(
1235						MAX_XCM_DECODE_DEPTH,
1236						&mut &message_bytes[..],
1237					)
1238					.unwrap();
1239					decoded_messages.push(decoded_message);
1240				}
1241
1242				(
1243					VersionedLocation::from(Location::new(1, Parachain(para_id.into()))),
1244					decoded_messages,
1245				)
1246			})
1247			.collect()
1248	}
1249}
1250
1251impl<T: Config> FeeTracker for Pallet<T> {
1252	type Id = ParaId;
1253
1254	fn get_fee_factor(id: Self::Id) -> FixedU128 {
1255		<DeliveryFeeFactor<T>>::get(id)
1256	}
1257
1258	fn set_fee_factor(id: Self::Id, val: FixedU128) {
1259		<DeliveryFeeFactor<T>>::set(id, val);
1260	}
1261}