referrerpolicy=no-referrer-when-downgrade

cumulus_pallet_xcmp_queue/
lib.rs

1// Copyright (C) Parity Technologies (UK) Ltd.
2// This file is part of Cumulus.
3// SPDX-License-Identifier: Apache-2.0
4
5// Licensed under the Apache License, Version 2.0 (the "License");
6// you may not use this file except in compliance with the License.
7// You may obtain a copy of the License at
8//
9// 	http://www.apache.org/licenses/LICENSE-2.0
10//
11// Unless required by applicable law or agreed to in writing, software
12// distributed under the License is distributed on an "AS IS" BASIS,
13// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14// See the License for the specific language governing permissions and
15// limitations under the License.
16
17//! A pallet which uses the XCMP transport layer to handle both incoming and outgoing XCM message
18//! sending and dispatch, queuing, signalling and backpressure. To do so, it implements:
19//! * `XcmpMessageHandler`
20//! * `XcmpMessageSource`
21//!
22//! Also provides an implementation of `SendXcm` which can be placed in a router tuple for relaying
23//! XCM over XCMP if the destination is `Parent/Parachain`. It requires an implementation of
24//! `XcmExecutor` for dispatching incoming XCM messages.
25//!
26//! To prevent out of memory errors on the `OutboundXcmpMessages` queue, an exponential fee factor
27//! (`DeliveryFeeFactor`) is set, much like the one used in DMP.
28//! The fee factor increases whenever the total size of messages in a particular channel passes a
29//! threshold. This threshold is defined as a percentage of the maximum total size the channel can
30//! have. More concretely, the threshold is `max_total_size` / `THRESHOLD_FACTOR`, where:
31//! - `max_total_size` is the maximum size, in bytes, of the channel, not number of messages.
32//! It is defined in the channel configuration.
33//! - `THRESHOLD_FACTOR` just declares which percentage of the max size is the actual threshold.
34//! If it's 2, then the threshold is half of the max size, if it's 4, it's a quarter, and so on.
35
36#![cfg_attr(not(feature = "std"), no_std)]
37
38pub mod migration;
39
40#[cfg(test)]
41mod mock;
42
43#[cfg(test)]
44mod tests;
45
46#[cfg(feature = "runtime-benchmarks")]
47mod benchmarking;
48#[cfg(feature = "bridging")]
49pub mod bridging;
50pub mod weights;
51pub mod weights_ext;
52
53pub use weights::WeightInfo;
54pub use weights_ext::WeightInfoExt;
55
56extern crate alloc;
57
58use alloc::{collections::BTreeSet, vec, vec::Vec};
59use bitflags::bitflags;
60use bounded_collections::{BoundedBTreeSet, BoundedSlice, BoundedVec};
61use codec::{Compact, Decode, DecodeLimit, Encode, MaxEncodedLen};
62use cumulus_primitives_core::{
63	relay_chain::BlockNumber as RelayBlockNumber, ChannelStatus, GetChannelInfo, MessageSendError,
64	ParaId, XcmpMessageFormat, XcmpMessageHandler, XcmpMessageSource,
65};
66
67use frame_support::{
68	defensive, defensive_assert,
69	traits::{
70		Defensive, DefensiveTruncateFrom, EnqueueMessage, EnsureOrigin, Get, Len, QueueFootprint,
71		QueueFootprintQuery, QueuePausedQuery,
72	},
73	weights::{Weight, WeightMeter},
74};
75use pallet_message_queue::OnQueueChanged;
76use polkadot_runtime_common::xcm_sender::PriceForMessageDelivery;
77use polkadot_runtime_parachains::{FeeTracker, GetMinFeeFactor};
78use scale_info::TypeInfo;
79use sp_core::MAX_POSSIBLE_ALLOCATION;
80use sp_runtime::{FixedU128, SaturatedConversion, WeakBoundedVec};
81use xcm::{latest::prelude::*, VersionedLocation, VersionedXcm, WrapVersion, MAX_XCM_DECODE_DEPTH};
82use xcm_builder::InspectMessageQueues;
83use xcm_executor::traits::ConvertOrigin;
84
85pub use pallet::*;
86
87/// Index used to identify overweight XCMs.
88pub type OverweightIndex = u64;
89/// The max length of an XCMP message.
90pub type MaxXcmpMessageLenOf<T> =
91	<<T as Config>::XcmpQueue as EnqueueMessage<ParaId>>::MaxMessageLen;
92
93const LOG_TARGET: &str = "xcmp_queue";
94const DEFAULT_POV_SIZE: u64 = 64 * 1024; // 64 KB
95/// The size of an XCM messages batch.
96pub const XCM_BATCH_SIZE: usize = 250;
97/// The maximum number of signals that we can have in an XCMP page.
98pub const MAX_SIGNALS_PER_PAGE: usize = 3;
99
100/// Constants related to delivery fee calculation
101pub mod delivery_fee_constants {
102	/// Fees will start increasing when queue is half full
103	pub const THRESHOLD_FACTOR: u32 = 2;
104}
105
106#[frame_support::pallet]
107pub mod pallet {
108	use super::*;
109	use frame_support::{pallet_prelude::*, Twox64Concat};
110	use frame_system::pallet_prelude::*;
111
112	#[pallet::pallet]
113	#[pallet::storage_version(migration::STORAGE_VERSION)]
114	pub struct Pallet<T>(_);
115
116	#[pallet::config]
117	pub trait Config: frame_system::Config {
118		#[allow(deprecated)]
119		type RuntimeEvent: From<Event<Self>> + IsType<<Self as frame_system::Config>::RuntimeEvent>;
120
121		/// Information on the available XCMP channels.
122		type ChannelInfo: GetChannelInfo;
123
124		/// Means of converting an `Xcm` into a `VersionedXcm`.
125		type VersionWrapper: WrapVersion;
126
127		/// Enqueue an inbound horizontal message for later processing.
128		///
129		/// This defines the maximal message length via [`crate::MaxXcmpMessageLenOf`]. The pallet
130		/// assumes that this hook will eventually process all the pushed messages.
131		type XcmpQueue: EnqueueMessage<ParaId>
132			+ QueueFootprintQuery<ParaId, MaxMessageLen = MaxXcmpMessageLenOf<Self>>;
133
134		/// The maximum number of inbound XCMP channels that can be suspended simultaneously.
135		///
136		/// Any further channel suspensions will fail and messages may get dropped without further
137		/// notice. Choosing a high value (1000) is okay; the trade-off that is described in
138		/// [`InboundXcmpSuspended`] still applies at that scale.
139		#[pallet::constant]
140		type MaxInboundSuspended: Get<u32>;
141
142		/// Maximal number of outbound XCMP channels that can have messages queued at the same time.
143		///
144		/// If this is reached, then no further messages can be sent to channels that do not yet
145		/// have a message queued. This should be set to the expected maximum of outbound channels
146		/// which is determined by [`Self::ChannelInfo`]. It is important to set this large enough,
147		/// since otherwise the congestion control protocol will not work as intended and messages
148		/// may be dropped. This value increases the PoV and should therefore not be picked too
149		/// high. Governance needs to pay attention to not open more channels than this value.
150		#[pallet::constant]
151		type MaxActiveOutboundChannels: Get<u32>;
152
153		/// The maximal page size for HRMP message pages.
154		///
155		/// A lower limit can be set dynamically, but this is the hard-limit for the PoV worst case
156		/// benchmarking. The limit for the size of a message is slightly below this, since some
157		/// overhead is incurred for encoding the format.
158		#[pallet::constant]
159		type MaxPageSize: Get<u32>;
160
161		/// The origin that is allowed to resume or suspend the XCMP queue.
162		type ControllerOrigin: EnsureOrigin<Self::RuntimeOrigin>;
163
164		/// The conversion function used to attempt to convert an XCM `Location` origin to a
165		/// superuser origin.
166		type ControllerOriginConverter: ConvertOrigin<Self::RuntimeOrigin>;
167
168		/// The price for delivering an XCM to a sibling parachain destination.
169		type PriceForSiblingDelivery: PriceForMessageDelivery<Id = ParaId>;
170
171		/// The weight information of this pallet.
172		type WeightInfo: WeightInfoExt;
173	}
174
175	#[pallet::call]
176	impl<T: Config> Pallet<T> {
177		/// Suspends all XCM executions for the XCMP queue, regardless of the sender's origin.
178		///
179		/// - `origin`: Must pass `ControllerOrigin`.
180		#[pallet::call_index(1)]
181		#[pallet::weight((T::DbWeight::get().writes(1), DispatchClass::Operational,))]
182		pub fn suspend_xcm_execution(origin: OriginFor<T>) -> DispatchResult {
183			T::ControllerOrigin::ensure_origin(origin)?;
184
185			QueueSuspended::<T>::try_mutate(|suspended| {
186				if *suspended {
187					Err(Error::<T>::AlreadySuspended.into())
188				} else {
189					*suspended = true;
190					Ok(())
191				}
192			})
193		}
194
195		/// Resumes all XCM executions for the XCMP queue.
196		///
197		/// Note that this function doesn't change the status of the in/out bound channels.
198		///
199		/// - `origin`: Must pass `ControllerOrigin`.
200		#[pallet::call_index(2)]
201		#[pallet::weight((T::DbWeight::get().writes(1), DispatchClass::Operational,))]
202		pub fn resume_xcm_execution(origin: OriginFor<T>) -> DispatchResult {
203			T::ControllerOrigin::ensure_origin(origin)?;
204
205			QueueSuspended::<T>::try_mutate(|suspended| {
206				if !*suspended {
207					Err(Error::<T>::AlreadyResumed.into())
208				} else {
209					*suspended = false;
210					Ok(())
211				}
212			})
213		}
214
215		/// Overwrites the number of pages which must be in the queue for the other side to be
216		/// told to suspend their sending.
217		///
218		/// - `origin`: Must pass `Root`.
219		/// - `new`: Desired value for `QueueConfigData.suspend_value`
220		#[pallet::call_index(3)]
221		#[pallet::weight((T::WeightInfo::set_config_with_u32(), DispatchClass::Operational,))]
222		pub fn update_suspend_threshold(origin: OriginFor<T>, new: u32) -> DispatchResult {
223			ensure_root(origin)?;
224
225			QueueConfig::<T>::try_mutate(|data| {
226				data.suspend_threshold = new;
227				data.validate::<T>()
228			})
229		}
230
231		/// Overwrites the number of pages which must be in the queue after which we drop any
232		/// further messages from the channel.
233		///
234		/// - `origin`: Must pass `Root`.
235		/// - `new`: Desired value for `QueueConfigData.drop_threshold`
236		#[pallet::call_index(4)]
237		#[pallet::weight((T::WeightInfo::set_config_with_u32(),DispatchClass::Operational,))]
238		pub fn update_drop_threshold(origin: OriginFor<T>, new: u32) -> DispatchResult {
239			ensure_root(origin)?;
240
241			QueueConfig::<T>::try_mutate(|data| {
242				data.drop_threshold = new;
243				data.validate::<T>()
244			})
245		}
246
247		/// Overwrites the number of pages which the queue must be reduced to before it signals
248		/// that message sending may recommence after it has been suspended.
249		///
250		/// - `origin`: Must pass `Root`.
251		/// - `new`: Desired value for `QueueConfigData.resume_threshold`
252		#[pallet::call_index(5)]
253		#[pallet::weight((T::WeightInfo::set_config_with_u32(), DispatchClass::Operational,))]
254		pub fn update_resume_threshold(origin: OriginFor<T>, new: u32) -> DispatchResult {
255			ensure_root(origin)?;
256
257			QueueConfig::<T>::try_mutate(|data| {
258				data.resume_threshold = new;
259				data.validate::<T>()
260			})
261		}
262	}
263
264	#[pallet::hooks]
265	impl<T: Config> Hooks<BlockNumberFor<T>> for Pallet<T> {
266		fn integrity_test() {
267			assert!(!T::MaxPageSize::get().is_zero(), "MaxPageSize too low");
268
269			let w = Self::on_idle_weight();
270			assert!(w != Weight::zero());
271			assert!(w.all_lte(T::BlockWeights::get().max_block));
272
273			<T::WeightInfo as WeightInfoExt>::check_accuracy::<MaxXcmpMessageLenOf<T>>(0.15);
274		}
275
276		fn on_idle(_block: BlockNumberFor<T>, limit: Weight) -> Weight {
277			let mut meter = WeightMeter::with_limit(limit);
278
279			if meter.try_consume(Self::on_idle_weight()).is_err() {
280				tracing::debug!(
281					target: LOG_TARGET,
282					"Not enough weight for on_idle. {} < {}",
283					Self::on_idle_weight(), limit
284				);
285				return meter.consumed();
286			}
287
288			migration::v3::lazy_migrate_inbound_queue::<T>();
289
290			meter.consumed()
291		}
292	}
293
294	#[pallet::event]
295	#[pallet::generate_deposit(pub(super) fn deposit_event)]
296	pub enum Event<T: Config> {
297		/// An HRMP message was sent to a sibling parachain.
298		XcmpMessageSent { message_hash: XcmHash },
299	}
300
301	#[pallet::error]
302	pub enum Error<T> {
303		/// Setting the queue config failed since one of its values was invalid.
304		BadQueueConfig,
305		/// The execution is already suspended.
306		AlreadySuspended,
307		/// The execution is already resumed.
308		AlreadyResumed,
309		/// There are too many active outbound channels.
310		TooManyActiveOutboundChannels,
311		/// The message is too big.
312		TooBig,
313	}
314
315	/// The suspended inbound XCMP channels. All others are not suspended.
316	///
317	/// This is a `StorageValue` instead of a `StorageMap` since we expect multiple reads per block
318	/// to different keys with a one byte payload. The access to `BoundedBTreeSet` will be cached
319	/// within the block and therefore only included once in the proof size.
320	///
321	/// NOTE: The PoV benchmarking cannot know this and will over-estimate, but the actual proof
322	/// will be smaller.
323	#[pallet::storage]
324	pub type InboundXcmpSuspended<T: Config> =
325		StorageValue<_, BoundedBTreeSet<ParaId, T::MaxInboundSuspended>, ValueQuery>;
326
327	/// The non-empty XCMP channels in order of becoming non-empty, and the index of the first
328	/// and last outbound message. If the two indices are equal, then it indicates an empty
329	/// queue and there must be a non-`Ok` `OutboundStatus`. We assume queues grow no greater
330	/// than 65535 items. Queue indices for normal messages begin at one; zero is reserved in
331	/// case of the need to send a high-priority signal message this block.
332	/// The bool is true if there is a signal message waiting to be sent.
333	#[pallet::storage]
334	pub(super) type OutboundXcmpStatus<T: Config> = StorageValue<
335		_,
336		BoundedVec<OutboundChannelDetails, T::MaxActiveOutboundChannels>,
337		ValueQuery,
338	>;
339
340	/// The messages outbound in a given XCMP channel.
341	#[pallet::storage]
342	pub(super) type OutboundXcmpMessages<T: Config> = StorageDoubleMap<
343		_,
344		Blake2_128Concat,
345		ParaId,
346		Twox64Concat,
347		u16,
348		WeakBoundedVec<u8, T::MaxPageSize>,
349		ValueQuery,
350	>;
351
352	/// Any signal messages waiting to be sent.
353	#[pallet::storage]
354	pub(super) type SignalMessages<T: Config> =
355		StorageMap<_, Blake2_128Concat, ParaId, WeakBoundedVec<u8, T::MaxPageSize>, ValueQuery>;
356
357	/// The configuration which controls the dynamics of the outbound queue.
358	#[pallet::storage]
359	pub(super) type QueueConfig<T: Config> = StorageValue<_, QueueConfigData, ValueQuery>;
360
361	/// Whether or not the XCMP queue is suspended from executing incoming XCMs or not.
362	#[pallet::storage]
363	pub(super) type QueueSuspended<T: Config> = StorageValue<_, bool, ValueQuery>;
364
365	/// The factor to multiply the base delivery fee by.
366	#[pallet::storage]
367	pub(super) type DeliveryFeeFactor<T: Config> =
368		StorageMap<_, Twox64Concat, ParaId, FixedU128, ValueQuery, GetMinFeeFactor<Pallet<T>>>;
369}
370
371#[derive(Copy, Clone, Eq, PartialEq, Encode, Decode, Debug, TypeInfo, MaxEncodedLen)]
372pub enum OutboundState {
373	Ok,
374	Suspended,
375}
376
377bitflags! {
378	#[derive(Encode, Decode, TypeInfo, MaxEncodedLen)]
379	struct OutboundChannelFlags: u32 {
380		const CONCATENATED_OPAQUE_VERSIONED_XCM_SUPPORT = 1;
381		const CONCATENATED_OPAQUE_VERSIONED_XCM_NOTIFICATION_SENT = 1 << 1;
382	}
383}
384
385impl OutboundChannelFlags {
386	// Check whether the recipient supports `ConcatenatedOpaqueVersionedXcm`.
387	fn has_concatenated_opaque_versioned_xcm_support(&self) -> bool {
388		*self & Self::CONCATENATED_OPAQUE_VERSIONED_XCM_SUPPORT != Self::empty()
389	}
390
391	// Check whether we should send a notification to the recipient, advertising that we support
392	// `ConcatenatedOpaqueVersionedXcm`.
393	fn should_send_concatenated_opaque_versioned_xcm_notification(&self) -> bool {
394		if self.has_concatenated_opaque_versioned_xcm_support() {
395			return false;
396		}
397
398		if *self & Self::CONCATENATED_OPAQUE_VERSIONED_XCM_NOTIFICATION_SENT != Self::empty() {
399			return false;
400		}
401
402		true
403	}
404
405	// Remember that the recipient supports `ConcatenatedOpaqueVersionedXcm`.
406	fn notice_concatenated_opaque_versioned_xcm_support(&mut self) {
407		*self = *self | Self::CONCATENATED_OPAQUE_VERSIONED_XCM_SUPPORT;
408	}
409
410	// Remember that we advertised the `ConcatenatedOpaqueVersionedXcm` support to the recipient.
411	fn notice_concatenated_opaque_versioned_xcm_notification_sent(&mut self) {
412		*self = *self | Self::CONCATENATED_OPAQUE_VERSIONED_XCM_NOTIFICATION_SENT;
413	}
414}
415
416/// Struct containing detailed information about the outbound channel.
417#[derive(Clone, Eq, PartialEq, Encode, Decode, TypeInfo, Debug, MaxEncodedLen)]
418pub struct OutboundChannelDetails {
419	/// The `ParaId` of the parachain that this channel is connected with.
420	recipient: ParaId,
421	/// The state of the channel.
422	state: OutboundState,
423	/// Whether any signals exist in this channel.
424	signals_exist: bool,
425	/// The index of the first outbound message.
426	first_index: u16,
427	/// The index of the last outbound message.
428	last_index: u16,
429	/// Flags
430	flags: OutboundChannelFlags,
431}
432
433impl OutboundChannelDetails {
434	pub fn new(recipient: ParaId) -> OutboundChannelDetails {
435		OutboundChannelDetails {
436			recipient,
437			state: OutboundState::Ok,
438			signals_exist: false,
439			first_index: 0,
440			last_index: 0,
441			flags: OutboundChannelFlags::empty(),
442		}
443	}
444
445	pub fn with_signals(mut self) -> OutboundChannelDetails {
446		self.signals_exist = true;
447		self
448	}
449
450	pub fn with_suspended_state(mut self) -> OutboundChannelDetails {
451		self.state = OutboundState::Suspended;
452		self
453	}
454}
455
456#[derive(Copy, Clone, Eq, PartialEq, Encode, Decode, Debug, TypeInfo, MaxEncodedLen)]
457pub struct QueueConfigData {
458	/// The number of pages which must be in the queue for the other side to be told to suspend
459	/// their sending.
460	suspend_threshold: u32,
461	/// The number of pages which must be in the queue after which we drop any further messages
462	/// from the channel. This should normally not happen since the `suspend_threshold` can be used
463	/// to suspend the channel.
464	drop_threshold: u32,
465	/// The number of pages which the queue must be reduced to before it signals that
466	/// message sending may recommence after it has been suspended.
467	resume_threshold: u32,
468}
469
470impl Default for QueueConfigData {
471	fn default() -> Self {
472		// NOTE that these default values are only used on genesis. They should give a rough idea of
473		// what to set these values to, but is in no way a requirement.
474		Self {
475			drop_threshold: 48,    // 64KiB * 48 = 3MiB
476			suspend_threshold: 32, // 64KiB * 32 = 2MiB
477			resume_threshold: 8,   // 64KiB * 8 = 512KiB
478		}
479	}
480}
481
482impl QueueConfigData {
483	/// Validate all assumptions about `Self`.
484	///
485	/// Should be called prior to accepting this as new config.
486	pub fn validate<T: crate::Config>(&self) -> sp_runtime::DispatchResult {
487		if self.resume_threshold < self.suspend_threshold &&
488			self.suspend_threshold <= self.drop_threshold &&
489			self.resume_threshold > 0
490		{
491			Ok(())
492		} else {
493			Err(Error::<T>::BadQueueConfig.into())
494		}
495	}
496}
497
498#[derive(PartialEq, Eq, Copy, Clone, Encode, Decode, TypeInfo)]
499pub enum ChannelSignal {
500	Suspend,
501	Resume,
502}
503
504impl<T: Config> Pallet<T> {
505	fn try_get_outbound_channel(
506		all_channels: &BoundedVec<OutboundChannelDetails, T::MaxActiveOutboundChannels>,
507		recipient: ParaId,
508	) -> Option<&OutboundChannelDetails> {
509		for channel_idx in 0..all_channels.len() {
510			if all_channels[channel_idx].recipient == recipient {
511				return Some(&all_channels[channel_idx]);
512			}
513		}
514
515		None
516	}
517
518	fn try_get_or_insert_outbound_channel(
519		all_channels: &mut BoundedVec<OutboundChannelDetails, T::MaxActiveOutboundChannels>,
520		recipient: ParaId,
521	) -> Option<&mut OutboundChannelDetails> {
522		for channel_idx in 0..all_channels.len() {
523			if all_channels[channel_idx].recipient == recipient {
524				return Some(&mut all_channels[channel_idx]);
525			}
526		}
527
528		all_channels
529			.try_push(OutboundChannelDetails::new(recipient))
530			.inspect_err(|e| {
531				tracing::error!(target: LOG_TARGET, error=?e, "Failed to insert outbound HRMP channel");
532			})
533			.ok()?;
534		all_channels.last_mut()
535	}
536
537	/// Place a message `fragment` on the outgoing XCMP queue for `recipient`.
538	///
539	/// Format is the type of aggregate message that the `fragment` may be safely encoded and
540	/// appended onto.
541	///
542	/// ## Background
543	///
544	/// For our purposes, one HRMP "message" is actually an aggregated block of XCM "messages".
545	///
546	/// For the sake of clarity, we distinguish between them as message AGGREGATEs versus
547	/// message FRAGMENTs.
548	///
549	/// So each AGGREGATE is comprised of one or more concatenated SCALE-encoded `Vec<u8>`
550	/// FRAGMENTs. Though each fragment is already probably a SCALE-encoded Xcm, we can't be
551	/// certain, so we SCALE encode each `Vec<u8>` fragment in order to ensure we have the
552	/// length prefixed and can thus decode each fragment from the aggregate stream. With this,
553	/// we can concatenate them into a single aggregate blob without needing to be concerned
554	/// about encoding fragment boundaries.
555	///
556	/// If successful, returns the number of pages in the outbound queue after enqueuing the new
557	/// fragment.
558	fn send_fragment<Fragment: Encode>(
559		recipient: ParaId,
560		format: XcmpMessageFormat,
561		fragment: Fragment,
562	) -> Result<u32, MessageSendError> {
563		let mut encoded_fragment = fragment.encode();
564		let encoded_fragment_len = encoded_fragment.len();
565
566		// Optimization note: `max_message_size` could potentially be stored in
567		// `OutboundXcmpMessages` once known; that way it's only accessed when a new page is needed.
568
569		let channel_info =
570			T::ChannelInfo::get_channel_info(recipient).ok_or(MessageSendError::NoChannel)?;
571		// Max message size refers to aggregates, or pages. Not to individual fragments.
572		let max_message_size = channel_info.max_message_size.min(T::MaxPageSize::get()) as usize;
573		let format_size = format.encoded_size();
574		// We check the encoded fragment length plus the format size against the max message size
575		// because the format is concatenated if a new page is needed.
576		let size_to_check = encoded_fragment
577			.len()
578			.checked_add(format_size)
579			.ok_or(MessageSendError::TooBig)?;
580		if size_to_check > max_message_size {
581			return Err(MessageSendError::TooBig);
582		}
583
584		let mut all_channels = <OutboundXcmpStatus<T>>::get();
585		let channel_details =
586			Self::try_get_or_insert_outbound_channel(&mut all_channels, recipient)
587				.ok_or(MessageSendError::TooManyChannels)?;
588		if let XcmpMessageFormat::ConcatenatedOpaqueVersionedXcm = format {
589			channel_details
590				.flags
591				.notice_concatenated_opaque_versioned_xcm_notification_sent();
592		}
593
594		let mut existing_page = None;
595		'existing_page_check: {
596			if channel_details.last_index > channel_details.first_index {
597				let page =
598					OutboundXcmpMessages::<T>::get(recipient, channel_details.last_index - 1);
599				if XcmpMessageFormat::decode(&mut &page[..]) != Ok(format) {
600					break 'existing_page_check;
601				}
602				if page.len() + encoded_fragment.len() > max_message_size {
603					break 'existing_page_check;
604				}
605				existing_page = Some(page.into_inner());
606			}
607		}
608		let mut current_page = existing_page.unwrap_or_else(|| {
609			// We need to add a new page.
610			channel_details.last_index += 1;
611			format.encode()
612		});
613
614		current_page.append(&mut encoded_fragment);
615		let current_page = WeakBoundedVec::try_from(current_page).map_err(|error| {
616			tracing::debug!(target: LOG_TARGET, ?error, "Failed to create bounded message page");
617			MessageSendError::TooBig
618		})?;
619		let page_count =
620			channel_details.last_index.saturating_sub(channel_details.first_index) as u32;
621		let last_page_size = current_page.len();
622		<OutboundXcmpMessages<T>>::insert(recipient, channel_details.last_index - 1, current_page);
623		<OutboundXcmpStatus<T>>::put(all_channels);
624
625		// We have to count the total size here since `channel_info.total_size` is not updated at
626		// this point in time. We assume all previous pages are filled, which, in practice, is not
627		// always the case.
628		let total_size =
629			page_count.saturating_sub(1) * max_message_size as u32 + last_page_size as u32;
630		let threshold = channel_info.max_total_size / delivery_fee_constants::THRESHOLD_FACTOR;
631		if total_size > threshold {
632			Self::increase_fee_factor(recipient, encoded_fragment_len as u128);
633		}
634
635		Ok(page_count)
636	}
637
638	/// Sends a signal to the `dest` chain over XCMP. This is guaranteed to be dispatched on this
639	/// block.
640	fn send_signal(dest: ParaId, signal: ChannelSignal) -> Result<(), Error<T>> {
641		let mut s = <OutboundXcmpStatus<T>>::get();
642		if let Some(details) = s.iter_mut().find(|item| item.recipient == dest) {
643			details.signals_exist = true;
644		} else {
645			s.try_push(OutboundChannelDetails::new(dest).with_signals()).map_err(|error| {
646				tracing::debug!(target: LOG_TARGET, ?error, "Failed to activate XCMP channel");
647				Error::<T>::TooManyActiveOutboundChannels
648			})?;
649		}
650
651		let page = BoundedVec::<u8, T::MaxPageSize>::try_from(
652			(XcmpMessageFormat::Signals, signal).encode(),
653		)
654		.map_err(|error| {
655			tracing::debug!(target: LOG_TARGET, ?error, "Failed to encode signal message");
656			Error::<T>::TooBig
657		})?;
658		let page = WeakBoundedVec::force_from(page.into_inner(), None);
659
660		<SignalMessages<T>>::insert(dest, page);
661		<OutboundXcmpStatus<T>>::put(s);
662		Ok(())
663	}
664
665	fn suspend_channel(target: ParaId) {
666		<OutboundXcmpStatus<T>>::mutate(|s| {
667			if let Some(details) = s.iter_mut().find(|item| item.recipient == target) {
668				let ok = details.state == OutboundState::Ok;
669				defensive_assert!(ok, "WARNING: Attempt to suspend channel that was not Ok.");
670				details.state = OutboundState::Suspended;
671			} else {
672				if s.try_push(OutboundChannelDetails::new(target).with_suspended_state()).is_err() {
673					defensive!("Cannot pause channel; too many outbound channels");
674				}
675			}
676		});
677	}
678
679	fn resume_channel(target: ParaId) {
680		<OutboundXcmpStatus<T>>::mutate(|s| {
681			if let Some(index) = s.iter().position(|item| item.recipient == target) {
682				let suspended = s[index].state == OutboundState::Suspended;
683				defensive_assert!(
684					suspended,
685					"WARNING: Attempt to resume channel that was not suspended."
686				);
687				if s[index].first_index == s[index].last_index {
688					s.remove(index);
689				} else {
690					s[index].state = OutboundState::Ok;
691				}
692			} else {
693				defensive!("WARNING: Attempt to resume channel that was not suspended.");
694			}
695		});
696	}
697
698	fn enqueue_xcmp_messages<'a>(
699		sender: ParaId,
700		xcms: &[BoundedSlice<'a, u8, MaxXcmpMessageLenOf<T>>],
701		is_first_sender_batch: bool,
702		meter: &mut WeightMeter,
703	) -> Result<(), ()> {
704		let QueueConfigData { drop_threshold, .. } = <QueueConfig<T>>::get();
705		let batches_footprints =
706			T::XcmpQueue::get_batches_footprints(sender, xcms.iter().copied(), drop_threshold);
707
708		let best_batch_footprint = batches_footprints.search_best_by(|batch_info| {
709			let required_weight = T::WeightInfo::enqueue_xcmp_messages(
710				batches_footprints.first_page_pos.saturated_into(),
711				batch_info,
712				is_first_sender_batch,
713			);
714
715			match meter.can_consume(required_weight) {
716				true => core::cmp::Ordering::Less,
717				false => core::cmp::Ordering::Greater,
718			}
719		});
720
721		meter.consume(T::WeightInfo::enqueue_xcmp_messages(
722			batches_footprints.first_page_pos.saturated_into(),
723			best_batch_footprint,
724			is_first_sender_batch,
725		));
726		T::XcmpQueue::enqueue_messages(
727			xcms.iter().take(best_batch_footprint.msgs_count).copied(),
728			sender,
729		);
730
731		if best_batch_footprint.msgs_count < xcms.len() {
732			tracing::error!(
733				target: LOG_TARGET,
734				used_weight=?meter.consumed_ratio(),
735				"Out of weight: cannot enqueue entire XCMP messages batch; \
736				dropped some or all messages in batch."
737			);
738			return Err(());
739		}
740		Ok(())
741	}
742
743	/// Split concatenated encoded `VersionedXcm`s into individual items.
744	///
745	/// We directly encode them again since that is needed later on.
746	///
747	/// On error returns a partial batch with all the XCMs processed before the failure.
748	/// This can happen in case of a decoding/re-encoding failure.
749	pub(crate) fn take_first_concatenated_xcm<'a>(
750		data: &mut &'a [u8],
751		meter: &mut WeightMeter,
752	) -> Result<BoundedSlice<'a, u8, MaxXcmpMessageLenOf<T>>, ()> {
753		// Let's make sure that we can decode at least an empty xcm message.
754		let base_weight = T::WeightInfo::take_first_concatenated_xcm(0);
755		if meter.try_consume(base_weight).is_err() {
756			defensive!("Out of weight; could not decode all; dropping");
757			return Err(());
758		}
759
760		let input_data = &mut &data[..];
761		let mut input = codec::CountedInput::new(input_data);
762		VersionedXcm::<()>::decode_with_depth_limit(MAX_XCM_DECODE_DEPTH, &mut input).map_err(
763			|error| {
764				tracing::debug!(target: LOG_TARGET, ?error, "Failed to decode XCM with depth limit");
765				()
766			},
767		)?;
768		let (xcm_data, remaining_data) = data.split_at(input.count() as usize);
769		*data = remaining_data;
770
771		// Consume the extra weight that it took to decode this message.
772		// This depends on the message len in bytes.
773		// Saturates if it's over the limit.
774		let extra_weight =
775			T::WeightInfo::take_first_concatenated_xcm(xcm_data.len() as u32) - base_weight;
776		meter.consume(extra_weight);
777
778		let xcm = BoundedSlice::try_from(xcm_data).map_err(|error| {
779			tracing::error!(
780				target: LOG_TARGET,
781				?error,
782				"Failed to take XCM after decoding: message is too long"
783			);
784			()
785		})?;
786
787		Ok(xcm)
788	}
789
790	/// Split concatenated opaque `VersionedXcm`s into individual items.
791	///
792	/// This method is not benchmarked because it's almost free.
793	pub(crate) fn take_first_concatenated_opaque_xcm<'a>(
794		data: &mut &'a [u8],
795	) -> Result<BoundedSlice<'a, u8, MaxXcmpMessageLenOf<T>>, ()> {
796		let xcm_len = Compact::<u32>::decode(data).map_err(|error| {
797			tracing::debug!(target: LOG_TARGET, ?error, "Failed to decode opaque XCM length");
798			()
799		})?;
800		let (xcm_data, remaining_data) = match data.split_at_checked(xcm_len.0 as usize) {
801			Some((xcm_data, remaining_data)) => (xcm_data, remaining_data),
802			None => {
803				tracing::debug!(target: LOG_TARGET, ?xcm_len, "Wrong opaque XCM length");
804				return Err(());
805			},
806		};
807		*data = remaining_data;
808
809		let xcm = BoundedSlice::try_from(xcm_data).map_err(|error| {
810			tracing::error!(
811				target: LOG_TARGET,
812				?error,
813				"Failed to take opaque XCM after decoding: message is too long"
814			);
815			()
816		})?;
817
818		Ok(xcm)
819	}
820
821	/// Split concatenated encoded `VersionedXcm`s into batches.
822	///
823	/// We directly encode them again since that is needed later on.
824	pub(crate) fn take_first_concatenated_xcms<'a>(
825		data: &mut &'a [u8],
826		encoding: XcmEncoding,
827		batch_size: usize,
828		meter: &mut WeightMeter,
829	) -> Result<
830		Vec<BoundedSlice<'a, u8, MaxXcmpMessageLenOf<T>>>,
831		Vec<BoundedSlice<'a, u8, MaxXcmpMessageLenOf<T>>>,
832	> {
833		let mut batch = vec![];
834		loop {
835			if data.is_empty() {
836				return Ok(batch);
837			}
838
839			let maybe_xcm = match encoding {
840				XcmEncoding::Simple => Self::take_first_concatenated_xcm(data, meter),
841				XcmEncoding::Double => Self::take_first_concatenated_opaque_xcm(data),
842			};
843			match maybe_xcm {
844				Ok(xcm) => {
845					batch.push(xcm);
846					if batch.len() >= batch_size {
847						return Ok(batch);
848					}
849				},
850				Err(_) => return Err(batch),
851			}
852		}
853	}
854
855	/// The worst-case weight of `on_idle`.
856	pub fn on_idle_weight() -> Weight {
857		<T as crate::Config>::WeightInfo::on_idle_good_msg()
858			.max(<T as crate::Config>::WeightInfo::on_idle_large_msg())
859	}
860
861	#[cfg(feature = "bridging")]
862	fn is_inbound_channel_suspended(sender: ParaId) -> bool {
863		<InboundXcmpSuspended<T>>::get().iter().any(|c| c == &sender)
864	}
865
866	#[cfg(feature = "bridging")]
867	/// Returns tuple of `OutboundState` and number of queued pages.
868	fn outbound_channel_state(target: ParaId) -> Option<(OutboundState, u16)> {
869		<OutboundXcmpStatus<T>>::get().iter().find(|c| c.recipient == target).map(|c| {
870			let queued_pages = c.last_index.saturating_sub(c.first_index);
871			(c.state, queued_pages)
872		})
873	}
874}
875
876impl<T: Config> OnQueueChanged<ParaId> for Pallet<T> {
877	// Suspends/Resumes the queue when certain thresholds are reached.
878	fn on_queue_changed(para: ParaId, fp: QueueFootprint) {
879		let QueueConfigData { resume_threshold, suspend_threshold, .. } = <QueueConfig<T>>::get();
880
881		let mut suspended_channels = <InboundXcmpSuspended<T>>::get();
882		let suspended = suspended_channels.contains(&para);
883
884		if suspended && fp.ready_pages <= resume_threshold {
885			if let Err(err) = Self::send_signal(para, ChannelSignal::Resume) {
886				tracing::error!(
887					target: LOG_TARGET,
888					error=?err,
889					sibling=?para,
890					"defensive: Could not send resumption signal to inbound channel of sibling; channel remains suspended."
891				);
892			} else {
893				suspended_channels.remove(&para);
894				<InboundXcmpSuspended<T>>::put(suspended_channels);
895			}
896		} else if !suspended && fp.ready_pages >= suspend_threshold {
897			tracing::warn!(target: LOG_TARGET, sibling=?para, "XCMP queue for sibling is full; suspending channel.");
898
899			if let Err(err) = Self::send_signal(para, ChannelSignal::Suspend) {
900				// It will retry if `drop_threshold` is not reached, but it could be too late.
901				tracing::error!(
902					target: LOG_TARGET, error=?err,
903					"defensive: Could not send suspension signal; future messages may be dropped."
904				);
905			} else if let Err(err) = suspended_channels.try_insert(para) {
906				tracing::error!(
907					target: LOG_TARGET,
908					error=?err,
909					sibling=?para,
910					"Too many channels suspended; cannot suspend sibling; further messages may be dropped."
911				);
912			} else {
913				<InboundXcmpSuspended<T>>::put(suspended_channels);
914			}
915		}
916	}
917}
918
919impl<T: Config> QueuePausedQuery<ParaId> for Pallet<T> {
920	fn is_paused(para: &ParaId) -> bool {
921		if !QueueSuspended::<T>::get() {
922			return false;
923		}
924
925		// Make an exception for the superuser queue:
926		let sender_origin = T::ControllerOriginConverter::convert_origin(
927			(Parent, Parachain((*para).into())),
928			OriginKind::Superuser,
929		);
930		let is_controller =
931			sender_origin.map_or(false, |origin| T::ControllerOrigin::try_origin(origin).is_ok());
932
933		!is_controller
934	}
935}
936
937/// The encoding of the XCM messages in an XCMP page.
938#[derive(Copy, Clone)]
939enum XcmEncoding {
940	/// Simple encoded (`xcm.encode()`)
941	///
942	/// When we receive this king of messages, we have to decode and then re-encoded them before
943	/// enqueueing them for later processing.
944	Simple,
945	/// Double encoded (`xcm.encode().encode()`)
946	///
947	/// The XCM message is encoded first, resulting a vector of bytes. And then the vector of bytes
948	/// is encoded again. This has 2 advantages:
949	/// 1. We can just decode them before enqueueing them for later processing. They don't need to
950	///    be re-encoded.
951	/// 2. Decoding a `Vec<u8>` is much more efficient than decoding XCM messages.
952	Double,
953}
954
955impl<T: Config> XcmpMessageHandler for Pallet<T> {
956	fn handle_xcmp_messages<'a, I: Iterator<Item = (ParaId, RelayBlockNumber, &'a [u8])>>(
957		iter: I,
958		max_weight: Weight,
959	) -> Weight {
960		let mut meter = WeightMeter::with_limit(max_weight);
961
962		let mut known_xcm_senders = BTreeSet::new();
963		for (sender, _sent_at, mut data) in iter {
964			let format = match XcmpMessageFormat::decode(&mut data) {
965				Ok(f) => f,
966				Err(_) => {
967					defensive!("Unknown XCMP message format - dropping");
968					continue;
969				},
970			};
971
972			match format {
973				XcmpMessageFormat::Signals => {
974					let mut signal_count = 0;
975					while !data.is_empty() && signal_count < MAX_SIGNALS_PER_PAGE {
976						signal_count += 1;
977						match ChannelSignal::decode(&mut data) {
978							Ok(ChannelSignal::Suspend) => {
979								if meter.try_consume(T::WeightInfo::suspend_channel()).is_err() {
980									defensive!(
981										"Not enough weight to process suspend signal - dropping"
982									);
983									break;
984								}
985								Self::suspend_channel(sender)
986							},
987							Ok(ChannelSignal::Resume) => {
988								if meter.try_consume(T::WeightInfo::resume_channel()).is_err() {
989									defensive!(
990										"Not enough weight to process resume signal - dropping"
991									);
992									break;
993								}
994								Self::resume_channel(sender)
995							},
996							Err(_) => {
997								defensive!("Undecodable channel signal - dropping");
998								break;
999							},
1000						}
1001					}
1002				},
1003				XcmpMessageFormat::ConcatenatedVersionedXcm |
1004				XcmpMessageFormat::ConcatenatedOpaqueVersionedXcm => {
1005					let encoding = match format {
1006						XcmpMessageFormat::ConcatenatedVersionedXcm => XcmEncoding::Simple,
1007						XcmpMessageFormat::ConcatenatedOpaqueVersionedXcm => {
1008							let mut all_channels = <OutboundXcmpStatus<T>>::get();
1009							if let Some(channel_details) =
1010								Self::try_get_or_insert_outbound_channel(&mut all_channels, sender)
1011							{
1012								channel_details
1013									.flags
1014									.notice_concatenated_opaque_versioned_xcm_support();
1015							}
1016							<OutboundXcmpStatus<T>>::put(all_channels);
1017
1018							XcmEncoding::Double
1019						},
1020						_ => {
1021							// This branch is unreachable.
1022							continue;
1023						},
1024					};
1025
1026					let mut is_first_sender_batch = known_xcm_senders.insert(sender);
1027					if is_first_sender_batch {
1028						if meter
1029							.try_consume(T::WeightInfo::uncached_enqueue_xcmp_messages())
1030							.is_err()
1031						{
1032							defensive!(
1033								"Out of weight: cannot enqueue XCMP messages; dropping page; \
1034                                    Used weight: ",
1035								meter.consumed_ratio()
1036							);
1037							continue;
1038						}
1039					}
1040
1041					let mut can_process_next_batch = true;
1042					while can_process_next_batch {
1043						let batch = match Self::take_first_concatenated_xcms(
1044							&mut data,
1045							encoding,
1046							XCM_BATCH_SIZE,
1047							&mut meter,
1048						) {
1049							Ok(batch) => batch,
1050							Err(batch) => {
1051								can_process_next_batch = false;
1052								defensive!(
1053									"HRMP inbound decode stream broke; page will be dropped."
1054								);
1055								batch
1056							},
1057						};
1058						if batch.is_empty() {
1059							break;
1060						}
1061
1062						if let Err(()) = Self::enqueue_xcmp_messages(
1063							sender,
1064							&batch,
1065							is_first_sender_batch,
1066							&mut meter,
1067						) {
1068							break;
1069						}
1070						is_first_sender_batch = false;
1071					}
1072				},
1073				XcmpMessageFormat::ConcatenatedEncodedBlob => {
1074					defensive!("Blob messages are unhandled - dropping");
1075					continue;
1076				},
1077			}
1078		}
1079
1080		meter.consumed()
1081	}
1082}
1083
1084impl<T: Config> XcmpMessageSource for Pallet<T> {
1085	fn take_outbound_messages(
1086		maximum_channels: usize,
1087		excluded_recipients: &[ParaId],
1088	) -> Vec<(ParaId, Vec<u8>)> {
1089		let mut statuses = <OutboundXcmpStatus<T>>::get().into_inner();
1090		let old_statuses_len = statuses.len();
1091		let max_message_count = statuses.len().min(maximum_channels);
1092		let mut result = Vec::with_capacity(max_message_count);
1093
1094		statuses.retain_mut(|status| {
1095			let OutboundChannelDetails {
1096				recipient: para_id,
1097				state: outbound_state,
1098				signals_exist,
1099				first_index,
1100				last_index,
1101				flags,
1102			} = status;
1103
1104			let (max_size_now, max_size_ever) = match T::ChannelInfo::get_channel_status(*para_id) {
1105				ChannelStatus::Closed => {
1106					// This means that there is no such channel anymore. Nothing to be done but
1107					// swallow the messages and discard the status.
1108					for i in *first_index..*last_index {
1109						<OutboundXcmpMessages<T>>::remove(*para_id, i);
1110					}
1111					if *signals_exist {
1112						<SignalMessages<T>>::remove(*para_id);
1113					}
1114					return false;
1115				},
1116				ChannelStatus::Full => return true,
1117				ChannelStatus::Ready(max_size_now, max_size_ever) => (max_size_now, max_size_ever),
1118			};
1119
1120			// Check if we should omit the recipient.
1121			if excluded_recipients.contains(para_id) {
1122				return true;
1123			}
1124
1125			// This is a hard limit from the host config; not even signals can bypass it.
1126			if result.len() == max_message_count {
1127				// We check this condition in the beginning of the loop so that we don't include
1128				// a message where the limit is 0.
1129				return true;
1130			}
1131
1132			let page = 'page_fetch: {
1133				if *signals_exist {
1134					let page = <SignalMessages<T>>::get(*para_id);
1135					defensive_assert!(!page.is_empty(), "Signals must exist");
1136
1137					if page.len() < max_size_now {
1138						<SignalMessages<T>>::remove(*para_id);
1139						*signals_exist = false;
1140						break 'page_fetch page;
1141					}
1142
1143					defensive!("Signals should fit into a single page");
1144					return true;
1145				}
1146
1147				if *outbound_state == OutboundState::Suspended {
1148					// Only signals are exempt from suspension.
1149					return true;
1150				}
1151
1152				if last_index > first_index {
1153					let page = <OutboundXcmpMessages<T>>::get(*para_id, *first_index);
1154					if page.len() < max_size_now {
1155						<OutboundXcmpMessages<T>>::remove(*para_id, *first_index);
1156						*first_index += 1;
1157						break 'page_fetch page;
1158					}
1159				}
1160
1161				// Send a notification to the recipient advertising that we support
1162				// `XcmpMessageFormat::ConcatenatedOpaqueVersionedXcm` if needed.
1163				// We do this only once during the entire lifetime of the channel.
1164				if flags.should_send_concatenated_opaque_versioned_xcm_notification() {
1165					match WeakBoundedVec::try_from(XcmpMessageFormat::ConcatenatedOpaqueVersionedXcm.encode()) {
1166						Ok(page) => {
1167							flags.notice_concatenated_opaque_versioned_xcm_notification_sent();
1168							break 'page_fetch page;
1169						},
1170						Err(_) => {
1171							defensive!("XcmpMessageFormat should fit into a single page");
1172							return true;
1173						}
1174					};
1175				}
1176
1177				return true;
1178			};
1179
1180			if first_index == last_index {
1181				*first_index = 0;
1182				*last_index = 0;
1183			}
1184
1185			if page.len() > max_size_ever {
1186				// TODO: #274 This means that the channel's max message size has changed since
1187				//   the message was sent. We should parse it and split into smaller messages but
1188				//   since it's so unlikely then for now we just drop it.
1189				defensive!("WARNING: oversize message in queue - dropping");
1190			} else {
1191				result.push((*para_id, page.into_inner()));
1192			}
1193
1194			let max_total_size = match T::ChannelInfo::get_channel_info(*para_id) {
1195				Some(channel_info) => channel_info.max_total_size,
1196				None => {
1197					tracing::warn!(target: LOG_TARGET, "calling `get_channel_info` with no RelevantMessagingState?!");
1198					MAX_POSSIBLE_ALLOCATION // We use this as a fallback in case the messaging state is not present
1199				},
1200			};
1201			let threshold = max_total_size.saturating_div(delivery_fee_constants::THRESHOLD_FACTOR);
1202			let remaining_total_size: usize = (*first_index..*last_index)
1203				.map(|index| OutboundXcmpMessages::<T>::decode_len(*para_id, index).unwrap())
1204				.sum();
1205			if remaining_total_size <= threshold as usize {
1206				Self::decrease_fee_factor(*para_id);
1207			}
1208
1209			true
1210		});
1211		debug_assert!(!statuses.iter().any(|s| s.signals_exist), "Signals should be handled");
1212		let mut statuses = BoundedVec::defensive_truncate_from(statuses);
1213
1214		// Sort the outbound messages by ascending recipient para id to satisfy the acceptance
1215		// criteria requirement.
1216		result.sort_by_key(|(recipient, _msg)| *recipient);
1217
1218		// old_status_len must be >= status.len() since we never add anything to status.
1219		let pruned = old_statuses_len - statuses.len();
1220		// Because it may so happen that we only gave attention to some channels it's important
1221		// to change the order. Otherwise, the next `on_finalize` we will again give attention
1222		// only to those channels that happen to be in the beginning, until they are emptied.
1223		// This leads to "starvation" of the channels near to the end.
1224		let _ = statuses.try_rotate_left(result.len().saturating_sub(pruned)).defensive_proof(
1225			"Could not store HRMP channels config. Some HRMP channels may be broken.",
1226		);
1227
1228		<OutboundXcmpStatus<T>>::put(statuses);
1229
1230		result
1231	}
1232}
1233
1234/// Xcm sender for sending to a sibling parachain.
1235impl<T: Config> SendXcm for Pallet<T> {
1236	type Ticket = (ParaId, VersionedXcm<()>);
1237
1238	fn validate(
1239		dest: &mut Option<Location>,
1240		msg: &mut Option<Xcm<()>>,
1241	) -> SendResult<(ParaId, VersionedXcm<()>)> {
1242		let d = dest.take().ok_or(SendError::MissingArgument)?;
1243
1244		match d.unpack() {
1245			// An HRMP message for a sibling parachain.
1246			(1, [Parachain(id)]) => {
1247				let xcm = msg.take().ok_or(SendError::MissingArgument)?;
1248				let id = ParaId::from(*id);
1249				let price = T::PriceForSiblingDelivery::price_for_delivery(id, &xcm);
1250				let versioned_xcm = T::VersionWrapper::wrap_version(&d, xcm)
1251					.map_err(|()| SendError::DestinationUnsupported)?;
1252				versioned_xcm
1253					.check_is_decodable()
1254					.map_err(|()| SendError::ExceedsMaxMessageSize)?;
1255
1256				Ok(((id, versioned_xcm), price))
1257			},
1258			_ => {
1259				// Anything else is unhandled. This includes a message that is not meant for us.
1260				// We need to make sure that dest/msg is not consumed here.
1261				*dest = Some(d);
1262				Err(SendError::NotApplicable)
1263			},
1264		}
1265	}
1266
1267	fn deliver((recipient, xcm): (ParaId, VersionedXcm<()>)) -> Result<XcmHash, SendError> {
1268		let hash = xcm.using_encoded(sp_io::hashing::blake2_256);
1269
1270		let mut encoding = XcmEncoding::Simple;
1271		let mut all_channels = <OutboundXcmpStatus<T>>::get();
1272		if let Some(channel_details) = Self::try_get_outbound_channel(&mut all_channels, recipient)
1273		{
1274			if channel_details.flags.has_concatenated_opaque_versioned_xcm_support() {
1275				encoding = XcmEncoding::Double;
1276			}
1277		}
1278
1279		let result = match encoding {
1280			XcmEncoding::Simple => {
1281				Self::send_fragment(recipient, XcmpMessageFormat::ConcatenatedVersionedXcm, xcm)
1282			},
1283			XcmEncoding::Double => Self::send_fragment(
1284				recipient,
1285				XcmpMessageFormat::ConcatenatedOpaqueVersionedXcm,
1286				xcm.encode(),
1287			),
1288		};
1289		match result {
1290			Ok(_) => {
1291				Self::deposit_event(Event::XcmpMessageSent { message_hash: hash });
1292				Ok(hash)
1293			},
1294			Err(e) => {
1295				tracing::error!(target: LOG_TARGET, error=?e, "Deliver error");
1296				Err(SendError::Transport(e.into()))
1297			},
1298		}
1299	}
1300}
1301
1302impl<T: Config> InspectMessageQueues for Pallet<T> {
1303	fn clear_messages() {
1304		// Best effort.
1305		let _ = OutboundXcmpMessages::<T>::clear(u32::MAX, None);
1306		OutboundXcmpStatus::<T>::mutate(|details_vec| {
1307			for details in details_vec {
1308				details.first_index = 0;
1309				details.last_index = 0;
1310			}
1311		});
1312	}
1313
1314	fn get_messages() -> Vec<(VersionedLocation, Vec<VersionedXcm<()>>)> {
1315		use xcm::prelude::*;
1316
1317		OutboundXcmpMessages::<T>::iter()
1318			.map(|(para_id, _, messages)| {
1319				let data = &mut &messages[..];
1320
1321				let decoded_format = XcmpMessageFormat::decode(data).unwrap();
1322				let mut decoded_messages = Vec::new();
1323				while !data.is_empty() {
1324					let message_bytes = match decoded_format {
1325						XcmpMessageFormat::ConcatenatedVersionedXcm => {
1326							Self::take_first_concatenated_xcm(data, &mut WeightMeter::new())
1327						},
1328						XcmpMessageFormat::ConcatenatedOpaqueVersionedXcm => {
1329							Self::take_first_concatenated_opaque_xcm(data)
1330						},
1331						unexpected_format => {
1332							panic!("Unexpected XCMP format: {unexpected_format:?}!")
1333						},
1334					}
1335					.unwrap();
1336					let decoded_message = VersionedXcm::<()>::decode_with_depth_limit(
1337						MAX_XCM_DECODE_DEPTH,
1338						&mut &message_bytes[..],
1339					)
1340					.unwrap();
1341					decoded_messages.push(decoded_message);
1342				}
1343
1344				(
1345					VersionedLocation::from(Location::new(1, Parachain(para_id.into()))),
1346					decoded_messages,
1347				)
1348			})
1349			.collect()
1350	}
1351}
1352
1353impl<T: Config> FeeTracker for Pallet<T> {
1354	type Id = ParaId;
1355
1356	fn get_fee_factor(id: Self::Id) -> FixedU128 {
1357		<DeliveryFeeFactor<T>>::get(id)
1358	}
1359
1360	fn set_fee_factor(id: Self::Id, val: FixedU128) {
1361		<DeliveryFeeFactor<T>>::set(id, val);
1362	}
1363}