referrerpolicy=no-referrer-when-downgrade

cumulus_pallet_xcmp_queue/
lib.rs

1// Copyright (C) Parity Technologies (UK) Ltd.
2// This file is part of Cumulus.
3// SPDX-License-Identifier: Apache-2.0
4
5// Licensed under the Apache License, Version 2.0 (the "License");
6// you may not use this file except in compliance with the License.
7// You may obtain a copy of the License at
8//
9// 	http://www.apache.org/licenses/LICENSE-2.0
10//
11// Unless required by applicable law or agreed to in writing, software
12// distributed under the License is distributed on an "AS IS" BASIS,
13// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14// See the License for the specific language governing permissions and
15// limitations under the License.
16
17//! A pallet which uses the XCMP transport layer to handle both incoming and outgoing XCM message
18//! sending and dispatch, queuing, signalling and backpressure. To do so, it implements:
19//! * `XcmpMessageHandler`
20//! * `XcmpMessageSource`
21//!
22//! Also provides an implementation of `SendXcm` which can be placed in a router tuple for relaying
23//! XCM over XCMP if the destination is `Parent/Parachain`. It requires an implementation of
24//! `XcmExecutor` for dispatching incoming XCM messages.
25//!
26//! To prevent out of memory errors on the `OutboundXcmpMessages` queue, an exponential fee factor
27//! (`DeliveryFeeFactor`) is set, much like the one used in DMP.
28//! The fee factor increases whenever the total size of messages in a particular channel passes a
29//! threshold. This threshold is defined as a percentage of the maximum total size the channel can
30//! have. More concretely, the threshold is `max_total_size` / `THRESHOLD_FACTOR`, where:
31//! - `max_total_size` is the maximum size, in bytes, of the channel, not number of messages.
32//! It is defined in the channel configuration.
33//! - `THRESHOLD_FACTOR` just declares which percentage of the max size is the actual threshold.
34//! If it's 2, then the threshold is half of the max size, if it's 4, it's a quarter, and so on.
35
36#![cfg_attr(not(feature = "std"), no_std)]
37
38pub mod migration;
39
40#[cfg(test)]
41mod mock;
42
43#[cfg(test)]
44mod tests;
45
46#[cfg(feature = "runtime-benchmarks")]
47mod benchmarking;
48#[cfg(feature = "bridging")]
49pub mod bridging;
50pub mod weights;
51pub mod weights_ext;
52
53pub use weights::WeightInfo;
54pub use weights_ext::WeightInfoExt;
55
56extern crate alloc;
57
58use alloc::{collections::BTreeSet, vec, vec::Vec};
59use bounded_collections::{BoundedBTreeSet, BoundedSlice, BoundedVec};
60use codec::{Decode, DecodeLimit, Encode, MaxEncodedLen};
61use cumulus_primitives_core::{
62	relay_chain::BlockNumber as RelayBlockNumber, ChannelStatus, GetChannelInfo, MessageSendError,
63	ParaId, XcmpMessageFormat, XcmpMessageHandler, XcmpMessageSource,
64};
65
66use frame_support::{
67	defensive, defensive_assert,
68	traits::{
69		Defensive, EnqueueMessage, EnsureOrigin, Get, QueueFootprint, QueueFootprintQuery,
70		QueuePausedQuery,
71	},
72	weights::{Weight, WeightMeter},
73};
74use pallet_message_queue::OnQueueChanged;
75use polkadot_runtime_common::xcm_sender::PriceForMessageDelivery;
76use polkadot_runtime_parachains::{FeeTracker, GetMinFeeFactor};
77use scale_info::TypeInfo;
78use sp_core::MAX_POSSIBLE_ALLOCATION;
79use sp_runtime::{FixedU128, RuntimeDebug, SaturatedConversion, WeakBoundedVec};
80use xcm::{latest::prelude::*, VersionedLocation, VersionedXcm, WrapVersion, MAX_XCM_DECODE_DEPTH};
81use xcm_builder::InspectMessageQueues;
82use xcm_executor::traits::ConvertOrigin;
83
84pub use pallet::*;
85
86/// Index used to identify overweight XCMs.
87pub type OverweightIndex = u64;
88/// The max length of an XCMP message.
89pub type MaxXcmpMessageLenOf<T> =
90	<<T as Config>::XcmpQueue as EnqueueMessage<ParaId>>::MaxMessageLen;
91
92const LOG_TARGET: &str = "xcmp_queue";
93const DEFAULT_POV_SIZE: u64 = 64 * 1024; // 64 KB
94/// The size of an XCM messages batch.
95pub const XCM_BATCH_SIZE: usize = 250;
96
97/// Constants related to delivery fee calculation
98pub mod delivery_fee_constants {
99	/// Fees will start increasing when queue is half full
100	pub const THRESHOLD_FACTOR: u32 = 2;
101}
102
103#[frame_support::pallet]
104pub mod pallet {
105	use super::*;
106	use frame_support::{pallet_prelude::*, Twox64Concat};
107	use frame_system::pallet_prelude::*;
108
109	#[pallet::pallet]
110	#[pallet::storage_version(migration::STORAGE_VERSION)]
111	pub struct Pallet<T>(_);
112
113	#[pallet::config]
114	pub trait Config: frame_system::Config {
115		#[allow(deprecated)]
116		type RuntimeEvent: From<Event<Self>> + IsType<<Self as frame_system::Config>::RuntimeEvent>;
117
118		/// Information on the available XCMP channels.
119		type ChannelInfo: GetChannelInfo;
120
121		/// Means of converting an `Xcm` into a `VersionedXcm`.
122		type VersionWrapper: WrapVersion;
123
124		/// Enqueue an inbound horizontal message for later processing.
125		///
126		/// This defines the maximal message length via [`crate::MaxXcmpMessageLenOf`]. The pallet
127		/// assumes that this hook will eventually process all the pushed messages.
128		type XcmpQueue: EnqueueMessage<ParaId>
129			+ QueueFootprintQuery<ParaId, MaxMessageLen = MaxXcmpMessageLenOf<Self>>;
130
131		/// The maximum number of inbound XCMP channels that can be suspended simultaneously.
132		///
133		/// Any further channel suspensions will fail and messages may get dropped without further
134		/// notice. Choosing a high value (1000) is okay; the trade-off that is described in
135		/// [`InboundXcmpSuspended`] still applies at that scale.
136		#[pallet::constant]
137		type MaxInboundSuspended: Get<u32>;
138
139		/// Maximal number of outbound XCMP channels that can have messages queued at the same time.
140		///
141		/// If this is reached, then no further messages can be sent to channels that do not yet
142		/// have a message queued. This should be set to the expected maximum of outbound channels
143		/// which is determined by [`Self::ChannelInfo`]. It is important to set this large enough,
144		/// since otherwise the congestion control protocol will not work as intended and messages
145		/// may be dropped. This value increases the PoV and should therefore not be picked too
146		/// high. Governance needs to pay attention to not open more channels than this value.
147		#[pallet::constant]
148		type MaxActiveOutboundChannels: Get<u32>;
149
150		/// The maximal page size for HRMP message pages.
151		///
152		/// A lower limit can be set dynamically, but this is the hard-limit for the PoV worst case
153		/// benchmarking. The limit for the size of a message is slightly below this, since some
154		/// overhead is incurred for encoding the format.
155		#[pallet::constant]
156		type MaxPageSize: Get<u32>;
157
158		/// The origin that is allowed to resume or suspend the XCMP queue.
159		type ControllerOrigin: EnsureOrigin<Self::RuntimeOrigin>;
160
161		/// The conversion function used to attempt to convert an XCM `Location` origin to a
162		/// superuser origin.
163		type ControllerOriginConverter: ConvertOrigin<Self::RuntimeOrigin>;
164
165		/// The price for delivering an XCM to a sibling parachain destination.
166		type PriceForSiblingDelivery: PriceForMessageDelivery<Id = ParaId>;
167
168		/// The weight information of this pallet.
169		type WeightInfo: WeightInfoExt;
170	}
171
172	#[pallet::call]
173	impl<T: Config> Pallet<T> {
174		/// Suspends all XCM executions for the XCMP queue, regardless of the sender's origin.
175		///
176		/// - `origin`: Must pass `ControllerOrigin`.
177		#[pallet::call_index(1)]
178		#[pallet::weight((T::DbWeight::get().writes(1), DispatchClass::Operational,))]
179		pub fn suspend_xcm_execution(origin: OriginFor<T>) -> DispatchResult {
180			T::ControllerOrigin::ensure_origin(origin)?;
181
182			QueueSuspended::<T>::try_mutate(|suspended| {
183				if *suspended {
184					Err(Error::<T>::AlreadySuspended.into())
185				} else {
186					*suspended = true;
187					Ok(())
188				}
189			})
190		}
191
192		/// Resumes all XCM executions for the XCMP queue.
193		///
194		/// Note that this function doesn't change the status of the in/out bound channels.
195		///
196		/// - `origin`: Must pass `ControllerOrigin`.
197		#[pallet::call_index(2)]
198		#[pallet::weight((T::DbWeight::get().writes(1), DispatchClass::Operational,))]
199		pub fn resume_xcm_execution(origin: OriginFor<T>) -> DispatchResult {
200			T::ControllerOrigin::ensure_origin(origin)?;
201
202			QueueSuspended::<T>::try_mutate(|suspended| {
203				if !*suspended {
204					Err(Error::<T>::AlreadyResumed.into())
205				} else {
206					*suspended = false;
207					Ok(())
208				}
209			})
210		}
211
212		/// Overwrites the number of pages which must be in the queue for the other side to be
213		/// told to suspend their sending.
214		///
215		/// - `origin`: Must pass `Root`.
216		/// - `new`: Desired value for `QueueConfigData.suspend_value`
217		#[pallet::call_index(3)]
218		#[pallet::weight((T::WeightInfo::set_config_with_u32(), DispatchClass::Operational,))]
219		pub fn update_suspend_threshold(origin: OriginFor<T>, new: u32) -> DispatchResult {
220			ensure_root(origin)?;
221
222			QueueConfig::<T>::try_mutate(|data| {
223				data.suspend_threshold = new;
224				data.validate::<T>()
225			})
226		}
227
228		/// Overwrites the number of pages which must be in the queue after which we drop any
229		/// further messages from the channel.
230		///
231		/// - `origin`: Must pass `Root`.
232		/// - `new`: Desired value for `QueueConfigData.drop_threshold`
233		#[pallet::call_index(4)]
234		#[pallet::weight((T::WeightInfo::set_config_with_u32(),DispatchClass::Operational,))]
235		pub fn update_drop_threshold(origin: OriginFor<T>, new: u32) -> DispatchResult {
236			ensure_root(origin)?;
237
238			QueueConfig::<T>::try_mutate(|data| {
239				data.drop_threshold = new;
240				data.validate::<T>()
241			})
242		}
243
244		/// Overwrites the number of pages which the queue must be reduced to before it signals
245		/// that message sending may recommence after it has been suspended.
246		///
247		/// - `origin`: Must pass `Root`.
248		/// - `new`: Desired value for `QueueConfigData.resume_threshold`
249		#[pallet::call_index(5)]
250		#[pallet::weight((T::WeightInfo::set_config_with_u32(), DispatchClass::Operational,))]
251		pub fn update_resume_threshold(origin: OriginFor<T>, new: u32) -> DispatchResult {
252			ensure_root(origin)?;
253
254			QueueConfig::<T>::try_mutate(|data| {
255				data.resume_threshold = new;
256				data.validate::<T>()
257			})
258		}
259	}
260
261	#[pallet::hooks]
262	impl<T: Config> Hooks<BlockNumberFor<T>> for Pallet<T> {
263		fn integrity_test() {
264			assert!(!T::MaxPageSize::get().is_zero(), "MaxPageSize too low");
265
266			let w = Self::on_idle_weight();
267			assert!(w != Weight::zero());
268			assert!(w.all_lte(T::BlockWeights::get().max_block));
269
270			<T::WeightInfo as WeightInfoExt>::check_accuracy::<MaxXcmpMessageLenOf<T>>(0.15);
271		}
272
273		fn on_idle(_block: BlockNumberFor<T>, limit: Weight) -> Weight {
274			let mut meter = WeightMeter::with_limit(limit);
275
276			if meter.try_consume(Self::on_idle_weight()).is_err() {
277				tracing::debug!(
278					target: LOG_TARGET,
279					"Not enough weight for on_idle. {} < {}",
280					Self::on_idle_weight(), limit
281				);
282				return meter.consumed()
283			}
284
285			migration::v3::lazy_migrate_inbound_queue::<T>();
286
287			meter.consumed()
288		}
289	}
290
291	#[pallet::event]
292	#[pallet::generate_deposit(pub(super) fn deposit_event)]
293	pub enum Event<T: Config> {
294		/// An HRMP message was sent to a sibling parachain.
295		XcmpMessageSent { message_hash: XcmHash },
296	}
297
298	#[pallet::error]
299	pub enum Error<T> {
300		/// Setting the queue config failed since one of its values was invalid.
301		BadQueueConfig,
302		/// The execution is already suspended.
303		AlreadySuspended,
304		/// The execution is already resumed.
305		AlreadyResumed,
306		/// There are too many active outbound channels.
307		TooManyActiveOutboundChannels,
308		/// The message is too big.
309		TooBig,
310	}
311
312	/// The suspended inbound XCMP channels. All others are not suspended.
313	///
314	/// This is a `StorageValue` instead of a `StorageMap` since we expect multiple reads per block
315	/// to different keys with a one byte payload. The access to `BoundedBTreeSet` will be cached
316	/// within the block and therefore only included once in the proof size.
317	///
318	/// NOTE: The PoV benchmarking cannot know this and will over-estimate, but the actual proof
319	/// will be smaller.
320	#[pallet::storage]
321	pub type InboundXcmpSuspended<T: Config> =
322		StorageValue<_, BoundedBTreeSet<ParaId, T::MaxInboundSuspended>, ValueQuery>;
323
324	/// The non-empty XCMP channels in order of becoming non-empty, and the index of the first
325	/// and last outbound message. If the two indices are equal, then it indicates an empty
326	/// queue and there must be a non-`Ok` `OutboundStatus`. We assume queues grow no greater
327	/// than 65535 items. Queue indices for normal messages begin at one; zero is reserved in
328	/// case of the need to send a high-priority signal message this block.
329	/// The bool is true if there is a signal message waiting to be sent.
330	#[pallet::storage]
331	pub(super) type OutboundXcmpStatus<T: Config> = StorageValue<
332		_,
333		BoundedVec<OutboundChannelDetails, T::MaxActiveOutboundChannels>,
334		ValueQuery,
335	>;
336
337	/// The messages outbound in a given XCMP channel.
338	#[pallet::storage]
339	pub(super) type OutboundXcmpMessages<T: Config> = StorageDoubleMap<
340		_,
341		Blake2_128Concat,
342		ParaId,
343		Twox64Concat,
344		u16,
345		WeakBoundedVec<u8, T::MaxPageSize>,
346		ValueQuery,
347	>;
348
349	/// Any signal messages waiting to be sent.
350	#[pallet::storage]
351	pub(super) type SignalMessages<T: Config> =
352		StorageMap<_, Blake2_128Concat, ParaId, WeakBoundedVec<u8, T::MaxPageSize>, ValueQuery>;
353
354	/// The configuration which controls the dynamics of the outbound queue.
355	#[pallet::storage]
356	pub(super) type QueueConfig<T: Config> = StorageValue<_, QueueConfigData, ValueQuery>;
357
358	/// Whether or not the XCMP queue is suspended from executing incoming XCMs or not.
359	#[pallet::storage]
360	pub(super) type QueueSuspended<T: Config> = StorageValue<_, bool, ValueQuery>;
361
362	/// The factor to multiply the base delivery fee by.
363	#[pallet::storage]
364	pub(super) type DeliveryFeeFactor<T: Config> =
365		StorageMap<_, Twox64Concat, ParaId, FixedU128, ValueQuery, GetMinFeeFactor<Pallet<T>>>;
366}
367
368#[derive(Copy, Clone, Eq, PartialEq, Encode, Decode, RuntimeDebug, TypeInfo, MaxEncodedLen)]
369pub enum OutboundState {
370	Ok,
371	Suspended,
372}
373
374/// Struct containing detailed information about the outbound channel.
375#[derive(Clone, Eq, PartialEq, Encode, Decode, TypeInfo, RuntimeDebug, MaxEncodedLen)]
376pub struct OutboundChannelDetails {
377	/// The `ParaId` of the parachain that this channel is connected with.
378	recipient: ParaId,
379	/// The state of the channel.
380	state: OutboundState,
381	/// Whether or not any signals exist in this channel.
382	signals_exist: bool,
383	/// The index of the first outbound message.
384	first_index: u16,
385	/// The index of the last outbound message.
386	last_index: u16,
387}
388
389impl OutboundChannelDetails {
390	pub fn new(recipient: ParaId) -> OutboundChannelDetails {
391		OutboundChannelDetails {
392			recipient,
393			state: OutboundState::Ok,
394			signals_exist: false,
395			first_index: 0,
396			last_index: 0,
397		}
398	}
399
400	pub fn with_signals(mut self) -> OutboundChannelDetails {
401		self.signals_exist = true;
402		self
403	}
404
405	pub fn with_suspended_state(mut self) -> OutboundChannelDetails {
406		self.state = OutboundState::Suspended;
407		self
408	}
409}
410
411#[derive(Copy, Clone, Eq, PartialEq, Encode, Decode, RuntimeDebug, TypeInfo, MaxEncodedLen)]
412pub struct QueueConfigData {
413	/// The number of pages which must be in the queue for the other side to be told to suspend
414	/// their sending.
415	suspend_threshold: u32,
416	/// The number of pages which must be in the queue after which we drop any further messages
417	/// from the channel. This should normally not happen since the `suspend_threshold` can be used
418	/// to suspend the channel.
419	drop_threshold: u32,
420	/// The number of pages which the queue must be reduced to before it signals that
421	/// message sending may recommence after it has been suspended.
422	resume_threshold: u32,
423}
424
425impl Default for QueueConfigData {
426	fn default() -> Self {
427		// NOTE that these default values are only used on genesis. They should give a rough idea of
428		// what to set these values to, but is in no way a requirement.
429		Self {
430			drop_threshold: 48,    // 64KiB * 48 = 3MiB
431			suspend_threshold: 32, // 64KiB * 32 = 2MiB
432			resume_threshold: 8,   // 64KiB * 8 = 512KiB
433		}
434	}
435}
436
437impl QueueConfigData {
438	/// Validate all assumptions about `Self`.
439	///
440	/// Should be called prior to accepting this as new config.
441	pub fn validate<T: crate::Config>(&self) -> sp_runtime::DispatchResult {
442		if self.resume_threshold < self.suspend_threshold &&
443			self.suspend_threshold <= self.drop_threshold &&
444			self.resume_threshold > 0
445		{
446			Ok(())
447		} else {
448			Err(Error::<T>::BadQueueConfig.into())
449		}
450	}
451}
452
453#[derive(PartialEq, Eq, Copy, Clone, Encode, Decode, TypeInfo)]
454pub enum ChannelSignal {
455	Suspend,
456	Resume,
457}
458
459impl<T: Config> Pallet<T> {
460	/// Place a message `fragment` on the outgoing XCMP queue for `recipient`.
461	///
462	/// Format is the type of aggregate message that the `fragment` may be safely encoded and
463	/// appended onto. Whether earlier unused space is used for the fragment at the risk of sending
464	/// it out of order is determined with `qos`. NOTE: For any two messages to be guaranteed to be
465	/// dispatched in order, then both must be sent with `ServiceQuality::Ordered`.
466	///
467	/// ## Background
468	///
469	/// For our purposes, one HRMP "message" is actually an aggregated block of XCM "messages".
470	///
471	/// For the sake of clarity, we distinguish between them as message AGGREGATEs versus
472	/// message FRAGMENTs.
473	///
474	/// So each AGGREGATE is comprised of one or more concatenated SCALE-encoded `Vec<u8>`
475	/// FRAGMENTs. Though each fragment is already probably a SCALE-encoded Xcm, we can't be
476	/// certain, so we SCALE encode each `Vec<u8>` fragment in order to ensure we have the
477	/// length prefixed and can thus decode each fragment from the aggregate stream. With this,
478	/// we can concatenate them into a single aggregate blob without needing to be concerned
479	/// about encoding fragment boundaries.
480	///
481	/// If successful, returns the number of pages in the outbound queue after enqueuing the new
482	/// fragment.
483	fn send_fragment<Fragment: Encode>(
484		recipient: ParaId,
485		format: XcmpMessageFormat,
486		fragment: Fragment,
487	) -> Result<u32, MessageSendError> {
488		let encoded_fragment = fragment.encode();
489
490		// Optimization note: `max_message_size` could potentially be stored in
491		// `OutboundXcmpMessages` once known; that way it's only accessed when a new page is needed.
492
493		let channel_info =
494			T::ChannelInfo::get_channel_info(recipient).ok_or(MessageSendError::NoChannel)?;
495		// Max message size refers to aggregates, or pages. Not to individual fragments.
496		let max_message_size = channel_info.max_message_size.min(T::MaxPageSize::get()) as usize;
497		let format_size = format.encoded_size();
498		// We check the encoded fragment length plus the format size against the max message size
499		// because the format is concatenated if a new page is needed.
500		let size_to_check = encoded_fragment
501			.len()
502			.checked_add(format_size)
503			.ok_or(MessageSendError::TooBig)?;
504		if size_to_check > max_message_size {
505			return Err(MessageSendError::TooBig)
506		}
507
508		let mut all_channels = <OutboundXcmpStatus<T>>::get();
509		let channel_details = if let Some(details) =
510			all_channels.iter_mut().find(|channel| channel.recipient == recipient)
511		{
512			details
513		} else {
514			all_channels.try_push(OutboundChannelDetails::new(recipient)).map_err(|e| {
515				tracing::error!(target: LOG_TARGET, error=?e, "Failed to activate HRMP channel");
516				MessageSendError::TooManyChannels
517			})?;
518			all_channels
519				.last_mut()
520				.expect("can't be empty; a new element was just pushed; qed")
521		};
522		let have_active = channel_details.last_index > channel_details.first_index;
523		// Try to append fragment to the last page, if there is enough space.
524		// We return the size of the last page inside of the option, to not calculate it again.
525		let appended_to_last_page = have_active
526			.then(|| {
527				<OutboundXcmpMessages<T>>::try_mutate(
528					recipient,
529					channel_details.last_index - 1,
530					|page| {
531						if XcmpMessageFormat::decode(&mut &page[..]) != Ok(format) {
532							defensive!("Bad format in outbound queue; dropping message");
533							return Err(())
534						}
535						if page.len() + encoded_fragment.len() > max_message_size {
536							return Err(())
537						}
538						for frag in encoded_fragment.iter() {
539							page.try_push(*frag)?;
540						}
541						Ok(page.len())
542					},
543				)
544				.ok()
545			})
546			.flatten();
547
548		let (number_of_pages, last_page_size) = if let Some(size) = appended_to_last_page {
549			let number_of_pages = (channel_details.last_index - channel_details.first_index) as u32;
550			(number_of_pages, size)
551		} else {
552			// Need to add a new page.
553			let page_index = channel_details.last_index;
554			channel_details.last_index += 1;
555			let mut new_page = format.encode();
556			new_page.extend_from_slice(&encoded_fragment[..]);
557			let last_page_size = new_page.len();
558			let number_of_pages = (channel_details.last_index - channel_details.first_index) as u32;
559			let bounded_page =
560				BoundedVec::<u8, T::MaxPageSize>::try_from(new_page).map_err(|error| {
561					tracing::debug!(target: LOG_TARGET, ?error, "Failed to create bounded message page");
562					MessageSendError::TooBig
563				})?;
564			let bounded_page = WeakBoundedVec::force_from(bounded_page.into_inner(), None);
565			<OutboundXcmpMessages<T>>::insert(recipient, page_index, bounded_page);
566			<OutboundXcmpStatus<T>>::put(all_channels);
567			(number_of_pages, last_page_size)
568		};
569
570		// We have to count the total size here since `channel_info.total_size` is not updated at
571		// this point in time. We assume all previous pages are filled, which, in practice, is not
572		// always the case.
573		let total_size =
574			number_of_pages.saturating_sub(1) * max_message_size as u32 + last_page_size as u32;
575		let threshold = channel_info.max_total_size / delivery_fee_constants::THRESHOLD_FACTOR;
576		if total_size > threshold {
577			Self::increase_fee_factor(recipient, encoded_fragment.len() as u128);
578		}
579
580		Ok(number_of_pages)
581	}
582
583	/// Sends a signal to the `dest` chain over XCMP. This is guaranteed to be dispatched on this
584	/// block.
585	fn send_signal(dest: ParaId, signal: ChannelSignal) -> Result<(), Error<T>> {
586		let mut s = <OutboundXcmpStatus<T>>::get();
587		if let Some(details) = s.iter_mut().find(|item| item.recipient == dest) {
588			details.signals_exist = true;
589		} else {
590			s.try_push(OutboundChannelDetails::new(dest).with_signals()).map_err(|error| {
591				tracing::debug!(target: LOG_TARGET, ?error, "Failed to activate XCMP channel");
592				Error::<T>::TooManyActiveOutboundChannels
593			})?;
594		}
595
596		let page = BoundedVec::<u8, T::MaxPageSize>::try_from(
597			(XcmpMessageFormat::Signals, signal).encode(),
598		)
599		.map_err(|error| {
600			tracing::debug!(target: LOG_TARGET, ?error, "Failed to encode signal message");
601			Error::<T>::TooBig
602		})?;
603		let page = WeakBoundedVec::force_from(page.into_inner(), None);
604
605		<SignalMessages<T>>::insert(dest, page);
606		<OutboundXcmpStatus<T>>::put(s);
607		Ok(())
608	}
609
610	fn suspend_channel(target: ParaId) {
611		<OutboundXcmpStatus<T>>::mutate(|s| {
612			if let Some(details) = s.iter_mut().find(|item| item.recipient == target) {
613				let ok = details.state == OutboundState::Ok;
614				defensive_assert!(ok, "WARNING: Attempt to suspend channel that was not Ok.");
615				details.state = OutboundState::Suspended;
616			} else {
617				if s.try_push(OutboundChannelDetails::new(target).with_suspended_state()).is_err() {
618					defensive!("Cannot pause channel; too many outbound channels");
619				}
620			}
621		});
622	}
623
624	fn resume_channel(target: ParaId) {
625		<OutboundXcmpStatus<T>>::mutate(|s| {
626			if let Some(index) = s.iter().position(|item| item.recipient == target) {
627				let suspended = s[index].state == OutboundState::Suspended;
628				defensive_assert!(
629					suspended,
630					"WARNING: Attempt to resume channel that was not suspended."
631				);
632				if s[index].first_index == s[index].last_index {
633					s.remove(index);
634				} else {
635					s[index].state = OutboundState::Ok;
636				}
637			} else {
638				defensive!("WARNING: Attempt to resume channel that was not suspended.");
639			}
640		});
641	}
642
643	fn enqueue_xcmp_messages<'a>(
644		sender: ParaId,
645		xcms: &[BoundedSlice<'a, u8, MaxXcmpMessageLenOf<T>>],
646		meter: &mut WeightMeter,
647	) -> Result<(), ()> {
648		let QueueConfigData { drop_threshold, .. } = <QueueConfig<T>>::get();
649		let batches_footprints =
650			T::XcmpQueue::get_batches_footprints(sender, xcms.iter().copied(), drop_threshold);
651
652		let best_batch_footprint = batches_footprints.search_best_by(|batch_info| {
653			let required_weight = T::WeightInfo::enqueue_xcmp_messages(
654				batches_footprints.first_page_pos.saturated_into(),
655				batch_info,
656			);
657
658			match meter.can_consume(required_weight) {
659				true => core::cmp::Ordering::Less,
660				false => core::cmp::Ordering::Greater,
661			}
662		});
663
664		meter.consume(T::WeightInfo::enqueue_xcmp_messages(
665			batches_footprints.first_page_pos.saturated_into(),
666			best_batch_footprint,
667		));
668		T::XcmpQueue::enqueue_messages(
669			xcms.iter().take(best_batch_footprint.msgs_count).copied(),
670			sender,
671		);
672
673		if best_batch_footprint.msgs_count < xcms.len() {
674			tracing::error!(
675				target: LOG_TARGET,
676				used_weight=?meter.consumed_ratio(),
677				"Out of weight: cannot enqueue entire XCMP messages batch; \
678				dropped some or all messages in batch."
679			);
680			return Err(());
681		}
682		Ok(())
683	}
684
685	/// Split concatenated encoded `VersionedXcm`s or `MaybeDoubleEncodedVersionedXcm`s into
686	/// individual items.
687	///
688	/// We directly encode them again since that is needed later on.
689	///
690	/// On error returns a partial batch with all the XCMs processed before the failure.
691	/// This can happen in case of a decoding/re-encoding failure.
692	pub(crate) fn take_first_concatenated_xcm<'a>(
693		data: &mut &'a [u8],
694		meter: &mut WeightMeter,
695	) -> Result<Option<BoundedSlice<'a, u8, MaxXcmpMessageLenOf<T>>>, ()> {
696		if data.is_empty() {
697			return Ok(None)
698		}
699
700		// Let's make sure that we can decode at least an empty xcm message.
701		let base_weight = T::WeightInfo::take_first_concatenated_xcm(0);
702		if meter.try_consume(base_weight).is_err() {
703			defensive!("Out of weight; could not decode all; dropping");
704			return Err(())
705		}
706
707		let input_data = &mut &data[..];
708		let mut input = codec::CountedInput::new(input_data);
709		VersionedXcm::<()>::decode_with_depth_limit(MAX_XCM_DECODE_DEPTH, &mut input).map_err(
710			|error| {
711				tracing::debug!(target: LOG_TARGET, ?error, "Failed to decode XCM with depth limit");
712				()
713			},
714		)?;
715		let (xcm_data, remaining_data) = data.split_at(input.count() as usize);
716		*data = remaining_data;
717
718		// Consume the extra weight that it took to decode this message.
719		// This depends on the message len in bytes.
720		// Saturates if it's over the limit.
721		let extra_weight =
722			T::WeightInfo::take_first_concatenated_xcm(xcm_data.len() as u32) - base_weight;
723		meter.consume(extra_weight);
724
725		let xcm = Some(BoundedSlice::try_from(xcm_data).map_err(|error| {
726			tracing::error!(
727				target: LOG_TARGET,
728				?error,
729				"Failed to take XCM after decoding: message is too long"
730			);
731			()
732		})?);
733
734		Ok(xcm)
735	}
736
737	/// Split concatenated encoded `VersionedXcm`s or `MaybeDoubleEncodedVersionedXcm`s into
738	/// batches.
739	///
740	/// We directly encode them again since that is needed later on.
741	pub(crate) fn take_first_concatenated_xcms<'a>(
742		data: &mut &'a [u8],
743		batch_size: usize,
744		meter: &mut WeightMeter,
745	) -> Result<
746		Vec<BoundedSlice<'a, u8, MaxXcmpMessageLenOf<T>>>,
747		Vec<BoundedSlice<'a, u8, MaxXcmpMessageLenOf<T>>>,
748	> {
749		let mut batch = vec![];
750		loop {
751			match Self::take_first_concatenated_xcm(data, meter) {
752				Ok(Some(xcm)) => {
753					batch.push(xcm);
754					if batch.len() >= batch_size {
755						return Ok(batch);
756					}
757				},
758				Ok(None) => return Ok(batch),
759				Err(_) => return Err(batch),
760			}
761		}
762	}
763
764	/// The worst-case weight of `on_idle`.
765	pub fn on_idle_weight() -> Weight {
766		<T as crate::Config>::WeightInfo::on_idle_good_msg()
767			.max(<T as crate::Config>::WeightInfo::on_idle_large_msg())
768	}
769
770	#[cfg(feature = "bridging")]
771	fn is_inbound_channel_suspended(sender: ParaId) -> bool {
772		<InboundXcmpSuspended<T>>::get().iter().any(|c| c == &sender)
773	}
774
775	#[cfg(feature = "bridging")]
776	/// Returns tuple of `OutboundState` and number of queued pages.
777	fn outbound_channel_state(target: ParaId) -> Option<(OutboundState, u16)> {
778		<OutboundXcmpStatus<T>>::get().iter().find(|c| c.recipient == target).map(|c| {
779			let queued_pages = c.last_index.saturating_sub(c.first_index);
780			(c.state, queued_pages)
781		})
782	}
783}
784
785impl<T: Config> OnQueueChanged<ParaId> for Pallet<T> {
786	// Suspends/Resumes the queue when certain thresholds are reached.
787	fn on_queue_changed(para: ParaId, fp: QueueFootprint) {
788		let QueueConfigData { resume_threshold, suspend_threshold, .. } = <QueueConfig<T>>::get();
789
790		let mut suspended_channels = <InboundXcmpSuspended<T>>::get();
791		let suspended = suspended_channels.contains(&para);
792
793		if suspended && fp.ready_pages <= resume_threshold {
794			if let Err(err) = Self::send_signal(para, ChannelSignal::Resume) {
795				tracing::error!(
796					target: LOG_TARGET,
797					error=?err,
798					sibling=?para,
799					"defensive: Could not send resumption signal to inbound channel of sibling; channel remains suspended."
800				);
801			} else {
802				suspended_channels.remove(&para);
803				<InboundXcmpSuspended<T>>::put(suspended_channels);
804			}
805		} else if !suspended && fp.ready_pages >= suspend_threshold {
806			tracing::warn!(target: LOG_TARGET, sibling=?para, "XCMP queue for sibling is full; suspending channel.");
807
808			if let Err(err) = Self::send_signal(para, ChannelSignal::Suspend) {
809				// It will retry if `drop_threshold` is not reached, but it could be too late.
810				tracing::error!(
811					target: LOG_TARGET, error=?err,
812					"defensive: Could not send suspension signal; future messages may be dropped."
813				);
814			} else if let Err(err) = suspended_channels.try_insert(para) {
815				tracing::error!(
816					target: LOG_TARGET,
817					error=?err,
818					sibling=?para,
819					"Too many channels suspended; cannot suspend sibling; further messages may be dropped."
820				);
821			} else {
822				<InboundXcmpSuspended<T>>::put(suspended_channels);
823			}
824		}
825	}
826}
827
828impl<T: Config> QueuePausedQuery<ParaId> for Pallet<T> {
829	fn is_paused(para: &ParaId) -> bool {
830		if !QueueSuspended::<T>::get() {
831			return false
832		}
833
834		// Make an exception for the superuser queue:
835		let sender_origin = T::ControllerOriginConverter::convert_origin(
836			(Parent, Parachain((*para).into())),
837			OriginKind::Superuser,
838		);
839		let is_controller =
840			sender_origin.map_or(false, |origin| T::ControllerOrigin::try_origin(origin).is_ok());
841
842		!is_controller
843	}
844}
845
846impl<T: Config> XcmpMessageHandler for Pallet<T> {
847	fn handle_xcmp_messages<'a, I: Iterator<Item = (ParaId, RelayBlockNumber, &'a [u8])>>(
848		iter: I,
849		max_weight: Weight,
850	) -> Weight {
851		let mut meter = WeightMeter::with_limit(max_weight);
852
853		let mut known_xcm_senders = BTreeSet::new();
854		for (sender, _sent_at, mut data) in iter {
855			let format = match XcmpMessageFormat::decode(&mut data) {
856				Ok(f) => f,
857				Err(_) => {
858					defensive!("Unknown XCMP message format - dropping");
859					continue
860				},
861			};
862
863			match format {
864				XcmpMessageFormat::Signals =>
865					while !data.is_empty() {
866						if meter
867							.try_consume(
868								T::WeightInfo::suspend_channel()
869									.max(T::WeightInfo::resume_channel()),
870							)
871							.is_err()
872						{
873							defensive!("Not enough weight to process signals - dropping");
874							break
875						}
876
877						match ChannelSignal::decode(&mut data) {
878							Ok(ChannelSignal::Suspend) => Self::suspend_channel(sender),
879							Ok(ChannelSignal::Resume) => Self::resume_channel(sender),
880							Err(_) => {
881								defensive!("Undecodable channel signal - dropping");
882								break
883							},
884						}
885					},
886				XcmpMessageFormat::ConcatenatedVersionedXcm => {
887					if known_xcm_senders.insert(sender) {
888						if meter
889							.try_consume(T::WeightInfo::uncached_enqueue_xcmp_messages())
890							.is_err()
891						{
892							defensive!(
893								"Out of weight: cannot enqueue XCMP messages; dropping page; \
894                                    Used weight: ",
895								meter.consumed_ratio()
896							);
897							continue;
898						}
899					}
900
901					let mut can_process_next_batch = true;
902					while can_process_next_batch {
903						let batch = match Self::take_first_concatenated_xcms(
904							&mut data,
905							XCM_BATCH_SIZE,
906							&mut meter,
907						) {
908							Ok(batch) => batch,
909							Err(batch) => {
910								can_process_next_batch = false;
911								defensive!(
912									"HRMP inbound decode stream broke; page will be dropped."
913								);
914								batch
915							},
916						};
917						if batch.is_empty() {
918							break;
919						}
920
921						if let Err(()) = Self::enqueue_xcmp_messages(sender, &batch, &mut meter) {
922							break
923						}
924					}
925				},
926				XcmpMessageFormat::ConcatenatedEncodedBlob => {
927					defensive!("Blob messages are unhandled - dropping");
928					continue
929				},
930			}
931		}
932
933		meter.consumed()
934	}
935}
936
937impl<T: Config> XcmpMessageSource for Pallet<T> {
938	fn take_outbound_messages(maximum_channels: usize) -> Vec<(ParaId, Vec<u8>)> {
939		let mut statuses = <OutboundXcmpStatus<T>>::get();
940		let old_statuses_len = statuses.len();
941		let max_message_count = statuses.len().min(maximum_channels);
942		let mut result = Vec::with_capacity(max_message_count);
943
944		for status in statuses.iter_mut() {
945			let OutboundChannelDetails {
946				recipient: para_id,
947				state: outbound_state,
948				mut signals_exist,
949				mut first_index,
950				mut last_index,
951			} = *status;
952
953			let (max_size_now, max_size_ever) = match T::ChannelInfo::get_channel_status(para_id) {
954				ChannelStatus::Closed => {
955					// This means that there is no such channel anymore. Nothing to be done but
956					// swallow the messages and discard the status.
957					for i in first_index..last_index {
958						<OutboundXcmpMessages<T>>::remove(para_id, i);
959					}
960					if signals_exist {
961						<SignalMessages<T>>::remove(para_id);
962					}
963					*status = OutboundChannelDetails::new(para_id);
964					continue
965				},
966				ChannelStatus::Full => continue,
967				ChannelStatus::Ready(n, e) => (n, e),
968			};
969
970			// This is a hard limit from the host config; not even signals can bypass it.
971			if result.len() == max_message_count {
972				// We check this condition in the beginning of the loop so that we don't include
973				// a message where the limit is 0.
974				break
975			}
976
977			let page = if signals_exist {
978				let page = <SignalMessages<T>>::get(para_id);
979				defensive_assert!(!page.is_empty(), "Signals must exist");
980
981				if page.len() < max_size_now {
982					<SignalMessages<T>>::remove(para_id);
983					signals_exist = false;
984					page
985				} else {
986					defensive!("Signals should fit into a single page");
987					continue
988				}
989			} else if outbound_state == OutboundState::Suspended {
990				// Signals are exempt from suspension.
991				continue
992			} else if last_index > first_index {
993				let page = <OutboundXcmpMessages<T>>::get(para_id, first_index);
994				if page.len() < max_size_now {
995					<OutboundXcmpMessages<T>>::remove(para_id, first_index);
996					first_index += 1;
997					page
998				} else {
999					continue
1000				}
1001			} else {
1002				continue
1003			};
1004			if first_index == last_index {
1005				first_index = 0;
1006				last_index = 0;
1007			}
1008
1009			if page.len() > max_size_ever {
1010				// TODO: #274 This means that the channel's max message size has changed since
1011				//   the message was sent. We should parse it and split into smaller messages but
1012				//   since it's so unlikely then for now we just drop it.
1013				defensive!("WARNING: oversize message in queue - dropping");
1014			} else {
1015				result.push((para_id, page.into_inner()));
1016			}
1017
1018			let max_total_size = match T::ChannelInfo::get_channel_info(para_id) {
1019				Some(channel_info) => channel_info.max_total_size,
1020				None => {
1021					tracing::warn!(target: LOG_TARGET, "calling `get_channel_info` with no RelevantMessagingState?!");
1022					MAX_POSSIBLE_ALLOCATION // We use this as a fallback in case the messaging state is not present
1023				},
1024			};
1025			let threshold = max_total_size.saturating_div(delivery_fee_constants::THRESHOLD_FACTOR);
1026			let remaining_total_size: usize = (first_index..last_index)
1027				.map(|index| OutboundXcmpMessages::<T>::decode_len(para_id, index).unwrap())
1028				.sum();
1029			if remaining_total_size <= threshold as usize {
1030				Self::decrease_fee_factor(para_id);
1031			}
1032
1033			*status = OutboundChannelDetails {
1034				recipient: para_id,
1035				state: outbound_state,
1036				signals_exist,
1037				first_index,
1038				last_index,
1039			};
1040		}
1041		debug_assert!(!statuses.iter().any(|s| s.signals_exist), "Signals should be handled");
1042
1043		// Sort the outbound messages by ascending recipient para id to satisfy the acceptance
1044		// criteria requirement.
1045		result.sort_by_key(|m| m.0);
1046
1047		// Prune hrmp channels that became empty. Additionally, because it may so happen that we
1048		// only gave attention to some channels in `non_empty_hrmp_channels` it's important to
1049		// change the order. Otherwise, the next `on_finalize` we will again give attention
1050		// only to those channels that happen to be in the beginning, until they are emptied.
1051		// This leads to "starvation" of the channels near to the end.
1052		//
1053		// To mitigate this we shift all processed elements towards the end of the vector using
1054		// `rotate_left`. To get intuition how it works see the examples in its rustdoc.
1055		statuses.retain(|x| {
1056			x.state == OutboundState::Suspended || x.signals_exist || x.first_index < x.last_index
1057		});
1058
1059		// old_status_len must be >= status.len() since we never add anything to status.
1060		let pruned = old_statuses_len - statuses.len();
1061		// removing an item from status implies a message being sent, so the result messages must
1062		// be no less than the pruned channels.
1063		let _ = statuses.try_rotate_left(result.len().saturating_sub(pruned)).defensive_proof(
1064			"Could not store HRMP channels config. Some HRMP channels may be broken.",
1065		);
1066
1067		<OutboundXcmpStatus<T>>::put(statuses);
1068
1069		result
1070	}
1071}
1072
1073/// Xcm sender for sending to a sibling parachain.
1074impl<T: Config> SendXcm for Pallet<T> {
1075	type Ticket = (ParaId, VersionedXcm<()>);
1076
1077	fn validate(
1078		dest: &mut Option<Location>,
1079		msg: &mut Option<Xcm<()>>,
1080	) -> SendResult<(ParaId, VersionedXcm<()>)> {
1081		let d = dest.take().ok_or(SendError::MissingArgument)?;
1082
1083		match d.unpack() {
1084			// An HRMP message for a sibling parachain.
1085			(1, [Parachain(id)]) => {
1086				let xcm = msg.take().ok_or(SendError::MissingArgument)?;
1087				let id = ParaId::from(*id);
1088				let price = T::PriceForSiblingDelivery::price_for_delivery(id, &xcm);
1089				let versioned_xcm = T::VersionWrapper::wrap_version(&d, xcm)
1090					.map_err(|()| SendError::DestinationUnsupported)?;
1091				versioned_xcm
1092					.check_is_decodable()
1093					.map_err(|()| SendError::ExceedsMaxMessageSize)?;
1094
1095				Ok(((id, versioned_xcm), price))
1096			},
1097			_ => {
1098				// Anything else is unhandled. This includes a message that is not meant for us.
1099				// We need to make sure that dest/msg is not consumed here.
1100				*dest = Some(d);
1101				Err(SendError::NotApplicable)
1102			},
1103		}
1104	}
1105
1106	fn deliver((id, xcm): (ParaId, VersionedXcm<()>)) -> Result<XcmHash, SendError> {
1107		let hash = xcm.using_encoded(sp_io::hashing::blake2_256);
1108
1109		match Self::send_fragment(id, XcmpMessageFormat::ConcatenatedVersionedXcm, xcm) {
1110			Ok(_) => {
1111				Self::deposit_event(Event::XcmpMessageSent { message_hash: hash });
1112				Ok(hash)
1113			},
1114			Err(e) => {
1115				tracing::error!(target: LOG_TARGET, error=?e, "Deliver error");
1116				Err(SendError::Transport(e.into()))
1117			},
1118		}
1119	}
1120}
1121
1122impl<T: Config> InspectMessageQueues for Pallet<T> {
1123	fn clear_messages() {
1124		// Best effort.
1125		let _ = OutboundXcmpMessages::<T>::clear(u32::MAX, None);
1126		OutboundXcmpStatus::<T>::mutate(|details_vec| {
1127			for details in details_vec {
1128				details.first_index = 0;
1129				details.last_index = 0;
1130			}
1131		});
1132	}
1133
1134	fn get_messages() -> Vec<(VersionedLocation, Vec<VersionedXcm<()>>)> {
1135		use xcm::prelude::*;
1136
1137		OutboundXcmpMessages::<T>::iter()
1138			.map(|(para_id, _, messages)| {
1139				let mut data = &messages[..];
1140				let decoded_format = XcmpMessageFormat::decode(&mut data).unwrap();
1141				if decoded_format != XcmpMessageFormat::ConcatenatedVersionedXcm {
1142					panic!("Unexpected format.")
1143				}
1144				let mut decoded_messages = Vec::new();
1145				while !data.is_empty() {
1146					let decoded_message = VersionedXcm::<()>::decode_with_depth_limit(
1147						MAX_XCM_DECODE_DEPTH,
1148						&mut data,
1149					)
1150					.unwrap();
1151					decoded_messages.push(decoded_message);
1152				}
1153
1154				(
1155					VersionedLocation::from(Location::new(1, Parachain(para_id.into()))),
1156					decoded_messages,
1157				)
1158			})
1159			.collect()
1160	}
1161}
1162
1163impl<T: Config> FeeTracker for Pallet<T> {
1164	type Id = ParaId;
1165
1166	fn get_fee_factor(id: Self::Id) -> FixedU128 {
1167		<DeliveryFeeFactor<T>>::get(id)
1168	}
1169
1170	fn set_fee_factor(id: Self::Id, val: FixedU128) {
1171		<DeliveryFeeFactor<T>>::set(id, val);
1172	}
1173}