referrerpolicy=no-referrer-when-downgrade

cumulus_pallet_xcmp_queue/
lib.rs

1// Copyright (C) Parity Technologies (UK) Ltd.
2// This file is part of Cumulus.
3// SPDX-License-Identifier: Apache-2.0
4
5// Licensed under the Apache License, Version 2.0 (the "License");
6// you may not use this file except in compliance with the License.
7// You may obtain a copy of the License at
8//
9// 	http://www.apache.org/licenses/LICENSE-2.0
10//
11// Unless required by applicable law or agreed to in writing, software
12// distributed under the License is distributed on an "AS IS" BASIS,
13// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14// See the License for the specific language governing permissions and
15// limitations under the License.
16
17//! A pallet which uses the XCMP transport layer to handle both incoming and outgoing XCM message
18//! sending and dispatch, queuing, signalling and backpressure. To do so, it implements:
19//! * `XcmpMessageHandler`
20//! * `XcmpMessageSource`
21//!
22//! Also provides an implementation of `SendXcm` which can be placed in a router tuple for relaying
23//! XCM over XCMP if the destination is `Parent/Parachain`. It requires an implementation of
24//! `XcmExecutor` for dispatching incoming XCM messages.
25//!
26//! To prevent out of memory errors on the `OutboundXcmpMessages` queue, an exponential fee factor
27//! (`DeliveryFeeFactor`) is set, much like the one used in DMP.
28//! The fee factor increases whenever the total size of messages in a particular channel passes a
29//! threshold. This threshold is defined as a percentage of the maximum total size the channel can
30//! have. More concretely, the threshold is `max_total_size` / `THRESHOLD_FACTOR`, where:
31//! - `max_total_size` is the maximum size, in bytes, of the channel, not number of messages.
32//! It is defined in the channel configuration.
33//! - `THRESHOLD_FACTOR` just declares which percentage of the max size is the actual threshold.
34//! If it's 2, then the threshold is half of the max size, if it's 4, it's a quarter, and so on.
35
36#![cfg_attr(not(feature = "std"), no_std)]
37
38pub mod migration;
39
40#[cfg(test)]
41mod mock;
42
43#[cfg(test)]
44mod tests;
45
46#[cfg(feature = "runtime-benchmarks")]
47mod benchmarking;
48#[cfg(feature = "bridging")]
49pub mod bridging;
50pub mod weights;
51pub mod weights_ext;
52
53pub use weights::WeightInfo;
54pub use weights_ext::WeightInfoExt;
55
56extern crate alloc;
57
58use alloc::{collections::BTreeSet, vec, vec::Vec};
59use bitflags::bitflags;
60use bounded_collections::{BoundedBTreeSet, BoundedSlice, BoundedVec};
61use codec::{Compact, Decode, DecodeLimit, Encode, MaxEncodedLen};
62use cumulus_primitives_core::{
63	relay_chain::BlockNumber as RelayBlockNumber, ChannelStatus, GetChannelInfo, MessageSendError,
64	ParaId, XcmpMessageFormat, XcmpMessageHandler, XcmpMessageSource,
65};
66
67use frame_support::{
68	defensive, defensive_assert,
69	traits::{
70		Defensive, DefensiveTruncateFrom, EnqueueMessage, EnsureOrigin, Get, Len, QueueFootprint,
71		QueueFootprintQuery, QueuePausedQuery,
72	},
73	weights::{Weight, WeightMeter},
74};
75use pallet_message_queue::OnQueueChanged;
76use polkadot_runtime_common::xcm_sender::PriceForMessageDelivery;
77use polkadot_runtime_parachains::{FeeTracker, GetMinFeeFactor};
78use scale_info::TypeInfo;
79use sp_core::MAX_POSSIBLE_ALLOCATION;
80use sp_runtime::{FixedU128, SaturatedConversion, WeakBoundedVec};
81use xcm::{latest::prelude::*, VersionedLocation, VersionedXcm, WrapVersion, MAX_XCM_DECODE_DEPTH};
82use xcm_builder::InspectMessageQueues;
83use xcm_executor::traits::ConvertOrigin;
84
85pub use pallet::*;
86
87/// Index used to identify overweight XCMs.
88pub type OverweightIndex = u64;
89/// The max length of an XCMP message.
90pub type MaxXcmpMessageLenOf<T> =
91	<<T as Config>::XcmpQueue as EnqueueMessage<ParaId>>::MaxMessageLen;
92
93const LOG_TARGET: &str = "xcmp_queue";
94const DEFAULT_POV_SIZE: u64 = 64 * 1024; // 64 KB
95/// The size of an XCM messages batch.
96pub const XCM_BATCH_SIZE: usize = 250;
97/// The maximum number of signals that we can have in an XCMP page.
98pub const MAX_SIGNALS_PER_PAGE: usize = 3;
99
100/// Constants related to delivery fee calculation
101pub mod delivery_fee_constants {
102	/// Fees will start increasing when queue is half full
103	pub const THRESHOLD_FACTOR: u32 = 2;
104}
105
106#[frame_support::pallet]
107pub mod pallet {
108	use super::*;
109	use frame_support::{pallet_prelude::*, Twox64Concat};
110	use frame_system::pallet_prelude::*;
111
112	#[pallet::pallet]
113	#[pallet::storage_version(migration::STORAGE_VERSION)]
114	pub struct Pallet<T>(_);
115
116	#[pallet::config]
117	pub trait Config: frame_system::Config {
118		#[allow(deprecated)]
119		type RuntimeEvent: From<Event<Self>> + IsType<<Self as frame_system::Config>::RuntimeEvent>;
120
121		/// Information on the available XCMP channels.
122		type ChannelInfo: GetChannelInfo;
123
124		/// Means of converting an `Xcm` into a `VersionedXcm`.
125		type VersionWrapper: WrapVersion;
126
127		/// Enqueue an inbound horizontal message for later processing.
128		///
129		/// This defines the maximal message length via [`crate::MaxXcmpMessageLenOf`]. The pallet
130		/// assumes that this hook will eventually process all the pushed messages.
131		type XcmpQueue: EnqueueMessage<ParaId>
132			+ QueueFootprintQuery<ParaId, MaxMessageLen = MaxXcmpMessageLenOf<Self>>;
133
134		/// The maximum number of inbound XCMP channels that can be suspended simultaneously.
135		///
136		/// Any further channel suspensions will fail and messages may get dropped without further
137		/// notice. Choosing a high value (1000) is okay; the trade-off that is described in
138		/// [`InboundXcmpSuspended`] still applies at that scale.
139		#[pallet::constant]
140		type MaxInboundSuspended: Get<u32>;
141
142		/// Maximal number of outbound XCMP channels that can have messages queued at the same time.
143		///
144		/// If this is reached, then no further messages can be sent to channels that do not yet
145		/// have a message queued. This should be set to the expected maximum of outbound channels
146		/// which is determined by [`Self::ChannelInfo`]. It is important to set this large enough,
147		/// since otherwise the congestion control protocol will not work as intended and messages
148		/// may be dropped. This value increases the PoV and should therefore not be picked too
149		/// high. Governance needs to pay attention to not open more channels than this value.
150		#[pallet::constant]
151		type MaxActiveOutboundChannels: Get<u32>;
152
153		/// The maximal page size for HRMP message pages.
154		///
155		/// A lower limit can be set dynamically, but this is the hard-limit for the PoV worst case
156		/// benchmarking. The limit for the size of a message is slightly below this, since some
157		/// overhead is incurred for encoding the format.
158		#[pallet::constant]
159		type MaxPageSize: Get<u32>;
160
161		/// The origin that is allowed to resume or suspend the XCMP queue.
162		type ControllerOrigin: EnsureOrigin<Self::RuntimeOrigin>;
163
164		/// The conversion function used to attempt to convert an XCM `Location` origin to a
165		/// superuser origin.
166		type ControllerOriginConverter: ConvertOrigin<Self::RuntimeOrigin>;
167
168		/// The price for delivering an XCM to a sibling parachain destination.
169		type PriceForSiblingDelivery: PriceForMessageDelivery<Id = ParaId>;
170
171		/// The weight information of this pallet.
172		type WeightInfo: WeightInfoExt;
173	}
174
175	#[pallet::call]
176	impl<T: Config> Pallet<T> {
177		/// Suspends all XCM executions for the XCMP queue, regardless of the sender's origin.
178		///
179		/// - `origin`: Must pass `ControllerOrigin`.
180		#[pallet::call_index(1)]
181		#[pallet::weight((T::DbWeight::get().writes(1), DispatchClass::Operational,))]
182		pub fn suspend_xcm_execution(origin: OriginFor<T>) -> DispatchResult {
183			T::ControllerOrigin::ensure_origin(origin)?;
184
185			QueueSuspended::<T>::try_mutate(|suspended| {
186				if *suspended {
187					Err(Error::<T>::AlreadySuspended.into())
188				} else {
189					*suspended = true;
190					Ok(())
191				}
192			})
193		}
194
195		/// Resumes all XCM executions for the XCMP queue.
196		///
197		/// Note that this function doesn't change the status of the in/out bound channels.
198		///
199		/// - `origin`: Must pass `ControllerOrigin`.
200		#[pallet::call_index(2)]
201		#[pallet::weight((T::DbWeight::get().writes(1), DispatchClass::Operational,))]
202		pub fn resume_xcm_execution(origin: OriginFor<T>) -> DispatchResult {
203			T::ControllerOrigin::ensure_origin(origin)?;
204
205			QueueSuspended::<T>::try_mutate(|suspended| {
206				if !*suspended {
207					Err(Error::<T>::AlreadyResumed.into())
208				} else {
209					*suspended = false;
210					Ok(())
211				}
212			})
213		}
214
215		/// Overwrites the number of pages which must be in the queue for the other side to be
216		/// told to suspend their sending.
217		///
218		/// - `origin`: Must pass `Root`.
219		/// - `new`: Desired value for `QueueConfigData.suspend_value`
220		#[pallet::call_index(3)]
221		#[pallet::weight((T::WeightInfo::set_config_with_u32(), DispatchClass::Operational,))]
222		pub fn update_suspend_threshold(origin: OriginFor<T>, new: u32) -> DispatchResult {
223			ensure_root(origin)?;
224
225			QueueConfig::<T>::try_mutate(|data| {
226				data.suspend_threshold = new;
227				data.validate::<T>()
228			})
229		}
230
231		/// Overwrites the number of pages which must be in the queue after which we drop any
232		/// further messages from the channel.
233		///
234		/// - `origin`: Must pass `Root`.
235		/// - `new`: Desired value for `QueueConfigData.drop_threshold`
236		#[pallet::call_index(4)]
237		#[pallet::weight((T::WeightInfo::set_config_with_u32(),DispatchClass::Operational,))]
238		pub fn update_drop_threshold(origin: OriginFor<T>, new: u32) -> DispatchResult {
239			ensure_root(origin)?;
240
241			QueueConfig::<T>::try_mutate(|data| {
242				data.drop_threshold = new;
243				data.validate::<T>()
244			})
245		}
246
247		/// Overwrites the number of pages which the queue must be reduced to before it signals
248		/// that message sending may recommence after it has been suspended.
249		///
250		/// - `origin`: Must pass `Root`.
251		/// - `new`: Desired value for `QueueConfigData.resume_threshold`
252		#[pallet::call_index(5)]
253		#[pallet::weight((T::WeightInfo::set_config_with_u32(), DispatchClass::Operational,))]
254		pub fn update_resume_threshold(origin: OriginFor<T>, new: u32) -> DispatchResult {
255			ensure_root(origin)?;
256
257			QueueConfig::<T>::try_mutate(|data| {
258				data.resume_threshold = new;
259				data.validate::<T>()
260			})
261		}
262	}
263
264	#[pallet::hooks]
265	impl<T: Config> Hooks<BlockNumberFor<T>> for Pallet<T> {
266		fn integrity_test() {
267			assert!(!T::MaxPageSize::get().is_zero(), "MaxPageSize too low");
268
269			let w = Self::on_idle_weight();
270			assert!(w != Weight::zero());
271			assert!(w.all_lte(T::BlockWeights::get().max_block));
272
273			<T::WeightInfo as WeightInfoExt>::check_accuracy::<MaxXcmpMessageLenOf<T>>(0.15);
274		}
275
276		fn on_idle(_block: BlockNumberFor<T>, limit: Weight) -> Weight {
277			let mut meter = WeightMeter::with_limit(limit);
278
279			if meter.try_consume(Self::on_idle_weight()).is_err() {
280				tracing::debug!(
281					target: LOG_TARGET,
282					"Not enough weight for on_idle. {} < {}",
283					Self::on_idle_weight(), limit
284				);
285				return meter.consumed();
286			}
287
288			migration::v3::lazy_migrate_inbound_queue::<T>();
289
290			meter.consumed()
291		}
292	}
293
294	#[pallet::event]
295	#[pallet::generate_deposit(pub(super) fn deposit_event)]
296	pub enum Event<T: Config> {
297		/// An HRMP message was sent to a sibling parachain.
298		XcmpMessageSent { message_hash: XcmHash },
299	}
300
301	#[pallet::error]
302	pub enum Error<T> {
303		/// Setting the queue config failed since one of its values was invalid.
304		BadQueueConfig,
305		/// The execution is already suspended.
306		AlreadySuspended,
307		/// The execution is already resumed.
308		AlreadyResumed,
309		/// There are too many active outbound channels.
310		TooManyActiveOutboundChannels,
311		/// The message is too big.
312		TooBig,
313	}
314
315	/// The suspended inbound XCMP channels. All others are not suspended.
316	///
317	/// This is a `StorageValue` instead of a `StorageMap` since we expect multiple reads per block
318	/// to different keys with a one byte payload. The access to `BoundedBTreeSet` will be cached
319	/// within the block and therefore only included once in the proof size.
320	///
321	/// NOTE: The PoV benchmarking cannot know this and will over-estimate, but the actual proof
322	/// will be smaller.
323	#[pallet::storage]
324	pub type InboundXcmpSuspended<T: Config> =
325		StorageValue<_, BoundedBTreeSet<ParaId, T::MaxInboundSuspended>, ValueQuery>;
326
327	/// The non-empty XCMP channels in order of becoming non-empty, and the index of the first
328	/// and last outbound message. If the two indices are equal, then it indicates an empty
329	/// queue and there must be a non-`Ok` `OutboundStatus`. We assume queues grow no greater
330	/// than 65535 items. Queue indices for normal messages begin at one; zero is reserved in
331	/// case of the need to send a high-priority signal message this block.
332	/// The bool is true if there is a signal message waiting to be sent.
333	#[pallet::storage]
334	pub(super) type OutboundXcmpStatus<T: Config> = StorageValue<
335		_,
336		BoundedVec<OutboundChannelDetails, T::MaxActiveOutboundChannels>,
337		ValueQuery,
338	>;
339
340	/// The messages outbound in a given XCMP channel.
341	#[pallet::storage]
342	pub(super) type OutboundXcmpMessages<T: Config> = StorageDoubleMap<
343		_,
344		Blake2_128Concat,
345		ParaId,
346		Twox64Concat,
347		u16,
348		WeakBoundedVec<u8, T::MaxPageSize>,
349		ValueQuery,
350	>;
351
352	/// Any signal messages waiting to be sent.
353	#[pallet::storage]
354	pub(super) type SignalMessages<T: Config> =
355		StorageMap<_, Blake2_128Concat, ParaId, WeakBoundedVec<u8, T::MaxPageSize>, ValueQuery>;
356
357	/// The configuration which controls the dynamics of the outbound queue.
358	#[pallet::storage]
359	pub(super) type QueueConfig<T: Config> = StorageValue<_, QueueConfigData, ValueQuery>;
360
361	/// Whether or not the XCMP queue is suspended from executing incoming XCMs or not.
362	#[pallet::storage]
363	pub(super) type QueueSuspended<T: Config> = StorageValue<_, bool, ValueQuery>;
364
365	/// The factor to multiply the base delivery fee by.
366	#[pallet::storage]
367	pub(super) type DeliveryFeeFactor<T: Config> =
368		StorageMap<_, Twox64Concat, ParaId, FixedU128, ValueQuery, GetMinFeeFactor<Pallet<T>>>;
369}
370
371#[derive(Copy, Clone, Eq, PartialEq, Encode, Decode, Debug, TypeInfo, MaxEncodedLen)]
372pub enum OutboundState {
373	Ok,
374	Suspended,
375}
376
377bitflags! {
378	#[derive(Encode, Decode, TypeInfo, MaxEncodedLen)]
379	struct OutboundChannelFlags: u32 {
380		const CONCATENATED_OPAQUE_VERSIONED_XCM_SUPPORT = 1;
381		const CONCATENATED_OPAQUE_VERSIONED_XCM_NOTIFICATION_SENT = 1 << 1;
382	}
383}
384
385impl OutboundChannelFlags {
386	// Check whether the recipient supports `ConcatenatedOpaqueVersionedXcm`.
387	fn has_concatenated_opaque_versioned_xcm_support(&self) -> bool {
388		*self & Self::CONCATENATED_OPAQUE_VERSIONED_XCM_SUPPORT != Self::empty()
389	}
390
391	// Check whether we should send a notification to the recipient, advertising that we support
392	// `ConcatenatedOpaqueVersionedXcm`.
393	fn should_send_concatenated_opaque_versioned_xcm_notification(&self) -> bool {
394		if self.has_concatenated_opaque_versioned_xcm_support() {
395			return false;
396		}
397
398		if *self & Self::CONCATENATED_OPAQUE_VERSIONED_XCM_NOTIFICATION_SENT != Self::empty() {
399			return false;
400		}
401
402		true
403	}
404
405	// Remember that the recipient supports `ConcatenatedOpaqueVersionedXcm`.
406	fn notice_concatenated_opaque_versioned_xcm_support(&mut self) {
407		*self = *self | Self::CONCATENATED_OPAQUE_VERSIONED_XCM_SUPPORT;
408	}
409
410	// Remember that we advertised the `ConcatenatedOpaqueVersionedXcm` support to the recipient.
411	fn notice_concatenated_opaque_versioned_xcm_notification_sent(&mut self) {
412		*self = *self | Self::CONCATENATED_OPAQUE_VERSIONED_XCM_NOTIFICATION_SENT;
413	}
414}
415
416/// Struct containing detailed information about the outbound channel.
417#[derive(Clone, Eq, PartialEq, Encode, Decode, TypeInfo, Debug, MaxEncodedLen)]
418pub struct OutboundChannelDetails {
419	/// The `ParaId` of the parachain that this channel is connected with.
420	recipient: ParaId,
421	/// The state of the channel.
422	state: OutboundState,
423	/// Whether any signals exist in this channel.
424	signals_exist: bool,
425	/// The index of the first outbound message.
426	first_index: u16,
427	/// The index of the last outbound message.
428	last_index: u16,
429	/// Flags
430	flags: OutboundChannelFlags,
431	/// Cached total byte size of the pages currently queued in this channel.
432	queued_bytes: u32,
433}
434
435impl OutboundChannelDetails {
436	pub fn new(recipient: ParaId) -> OutboundChannelDetails {
437		OutboundChannelDetails {
438			recipient,
439			state: OutboundState::Ok,
440			signals_exist: false,
441			first_index: 0,
442			last_index: 0,
443			flags: OutboundChannelFlags::empty(),
444			queued_bytes: 0,
445		}
446	}
447
448	pub fn with_signals(mut self) -> OutboundChannelDetails {
449		self.signals_exist = true;
450		self
451	}
452
453	pub fn with_suspended_state(mut self) -> OutboundChannelDetails {
454		self.state = OutboundState::Suspended;
455		self
456	}
457}
458
459#[derive(Copy, Clone, Eq, PartialEq, Encode, Decode, Debug, TypeInfo, MaxEncodedLen)]
460pub struct QueueConfigData {
461	/// The number of pages which must be in the queue for the other side to be told to suspend
462	/// their sending.
463	suspend_threshold: u32,
464	/// The number of pages which must be in the queue after which we drop any further messages
465	/// from the channel. This should normally not happen since the `suspend_threshold` can be used
466	/// to suspend the channel.
467	drop_threshold: u32,
468	/// The number of pages which the queue must be reduced to before it signals that
469	/// message sending may recommence after it has been suspended.
470	resume_threshold: u32,
471}
472
473impl Default for QueueConfigData {
474	fn default() -> Self {
475		// NOTE that these default values are only used on genesis. They should give a rough idea of
476		// what to set these values to, but is in no way a requirement.
477		Self {
478			drop_threshold: 48,    // 64KiB * 48 = 3MiB
479			suspend_threshold: 32, // 64KiB * 32 = 2MiB
480			resume_threshold: 8,   // 64KiB * 8 = 512KiB
481		}
482	}
483}
484
485impl QueueConfigData {
486	/// Validate all assumptions about `Self`.
487	///
488	/// Should be called prior to accepting this as new config.
489	pub fn validate<T: crate::Config>(&self) -> sp_runtime::DispatchResult {
490		if self.resume_threshold < self.suspend_threshold &&
491			self.suspend_threshold <= self.drop_threshold &&
492			self.resume_threshold > 0
493		{
494			Ok(())
495		} else {
496			Err(Error::<T>::BadQueueConfig.into())
497		}
498	}
499}
500
501#[derive(PartialEq, Eq, Copy, Clone, Encode, Decode, TypeInfo)]
502pub enum ChannelSignal {
503	Suspend,
504	Resume,
505}
506
507impl<T: Config> Pallet<T> {
508	fn try_get_outbound_channel(
509		all_channels: &BoundedVec<OutboundChannelDetails, T::MaxActiveOutboundChannels>,
510		recipient: ParaId,
511	) -> Option<&OutboundChannelDetails> {
512		for channel_idx in 0..all_channels.len() {
513			if all_channels[channel_idx].recipient == recipient {
514				return Some(&all_channels[channel_idx]);
515			}
516		}
517
518		None
519	}
520
521	fn try_get_or_insert_outbound_channel(
522		all_channels: &mut BoundedVec<OutboundChannelDetails, T::MaxActiveOutboundChannels>,
523		recipient: ParaId,
524	) -> Option<&mut OutboundChannelDetails> {
525		for channel_idx in 0..all_channels.len() {
526			if all_channels[channel_idx].recipient == recipient {
527				return Some(&mut all_channels[channel_idx]);
528			}
529		}
530
531		all_channels
532			.try_push(OutboundChannelDetails::new(recipient))
533			.inspect_err(|e| {
534				tracing::error!(target: LOG_TARGET, error=?e, "Failed to insert outbound HRMP channel");
535			})
536			.ok()?;
537		all_channels.last_mut()
538	}
539
540	/// Place a message `fragment` on the outgoing XCMP queue for `recipient`.
541	///
542	/// Format is the type of aggregate message that the `fragment` may be safely encoded and
543	/// appended onto.
544	///
545	/// ## Background
546	///
547	/// For our purposes, one HRMP "message" is actually an aggregated block of XCM "messages".
548	///
549	/// For the sake of clarity, we distinguish between them as message AGGREGATEs versus
550	/// message FRAGMENTs.
551	///
552	/// So each AGGREGATE is comprised of one or more concatenated SCALE-encoded `Vec<u8>`
553	/// FRAGMENTs. Though each fragment is already probably a SCALE-encoded Xcm, we can't be
554	/// certain, so we SCALE encode each `Vec<u8>` fragment in order to ensure we have the
555	/// length prefixed and can thus decode each fragment from the aggregate stream. With this,
556	/// we can concatenate them into a single aggregate blob without needing to be concerned
557	/// about encoding fragment boundaries.
558	///
559	/// If successful, returns the number of pages in the outbound queue after enqueuing the new
560	/// fragment.
561	fn send_fragment<Fragment: Encode>(
562		recipient: ParaId,
563		format: XcmpMessageFormat,
564		fragment: Fragment,
565	) -> Result<u32, MessageSendError> {
566		let mut encoded_fragment = fragment.encode();
567		let encoded_fragment_len = encoded_fragment.len();
568
569		// Optimization note: `max_message_size` could potentially be stored in
570		// `OutboundXcmpMessages` once known; that way it's only accessed when a new page is needed.
571
572		let channel_info =
573			T::ChannelInfo::get_channel_info(recipient).ok_or(MessageSendError::NoChannel)?;
574		// Max message size refers to aggregates, or pages. Not to individual fragments.
575		let max_message_size = channel_info.max_message_size.min(T::MaxPageSize::get()) as usize;
576		let format_size = format.encoded_size();
577		// We check the encoded fragment length plus the format size against the max message size
578		// because the format is concatenated if a new page is needed.
579		let size_to_check = encoded_fragment
580			.len()
581			.checked_add(format_size)
582			.ok_or(MessageSendError::TooBig)?;
583		if size_to_check > max_message_size {
584			return Err(MessageSendError::TooBig);
585		}
586
587		let mut all_channels = <OutboundXcmpStatus<T>>::get();
588		let channel_details =
589			Self::try_get_or_insert_outbound_channel(&mut all_channels, recipient)
590				.ok_or(MessageSendError::TooManyChannels)?;
591		if let XcmpMessageFormat::ConcatenatedOpaqueVersionedXcm = format {
592			channel_details
593				.flags
594				.notice_concatenated_opaque_versioned_xcm_notification_sent();
595		}
596
597		let mut existing_page = None;
598		'existing_page_check: {
599			if channel_details.last_index > channel_details.first_index {
600				let page =
601					OutboundXcmpMessages::<T>::get(recipient, channel_details.last_index - 1);
602				if XcmpMessageFormat::decode(&mut &page[..]) != Ok(format) {
603					break 'existing_page_check;
604				}
605				if page.len() + encoded_fragment.len() > max_message_size {
606					break 'existing_page_check;
607				}
608				existing_page = Some(page.into_inner());
609			}
610		}
611		let mut current_page = existing_page.unwrap_or_else(|| {
612			// We need to add a new page.
613			channel_details.last_index += 1;
614			let new_page = format.encode();
615			channel_details.queued_bytes =
616				channel_details.queued_bytes.saturating_add(new_page.len() as u32);
617			new_page
618		});
619
620		current_page.append(&mut encoded_fragment);
621		channel_details.queued_bytes =
622			channel_details.queued_bytes.saturating_add(encoded_fragment_len as u32);
623		let current_page = WeakBoundedVec::try_from(current_page).map_err(|error| {
624			tracing::debug!(target: LOG_TARGET, ?error, "Failed to create bounded message page");
625			MessageSendError::TooBig
626		})?;
627		let page_count =
628			channel_details.last_index.saturating_sub(channel_details.first_index) as u32;
629		<OutboundXcmpMessages<T>>::insert(recipient, channel_details.last_index - 1, current_page);
630
631		let threshold = channel_info.max_total_size / delivery_fee_constants::THRESHOLD_FACTOR;
632		if channel_details.queued_bytes > threshold {
633			Self::increase_fee_factor(recipient, encoded_fragment_len as u128);
634		}
635
636		<OutboundXcmpStatus<T>>::put(all_channels);
637
638		Ok(page_count)
639	}
640
641	/// Sends a signal to the `dest` chain over XCMP. This is guaranteed to be dispatched on this
642	/// block.
643	fn send_signal(dest: ParaId, signal: ChannelSignal) -> Result<(), Error<T>> {
644		let mut s = <OutboundXcmpStatus<T>>::get();
645		if let Some(details) = s.iter_mut().find(|item| item.recipient == dest) {
646			details.signals_exist = true;
647		} else {
648			s.try_push(OutboundChannelDetails::new(dest).with_signals()).map_err(|error| {
649				tracing::debug!(target: LOG_TARGET, ?error, "Failed to activate XCMP channel");
650				Error::<T>::TooManyActiveOutboundChannels
651			})?;
652		}
653
654		let page = BoundedVec::<u8, T::MaxPageSize>::try_from(
655			(XcmpMessageFormat::Signals, signal).encode(),
656		)
657		.map_err(|error| {
658			tracing::debug!(target: LOG_TARGET, ?error, "Failed to encode signal message");
659			Error::<T>::TooBig
660		})?;
661		let page = WeakBoundedVec::force_from(page.into_inner(), None);
662
663		<SignalMessages<T>>::insert(dest, page);
664		<OutboundXcmpStatus<T>>::put(s);
665		Ok(())
666	}
667
668	fn suspend_channel(target: ParaId) {
669		<OutboundXcmpStatus<T>>::mutate(|s| {
670			if let Some(details) = s.iter_mut().find(|item| item.recipient == target) {
671				let ok = details.state == OutboundState::Ok;
672				defensive_assert!(ok, "WARNING: Attempt to suspend channel that was not Ok.");
673				details.state = OutboundState::Suspended;
674			} else {
675				if s.try_push(OutboundChannelDetails::new(target).with_suspended_state()).is_err() {
676					defensive!("Cannot pause channel; too many outbound channels");
677				}
678			}
679		});
680	}
681
682	fn resume_channel(target: ParaId) {
683		<OutboundXcmpStatus<T>>::mutate(|s| {
684			if let Some(index) = s.iter().position(|item| item.recipient == target) {
685				let suspended = s[index].state == OutboundState::Suspended;
686				defensive_assert!(
687					suspended,
688					"WARNING: Attempt to resume channel that was not suspended."
689				);
690				if s[index].first_index == s[index].last_index {
691					s.remove(index);
692				} else {
693					s[index].state = OutboundState::Ok;
694				}
695			} else {
696				defensive!("WARNING: Attempt to resume channel that was not suspended.");
697			}
698		});
699	}
700
701	fn enqueue_xcmp_messages<'a>(
702		sender: ParaId,
703		xcms: &[BoundedSlice<'a, u8, MaxXcmpMessageLenOf<T>>],
704		is_first_sender_batch: bool,
705		meter: &mut WeightMeter,
706	) -> Result<(), ()> {
707		let QueueConfigData { drop_threshold, .. } = <QueueConfig<T>>::get();
708		let batches_footprints =
709			T::XcmpQueue::get_batches_footprints(sender, xcms.iter().copied(), drop_threshold);
710
711		let best_batch_footprint = batches_footprints.search_best_by(|batch_info| {
712			let required_weight = T::WeightInfo::enqueue_xcmp_messages(
713				batches_footprints.first_page_pos.saturated_into(),
714				batch_info,
715				is_first_sender_batch,
716			);
717
718			match meter.can_consume(required_weight) {
719				true => core::cmp::Ordering::Less,
720				false => core::cmp::Ordering::Greater,
721			}
722		});
723
724		meter.consume(T::WeightInfo::enqueue_xcmp_messages(
725			batches_footprints.first_page_pos.saturated_into(),
726			best_batch_footprint,
727			is_first_sender_batch,
728		));
729		T::XcmpQueue::enqueue_messages(
730			xcms.iter().take(best_batch_footprint.msgs_count).copied(),
731			sender,
732		);
733
734		if best_batch_footprint.msgs_count < xcms.len() {
735			tracing::error!(
736				target: LOG_TARGET,
737				used_weight=?meter.consumed_ratio(),
738				"Out of weight: cannot enqueue entire XCMP messages batch; \
739				dropped some or all messages in batch."
740			);
741			return Err(());
742		}
743		Ok(())
744	}
745
746	/// Split concatenated encoded `VersionedXcm`s into individual items.
747	///
748	/// We directly encode them again since that is needed later on.
749	///
750	/// On error returns a partial batch with all the XCMs processed before the failure.
751	/// This can happen in case of a decoding/re-encoding failure.
752	pub(crate) fn take_first_concatenated_xcm<'a>(
753		data: &mut &'a [u8],
754		meter: &mut WeightMeter,
755	) -> Result<BoundedSlice<'a, u8, MaxXcmpMessageLenOf<T>>, ()> {
756		// Let's make sure that we can decode at least an empty xcm message.
757		let base_weight = T::WeightInfo::take_first_concatenated_xcm(0);
758		if meter.try_consume(base_weight).is_err() {
759			defensive!("Out of weight; could not decode all; dropping");
760			return Err(());
761		}
762
763		let input_data = &mut &data[..];
764		let mut input = codec::CountedInput::new(input_data);
765		VersionedXcm::<()>::decode_with_depth_limit(MAX_XCM_DECODE_DEPTH, &mut input).map_err(
766			|error| {
767				tracing::debug!(target: LOG_TARGET, ?error, "Failed to decode XCM with depth limit");
768				()
769			},
770		)?;
771		let (xcm_data, remaining_data) = data.split_at(input.count() as usize);
772		*data = remaining_data;
773
774		// Consume the extra weight that it took to decode this message.
775		// This depends on the message len in bytes.
776		// Saturates if it's over the limit.
777		let extra_weight =
778			T::WeightInfo::take_first_concatenated_xcm(xcm_data.len() as u32) - base_weight;
779		meter.consume(extra_weight);
780
781		let xcm = BoundedSlice::try_from(xcm_data).map_err(|error| {
782			tracing::error!(
783				target: LOG_TARGET,
784				?error,
785				"Failed to take XCM after decoding: message is too long"
786			);
787			()
788		})?;
789
790		Ok(xcm)
791	}
792
793	/// Split concatenated opaque `VersionedXcm`s into individual items.
794	///
795	/// This method is not benchmarked because it's almost free.
796	pub(crate) fn take_first_concatenated_opaque_xcm<'a>(
797		data: &mut &'a [u8],
798	) -> Result<BoundedSlice<'a, u8, MaxXcmpMessageLenOf<T>>, ()> {
799		let xcm_len = Compact::<u32>::decode(data).map_err(|error| {
800			tracing::debug!(target: LOG_TARGET, ?error, "Failed to decode opaque XCM length");
801			()
802		})?;
803		let (xcm_data, remaining_data) = match data.split_at_checked(xcm_len.0 as usize) {
804			Some((xcm_data, remaining_data)) => (xcm_data, remaining_data),
805			None => {
806				tracing::debug!(target: LOG_TARGET, ?xcm_len, "Wrong opaque XCM length");
807				return Err(());
808			},
809		};
810		*data = remaining_data;
811
812		let xcm = BoundedSlice::try_from(xcm_data).map_err(|error| {
813			tracing::error!(
814				target: LOG_TARGET,
815				?error,
816				"Failed to take opaque XCM after decoding: message is too long"
817			);
818			()
819		})?;
820
821		Ok(xcm)
822	}
823
824	/// Split concatenated encoded `VersionedXcm`s into batches.
825	///
826	/// We directly encode them again since that is needed later on.
827	pub(crate) fn take_first_concatenated_xcms<'a>(
828		data: &mut &'a [u8],
829		encoding: XcmEncoding,
830		batch_size: usize,
831		meter: &mut WeightMeter,
832	) -> Result<
833		Vec<BoundedSlice<'a, u8, MaxXcmpMessageLenOf<T>>>,
834		Vec<BoundedSlice<'a, u8, MaxXcmpMessageLenOf<T>>>,
835	> {
836		let mut batch = vec![];
837		loop {
838			if data.is_empty() {
839				return Ok(batch);
840			}
841
842			let maybe_xcm = match encoding {
843				XcmEncoding::Simple => Self::take_first_concatenated_xcm(data, meter),
844				XcmEncoding::Double => Self::take_first_concatenated_opaque_xcm(data),
845			};
846			match maybe_xcm {
847				Ok(xcm) => {
848					batch.push(xcm);
849					if batch.len() >= batch_size {
850						return Ok(batch);
851					}
852				},
853				Err(_) => return Err(batch),
854			}
855		}
856	}
857
858	/// The worst-case weight of `on_idle`.
859	pub fn on_idle_weight() -> Weight {
860		<T as crate::Config>::WeightInfo::on_idle_good_msg()
861			.max(<T as crate::Config>::WeightInfo::on_idle_large_msg())
862	}
863
864	#[cfg(feature = "bridging")]
865	fn is_inbound_channel_suspended(sender: ParaId) -> bool {
866		<InboundXcmpSuspended<T>>::get().iter().any(|c| c == &sender)
867	}
868
869	#[cfg(feature = "bridging")]
870	/// Returns tuple of `OutboundState` and number of queued pages.
871	fn outbound_channel_state(target: ParaId) -> Option<(OutboundState, u16)> {
872		<OutboundXcmpStatus<T>>::get().iter().find(|c| c.recipient == target).map(|c| {
873			let queued_pages = c.last_index.saturating_sub(c.first_index);
874			(c.state, queued_pages)
875		})
876	}
877}
878
879impl<T: Config> OnQueueChanged<ParaId> for Pallet<T> {
880	// Suspends/Resumes the queue when certain thresholds are reached.
881	fn on_queue_changed(para: ParaId, fp: QueueFootprint) {
882		let QueueConfigData { resume_threshold, suspend_threshold, .. } = <QueueConfig<T>>::get();
883
884		let mut suspended_channels = <InboundXcmpSuspended<T>>::get();
885		let suspended = suspended_channels.contains(&para);
886
887		if suspended && fp.ready_pages <= resume_threshold {
888			if let Err(err) = Self::send_signal(para, ChannelSignal::Resume) {
889				tracing::error!(
890					target: LOG_TARGET,
891					error=?err,
892					sibling=?para,
893					"defensive: Could not send resumption signal to inbound channel of sibling; channel remains suspended."
894				);
895			} else {
896				suspended_channels.remove(&para);
897				<InboundXcmpSuspended<T>>::put(suspended_channels);
898			}
899		} else if !suspended && fp.ready_pages >= suspend_threshold {
900			tracing::warn!(target: LOG_TARGET, sibling=?para, "XCMP queue for sibling is full; suspending channel.");
901
902			if let Err(err) = Self::send_signal(para, ChannelSignal::Suspend) {
903				// It will retry if `drop_threshold` is not reached, but it could be too late.
904				tracing::error!(
905					target: LOG_TARGET, error=?err,
906					"defensive: Could not send suspension signal; future messages may be dropped."
907				);
908			} else if let Err(err) = suspended_channels.try_insert(para) {
909				tracing::error!(
910					target: LOG_TARGET,
911					error=?err,
912					sibling=?para,
913					"Too many channels suspended; cannot suspend sibling; further messages may be dropped."
914				);
915			} else {
916				<InboundXcmpSuspended<T>>::put(suspended_channels);
917			}
918		}
919	}
920}
921
922impl<T: Config> QueuePausedQuery<ParaId> for Pallet<T> {
923	fn is_paused(para: &ParaId) -> bool {
924		if !QueueSuspended::<T>::get() {
925			return false;
926		}
927
928		// Make an exception for the superuser queue:
929		let sender_origin = T::ControllerOriginConverter::convert_origin(
930			(Parent, Parachain((*para).into())),
931			OriginKind::Superuser,
932		);
933		let is_controller =
934			sender_origin.map_or(false, |origin| T::ControllerOrigin::try_origin(origin).is_ok());
935
936		!is_controller
937	}
938}
939
940/// The encoding of the XCM messages in an XCMP page.
941#[derive(Copy, Clone)]
942enum XcmEncoding {
943	/// Simple encoded (`xcm.encode()`)
944	///
945	/// When we receive this king of messages, we have to decode and then re-encoded them before
946	/// enqueueing them for later processing.
947	Simple,
948	/// Double encoded (`xcm.encode().encode()`)
949	///
950	/// The XCM message is encoded first, resulting a vector of bytes. And then the vector of bytes
951	/// is encoded again. This has 2 advantages:
952	/// 1. We can just decode them before enqueueing them for later processing. They don't need to
953	///    be re-encoded.
954	/// 2. Decoding a `Vec<u8>` is much more efficient than decoding XCM messages.
955	Double,
956}
957
958impl<T: Config> XcmpMessageHandler for Pallet<T> {
959	fn handle_xcmp_messages<'a, I: Iterator<Item = (ParaId, RelayBlockNumber, &'a [u8])>>(
960		iter: I,
961		max_weight: Weight,
962	) -> Weight {
963		let mut meter = WeightMeter::with_limit(max_weight);
964
965		let mut known_xcm_senders = BTreeSet::new();
966		for (sender, _sent_at, mut data) in iter {
967			let format = match XcmpMessageFormat::decode(&mut data) {
968				Ok(f) => f,
969				Err(_) => {
970					defensive!("Unknown XCMP message format - dropping");
971					continue;
972				},
973			};
974
975			match format {
976				XcmpMessageFormat::Signals => {
977					let mut signal_count = 0;
978					while !data.is_empty() && signal_count < MAX_SIGNALS_PER_PAGE {
979						signal_count += 1;
980						match ChannelSignal::decode(&mut data) {
981							Ok(ChannelSignal::Suspend) => {
982								if meter.try_consume(T::WeightInfo::suspend_channel()).is_err() {
983									defensive!(
984										"Not enough weight to process suspend signal - dropping"
985									);
986									break;
987								}
988								Self::suspend_channel(sender)
989							},
990							Ok(ChannelSignal::Resume) => {
991								if meter.try_consume(T::WeightInfo::resume_channel()).is_err() {
992									defensive!(
993										"Not enough weight to process resume signal - dropping"
994									);
995									break;
996								}
997								Self::resume_channel(sender)
998							},
999							Err(_) => {
1000								defensive!("Undecodable channel signal - dropping");
1001								break;
1002							},
1003						}
1004					}
1005				},
1006				XcmpMessageFormat::ConcatenatedVersionedXcm |
1007				XcmpMessageFormat::ConcatenatedOpaqueVersionedXcm => {
1008					let encoding = match format {
1009						XcmpMessageFormat::ConcatenatedVersionedXcm => XcmEncoding::Simple,
1010						XcmpMessageFormat::ConcatenatedOpaqueVersionedXcm => {
1011							let mut all_channels = <OutboundXcmpStatus<T>>::get();
1012							if let Some(channel_details) =
1013								Self::try_get_or_insert_outbound_channel(&mut all_channels, sender)
1014							{
1015								channel_details
1016									.flags
1017									.notice_concatenated_opaque_versioned_xcm_support();
1018							}
1019							<OutboundXcmpStatus<T>>::put(all_channels);
1020
1021							XcmEncoding::Double
1022						},
1023						_ => {
1024							// This branch is unreachable.
1025							continue;
1026						},
1027					};
1028
1029					let mut is_first_sender_batch = known_xcm_senders.insert(sender);
1030					if is_first_sender_batch {
1031						if meter
1032							.try_consume(T::WeightInfo::uncached_enqueue_xcmp_messages())
1033							.is_err()
1034						{
1035							defensive!(
1036								"Out of weight: cannot enqueue XCMP messages; dropping page; \
1037                                    Used weight: ",
1038								meter.consumed_ratio()
1039							);
1040							continue;
1041						}
1042					}
1043
1044					let mut can_process_next_batch = true;
1045					while can_process_next_batch {
1046						let batch = match Self::take_first_concatenated_xcms(
1047							&mut data,
1048							encoding,
1049							XCM_BATCH_SIZE,
1050							&mut meter,
1051						) {
1052							Ok(batch) => batch,
1053							Err(batch) => {
1054								can_process_next_batch = false;
1055								defensive!(
1056									"HRMP inbound decode stream broke; page will be dropped."
1057								);
1058								batch
1059							},
1060						};
1061						if batch.is_empty() {
1062							break;
1063						}
1064
1065						if let Err(()) = Self::enqueue_xcmp_messages(
1066							sender,
1067							&batch,
1068							is_first_sender_batch,
1069							&mut meter,
1070						) {
1071							break;
1072						}
1073						is_first_sender_batch = false;
1074					}
1075				},
1076				XcmpMessageFormat::ConcatenatedEncodedBlob => {
1077					defensive!("Blob messages are unhandled - dropping");
1078					continue;
1079				},
1080			}
1081		}
1082
1083		meter.consumed()
1084	}
1085}
1086
1087impl<T: Config> XcmpMessageSource for Pallet<T> {
1088	fn take_outbound_messages(
1089		maximum_channels: usize,
1090		excluded_recipients: &[ParaId],
1091	) -> Vec<(ParaId, Vec<u8>)> {
1092		let mut statuses = <OutboundXcmpStatus<T>>::get().into_inner();
1093		let old_statuses_len = statuses.len();
1094		let max_message_count = statuses.len().min(maximum_channels);
1095		let mut result = Vec::with_capacity(max_message_count);
1096
1097		statuses.retain_mut(|status| {
1098			let OutboundChannelDetails {
1099				recipient: para_id,
1100				state: outbound_state,
1101				signals_exist,
1102				first_index,
1103				last_index,
1104				flags,
1105				queued_bytes,
1106			} = status;
1107
1108			let (max_size_now, max_size_ever) = match T::ChannelInfo::get_channel_status(*para_id) {
1109				ChannelStatus::Closed => {
1110					// This means that there is no such channel anymore. Nothing to be done but
1111					// swallow the messages and discard the status.
1112					for i in *first_index..*last_index {
1113						<OutboundXcmpMessages<T>>::remove(*para_id, i);
1114					}
1115					if *signals_exist {
1116						<SignalMessages<T>>::remove(*para_id);
1117					}
1118					return false;
1119				},
1120				ChannelStatus::Full => return true,
1121				ChannelStatus::Ready(max_size_now, max_size_ever) => (max_size_now, max_size_ever),
1122			};
1123
1124			// Check if we should omit the recipient.
1125			if excluded_recipients.contains(para_id) {
1126				return true;
1127			}
1128
1129			// This is a hard limit from the host config; not even signals can bypass it.
1130			if result.len() == max_message_count {
1131				// We check this condition in the beginning of the loop so that we don't include
1132				// a message where the limit is 0.
1133				return true;
1134			}
1135
1136			let page = 'page_fetch: {
1137				if *signals_exist {
1138					let page = <SignalMessages<T>>::get(*para_id);
1139					defensive_assert!(!page.is_empty(), "Signals must exist");
1140
1141					if page.len() < max_size_now {
1142						<SignalMessages<T>>::remove(*para_id);
1143						*signals_exist = false;
1144						break 'page_fetch page;
1145					}
1146
1147					defensive!("Signals should fit into a single page");
1148					return true;
1149				}
1150
1151				if *outbound_state == OutboundState::Suspended {
1152					// Only signals are exempt from suspension.
1153					return true;
1154				}
1155
1156				if last_index > first_index {
1157					let page = <OutboundXcmpMessages<T>>::get(*para_id, *first_index);
1158					if page.len() < max_size_now {
1159						<OutboundXcmpMessages<T>>::remove(*para_id, *first_index);
1160						*first_index += 1;
1161						*queued_bytes = queued_bytes.saturating_sub(page.len() as u32);
1162						break 'page_fetch page;
1163					}
1164				}
1165
1166				// Send a notification to the recipient advertising that we support
1167				// `XcmpMessageFormat::ConcatenatedOpaqueVersionedXcm` if needed.
1168				// We do this only once during the entire lifetime of the channel.
1169				if flags.should_send_concatenated_opaque_versioned_xcm_notification() {
1170					match WeakBoundedVec::try_from(XcmpMessageFormat::ConcatenatedOpaqueVersionedXcm.encode()) {
1171						Ok(page) => {
1172							flags.notice_concatenated_opaque_versioned_xcm_notification_sent();
1173							break 'page_fetch page;
1174						},
1175						Err(_) => {
1176							defensive!("XcmpMessageFormat should fit into a single page");
1177							return true;
1178						}
1179					};
1180				}
1181
1182				return true;
1183			};
1184
1185			if first_index == last_index {
1186				*first_index = 0;
1187				*last_index = 0;
1188				*queued_bytes = 0;
1189			}
1190
1191			if page.len() > max_size_ever {
1192				// TODO: #274 This means that the channel's max message size has changed since
1193				//   the message was sent. We should parse it and split into smaller messages but
1194				//   since it's so unlikely then for now we just drop it.
1195				defensive!("WARNING: oversize message in queue - dropping");
1196			} else {
1197				result.push((*para_id, page.into_inner()));
1198			}
1199
1200			let max_total_size = match T::ChannelInfo::get_channel_info(*para_id) {
1201				Some(channel_info) => channel_info.max_total_size,
1202				None => {
1203					tracing::warn!(target: LOG_TARGET, "calling `get_channel_info` with no RelevantMessagingState?!");
1204					// We use this as a fallback in case the messaging state is not present
1205					MAX_POSSIBLE_ALLOCATION
1206				},
1207			};
1208			let threshold = max_total_size.saturating_div(delivery_fee_constants::THRESHOLD_FACTOR);
1209			if *queued_bytes <= threshold {
1210				Self::decrease_fee_factor(*para_id);
1211			}
1212
1213			true
1214		});
1215		debug_assert!(!statuses.iter().any(|s| s.signals_exist), "Signals should be handled");
1216		let mut statuses = BoundedVec::defensive_truncate_from(statuses);
1217
1218		// Sort the outbound messages by ascending recipient para id to satisfy the acceptance
1219		// criteria requirement.
1220		result.sort_by_key(|(recipient, _msg)| *recipient);
1221
1222		// old_status_len must be >= status.len() since we never add anything to status.
1223		let pruned = old_statuses_len - statuses.len();
1224		// Because it may so happen that we only gave attention to some channels it's important
1225		// to change the order. Otherwise, the next `on_finalize` we will again give attention
1226		// only to those channels that happen to be in the beginning, until they are emptied.
1227		// This leads to "starvation" of the channels near to the end.
1228		let _ = statuses.try_rotate_left(result.len().saturating_sub(pruned)).defensive_proof(
1229			"Could not store HRMP channels config. Some HRMP channels may be broken.",
1230		);
1231
1232		<OutboundXcmpStatus<T>>::put(statuses);
1233
1234		result
1235	}
1236}
1237
1238/// Xcm sender for sending to a sibling parachain.
1239impl<T: Config> SendXcm for Pallet<T> {
1240	type Ticket = (ParaId, VersionedXcm<()>);
1241
1242	fn validate(
1243		dest: &mut Option<Location>,
1244		msg: &mut Option<Xcm<()>>,
1245	) -> SendResult<(ParaId, VersionedXcm<()>)> {
1246		let d = dest.take().ok_or(SendError::MissingArgument)?;
1247
1248		match d.unpack() {
1249			// An HRMP message for a sibling parachain.
1250			(1, [Parachain(id)]) => {
1251				let xcm = msg.take().ok_or(SendError::MissingArgument)?;
1252				let id = ParaId::from(*id);
1253				let price = T::PriceForSiblingDelivery::price_for_delivery(id, &xcm);
1254				let versioned_xcm = T::VersionWrapper::wrap_version(&d, xcm)
1255					.map_err(|()| SendError::DestinationUnsupported)?;
1256				versioned_xcm
1257					.check_is_decodable()
1258					.map_err(|()| SendError::ExceedsMaxMessageSize)?;
1259
1260				Ok(((id, versioned_xcm), price))
1261			},
1262			_ => {
1263				// Anything else is unhandled. This includes a message that is not meant for us.
1264				// We need to make sure that dest/msg is not consumed here.
1265				*dest = Some(d);
1266				Err(SendError::NotApplicable)
1267			},
1268		}
1269	}
1270
1271	fn deliver((recipient, xcm): (ParaId, VersionedXcm<()>)) -> Result<XcmHash, SendError> {
1272		let hash = xcm.using_encoded(sp_io::hashing::blake2_256);
1273
1274		let mut encoding = XcmEncoding::Simple;
1275		let mut all_channels = <OutboundXcmpStatus<T>>::get();
1276		if let Some(channel_details) = Self::try_get_outbound_channel(&mut all_channels, recipient)
1277		{
1278			if channel_details.flags.has_concatenated_opaque_versioned_xcm_support() {
1279				encoding = XcmEncoding::Double;
1280			}
1281		}
1282
1283		let result = match encoding {
1284			XcmEncoding::Simple => {
1285				Self::send_fragment(recipient, XcmpMessageFormat::ConcatenatedVersionedXcm, xcm)
1286			},
1287			XcmEncoding::Double => Self::send_fragment(
1288				recipient,
1289				XcmpMessageFormat::ConcatenatedOpaqueVersionedXcm,
1290				xcm.encode(),
1291			),
1292		};
1293		match result {
1294			Ok(_) => {
1295				Self::deposit_event(Event::XcmpMessageSent { message_hash: hash });
1296				Ok(hash)
1297			},
1298			Err(e) => {
1299				tracing::error!(target: LOG_TARGET, error=?e, "Deliver error");
1300				Err(SendError::Transport(e.into()))
1301			},
1302		}
1303	}
1304}
1305
1306impl<T: Config> InspectMessageQueues for Pallet<T> {
1307	fn clear_messages() {
1308		// Best effort.
1309		let _ = OutboundXcmpMessages::<T>::clear(u32::MAX, None);
1310		OutboundXcmpStatus::<T>::mutate(|details_vec| {
1311			for details in details_vec {
1312				details.first_index = 0;
1313				details.last_index = 0;
1314				details.queued_bytes = 0;
1315			}
1316		});
1317	}
1318
1319	fn get_messages() -> Vec<(VersionedLocation, Vec<VersionedXcm<()>>)> {
1320		use xcm::prelude::*;
1321
1322		OutboundXcmpMessages::<T>::iter()
1323			.map(|(para_id, _, messages)| {
1324				let data = &mut &messages[..];
1325
1326				let decoded_format = XcmpMessageFormat::decode(data).unwrap();
1327				let mut decoded_messages = Vec::new();
1328				while !data.is_empty() {
1329					let message_bytes = match decoded_format {
1330						XcmpMessageFormat::ConcatenatedVersionedXcm => {
1331							Self::take_first_concatenated_xcm(data, &mut WeightMeter::new())
1332						},
1333						XcmpMessageFormat::ConcatenatedOpaqueVersionedXcm => {
1334							Self::take_first_concatenated_opaque_xcm(data)
1335						},
1336						unexpected_format => {
1337							panic!("Unexpected XCMP format: {unexpected_format:?}!")
1338						},
1339					}
1340					.unwrap();
1341					let decoded_message = VersionedXcm::<()>::decode_with_depth_limit(
1342						MAX_XCM_DECODE_DEPTH,
1343						&mut &message_bytes[..],
1344					)
1345					.unwrap();
1346					decoded_messages.push(decoded_message);
1347				}
1348
1349				(
1350					VersionedLocation::from(Location::new(1, Parachain(para_id.into()))),
1351					decoded_messages,
1352				)
1353			})
1354			.collect()
1355	}
1356}
1357
1358impl<T: Config> FeeTracker for Pallet<T> {
1359	type Id = ParaId;
1360
1361	fn get_fee_factor(id: Self::Id) -> FixedU128 {
1362		<DeliveryFeeFactor<T>>::get(id)
1363	}
1364
1365	fn set_fee_factor(id: Self::Id, val: FixedU128) {
1366		<DeliveryFeeFactor<T>>::set(id, val);
1367	}
1368}