referrerpolicy=no-referrer-when-downgrade

polkadot_runtime_parachains/
hrmp.rs

1// Copyright (C) Parity Technologies (UK) Ltd.
2// This file is part of Polkadot.
3
4// Polkadot is free software: you can redistribute it and/or modify
5// it under the terms of the GNU General Public License as published by
6// the Free Software Foundation, either version 3 of the License, or
7// (at your option) any later version.
8
9// Polkadot is distributed in the hope that it will be useful,
10// but WITHOUT ANY WARRANTY; without even the implied warranty of
11// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
12// GNU General Public License for more details.
13
14// You should have received a copy of the GNU General Public License
15// along with Polkadot.  If not, see <http://www.gnu.org/licenses/>.
16
17use crate::{
18	configuration::{self, HostConfiguration},
19	dmp, ensure_parachain, initializer, paras,
20};
21use alloc::{
22	collections::{btree_map::BTreeMap, btree_set::BTreeSet},
23	vec,
24	vec::Vec,
25};
26use codec::{Decode, Encode};
27use core::{fmt, mem};
28use frame_support::{pallet_prelude::*, traits::ReservableCurrency, DefaultNoBound};
29use frame_system::pallet_prelude::*;
30use polkadot_parachain_primitives::primitives::{HorizontalMessages, IsSystem};
31use polkadot_primitives::{
32	Balance, Hash, HrmpChannelId, Id as ParaId, InboundHrmpMessage, OutboundHrmpMessage,
33	SessionIndex,
34};
35use scale_info::TypeInfo;
36use sp_runtime::{
37	traits::{AccountIdConversion, BlakeTwo256, Hash as HashT, UniqueSaturatedInto, Zero},
38	ArithmeticError,
39};
40
41pub use pallet::*;
42
43/// Maximum bound that can be set for inbound channels.
44///
45/// If inaccurate, the weighing of this pallet might become inaccurate. It is expected form the
46/// `configurations` pallet to check these values before setting
47pub const HRMP_MAX_INBOUND_CHANNELS_BOUND: u32 = 128;
48/// Same as [`HRMP_MAX_INBOUND_CHANNELS_BOUND`], but for outbound channels.
49pub const HRMP_MAX_OUTBOUND_CHANNELS_BOUND: u32 = 128;
50
51#[cfg(test)]
52pub(crate) mod tests;
53
54#[cfg(feature = "runtime-benchmarks")]
55mod benchmarking;
56
57pub trait WeightInfo {
58	fn hrmp_init_open_channel() -> Weight;
59	fn hrmp_accept_open_channel() -> Weight;
60	fn hrmp_close_channel() -> Weight;
61	fn force_clean_hrmp(i: u32, e: u32) -> Weight;
62	fn force_process_hrmp_open(c: u32) -> Weight;
63	fn force_process_hrmp_close(c: u32) -> Weight;
64	fn hrmp_cancel_open_request(c: u32) -> Weight;
65	fn clean_open_channel_requests(c: u32) -> Weight;
66	fn force_open_hrmp_channel(c: u32) -> Weight;
67	fn establish_system_channel() -> Weight;
68	fn poke_channel_deposits() -> Weight;
69	fn establish_channel_with_system() -> Weight;
70}
71
72/// A weight info that is only suitable for testing.
73pub struct TestWeightInfo;
74
75impl WeightInfo for TestWeightInfo {
76	fn hrmp_accept_open_channel() -> Weight {
77		Weight::MAX
78	}
79	fn force_clean_hrmp(_: u32, _: u32) -> Weight {
80		Weight::MAX
81	}
82	fn force_process_hrmp_close(_: u32) -> Weight {
83		Weight::MAX
84	}
85	fn force_process_hrmp_open(_: u32) -> Weight {
86		Weight::MAX
87	}
88	fn hrmp_cancel_open_request(_: u32) -> Weight {
89		Weight::MAX
90	}
91	fn hrmp_close_channel() -> Weight {
92		Weight::MAX
93	}
94	fn hrmp_init_open_channel() -> Weight {
95		Weight::MAX
96	}
97	fn clean_open_channel_requests(_: u32) -> Weight {
98		Weight::MAX
99	}
100	fn force_open_hrmp_channel(_: u32) -> Weight {
101		Weight::MAX
102	}
103	fn establish_system_channel() -> Weight {
104		Weight::MAX
105	}
106	fn poke_channel_deposits() -> Weight {
107		Weight::MAX
108	}
109	fn establish_channel_with_system() -> Weight {
110		Weight::MAX
111	}
112}
113
114/// A description of a request to open an HRMP channel.
115#[derive(Encode, Decode, TypeInfo)]
116pub struct HrmpOpenChannelRequest {
117	/// Indicates if this request was confirmed by the recipient.
118	pub confirmed: bool,
119	/// NOTE: this field is deprecated. Channel open requests became non-expiring and this value
120	/// became unused.
121	pub _age: SessionIndex,
122	/// The amount that the sender supplied at the time of creation of this request.
123	pub sender_deposit: Balance,
124	/// The maximum message size that could be put into the channel.
125	pub max_message_size: u32,
126	/// The maximum number of messages that can be pending in the channel at once.
127	pub max_capacity: u32,
128	/// The maximum total size of the messages that can be pending in the channel at once.
129	pub max_total_size: u32,
130}
131
132/// A metadata of an HRMP channel.
133#[derive(Encode, Decode, TypeInfo)]
134#[cfg_attr(test, derive(Debug))]
135pub struct HrmpChannel {
136	// NOTE: This structure is used by parachains via merkle proofs. Therefore, this struct
137	// requires special treatment.
138	//
139	// A parachain requested this struct can only depend on the subset of this struct.
140	// Specifically, only a first few fields can be depended upon (See `AbridgedHrmpChannel`).
141	// These fields cannot be changed without corresponding migration of parachains.
142	/// The maximum number of messages that can be pending in the channel at once.
143	pub max_capacity: u32,
144	/// The maximum total size of the messages that can be pending in the channel at once.
145	pub max_total_size: u32,
146	/// The maximum message size that could be put into the channel.
147	pub max_message_size: u32,
148	/// The current number of messages pending in the channel.
149	/// Invariant: should be less or equal to `max_capacity`.s`.
150	pub msg_count: u32,
151	/// The total size in bytes of all message payloads in the channel.
152	/// Invariant: should be less or equal to `max_total_size`.
153	pub total_size: u32,
154	/// A head of the Message Queue Chain for this channel. Each link in this chain has a form:
155	/// `(prev_head, B, H(M))`, where
156	/// - `prev_head`: is the previous value of `mqc_head` or zero if none.
157	/// - `B`: is the [relay-chain] block number in which a message was appended
158	/// - `H(M)`: is the hash of the message being appended.
159	/// This value is initialized to a special value that consists of all zeroes which indicates
160	/// that no messages were previously added.
161	pub mqc_head: Option<Hash>,
162	/// The amount that the sender supplied as a deposit when opening this channel.
163	pub sender_deposit: Balance,
164	/// The amount that the recipient supplied as a deposit when accepting opening this channel.
165	pub recipient_deposit: Balance,
166}
167
168/// An error returned by [`Pallet::check_hrmp_watermark`] that indicates an acceptance criteria
169/// check didn't pass.
170pub(crate) enum HrmpWatermarkAcceptanceErr<BlockNumber> {
171	AdvancementRule { new_watermark: BlockNumber, last_watermark: BlockNumber },
172	AheadRelayParent { new_watermark: BlockNumber, relay_chain_parent_number: BlockNumber },
173	LandsOnBlockWithNoMessages { new_watermark: BlockNumber },
174}
175
176/// An error returned by [`Pallet::check_outbound_hrmp`] that indicates an acceptance criteria check
177/// didn't pass.
178pub(crate) enum OutboundHrmpAcceptanceErr {
179	MoreMessagesThanPermitted { sent: u32, permitted: u32 },
180	NotSorted { idx: u32 },
181	NoSuchChannel { idx: u32, channel_id: HrmpChannelId },
182	MaxMessageSizeExceeded { idx: u32, msg_size: u32, max_size: u32 },
183	TotalSizeExceeded { idx: u32, total_size: u32, limit: u32 },
184	CapacityExceeded { idx: u32, count: u32, limit: u32 },
185}
186
187impl<BlockNumber> fmt::Debug for HrmpWatermarkAcceptanceErr<BlockNumber>
188where
189	BlockNumber: fmt::Debug,
190{
191	fn fmt(&self, fmt: &mut fmt::Formatter) -> fmt::Result {
192		use HrmpWatermarkAcceptanceErr::*;
193		match self {
194			AdvancementRule { new_watermark, last_watermark } => write!(
195				fmt,
196				"the HRMP watermark is not advanced relative to the last watermark ({:?} > {:?})",
197				new_watermark, last_watermark,
198			),
199			AheadRelayParent { new_watermark, relay_chain_parent_number } => write!(
200				fmt,
201				"the HRMP watermark is ahead the relay-parent ({:?} > {:?})",
202				new_watermark, relay_chain_parent_number
203			),
204			LandsOnBlockWithNoMessages { new_watermark } => write!(
205				fmt,
206				"the HRMP watermark ({:?}) doesn't land on a block with messages received",
207				new_watermark
208			),
209		}
210	}
211}
212
213impl fmt::Debug for OutboundHrmpAcceptanceErr {
214	fn fmt(&self, fmt: &mut fmt::Formatter) -> fmt::Result {
215		use OutboundHrmpAcceptanceErr::*;
216		match self {
217			MoreMessagesThanPermitted { sent, permitted } => write!(
218				fmt,
219				"more HRMP messages than permitted by config ({} > {})",
220				sent, permitted,
221			),
222			NotSorted { idx } => {
223				write!(fmt, "the HRMP messages are not sorted (first unsorted is at index {})", idx,)
224			},
225			NoSuchChannel { idx, channel_id } => write!(
226				fmt,
227				"the HRMP message at index {} is sent to a non existent channel {:?}->{:?}",
228				idx, channel_id.sender, channel_id.recipient,
229			),
230			MaxMessageSizeExceeded { idx, msg_size, max_size } => write!(
231				fmt,
232				"the HRMP message at index {} exceeds the negotiated channel maximum message size ({} > {})",
233				idx, msg_size, max_size,
234			),
235			TotalSizeExceeded { idx, total_size, limit } => write!(
236				fmt,
237				"sending the HRMP message at index {} would exceed the negotiated channel total size  ({} > {})",
238				idx, total_size, limit,
239			),
240			CapacityExceeded { idx, count, limit } => write!(
241				fmt,
242				"sending the HRMP message at index {} would exceed the negotiated channel capacity  ({} > {})",
243				idx, count, limit,
244			),
245		}
246	}
247}
248
249#[frame_support::pallet]
250pub mod pallet {
251	use super::*;
252
253	#[pallet::pallet]
254	#[pallet::without_storage_info]
255	pub struct Pallet<T>(_);
256
257	#[pallet::config]
258	pub trait Config:
259		frame_system::Config + configuration::Config + paras::Config + dmp::Config
260	{
261		/// The outer event type.
262		#[allow(deprecated)]
263		type RuntimeEvent: From<Event<Self>> + IsType<<Self as frame_system::Config>::RuntimeEvent>;
264
265		type RuntimeOrigin: From<crate::Origin>
266			+ From<<Self as frame_system::Config>::RuntimeOrigin>
267			+ Into<Result<crate::Origin, <Self as Config>::RuntimeOrigin>>;
268
269		/// The origin that can perform "force" actions on channels.
270		type ChannelManager: EnsureOrigin<<Self as frame_system::Config>::RuntimeOrigin>;
271
272		/// An interface for reserving deposits for opening channels.
273		///
274		/// NOTE that this Currency instance will be charged with the amounts defined in the
275		/// `Configuration` pallet. Specifically, that means that the `Balance` of the `Currency`
276		/// implementation should be the same as `Balance` as used in the `Configuration`.
277		type Currency: ReservableCurrency<Self::AccountId>;
278
279		/// The default channel size and capacity to use when opening a channel to a system
280		/// parachain.
281		type DefaultChannelSizeAndCapacityWithSystem: Get<(u32, u32)>;
282
283		/// Means of converting an `Xcm` into a `VersionedXcm`. This pallet sends HRMP XCM
284		/// notifications to the channel-related parachains, while the `WrapVersion` implementation
285		/// attempts to wrap them into the most suitable XCM version for the destination parachain.
286		///
287		/// NOTE: For example, `pallet_xcm` provides an accurate implementation (recommended), or
288		/// the default `()` implementation uses the latest XCM version for all parachains.
289		type VersionWrapper: xcm::WrapVersion;
290
291		/// Something that provides the weight of this pallet.
292		type WeightInfo: WeightInfo;
293	}
294
295	#[pallet::event]
296	#[pallet::generate_deposit(pub(super) fn deposit_event)]
297	pub enum Event<T: Config> {
298		/// Open HRMP channel requested.
299		OpenChannelRequested {
300			sender: ParaId,
301			recipient: ParaId,
302			proposed_max_capacity: u32,
303			proposed_max_message_size: u32,
304		},
305		/// An HRMP channel request sent by the receiver was canceled by either party.
306		OpenChannelCanceled { by_parachain: ParaId, channel_id: HrmpChannelId },
307		/// Open HRMP channel accepted.
308		OpenChannelAccepted { sender: ParaId, recipient: ParaId },
309		/// HRMP channel closed.
310		ChannelClosed { by_parachain: ParaId, channel_id: HrmpChannelId },
311		/// An HRMP channel was opened via Root origin.
312		HrmpChannelForceOpened {
313			sender: ParaId,
314			recipient: ParaId,
315			proposed_max_capacity: u32,
316			proposed_max_message_size: u32,
317		},
318		/// An HRMP channel was opened with a system chain.
319		HrmpSystemChannelOpened {
320			sender: ParaId,
321			recipient: ParaId,
322			proposed_max_capacity: u32,
323			proposed_max_message_size: u32,
324		},
325		/// An HRMP channel's deposits were updated.
326		OpenChannelDepositsUpdated { sender: ParaId, recipient: ParaId },
327	}
328
329	#[pallet::error]
330	pub enum Error<T> {
331		/// The sender tried to open a channel to themselves.
332		OpenHrmpChannelToSelf,
333		/// The recipient is not a valid para.
334		OpenHrmpChannelInvalidRecipient,
335		/// The requested capacity is zero.
336		OpenHrmpChannelZeroCapacity,
337		/// The requested capacity exceeds the global limit.
338		OpenHrmpChannelCapacityExceedsLimit,
339		/// The requested maximum message size is 0.
340		OpenHrmpChannelZeroMessageSize,
341		/// The open request requested the message size that exceeds the global limit.
342		OpenHrmpChannelMessageSizeExceedsLimit,
343		/// The channel already exists
344		OpenHrmpChannelAlreadyExists,
345		/// There is already a request to open the same channel.
346		OpenHrmpChannelAlreadyRequested,
347		/// The sender already has the maximum number of allowed outbound channels.
348		OpenHrmpChannelLimitExceeded,
349		/// The channel from the sender to the origin doesn't exist.
350		AcceptHrmpChannelDoesntExist,
351		/// The channel is already confirmed.
352		AcceptHrmpChannelAlreadyConfirmed,
353		/// The recipient already has the maximum number of allowed inbound channels.
354		AcceptHrmpChannelLimitExceeded,
355		/// The origin tries to close a channel where it is neither the sender nor the recipient.
356		CloseHrmpChannelUnauthorized,
357		/// The channel to be closed doesn't exist.
358		CloseHrmpChannelDoesntExist,
359		/// The channel close request is already requested.
360		CloseHrmpChannelAlreadyUnderway,
361		/// Canceling is requested by neither the sender nor recipient of the open channel request.
362		CancelHrmpOpenChannelUnauthorized,
363		/// The open request doesn't exist.
364		OpenHrmpChannelDoesntExist,
365		/// Cannot cancel an HRMP open channel request because it is already confirmed.
366		OpenHrmpChannelAlreadyConfirmed,
367		/// The provided witness data is wrong.
368		WrongWitness,
369		/// The channel between these two chains cannot be authorized.
370		ChannelCreationNotAuthorized,
371	}
372
373	/// The set of pending HRMP open channel requests.
374	///
375	/// The set is accompanied by a list for iteration.
376	///
377	/// Invariant:
378	/// - There are no channels that exists in list but not in the set and vice versa.
379	#[pallet::storage]
380	pub type HrmpOpenChannelRequests<T: Config> =
381		StorageMap<_, Twox64Concat, HrmpChannelId, HrmpOpenChannelRequest>;
382
383	// NOTE: could become bounded, but we don't have a global maximum for this.
384	// `HRMP_MAX_INBOUND_CHANNELS_BOUND` are per parachain, while this storage tracks the
385	// global state.
386	#[pallet::storage]
387	pub type HrmpOpenChannelRequestsList<T: Config> =
388		StorageValue<_, Vec<HrmpChannelId>, ValueQuery>;
389
390	/// This mapping tracks how many open channel requests are initiated by a given sender para.
391	/// Invariant: `HrmpOpenChannelRequests` should contain the same number of items that has
392	/// `(X, _)` as the number of `HrmpOpenChannelRequestCount` for `X`.
393	#[pallet::storage]
394	pub type HrmpOpenChannelRequestCount<T: Config> =
395		StorageMap<_, Twox64Concat, ParaId, u32, ValueQuery>;
396
397	/// This mapping tracks how many open channel requests were accepted by a given recipient para.
398	/// Invariant: `HrmpOpenChannelRequests` should contain the same number of items `(_, X)` with
399	/// `confirmed` set to true, as the number of `HrmpAcceptedChannelRequestCount` for `X`.
400	#[pallet::storage]
401	pub type HrmpAcceptedChannelRequestCount<T: Config> =
402		StorageMap<_, Twox64Concat, ParaId, u32, ValueQuery>;
403
404	/// A set of pending HRMP close channel requests that are going to be closed during the session
405	/// change. Used for checking if a given channel is registered for closure.
406	///
407	/// The set is accompanied by a list for iteration.
408	///
409	/// Invariant:
410	/// - There are no channels that exists in list but not in the set and vice versa.
411	#[pallet::storage]
412	pub type HrmpCloseChannelRequests<T: Config> = StorageMap<_, Twox64Concat, HrmpChannelId, ()>;
413
414	#[pallet::storage]
415	pub type HrmpCloseChannelRequestsList<T: Config> =
416		StorageValue<_, Vec<HrmpChannelId>, ValueQuery>;
417
418	/// The HRMP watermark associated with each para.
419	/// Invariant:
420	/// - each para `P` used here as a key should satisfy `Paras::is_valid_para(P)` within a
421	///   session.
422	#[pallet::storage]
423	pub type HrmpWatermarks<T: Config> = StorageMap<_, Twox64Concat, ParaId, BlockNumberFor<T>>;
424
425	/// HRMP channel data associated with each para.
426	/// Invariant:
427	/// - each participant in the channel should satisfy `Paras::is_valid_para(P)` within a session.
428	#[pallet::storage]
429	pub type HrmpChannels<T: Config> = StorageMap<_, Twox64Concat, HrmpChannelId, HrmpChannel>;
430
431	/// Ingress/egress indexes allow to find all the senders and receivers given the opposite side.
432	/// I.e.
433	///
434	/// (a) ingress index allows to find all the senders for a given recipient.
435	/// (b) egress index allows to find all the recipients for a given sender.
436	///
437	/// Invariants:
438	/// - for each ingress index entry for `P` each item `I` in the index should present in
439	///   `HrmpChannels` as `(I, P)`.
440	/// - for each egress index entry for `P` each item `E` in the index should present in
441	///   `HrmpChannels` as `(P, E)`.
442	/// - there should be no other dangling channels in `HrmpChannels`.
443	/// - the vectors are sorted.
444	#[pallet::storage]
445	pub type HrmpIngressChannelsIndex<T: Config> =
446		StorageMap<_, Twox64Concat, ParaId, Vec<ParaId>, ValueQuery>;
447
448	// NOTE that this field is used by parachains via merkle storage proofs, therefore changing
449	// the format will require migration of parachains.
450	#[pallet::storage]
451	pub type HrmpEgressChannelsIndex<T: Config> =
452		StorageMap<_, Twox64Concat, ParaId, Vec<ParaId>, ValueQuery>;
453
454	/// Storage for the messages for each channel.
455	/// Invariant: cannot be non-empty if the corresponding channel in `HrmpChannels` is `None`.
456	#[pallet::storage]
457	pub type HrmpChannelContents<T: Config> = StorageMap<
458		_,
459		Twox64Concat,
460		HrmpChannelId,
461		Vec<InboundHrmpMessage<BlockNumberFor<T>>>,
462		ValueQuery,
463	>;
464
465	/// Maintains a mapping that can be used to answer the question: What paras sent a message at
466	/// the given block number for a given receiver. Invariants:
467	/// - The inner `Vec<ParaId>` is never empty.
468	/// - The inner `Vec<ParaId>` cannot store two same `ParaId`.
469	/// - The outer vector is sorted ascending by block number and cannot store two items with the
470	///   same block number.
471	#[pallet::storage]
472	pub type HrmpChannelDigests<T: Config> =
473		StorageMap<_, Twox64Concat, ParaId, Vec<(BlockNumberFor<T>, Vec<ParaId>)>, ValueQuery>;
474
475	/// Preopen the given HRMP channels.
476	///
477	/// The values in the tuple corresponds to
478	/// `(sender, recipient, max_capacity, max_message_size)`, i.e. similar to `init_open_channel`.
479	/// In fact, the initialization is performed as if the `init_open_channel` and
480	/// `accept_open_channel` were called with the respective parameters and the session change take
481	///  place.
482	///
483	/// As such, each channel initializer should satisfy the same constraints, namely:
484	///
485	/// 1. `max_capacity` and `max_message_size` should be within the limits set by the
486	///    configuration pallet.
487	/// 2. `sender` and `recipient` must be valid paras.
488	#[pallet::genesis_config]
489	#[derive(DefaultNoBound)]
490	pub struct GenesisConfig<T: Config> {
491		#[serde(skip)]
492		_config: core::marker::PhantomData<T>,
493		preopen_hrmp_channels: Vec<(ParaId, ParaId, u32, u32)>,
494	}
495
496	#[pallet::genesis_build]
497	impl<T: Config> BuildGenesisConfig for GenesisConfig<T> {
498		fn build(&self) {
499			initialize_storage::<T>(&self.preopen_hrmp_channels);
500		}
501	}
502
503	#[pallet::call]
504	impl<T: Config> Pallet<T> {
505		/// Initiate opening a channel from a parachain to a given recipient with given channel
506		/// parameters.
507		///
508		/// - `proposed_max_capacity` - specifies how many messages can be in the channel at once.
509		/// - `proposed_max_message_size` - specifies the maximum size of the messages.
510		///
511		/// These numbers are a subject to the relay-chain configuration limits.
512		///
513		/// The channel can be opened only after the recipient confirms it and only on a session
514		/// change.
515		#[pallet::call_index(0)]
516		#[pallet::weight(<T as Config>::WeightInfo::hrmp_init_open_channel())]
517		pub fn hrmp_init_open_channel(
518			origin: OriginFor<T>,
519			recipient: ParaId,
520			proposed_max_capacity: u32,
521			proposed_max_message_size: u32,
522		) -> DispatchResult {
523			let origin = ensure_parachain(<T as Config>::RuntimeOrigin::from(origin))?;
524			Self::init_open_channel(
525				origin,
526				recipient,
527				proposed_max_capacity,
528				proposed_max_message_size,
529			)?;
530			Self::deposit_event(Event::OpenChannelRequested {
531				sender: origin,
532				recipient,
533				proposed_max_capacity,
534				proposed_max_message_size,
535			});
536			Ok(())
537		}
538
539		/// Accept a pending open channel request from the given sender.
540		///
541		/// The channel will be opened only on the next session boundary.
542		#[pallet::call_index(1)]
543		#[pallet::weight(<T as Config>::WeightInfo::hrmp_accept_open_channel())]
544		pub fn hrmp_accept_open_channel(origin: OriginFor<T>, sender: ParaId) -> DispatchResult {
545			let origin = ensure_parachain(<T as Config>::RuntimeOrigin::from(origin))?;
546			Self::accept_open_channel(origin, sender)?;
547			Self::deposit_event(Event::OpenChannelAccepted { sender, recipient: origin });
548			Ok(())
549		}
550
551		/// Initiate unilateral closing of a channel. The origin must be either the sender or the
552		/// recipient in the channel being closed.
553		///
554		/// The closure can only happen on a session change.
555		#[pallet::call_index(2)]
556		#[pallet::weight(<T as Config>::WeightInfo::hrmp_close_channel())]
557		pub fn hrmp_close_channel(
558			origin: OriginFor<T>,
559			channel_id: HrmpChannelId,
560		) -> DispatchResult {
561			let origin = ensure_parachain(<T as Config>::RuntimeOrigin::from(origin))?;
562			Self::close_channel(origin, channel_id.clone())?;
563			Self::deposit_event(Event::ChannelClosed { by_parachain: origin, channel_id });
564			Ok(())
565		}
566
567		/// This extrinsic triggers the cleanup of all the HRMP storage items that a para may have.
568		/// Normally this happens once per session, but this allows you to trigger the cleanup
569		/// immediately for a specific parachain.
570		///
571		/// Number of inbound and outbound channels for `para` must be provided as witness data.
572		///
573		/// Origin must be the `ChannelManager`.
574		#[pallet::call_index(3)]
575		#[pallet::weight(<T as Config>::WeightInfo::force_clean_hrmp(*num_inbound, *num_outbound))]
576		pub fn force_clean_hrmp(
577			origin: OriginFor<T>,
578			para: ParaId,
579			num_inbound: u32,
580			num_outbound: u32,
581		) -> DispatchResult {
582			T::ChannelManager::ensure_origin(origin)?;
583
584			ensure!(
585				HrmpIngressChannelsIndex::<T>::decode_len(para).unwrap_or_default() <=
586					num_inbound as usize,
587				Error::<T>::WrongWitness
588			);
589			ensure!(
590				HrmpEgressChannelsIndex::<T>::decode_len(para).unwrap_or_default() <=
591					num_outbound as usize,
592				Error::<T>::WrongWitness
593			);
594
595			Self::clean_hrmp_after_outgoing(&para);
596			Ok(())
597		}
598
599		/// Force process HRMP open channel requests.
600		///
601		/// If there are pending HRMP open channel requests, you can use this function to process
602		/// all of those requests immediately.
603		///
604		/// Total number of opening channels must be provided as witness data.
605		///
606		/// Origin must be the `ChannelManager`.
607		#[pallet::call_index(4)]
608		#[pallet::weight(<T as Config>::WeightInfo::force_process_hrmp_open(*channels))]
609		pub fn force_process_hrmp_open(origin: OriginFor<T>, channels: u32) -> DispatchResult {
610			T::ChannelManager::ensure_origin(origin)?;
611
612			ensure!(
613				HrmpOpenChannelRequestsList::<T>::decode_len().unwrap_or_default() as u32 <=
614					channels,
615				Error::<T>::WrongWitness
616			);
617
618			let host_config = configuration::ActiveConfig::<T>::get();
619			Self::process_hrmp_open_channel_requests(&host_config);
620			Ok(())
621		}
622
623		/// Force process HRMP close channel requests.
624		///
625		/// If there are pending HRMP close channel requests, you can use this function to process
626		/// all of those requests immediately.
627		///
628		/// Total number of closing channels must be provided as witness data.
629		///
630		/// Origin must be the `ChannelManager`.
631		#[pallet::call_index(5)]
632		#[pallet::weight(<T as Config>::WeightInfo::force_process_hrmp_close(*channels))]
633		pub fn force_process_hrmp_close(origin: OriginFor<T>, channels: u32) -> DispatchResult {
634			T::ChannelManager::ensure_origin(origin)?;
635
636			ensure!(
637				HrmpCloseChannelRequestsList::<T>::decode_len().unwrap_or_default() as u32 <=
638					channels,
639				Error::<T>::WrongWitness
640			);
641
642			Self::process_hrmp_close_channel_requests();
643			Ok(())
644		}
645
646		/// This cancels a pending open channel request. It can be canceled by either of the sender
647		/// or the recipient for that request. The origin must be either of those.
648		///
649		/// The cancellation happens immediately. It is not possible to cancel the request if it is
650		/// already accepted.
651		///
652		/// Total number of open requests (i.e. `HrmpOpenChannelRequestsList`) must be provided as
653		/// witness data.
654		#[pallet::call_index(6)]
655		#[pallet::weight(<T as Config>::WeightInfo::hrmp_cancel_open_request(*open_requests))]
656		pub fn hrmp_cancel_open_request(
657			origin: OriginFor<T>,
658			channel_id: HrmpChannelId,
659			open_requests: u32,
660		) -> DispatchResult {
661			let origin = ensure_parachain(<T as Config>::RuntimeOrigin::from(origin))?;
662			ensure!(
663				HrmpOpenChannelRequestsList::<T>::decode_len().unwrap_or_default() as u32 <=
664					open_requests,
665				Error::<T>::WrongWitness
666			);
667			Self::cancel_open_request(origin, channel_id.clone())?;
668			Self::deposit_event(Event::OpenChannelCanceled { by_parachain: origin, channel_id });
669			Ok(())
670		}
671
672		/// Open a channel from a `sender` to a `recipient` `ParaId`. Although opened by governance,
673		/// the `max_capacity` and `max_message_size` are still subject to the Relay Chain's
674		/// configured limits.
675		///
676		/// Expected use is when one (and only one) of the `ParaId`s involved in the channel is
677		/// governed by the system, e.g. a system parachain.
678		///
679		/// Origin must be the `ChannelManager`.
680		#[pallet::call_index(7)]
681		#[pallet::weight(<T as Config>::WeightInfo::force_open_hrmp_channel(1))]
682		pub fn force_open_hrmp_channel(
683			origin: OriginFor<T>,
684			sender: ParaId,
685			recipient: ParaId,
686			max_capacity: u32,
687			max_message_size: u32,
688		) -> DispatchResultWithPostInfo {
689			T::ChannelManager::ensure_origin(origin)?;
690
691			// Guard against a common footgun where someone makes a channel request to a system
692			// parachain and then makes a proposal to open the channel via governance, which fails
693			// because `init_open_channel` fails if there is an existing request. This check will
694			// clear an existing request such that `init_open_channel` should otherwise succeed.
695			let channel_id = HrmpChannelId { sender, recipient };
696			let cancel_request: u32 =
697				if let Some(_open_channel) = HrmpOpenChannelRequests::<T>::get(&channel_id) {
698					Self::cancel_open_request(sender, channel_id)?;
699					1
700				} else {
701					0
702				};
703
704			// Now we proceed with normal init/accept, except that we set `no_deposit` to true such
705			// that it will not require deposits from either member.
706			Self::init_open_channel(sender, recipient, max_capacity, max_message_size)?;
707			Self::accept_open_channel(recipient, sender)?;
708			Self::deposit_event(Event::HrmpChannelForceOpened {
709				sender,
710				recipient,
711				proposed_max_capacity: max_capacity,
712				proposed_max_message_size: max_message_size,
713			});
714
715			Ok(Some(<T as Config>::WeightInfo::force_open_hrmp_channel(cancel_request)).into())
716		}
717
718		/// Establish an HRMP channel between two system chains. If the channel does not already
719		/// exist, the transaction fees will be refunded to the caller. The system does not take
720		/// deposits for channels between system chains, and automatically sets the message number
721		/// and size limits to the maximum allowed by the network's configuration.
722		///
723		/// Arguments:
724		///
725		/// - `sender`: A system chain, `ParaId`.
726		/// - `recipient`: A system chain, `ParaId`.
727		///
728		/// Any signed origin can call this function, but _both_ inputs MUST be system chains. If
729		/// the channel does not exist yet, there is no fee.
730		#[pallet::call_index(8)]
731		#[pallet::weight(<T as Config>::WeightInfo::establish_system_channel())]
732		pub fn establish_system_channel(
733			origin: OriginFor<T>,
734			sender: ParaId,
735			recipient: ParaId,
736		) -> DispatchResultWithPostInfo {
737			let _caller = ensure_signed(origin)?;
738
739			// both chains must be system
740			ensure!(
741				sender.is_system() && recipient.is_system(),
742				Error::<T>::ChannelCreationNotAuthorized
743			);
744
745			let config = configuration::ActiveConfig::<T>::get();
746			let max_message_size = config.hrmp_channel_max_message_size;
747			let max_capacity = config.hrmp_channel_max_capacity;
748
749			Self::init_open_channel(sender, recipient, max_capacity, max_message_size)?;
750			Self::accept_open_channel(recipient, sender)?;
751
752			Self::deposit_event(Event::HrmpSystemChannelOpened {
753				sender,
754				recipient,
755				proposed_max_capacity: max_capacity,
756				proposed_max_message_size: max_message_size,
757			});
758
759			Ok(Pays::No.into())
760		}
761
762		/// Update the deposits held for an HRMP channel to the latest `Configuration`. Channels
763		/// with system chains do not require a deposit.
764		///
765		/// Arguments:
766		///
767		/// - `sender`: A chain, `ParaId`.
768		/// - `recipient`: A chain, `ParaId`.
769		///
770		/// Any signed origin can call this function.
771		#[pallet::call_index(9)]
772		#[pallet::weight(<T as Config>::WeightInfo::poke_channel_deposits())]
773		pub fn poke_channel_deposits(
774			origin: OriginFor<T>,
775			sender: ParaId,
776			recipient: ParaId,
777		) -> DispatchResult {
778			let _caller = ensure_signed(origin)?;
779			let channel_id = HrmpChannelId { sender, recipient };
780			let is_system = sender.is_system() || recipient.is_system();
781
782			let config = configuration::ActiveConfig::<T>::get();
783
784			// Channels with and amongst the system do not require a deposit.
785			let (new_sender_deposit, new_recipient_deposit) = if is_system {
786				(0, 0)
787			} else {
788				(config.hrmp_sender_deposit, config.hrmp_recipient_deposit)
789			};
790
791			HrmpChannels::<T>::mutate(&channel_id, |channel| -> DispatchResult {
792				if let Some(ref mut channel) = channel {
793					let current_sender_deposit = channel.sender_deposit;
794					let current_recipient_deposit = channel.recipient_deposit;
795
796					// nothing to update
797					if current_sender_deposit == new_sender_deposit &&
798						current_recipient_deposit == new_recipient_deposit
799					{
800						return Ok(());
801					}
802
803					// sender
804					if current_sender_deposit > new_sender_deposit {
805						// Can never underflow, but be paranoid.
806						let amount = current_sender_deposit
807							.checked_sub(new_sender_deposit)
808							.ok_or(ArithmeticError::Underflow)?;
809						T::Currency::unreserve(
810							&channel_id.sender.into_account_truncating(),
811							// The difference should always be convertible into `Balance`, but be
812							// paranoid and do nothing in case.
813							amount.try_into().unwrap_or(Zero::zero()),
814						);
815					} else if current_sender_deposit < new_sender_deposit {
816						let amount = new_sender_deposit
817							.checked_sub(current_sender_deposit)
818							.ok_or(ArithmeticError::Underflow)?;
819						T::Currency::reserve(
820							&channel_id.sender.into_account_truncating(),
821							amount.try_into().unwrap_or(Zero::zero()),
822						)?;
823					}
824
825					// recipient
826					if current_recipient_deposit > new_recipient_deposit {
827						let amount = current_recipient_deposit
828							.checked_sub(new_recipient_deposit)
829							.ok_or(ArithmeticError::Underflow)?;
830						T::Currency::unreserve(
831							&channel_id.recipient.into_account_truncating(),
832							amount.try_into().unwrap_or(Zero::zero()),
833						);
834					} else if current_recipient_deposit < new_recipient_deposit {
835						let amount = new_recipient_deposit
836							.checked_sub(current_recipient_deposit)
837							.ok_or(ArithmeticError::Underflow)?;
838						T::Currency::reserve(
839							&channel_id.recipient.into_account_truncating(),
840							amount.try_into().unwrap_or(Zero::zero()),
841						)?;
842					}
843
844					// update storage
845					channel.sender_deposit = new_sender_deposit;
846					channel.recipient_deposit = new_recipient_deposit;
847				} else {
848					return Err(Error::<T>::OpenHrmpChannelDoesntExist.into());
849				}
850				Ok(())
851			})?;
852
853			Self::deposit_event(Event::OpenChannelDepositsUpdated { sender, recipient });
854
855			Ok(())
856		}
857
858		/// Establish a bidirectional HRMP channel between a parachain and a system chain.
859		///
860		/// Arguments:
861		///
862		/// - `target_system_chain`: A system chain, `ParaId`.
863		///
864		/// The origin needs to be the parachain origin.
865		#[pallet::call_index(10)]
866		#[pallet::weight(<T as Config>::WeightInfo::establish_channel_with_system())]
867		pub fn establish_channel_with_system(
868			origin: OriginFor<T>,
869			target_system_chain: ParaId,
870		) -> DispatchResultWithPostInfo {
871			let sender = ensure_parachain(<T as Config>::RuntimeOrigin::from(origin))?;
872
873			ensure!(target_system_chain.is_system(), Error::<T>::ChannelCreationNotAuthorized);
874
875			let (max_message_size, max_capacity) =
876				T::DefaultChannelSizeAndCapacityWithSystem::get();
877
878			// create bidirectional channel
879			Self::init_open_channel(sender, target_system_chain, max_capacity, max_message_size)?;
880			Self::accept_open_channel(target_system_chain, sender)?;
881
882			Self::init_open_channel(target_system_chain, sender, max_capacity, max_message_size)?;
883			Self::accept_open_channel(sender, target_system_chain)?;
884
885			Self::deposit_event(Event::HrmpSystemChannelOpened {
886				sender,
887				recipient: target_system_chain,
888				proposed_max_capacity: max_capacity,
889				proposed_max_message_size: max_message_size,
890			});
891
892			Self::deposit_event(Event::HrmpSystemChannelOpened {
893				sender: target_system_chain,
894				recipient: sender,
895				proposed_max_capacity: max_capacity,
896				proposed_max_message_size: max_message_size,
897			});
898
899			Ok(Pays::No.into())
900		}
901	}
902}
903
904fn initialize_storage<T: Config>(preopen_hrmp_channels: &[(ParaId, ParaId, u32, u32)]) {
905	let host_config = configuration::ActiveConfig::<T>::get();
906	for &(sender, recipient, max_capacity, max_message_size) in preopen_hrmp_channels {
907		if let Err(err) =
908			preopen_hrmp_channel::<T>(sender, recipient, max_capacity, max_message_size)
909		{
910			panic!("failed to initialize the genesis storage: {:?}", err);
911		}
912	}
913	Pallet::<T>::process_hrmp_open_channel_requests(&host_config);
914}
915
916fn preopen_hrmp_channel<T: Config>(
917	sender: ParaId,
918	recipient: ParaId,
919	max_capacity: u32,
920	max_message_size: u32,
921) -> DispatchResult {
922	Pallet::<T>::init_open_channel(sender, recipient, max_capacity, max_message_size)?;
923	Pallet::<T>::accept_open_channel(recipient, sender)?;
924	Ok(())
925}
926
927/// Routines and getters related to HRMP.
928impl<T: Config> Pallet<T> {
929	/// Block initialization logic, called by initializer.
930	pub(crate) fn initializer_initialize(_now: BlockNumberFor<T>) -> Weight {
931		Weight::zero()
932	}
933
934	/// Block finalization logic, called by initializer.
935	pub(crate) fn initializer_finalize() {}
936
937	/// Called by the initializer to note that a new session has started.
938	pub(crate) fn initializer_on_new_session(
939		notification: &initializer::SessionChangeNotification<BlockNumberFor<T>>,
940		outgoing_paras: &[ParaId],
941	) -> Weight {
942		let w1 = Self::perform_outgoing_para_cleanup(&notification.prev_config, outgoing_paras);
943		Self::process_hrmp_open_channel_requests(&notification.prev_config);
944		Self::process_hrmp_close_channel_requests();
945		w1.saturating_add(<T as Config>::WeightInfo::force_process_hrmp_open(
946			outgoing_paras.len() as u32
947		))
948		.saturating_add(<T as Config>::WeightInfo::force_process_hrmp_close(
949			outgoing_paras.len() as u32,
950		))
951	}
952
953	/// Iterate over all paras that were noted for offboarding and remove all the data
954	/// associated with them.
955	fn perform_outgoing_para_cleanup(
956		config: &HostConfiguration<BlockNumberFor<T>>,
957		outgoing: &[ParaId],
958	) -> Weight {
959		let mut w = Self::clean_open_channel_requests(config, outgoing);
960		for outgoing_para in outgoing {
961			Self::clean_hrmp_after_outgoing(outgoing_para);
962
963			// we need a few extra bits of data to weigh this -- all of this is read internally
964			// anyways, so no overhead.
965			let ingress_count =
966				HrmpIngressChannelsIndex::<T>::decode_len(outgoing_para).unwrap_or_default() as u32;
967			let egress_count =
968				HrmpEgressChannelsIndex::<T>::decode_len(outgoing_para).unwrap_or_default() as u32;
969			w = w.saturating_add(<T as Config>::WeightInfo::force_clean_hrmp(
970				ingress_count,
971				egress_count,
972			));
973		}
974		w
975	}
976
977	// Go over the HRMP open channel requests and remove all in which offboarding paras participate.
978	//
979	// This will also perform the refunds for the counterparty if it doesn't offboard.
980	pub(crate) fn clean_open_channel_requests(
981		config: &HostConfiguration<BlockNumberFor<T>>,
982		outgoing: &[ParaId],
983	) -> Weight {
984		// First collect all the channel ids of the open requests in which there is at least one
985		// party presents in the outgoing list.
986		//
987		// Both the open channel request list and outgoing list are expected to be small enough.
988		// In the most common case there will be only single outgoing para.
989		let open_channel_reqs = HrmpOpenChannelRequestsList::<T>::get();
990		let (go, stay): (Vec<HrmpChannelId>, Vec<HrmpChannelId>) = open_channel_reqs
991			.into_iter()
992			.partition(|req_id| outgoing.iter().any(|id| req_id.is_participant(*id)));
993		HrmpOpenChannelRequestsList::<T>::put(stay);
994
995		// Then iterate over all open requests to be removed, pull them out of the set and perform
996		// the refunds if applicable.
997		for req_id in go {
998			let req_data = match HrmpOpenChannelRequests::<T>::take(&req_id) {
999				Some(req_data) => req_data,
1000				None => {
1001					// Can't normally happen but no need to panic.
1002					continue;
1003				},
1004			};
1005
1006			// Return the deposit of the sender, but only if it is not the para being offboarded.
1007			if !outgoing.contains(&req_id.sender) {
1008				T::Currency::unreserve(
1009					&req_id.sender.into_account_truncating(),
1010					req_data.sender_deposit.unique_saturated_into(),
1011				);
1012			}
1013
1014			// If the request was confirmed, then it means it was confirmed in the finished session.
1015			// Therefore, the config's hrmp_recipient_deposit represents the actual value of the
1016			// deposit.
1017			//
1018			// We still want to refund the deposit only if the para is not being offboarded.
1019			if req_data.confirmed {
1020				if !outgoing.contains(&req_id.recipient) {
1021					T::Currency::unreserve(
1022						&req_id.recipient.into_account_truncating(),
1023						config.hrmp_recipient_deposit.unique_saturated_into(),
1024					);
1025				}
1026				Self::decrease_accepted_channel_request_count(req_id.recipient);
1027			}
1028		}
1029
1030		<T as Config>::WeightInfo::clean_open_channel_requests(outgoing.len() as u32)
1031	}
1032
1033	/// Remove all storage entries associated with the given para.
1034	fn clean_hrmp_after_outgoing(outgoing_para: &ParaId) {
1035		HrmpOpenChannelRequestCount::<T>::remove(outgoing_para);
1036		HrmpAcceptedChannelRequestCount::<T>::remove(outgoing_para);
1037
1038		let ingress = HrmpIngressChannelsIndex::<T>::take(outgoing_para)
1039			.into_iter()
1040			.map(|sender| HrmpChannelId { sender, recipient: *outgoing_para });
1041		let egress = HrmpEgressChannelsIndex::<T>::take(outgoing_para)
1042			.into_iter()
1043			.map(|recipient| HrmpChannelId { sender: *outgoing_para, recipient });
1044		let mut to_close = ingress.chain(egress).collect::<Vec<_>>();
1045		to_close.sort();
1046		to_close.dedup();
1047
1048		for channel in to_close {
1049			Self::close_hrmp_channel(&channel);
1050		}
1051	}
1052
1053	/// Iterate over all open channel requests and:
1054	///
1055	/// - prune the stale requests
1056	/// - enact the confirmed requests
1057	fn process_hrmp_open_channel_requests(config: &HostConfiguration<BlockNumberFor<T>>) {
1058		let mut open_req_channels = HrmpOpenChannelRequestsList::<T>::get();
1059		if open_req_channels.is_empty() {
1060			return;
1061		}
1062
1063		// iterate the vector starting from the end making our way to the beginning. This way we
1064		// can leverage `swap_remove` to efficiently remove an item during iteration.
1065		let mut idx = open_req_channels.len();
1066		loop {
1067			// bail if we've iterated over all items.
1068			if idx == 0 {
1069				break;
1070			}
1071
1072			idx -= 1;
1073			let channel_id = open_req_channels[idx].clone();
1074			let request = HrmpOpenChannelRequests::<T>::get(&channel_id).expect(
1075				"can't be `None` due to the invariant that the list contains the same items as the set; qed",
1076			);
1077
1078			let system_channel = channel_id.sender.is_system() || channel_id.recipient.is_system();
1079			let sender_deposit = request.sender_deposit;
1080			let recipient_deposit = if system_channel { 0 } else { config.hrmp_recipient_deposit };
1081
1082			if request.confirmed {
1083				if paras::Pallet::<T>::is_valid_para(channel_id.sender) &&
1084					paras::Pallet::<T>::is_valid_para(channel_id.recipient)
1085				{
1086					HrmpChannels::<T>::insert(
1087						&channel_id,
1088						HrmpChannel {
1089							sender_deposit,
1090							recipient_deposit,
1091							max_capacity: request.max_capacity,
1092							max_total_size: request.max_total_size,
1093							max_message_size: request.max_message_size,
1094							msg_count: 0,
1095							total_size: 0,
1096							mqc_head: None,
1097						},
1098					);
1099
1100					HrmpIngressChannelsIndex::<T>::mutate(&channel_id.recipient, |v| {
1101						if let Err(i) = v.binary_search(&channel_id.sender) {
1102							v.insert(i, channel_id.sender);
1103						}
1104					});
1105					HrmpEgressChannelsIndex::<T>::mutate(&channel_id.sender, |v| {
1106						if let Err(i) = v.binary_search(&channel_id.recipient) {
1107							v.insert(i, channel_id.recipient);
1108						}
1109					});
1110				}
1111
1112				Self::decrease_open_channel_request_count(channel_id.sender);
1113				Self::decrease_accepted_channel_request_count(channel_id.recipient);
1114
1115				let _ = open_req_channels.swap_remove(idx);
1116				HrmpOpenChannelRequests::<T>::remove(&channel_id);
1117			}
1118		}
1119
1120		HrmpOpenChannelRequestsList::<T>::put(open_req_channels);
1121	}
1122
1123	/// Iterate over all close channel requests unconditionally closing the channels.
1124	fn process_hrmp_close_channel_requests() {
1125		let close_reqs = HrmpCloseChannelRequestsList::<T>::take();
1126		for condemned_ch_id in close_reqs {
1127			HrmpCloseChannelRequests::<T>::remove(&condemned_ch_id);
1128			Self::close_hrmp_channel(&condemned_ch_id);
1129		}
1130	}
1131
1132	/// Close and remove the designated HRMP channel.
1133	///
1134	/// This includes returning the deposits.
1135	///
1136	/// This function is idempotent, meaning that after the first application it should have no
1137	/// effect (i.e. it won't return the deposits twice).
1138	fn close_hrmp_channel(channel_id: &HrmpChannelId) {
1139		if let Some(HrmpChannel { sender_deposit, recipient_deposit, .. }) =
1140			HrmpChannels::<T>::take(channel_id)
1141		{
1142			T::Currency::unreserve(
1143				&channel_id.sender.into_account_truncating(),
1144				sender_deposit.unique_saturated_into(),
1145			);
1146			T::Currency::unreserve(
1147				&channel_id.recipient.into_account_truncating(),
1148				recipient_deposit.unique_saturated_into(),
1149			);
1150		}
1151
1152		HrmpChannelContents::<T>::remove(channel_id);
1153
1154		HrmpEgressChannelsIndex::<T>::mutate(&channel_id.sender, |v| {
1155			if let Ok(i) = v.binary_search(&channel_id.recipient) {
1156				v.remove(i);
1157			}
1158		});
1159		HrmpIngressChannelsIndex::<T>::mutate(&channel_id.recipient, |v| {
1160			if let Ok(i) = v.binary_search(&channel_id.sender) {
1161				v.remove(i);
1162			}
1163		});
1164	}
1165
1166	/// Check that the candidate of the given recipient controls the HRMP watermark properly.
1167	pub(crate) fn check_hrmp_watermark(
1168		recipient: ParaId,
1169		relay_chain_parent_number: BlockNumberFor<T>,
1170		new_hrmp_watermark: BlockNumberFor<T>,
1171	) -> Result<(), HrmpWatermarkAcceptanceErr<BlockNumberFor<T>>> {
1172		// First, check where the watermark CANNOT legally land.
1173		//
1174		// (a) For ensuring that messages are eventually processed, we require each parablock's
1175		//     watermark to be greater than the last one. The exception to this is if the previous
1176		//     watermark was already equal to the current relay-parent number.
1177		//
1178		// (b) However, a parachain cannot read into "the future", therefore the watermark should
1179		//     not be greater than the relay-chain context block which the parablock refers to.
1180		if new_hrmp_watermark == relay_chain_parent_number {
1181			return Ok(());
1182		}
1183
1184		if new_hrmp_watermark > relay_chain_parent_number {
1185			return Err(HrmpWatermarkAcceptanceErr::AheadRelayParent {
1186				new_watermark: new_hrmp_watermark,
1187				relay_chain_parent_number,
1188			});
1189		}
1190
1191		if let Some(last_watermark) = HrmpWatermarks::<T>::get(&recipient) {
1192			if new_hrmp_watermark < last_watermark {
1193				return Err(HrmpWatermarkAcceptanceErr::AdvancementRule {
1194					new_watermark: new_hrmp_watermark,
1195					last_watermark,
1196				});
1197			}
1198
1199			if new_hrmp_watermark == last_watermark {
1200				return Ok(());
1201			}
1202		}
1203
1204		// Second, check where the watermark CAN land. It's one of the following:
1205		//
1206		// (a) The relay parent block number (checked above).
1207		// (b) A relay-chain block in which this para received at least one message (checked here)
1208		let digest = HrmpChannelDigests::<T>::get(&recipient);
1209		if !digest
1210			.binary_search_by_key(&new_hrmp_watermark, |(block_no, _)| *block_no)
1211			.is_ok()
1212		{
1213			return Err(HrmpWatermarkAcceptanceErr::LandsOnBlockWithNoMessages {
1214				new_watermark: new_hrmp_watermark,
1215			});
1216		}
1217		Ok(())
1218	}
1219
1220	/// Returns HRMP watermarks of previously sent messages to a given para.
1221	pub(crate) fn valid_watermarks(recipient: ParaId) -> Vec<BlockNumberFor<T>> {
1222		let mut valid_watermarks: Vec<_> = HrmpChannelDigests::<T>::get(&recipient)
1223			.into_iter()
1224			.map(|(block_no, _)| block_no)
1225			.collect();
1226
1227		// The current watermark will remain valid until updated.
1228		if let Some(last_watermark) = HrmpWatermarks::<T>::get(&recipient) {
1229			if valid_watermarks.first().map_or(false, |w| w > &last_watermark) {
1230				valid_watermarks.insert(0, last_watermark);
1231			}
1232		}
1233
1234		valid_watermarks
1235	}
1236
1237	pub(crate) fn check_outbound_hrmp(
1238		config: &HostConfiguration<BlockNumberFor<T>>,
1239		sender: ParaId,
1240		out_hrmp_msgs: &[OutboundHrmpMessage<ParaId>],
1241	) -> Result<(), OutboundHrmpAcceptanceErr> {
1242		if out_hrmp_msgs.len() as u32 > config.hrmp_max_message_num_per_candidate {
1243			return Err(OutboundHrmpAcceptanceErr::MoreMessagesThanPermitted {
1244				sent: out_hrmp_msgs.len() as u32,
1245				permitted: config.hrmp_max_message_num_per_candidate,
1246			});
1247		}
1248
1249		let mut last_recipient = None::<ParaId>;
1250
1251		for (idx, out_msg) in
1252			out_hrmp_msgs.iter().enumerate().map(|(idx, out_msg)| (idx as u32, out_msg))
1253		{
1254			match last_recipient {
1255				// the messages must be sorted in ascending order and there must be no two messages
1256				// sent to the same recipient. Thus we can check that every recipient is strictly
1257				// greater than the previous one.
1258				Some(last_recipient) if out_msg.recipient <= last_recipient => {
1259					return Err(OutboundHrmpAcceptanceErr::NotSorted { idx })
1260				},
1261				_ => last_recipient = Some(out_msg.recipient),
1262			}
1263
1264			let channel_id = HrmpChannelId { sender, recipient: out_msg.recipient };
1265
1266			let channel = match HrmpChannels::<T>::get(&channel_id) {
1267				Some(channel) => channel,
1268				None => return Err(OutboundHrmpAcceptanceErr::NoSuchChannel { channel_id, idx }),
1269			};
1270
1271			let msg_size = out_msg.data.len() as u32;
1272			if msg_size > channel.max_message_size {
1273				return Err(OutboundHrmpAcceptanceErr::MaxMessageSizeExceeded {
1274					idx,
1275					msg_size,
1276					max_size: channel.max_message_size,
1277				});
1278			}
1279
1280			let new_total_size = channel.total_size + out_msg.data.len() as u32;
1281			if new_total_size > channel.max_total_size {
1282				return Err(OutboundHrmpAcceptanceErr::TotalSizeExceeded {
1283					idx,
1284					total_size: new_total_size,
1285					limit: channel.max_total_size,
1286				});
1287			}
1288
1289			let new_msg_count = channel.msg_count + 1;
1290			if new_msg_count > channel.max_capacity {
1291				return Err(OutboundHrmpAcceptanceErr::CapacityExceeded {
1292					idx,
1293					count: new_msg_count,
1294					limit: channel.max_capacity,
1295				});
1296			}
1297		}
1298
1299		Ok(())
1300	}
1301
1302	/// Returns remaining outbound channels capacity in messages and in bytes per recipient para.
1303	pub(crate) fn outbound_remaining_capacity(sender: ParaId) -> Vec<(ParaId, (u32, u32))> {
1304		let recipients = HrmpEgressChannelsIndex::<T>::get(&sender);
1305		let mut remaining = Vec::with_capacity(recipients.len());
1306
1307		for recipient in recipients {
1308			let Some(channel) = HrmpChannels::<T>::get(&HrmpChannelId { sender, recipient }) else {
1309				continue;
1310			};
1311			remaining.push((
1312				recipient,
1313				(
1314					channel.max_capacity - channel.msg_count,
1315					channel.max_total_size - channel.total_size,
1316				),
1317			));
1318		}
1319
1320		remaining
1321	}
1322
1323	pub(crate) fn prune_hrmp(recipient: ParaId, new_hrmp_watermark: BlockNumberFor<T>) {
1324		// sift through the incoming messages digest to collect the paras that sent at least one
1325		// message to this parachain between the old and new watermarks.
1326		let senders = HrmpChannelDigests::<T>::mutate(&recipient, |digest| {
1327			let mut senders = BTreeSet::new();
1328			let mut leftover = Vec::with_capacity(digest.len());
1329			for (block_no, paras_sent_msg) in mem::replace(digest, Vec::new()) {
1330				if block_no <= new_hrmp_watermark {
1331					senders.extend(paras_sent_msg);
1332				} else {
1333					leftover.push((block_no, paras_sent_msg));
1334				}
1335			}
1336			*digest = leftover;
1337			senders
1338		});
1339
1340		// having all senders we can trivially find out the channels which we need to prune.
1341		let channels_to_prune =
1342			senders.into_iter().map(|sender| HrmpChannelId { sender, recipient });
1343		for channel_id in channels_to_prune {
1344			// prune each channel up to the new watermark keeping track how many messages we removed
1345			// and what is the total byte size of them.
1346			let (mut pruned_cnt, mut pruned_size) = (0, 0);
1347
1348			let contents = HrmpChannelContents::<T>::get(&channel_id);
1349			let mut leftover = Vec::with_capacity(contents.len());
1350			for msg in contents {
1351				if msg.sent_at <= new_hrmp_watermark {
1352					pruned_cnt += 1;
1353					pruned_size += msg.data.len();
1354				} else {
1355					leftover.push(msg);
1356				}
1357			}
1358			if !leftover.is_empty() {
1359				HrmpChannelContents::<T>::insert(&channel_id, leftover);
1360			} else {
1361				HrmpChannelContents::<T>::remove(&channel_id);
1362			}
1363
1364			// update the channel metadata.
1365			HrmpChannels::<T>::mutate(&channel_id, |channel| {
1366				if let Some(ref mut channel) = channel {
1367					channel.msg_count -= pruned_cnt as u32;
1368					channel.total_size -= pruned_size as u32;
1369				}
1370			});
1371		}
1372
1373		HrmpWatermarks::<T>::insert(&recipient, new_hrmp_watermark);
1374	}
1375
1376	/// Process the outbound HRMP messages by putting them into the appropriate recipient queues.
1377	pub(crate) fn queue_outbound_hrmp(sender: ParaId, out_hrmp_msgs: HorizontalMessages) {
1378		let now = frame_system::Pallet::<T>::block_number();
1379
1380		for out_msg in out_hrmp_msgs {
1381			let channel_id = HrmpChannelId { sender, recipient: out_msg.recipient };
1382
1383			let mut channel = match HrmpChannels::<T>::get(&channel_id) {
1384				Some(channel) => channel,
1385				None => {
1386					// apparently, that since acceptance of this candidate the recipient was
1387					// offboarded and the channel no longer exists.
1388					continue;
1389				},
1390			};
1391
1392			let inbound = InboundHrmpMessage { sent_at: now, data: out_msg.data };
1393
1394			// book keeping
1395			channel.msg_count += 1;
1396			channel.total_size += inbound.data.len() as u32;
1397
1398			// compute the new MQC head of the channel
1399			let prev_head = channel.mqc_head.unwrap_or(Default::default());
1400			let new_head = BlakeTwo256::hash_of(&(
1401				prev_head,
1402				inbound.sent_at,
1403				T::Hashing::hash_of(&inbound.data),
1404			));
1405			channel.mqc_head = Some(new_head);
1406
1407			HrmpChannels::<T>::insert(&channel_id, channel);
1408			HrmpChannelContents::<T>::append(&channel_id, inbound);
1409
1410			// The digests are sorted in ascending by block number order. There are only two
1411			// possible scenarios here ("the current" is the block of candidate's inclusion):
1412			//
1413			// (a) It's the first time anybody sends a message to this recipient within this block.
1414			//     In this case, the digest vector would be empty or the block number of the latest
1415			//     entry is smaller than the current.
1416			//
1417			// (b) Somebody has already sent a message within the current block. That means that
1418			//     the block number of the latest entry is equal to the current.
1419			//
1420			// Note that having the latest entry greater than the current block number is a logical
1421			// error.
1422			let mut recipient_digest = HrmpChannelDigests::<T>::get(&channel_id.recipient);
1423			if let Some(cur_block_digest) = recipient_digest
1424				.last_mut()
1425				.filter(|(block_no, _)| *block_no == now)
1426				.map(|(_, ref mut d)| d)
1427			{
1428				cur_block_digest.push(sender);
1429			} else {
1430				recipient_digest.push((now, vec![sender]));
1431			}
1432			HrmpChannelDigests::<T>::insert(&channel_id.recipient, recipient_digest);
1433		}
1434	}
1435
1436	/// Initiate opening a channel from a parachain to a given recipient with given channel
1437	/// parameters. If neither chain is part of the system, then a deposit from the `Configuration`
1438	/// will be required for `origin` (the sender) upon opening the request and the `recipient` upon
1439	/// accepting it.
1440	///
1441	/// Basically the same as [`hrmp_init_open_channel`](Pallet::hrmp_init_open_channel) but
1442	/// intended for calling directly from other pallets rather than dispatched.
1443	pub fn init_open_channel(
1444		origin: ParaId,
1445		recipient: ParaId,
1446		proposed_max_capacity: u32,
1447		proposed_max_message_size: u32,
1448	) -> DispatchResult {
1449		ensure!(origin != recipient, Error::<T>::OpenHrmpChannelToSelf);
1450		ensure!(
1451			paras::Pallet::<T>::is_valid_para(recipient),
1452			Error::<T>::OpenHrmpChannelInvalidRecipient,
1453		);
1454
1455		let config = configuration::ActiveConfig::<T>::get();
1456		ensure!(proposed_max_capacity > 0, Error::<T>::OpenHrmpChannelZeroCapacity);
1457		ensure!(
1458			proposed_max_capacity <= config.hrmp_channel_max_capacity,
1459			Error::<T>::OpenHrmpChannelCapacityExceedsLimit,
1460		);
1461		ensure!(proposed_max_message_size > 0, Error::<T>::OpenHrmpChannelZeroMessageSize);
1462		ensure!(
1463			proposed_max_message_size <= config.hrmp_channel_max_message_size,
1464			Error::<T>::OpenHrmpChannelMessageSizeExceedsLimit,
1465		);
1466
1467		let channel_id = HrmpChannelId { sender: origin, recipient };
1468		ensure!(
1469			HrmpOpenChannelRequests::<T>::get(&channel_id).is_none(),
1470			Error::<T>::OpenHrmpChannelAlreadyRequested,
1471		);
1472		ensure!(
1473			HrmpChannels::<T>::get(&channel_id).is_none(),
1474			Error::<T>::OpenHrmpChannelAlreadyExists,
1475		);
1476
1477		let egress_cnt = HrmpEgressChannelsIndex::<T>::decode_len(&origin).unwrap_or(0) as u32;
1478		let open_req_cnt = HrmpOpenChannelRequestCount::<T>::get(&origin);
1479		let channel_num_limit = config.hrmp_max_parachain_outbound_channels;
1480		ensure!(
1481			egress_cnt + open_req_cnt < channel_num_limit,
1482			Error::<T>::OpenHrmpChannelLimitExceeded,
1483		);
1484
1485		// Do not require deposits for channels with or amongst the system.
1486		let is_system = origin.is_system() || recipient.is_system();
1487		let deposit = if is_system { 0 } else { config.hrmp_sender_deposit };
1488		if !deposit.is_zero() {
1489			T::Currency::reserve(
1490				&origin.into_account_truncating(),
1491				deposit.unique_saturated_into(),
1492			)?;
1493		}
1494
1495		// mutating storage directly now -- shall not bail henceforth.
1496
1497		HrmpOpenChannelRequestCount::<T>::insert(&origin, open_req_cnt + 1);
1498		HrmpOpenChannelRequests::<T>::insert(
1499			&channel_id,
1500			HrmpOpenChannelRequest {
1501				confirmed: false,
1502				_age: 0,
1503				sender_deposit: deposit,
1504				max_capacity: proposed_max_capacity,
1505				max_message_size: proposed_max_message_size,
1506				max_total_size: config.hrmp_channel_max_total_size,
1507			},
1508		);
1509		HrmpOpenChannelRequestsList::<T>::append(channel_id);
1510
1511		Self::send_to_para(
1512			"init_open_channel",
1513			&config,
1514			recipient,
1515			Self::wrap_notification(|| {
1516				use xcm::opaque::latest::{prelude::*, Xcm};
1517				Xcm(vec![HrmpNewChannelOpenRequest {
1518					sender: origin.into(),
1519					max_capacity: proposed_max_capacity,
1520					max_message_size: proposed_max_message_size,
1521				}])
1522			}),
1523		);
1524
1525		Ok(())
1526	}
1527
1528	/// Accept a pending open channel request from the given sender.
1529	///
1530	/// Basically the same as [`hrmp_accept_open_channel`](Pallet::hrmp_accept_open_channel) but
1531	/// intended for calling directly from other pallets rather than dispatched.
1532	pub fn accept_open_channel(origin: ParaId, sender: ParaId) -> DispatchResult {
1533		let channel_id = HrmpChannelId { sender, recipient: origin };
1534		let mut channel_req = HrmpOpenChannelRequests::<T>::get(&channel_id)
1535			.ok_or(Error::<T>::AcceptHrmpChannelDoesntExist)?;
1536		ensure!(!channel_req.confirmed, Error::<T>::AcceptHrmpChannelAlreadyConfirmed);
1537
1538		// check if by accepting this open channel request, this parachain would exceed the
1539		// number of inbound channels.
1540		let config = configuration::ActiveConfig::<T>::get();
1541		let channel_num_limit = config.hrmp_max_parachain_inbound_channels;
1542		let ingress_cnt = HrmpIngressChannelsIndex::<T>::decode_len(&origin).unwrap_or(0) as u32;
1543		let accepted_cnt = HrmpAcceptedChannelRequestCount::<T>::get(&origin);
1544		ensure!(
1545			ingress_cnt + accepted_cnt < channel_num_limit,
1546			Error::<T>::AcceptHrmpChannelLimitExceeded,
1547		);
1548
1549		// Do not require deposits for channels with or amongst the system.
1550		let is_system = origin.is_system() || sender.is_system();
1551		let deposit = if is_system { 0 } else { config.hrmp_recipient_deposit };
1552		if !deposit.is_zero() {
1553			T::Currency::reserve(
1554				&origin.into_account_truncating(),
1555				deposit.unique_saturated_into(),
1556			)?;
1557		}
1558
1559		// persist the updated open channel request and then increment the number of accepted
1560		// channels.
1561		channel_req.confirmed = true;
1562		HrmpOpenChannelRequests::<T>::insert(&channel_id, channel_req);
1563		HrmpAcceptedChannelRequestCount::<T>::insert(&origin, accepted_cnt + 1);
1564
1565		Self::send_to_para(
1566			"accept_open_channel",
1567			&config,
1568			sender,
1569			Self::wrap_notification(|| {
1570				use xcm::opaque::latest::{prelude::*, Xcm};
1571				Xcm(vec![HrmpChannelAccepted { recipient: origin.into() }])
1572			}),
1573		);
1574
1575		Ok(())
1576	}
1577
1578	fn cancel_open_request(origin: ParaId, channel_id: HrmpChannelId) -> DispatchResult {
1579		// check if the origin is allowed to close the channel.
1580		ensure!(channel_id.is_participant(origin), Error::<T>::CancelHrmpOpenChannelUnauthorized);
1581
1582		let open_channel_req = HrmpOpenChannelRequests::<T>::get(&channel_id)
1583			.ok_or(Error::<T>::OpenHrmpChannelDoesntExist)?;
1584		ensure!(!open_channel_req.confirmed, Error::<T>::OpenHrmpChannelAlreadyConfirmed);
1585
1586		// Remove the request by the channel id and sync the accompanying list with the set.
1587		HrmpOpenChannelRequests::<T>::remove(&channel_id);
1588		HrmpOpenChannelRequestsList::<T>::mutate(|open_req_channels| {
1589			if let Some(pos) = open_req_channels.iter().position(|x| x == &channel_id) {
1590				open_req_channels.swap_remove(pos);
1591			}
1592		});
1593
1594		Self::decrease_open_channel_request_count(channel_id.sender);
1595		// Don't decrease `HrmpAcceptedChannelRequestCount` because we don't consider confirmed
1596		// requests here.
1597
1598		// Unreserve the sender's deposit. The recipient could not have left their deposit because
1599		// we ensured that the request is not confirmed.
1600		T::Currency::unreserve(
1601			&channel_id.sender.into_account_truncating(),
1602			open_channel_req.sender_deposit.unique_saturated_into(),
1603		);
1604
1605		Ok(())
1606	}
1607
1608	fn close_channel(origin: ParaId, channel_id: HrmpChannelId) -> Result<(), Error<T>> {
1609		// check if the origin is allowed to close the channel.
1610		ensure!(channel_id.is_participant(origin), Error::<T>::CloseHrmpChannelUnauthorized);
1611
1612		// check if the channel requested to close does exist.
1613		ensure!(
1614			HrmpChannels::<T>::get(&channel_id).is_some(),
1615			Error::<T>::CloseHrmpChannelDoesntExist,
1616		);
1617
1618		// check that there is no outstanding close request for this channel
1619		ensure!(
1620			HrmpCloseChannelRequests::<T>::get(&channel_id).is_none(),
1621			Error::<T>::CloseHrmpChannelAlreadyUnderway,
1622		);
1623
1624		HrmpCloseChannelRequests::<T>::insert(&channel_id, ());
1625		HrmpCloseChannelRequestsList::<T>::append(channel_id.clone());
1626
1627		let config = configuration::ActiveConfig::<T>::get();
1628		let opposite_party =
1629			if origin == channel_id.sender { channel_id.recipient } else { channel_id.sender };
1630
1631		Self::send_to_para(
1632			"close_channel",
1633			&config,
1634			opposite_party,
1635			Self::wrap_notification(|| {
1636				use xcm::opaque::latest::{prelude::*, Xcm};
1637				Xcm(vec![HrmpChannelClosing {
1638					initiator: origin.into(),
1639					sender: channel_id.sender.into(),
1640					recipient: channel_id.recipient.into(),
1641				}])
1642			}),
1643		);
1644
1645		Ok(())
1646	}
1647
1648	/// Returns the list of MQC heads for the inbound channels of the given recipient para paired
1649	/// with the sender para ids. This vector is sorted ascending by the para id and doesn't contain
1650	/// multiple entries with the same sender.
1651	#[cfg(test)]
1652	fn hrmp_mqc_heads(recipient: ParaId) -> Vec<(ParaId, Hash)> {
1653		let sender_set = HrmpIngressChannelsIndex::<T>::get(&recipient);
1654
1655		// The ingress channels vector is sorted, thus `mqc_heads` is sorted as well.
1656		let mut mqc_heads = Vec::with_capacity(sender_set.len());
1657		for sender in sender_set {
1658			let channel_metadata = HrmpChannels::<T>::get(&HrmpChannelId { sender, recipient });
1659			let mqc_head = channel_metadata
1660				.and_then(|metadata| metadata.mqc_head)
1661				.unwrap_or(Hash::default());
1662			mqc_heads.push((sender, mqc_head));
1663		}
1664
1665		mqc_heads
1666	}
1667
1668	/// Returns contents of all channels addressed to the given recipient. Channels that have no
1669	/// messages in them are also included.
1670	pub(crate) fn inbound_hrmp_channels_contents(
1671		recipient: ParaId,
1672	) -> BTreeMap<ParaId, Vec<InboundHrmpMessage<BlockNumberFor<T>>>> {
1673		let sender_set = HrmpIngressChannelsIndex::<T>::get(&recipient);
1674
1675		let mut inbound_hrmp_channels_contents = BTreeMap::new();
1676		for sender in sender_set {
1677			let channel_contents =
1678				HrmpChannelContents::<T>::get(&HrmpChannelId { sender, recipient });
1679			inbound_hrmp_channels_contents.insert(sender, channel_contents);
1680		}
1681
1682		inbound_hrmp_channels_contents
1683	}
1684}
1685
1686impl<T: Config> Pallet<T> {
1687	/// Decreases the open channel request count for the given sender. If the value reaches zero
1688	/// it is removed completely.
1689	fn decrease_open_channel_request_count(sender: ParaId) {
1690		HrmpOpenChannelRequestCount::<T>::mutate_exists(&sender, |opt_rc| {
1691			*opt_rc = opt_rc.and_then(|rc| match rc.saturating_sub(1) {
1692				0 => None,
1693				n => Some(n),
1694			});
1695		});
1696	}
1697
1698	/// Decreases the accepted channel request count for the given sender. If the value reaches
1699	/// zero it is removed completely.
1700	fn decrease_accepted_channel_request_count(recipient: ParaId) {
1701		HrmpAcceptedChannelRequestCount::<T>::mutate_exists(&recipient, |opt_rc| {
1702			*opt_rc = opt_rc.and_then(|rc| match rc.saturating_sub(1) {
1703				0 => None,
1704				n => Some(n),
1705			});
1706		});
1707	}
1708
1709	#[cfg(any(feature = "runtime-benchmarks", test))]
1710	fn assert_storage_consistency_exhaustive() {
1711		fn assert_is_sorted<T: Ord>(slice: &[T], id: &str) {
1712			assert!(slice.windows(2).all(|xs| xs[0] <= xs[1]), "{} supposed to be sorted", id);
1713		}
1714
1715		let assert_contains_only_onboarded = |paras: Vec<ParaId>, cause: &str| {
1716			for para in paras {
1717				assert!(
1718					crate::paras::Pallet::<T>::is_valid_para(para),
1719					"{}: {:?} para is offboarded",
1720					cause,
1721					para
1722				);
1723			}
1724		};
1725
1726		assert_eq!(
1727			HrmpOpenChannelRequests::<T>::iter().map(|(k, _)| k).collect::<BTreeSet<_>>(),
1728			HrmpOpenChannelRequestsList::<T>::get().into_iter().collect::<BTreeSet<_>>(),
1729		);
1730
1731		// verify that the set of keys in `HrmpOpenChannelRequestCount` corresponds to the set
1732		// of _senders_ in `HrmpOpenChannelRequests`.
1733		//
1734		// having ensured that, we can go ahead and go over all counts and verify that they match.
1735		assert_eq!(
1736			HrmpOpenChannelRequestCount::<T>::iter()
1737				.map(|(k, _)| k)
1738				.collect::<BTreeSet<_>>(),
1739			HrmpOpenChannelRequests::<T>::iter()
1740				.map(|(k, _)| k.sender)
1741				.collect::<BTreeSet<_>>(),
1742		);
1743		for (open_channel_initiator, expected_num) in HrmpOpenChannelRequestCount::<T>::iter() {
1744			let actual_num = HrmpOpenChannelRequests::<T>::iter()
1745				.filter(|(ch, _)| ch.sender == open_channel_initiator)
1746				.count() as u32;
1747			assert_eq!(expected_num, actual_num);
1748		}
1749
1750		// The same as above, but for accepted channel request count. Note that we are interested
1751		// only in confirmed open requests.
1752		assert_eq!(
1753			HrmpAcceptedChannelRequestCount::<T>::iter()
1754				.map(|(k, _)| k)
1755				.collect::<BTreeSet<_>>(),
1756			HrmpOpenChannelRequests::<T>::iter()
1757				.filter(|(_, v)| v.confirmed)
1758				.map(|(k, _)| k.recipient)
1759				.collect::<BTreeSet<_>>(),
1760		);
1761		for (channel_recipient, expected_num) in HrmpAcceptedChannelRequestCount::<T>::iter() {
1762			let actual_num = HrmpOpenChannelRequests::<T>::iter()
1763				.filter(|(ch, v)| ch.recipient == channel_recipient && v.confirmed)
1764				.count() as u32;
1765			assert_eq!(expected_num, actual_num);
1766		}
1767
1768		assert_eq!(
1769			HrmpCloseChannelRequests::<T>::iter().map(|(k, _)| k).collect::<BTreeSet<_>>(),
1770			HrmpCloseChannelRequestsList::<T>::get().into_iter().collect::<BTreeSet<_>>(),
1771		);
1772
1773		// A HRMP watermark can be None for an onboarded parachain. However, an offboarded parachain
1774		// cannot have an HRMP watermark: it should've been cleanup.
1775		assert_contains_only_onboarded(
1776			HrmpWatermarks::<T>::iter().map(|(k, _)| k).collect::<Vec<_>>(),
1777			"HRMP watermarks should contain only onboarded paras",
1778		);
1779
1780		// An entry in `HrmpChannels` indicates that the channel is open. Only open channels can
1781		// have contents.
1782		for (non_empty_channel, contents) in HrmpChannelContents::<T>::iter() {
1783			assert!(HrmpChannels::<T>::contains_key(&non_empty_channel));
1784
1785			// pedantic check: there should be no empty vectors in storage, those should be modeled
1786			// by a removed kv pair.
1787			assert!(!contents.is_empty());
1788		}
1789
1790		// Senders and recipients must be onboarded. Otherwise, all channels associated with them
1791		// are removed.
1792		assert_contains_only_onboarded(
1793			HrmpChannels::<T>::iter()
1794				.flat_map(|(k, _)| vec![k.sender, k.recipient])
1795				.collect::<Vec<_>>(),
1796			"senders and recipients in all channels should be onboarded",
1797		);
1798
1799		// Check the docs for `HrmpIngressChannelsIndex` and `HrmpEgressChannelsIndex` in decl
1800		// storage to get an index what are the channel mappings indexes.
1801		//
1802		// Here, from indexes.
1803		//
1804		// ingress         egress
1805		//
1806		// a -> [x, y]     x -> [a, b]
1807		// b -> [x, z]     y -> [a]
1808		//                 z -> [b]
1809		//
1810		// we derive a list of channels they represent.
1811		//
1812		//   (a, x)         (a, x)
1813		//   (a, y)         (a, y)
1814		//   (b, x)         (b, x)
1815		//   (b, z)         (b, z)
1816		//
1817		// and then that we compare that to the channel list in the `HrmpChannels`.
1818		let channel_set_derived_from_ingress = HrmpIngressChannelsIndex::<T>::iter()
1819			.flat_map(|(p, v)| v.into_iter().map(|i| (i, p)).collect::<Vec<_>>())
1820			.collect::<BTreeSet<_>>();
1821		let channel_set_derived_from_egress = HrmpEgressChannelsIndex::<T>::iter()
1822			.flat_map(|(p, v)| v.into_iter().map(|e| (p, e)).collect::<Vec<_>>())
1823			.collect::<BTreeSet<_>>();
1824		let channel_set_ground_truth = HrmpChannels::<T>::iter()
1825			.map(|(k, _)| (k.sender, k.recipient))
1826			.collect::<BTreeSet<_>>();
1827		assert_eq!(channel_set_derived_from_ingress, channel_set_derived_from_egress);
1828		assert_eq!(channel_set_derived_from_egress, channel_set_ground_truth);
1829
1830		HrmpIngressChannelsIndex::<T>::iter()
1831			.map(|(_, v)| v)
1832			.for_each(|v| assert_is_sorted(&v, "HrmpIngressChannelsIndex"));
1833		HrmpEgressChannelsIndex::<T>::iter()
1834			.map(|(_, v)| v)
1835			.for_each(|v| assert_is_sorted(&v, "HrmpIngressChannelsIndex"));
1836
1837		assert_contains_only_onboarded(
1838			HrmpChannelDigests::<T>::iter().map(|(k, _)| k).collect::<Vec<_>>(),
1839			"HRMP channel digests should contain only onboarded paras",
1840		);
1841		for (_digest_for_para, digest) in HrmpChannelDigests::<T>::iter() {
1842			// Assert that items are in **strictly** ascending order. The strictness also implies
1843			// there are no duplicates.
1844			assert!(digest.windows(2).all(|xs| xs[0].0 < xs[1].0));
1845
1846			for (_, mut senders) in digest {
1847				assert!(!senders.is_empty());
1848
1849				// check for duplicates. For that we sort the vector, then perform deduplication.
1850				// if the vector stayed the same, there are no duplicates.
1851				senders.sort();
1852				let orig_senders = senders.clone();
1853				senders.dedup();
1854				assert_eq!(
1855					orig_senders, senders,
1856					"duplicates removed implies existence of duplicates"
1857				);
1858			}
1859		}
1860	}
1861}
1862
1863impl<T: Config> Pallet<T> {
1864	/// Wraps HRMP XCM notifications to the most suitable XCM version for the destination para.
1865	/// If the XCM version is unknown, the latest XCM version is used as a best effort.
1866	fn wrap_notification(
1867		mut notification: impl FnMut() -> xcm::opaque::latest::opaque::Xcm,
1868	) -> impl FnOnce(ParaId) -> polkadot_primitives::DownwardMessage {
1869		use xcm::{
1870			opaque::VersionedXcm,
1871			prelude::{Junction, Location},
1872			WrapVersion,
1873		};
1874
1875		// Return a closure that can prepare notifications.
1876		move |dest| {
1877			// Attempt to wrap the notification for the destination parachain.
1878			T::VersionWrapper::wrap_version(
1879				&Location::new(0, [Junction::Parachain(dest.into())]),
1880				notification(),
1881			)
1882			.unwrap_or_else(|_| {
1883				// As a best effort, if we cannot resolve the version, fallback to using the latest
1884				// version.
1885				VersionedXcm::from(notification())
1886			})
1887			.encode()
1888		}
1889	}
1890
1891	/// Sends/enqueues notification to the destination parachain.
1892	fn send_to_para(
1893		log_label: &str,
1894		config: &HostConfiguration<BlockNumberFor<T>>,
1895		dest: ParaId,
1896		notification_bytes_for: impl FnOnce(ParaId) -> polkadot_primitives::DownwardMessage,
1897	) {
1898		// prepare notification
1899		let notification_bytes = notification_bytes_for(dest);
1900
1901		// try to enqueue
1902		if let Err(dmp::QueueDownwardMessageError::ExceedsMaxMessageSize) =
1903			dmp::Pallet::<T>::queue_downward_message(&config, dest, notification_bytes)
1904		{
1905			// this should never happen unless the max downward message size is configured to a
1906			// jokingly small number.
1907			log::error!(
1908				target: "runtime::hrmp",
1909				"sending '{log_label}::notification_bytes' failed."
1910			);
1911			debug_assert!(false);
1912		}
1913	}
1914}