referrerpolicy=no-referrer-when-downgrade

polkadot_runtime_parachains/
hrmp.rs

1// Copyright (C) Parity Technologies (UK) Ltd.
2// This file is part of Polkadot.
3
4// Polkadot is free software: you can redistribute it and/or modify
5// it under the terms of the GNU General Public License as published by
6// the Free Software Foundation, either version 3 of the License, or
7// (at your option) any later version.
8
9// Polkadot is distributed in the hope that it will be useful,
10// but WITHOUT ANY WARRANTY; without even the implied warranty of
11// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
12// GNU General Public License for more details.
13
14// You should have received a copy of the GNU General Public License
15// along with Polkadot.  If not, see <http://www.gnu.org/licenses/>.
16
17use crate::{
18	configuration::{self, HostConfiguration},
19	dmp, ensure_parachain, initializer, paras,
20};
21use alloc::{
22	collections::{btree_map::BTreeMap, btree_set::BTreeSet},
23	vec,
24	vec::Vec,
25};
26use codec::{Decode, Encode};
27use core::{fmt, mem};
28use frame_support::{pallet_prelude::*, traits::ReservableCurrency, DefaultNoBound};
29use frame_system::pallet_prelude::*;
30use polkadot_parachain_primitives::primitives::{HorizontalMessages, IsSystem};
31use polkadot_primitives::{
32	Balance, Hash, HrmpChannelId, Id as ParaId, InboundHrmpMessage, OutboundHrmpMessage,
33	SessionIndex,
34};
35use scale_info::TypeInfo;
36use sp_runtime::{
37	traits::{AccountIdConversion, BlakeTwo256, Hash as HashT, UniqueSaturatedInto, Zero},
38	ArithmeticError,
39};
40
41pub use pallet::*;
42
43/// Maximum bound that can be set for inbound channels.
44///
45/// If inaccurate, the weighing of this pallet might become inaccurate. It is expected form the
46/// `configurations` pallet to check these values before setting
47pub const HRMP_MAX_INBOUND_CHANNELS_BOUND: u32 = 128;
48/// Same as [`HRMP_MAX_INBOUND_CHANNELS_BOUND`], but for outbound channels.
49pub const HRMP_MAX_OUTBOUND_CHANNELS_BOUND: u32 = 128;
50
51#[cfg(test)]
52pub(crate) mod tests;
53
54#[cfg(feature = "runtime-benchmarks")]
55mod benchmarking;
56
57pub trait WeightInfo {
58	fn hrmp_init_open_channel() -> Weight;
59	fn hrmp_accept_open_channel() -> Weight;
60	fn hrmp_close_channel() -> Weight;
61	fn force_clean_hrmp(i: u32, e: u32) -> Weight;
62	fn force_process_hrmp_open(c: u32) -> Weight;
63	fn force_process_hrmp_close(c: u32) -> Weight;
64	fn hrmp_cancel_open_request(c: u32) -> Weight;
65	fn clean_open_channel_requests(c: u32) -> Weight;
66	fn force_open_hrmp_channel(c: u32) -> Weight;
67	fn establish_system_channel() -> Weight;
68	fn poke_channel_deposits() -> Weight;
69	fn establish_channel_with_system() -> Weight;
70}
71
72/// A weight info that is only suitable for testing.
73pub struct TestWeightInfo;
74
75impl WeightInfo for TestWeightInfo {
76	fn hrmp_accept_open_channel() -> Weight {
77		Weight::MAX
78	}
79	fn force_clean_hrmp(_: u32, _: u32) -> Weight {
80		Weight::MAX
81	}
82	fn force_process_hrmp_close(_: u32) -> Weight {
83		Weight::MAX
84	}
85	fn force_process_hrmp_open(_: u32) -> Weight {
86		Weight::MAX
87	}
88	fn hrmp_cancel_open_request(_: u32) -> Weight {
89		Weight::MAX
90	}
91	fn hrmp_close_channel() -> Weight {
92		Weight::MAX
93	}
94	fn hrmp_init_open_channel() -> Weight {
95		Weight::MAX
96	}
97	fn clean_open_channel_requests(_: u32) -> Weight {
98		Weight::MAX
99	}
100	fn force_open_hrmp_channel(_: u32) -> Weight {
101		Weight::MAX
102	}
103	fn establish_system_channel() -> Weight {
104		Weight::MAX
105	}
106	fn poke_channel_deposits() -> Weight {
107		Weight::MAX
108	}
109	fn establish_channel_with_system() -> Weight {
110		Weight::MAX
111	}
112}
113
114/// A description of a request to open an HRMP channel.
115#[derive(Encode, Decode, TypeInfo)]
116pub struct HrmpOpenChannelRequest {
117	/// Indicates if this request was confirmed by the recipient.
118	pub confirmed: bool,
119	/// NOTE: this field is deprecated. Channel open requests became non-expiring and this value
120	/// became unused.
121	pub _age: SessionIndex,
122	/// The amount that the sender supplied at the time of creation of this request.
123	pub sender_deposit: Balance,
124	/// The maximum message size that could be put into the channel.
125	pub max_message_size: u32,
126	/// The maximum number of messages that can be pending in the channel at once.
127	pub max_capacity: u32,
128	/// The maximum total size of the messages that can be pending in the channel at once.
129	pub max_total_size: u32,
130}
131
132/// A metadata of an HRMP channel.
133#[derive(Encode, Decode, TypeInfo)]
134#[cfg_attr(test, derive(Debug))]
135pub struct HrmpChannel {
136	// NOTE: This structure is used by parachains via merkle proofs. Therefore, this struct
137	// requires special treatment.
138	//
139	// A parachain requested this struct can only depend on the subset of this struct.
140	// Specifically, only a first few fields can be depended upon (See `AbridgedHrmpChannel`).
141	// These fields cannot be changed without corresponding migration of parachains.
142	/// The maximum number of messages that can be pending in the channel at once.
143	pub max_capacity: u32,
144	/// The maximum total size of the messages that can be pending in the channel at once.
145	pub max_total_size: u32,
146	/// The maximum message size that could be put into the channel.
147	pub max_message_size: u32,
148	/// The current number of messages pending in the channel.
149	/// Invariant: should be less or equal to `max_capacity`.s`.
150	pub msg_count: u32,
151	/// The total size in bytes of all message payloads in the channel.
152	/// Invariant: should be less or equal to `max_total_size`.
153	pub total_size: u32,
154	/// A head of the Message Queue Chain for this channel. Each link in this chain has a form:
155	/// `(prev_head, B, H(M))`, where
156	/// - `prev_head`: is the previous value of `mqc_head` or zero if none.
157	/// - `B`: is the [relay-chain] block number in which a message was appended
158	/// - `H(M)`: is the hash of the message being appended.
159	/// This value is initialized to a special value that consists of all zeroes which indicates
160	/// that no messages were previously added.
161	pub mqc_head: Option<Hash>,
162	/// The amount that the sender supplied as a deposit when opening this channel.
163	pub sender_deposit: Balance,
164	/// The amount that the recipient supplied as a deposit when accepting opening this channel.
165	pub recipient_deposit: Balance,
166}
167
168/// An error returned by [`Pallet::check_hrmp_watermark`] that indicates an acceptance criteria
169/// check didn't pass.
170pub(crate) enum HrmpWatermarkAcceptanceErr<BlockNumber> {
171	AdvancementRule { new_watermark: BlockNumber, last_watermark: BlockNumber },
172	AheadRelayParent { new_watermark: BlockNumber, relay_chain_parent_number: BlockNumber },
173	LandsOnBlockWithNoMessages { new_watermark: BlockNumber },
174}
175
176/// An error returned by [`Pallet::check_outbound_hrmp`] that indicates an acceptance criteria check
177/// didn't pass.
178pub(crate) enum OutboundHrmpAcceptanceErr {
179	MoreMessagesThanPermitted { sent: u32, permitted: u32 },
180	NotSorted { idx: u32 },
181	NoSuchChannel { idx: u32, channel_id: HrmpChannelId },
182	MaxMessageSizeExceeded { idx: u32, msg_size: u32, max_size: u32 },
183	TotalSizeExceeded { idx: u32, total_size: u32, limit: u32 },
184	CapacityExceeded { idx: u32, count: u32, limit: u32 },
185}
186
187impl<BlockNumber> fmt::Debug for HrmpWatermarkAcceptanceErr<BlockNumber>
188where
189	BlockNumber: fmt::Debug,
190{
191	fn fmt(&self, fmt: &mut fmt::Formatter) -> fmt::Result {
192		use HrmpWatermarkAcceptanceErr::*;
193		match self {
194			AdvancementRule { new_watermark, last_watermark } => write!(
195				fmt,
196				"the HRMP watermark is not advanced relative to the last watermark ({:?} > {:?})",
197				new_watermark, last_watermark,
198			),
199			AheadRelayParent { new_watermark, relay_chain_parent_number } => write!(
200				fmt,
201				"the HRMP watermark is ahead the relay-parent ({:?} > {:?})",
202				new_watermark, relay_chain_parent_number
203			),
204			LandsOnBlockWithNoMessages { new_watermark } => write!(
205				fmt,
206				"the HRMP watermark ({:?}) doesn't land on a block with messages received",
207				new_watermark
208			),
209		}
210	}
211}
212
213impl fmt::Debug for OutboundHrmpAcceptanceErr {
214	fn fmt(&self, fmt: &mut fmt::Formatter) -> fmt::Result {
215		use OutboundHrmpAcceptanceErr::*;
216		match self {
217			MoreMessagesThanPermitted { sent, permitted } => write!(
218				fmt,
219				"more HRMP messages than permitted by config ({} > {})",
220				sent, permitted,
221			),
222			NotSorted { idx } => {
223				write!(fmt, "the HRMP messages are not sorted (first unsorted is at index {})", idx,)
224			},
225			NoSuchChannel { idx, channel_id } => write!(
226				fmt,
227				"the HRMP message at index {} is sent to a non existent channel {:?}->{:?}",
228				idx, channel_id.sender, channel_id.recipient,
229			),
230			MaxMessageSizeExceeded { idx, msg_size, max_size } => write!(
231				fmt,
232				"the HRMP message at index {} exceeds the negotiated channel maximum message size ({} > {})",
233				idx, msg_size, max_size,
234			),
235			TotalSizeExceeded { idx, total_size, limit } => write!(
236				fmt,
237				"sending the HRMP message at index {} would exceed the negotiated channel total size  ({} > {})",
238				idx, total_size, limit,
239			),
240			CapacityExceeded { idx, count, limit } => write!(
241				fmt,
242				"sending the HRMP message at index {} would exceed the negotiated channel capacity  ({} > {})",
243				idx, count, limit,
244			),
245		}
246	}
247}
248
249#[frame_support::pallet]
250pub mod pallet {
251	use super::*;
252
253	#[pallet::pallet]
254	#[pallet::without_storage_info]
255	pub struct Pallet<T>(_);
256
257	#[pallet::config]
258	pub trait Config:
259		frame_system::Config + configuration::Config + paras::Config + dmp::Config
260	{
261		/// The outer event type.
262		#[allow(deprecated)]
263		type RuntimeEvent: From<Event<Self>> + IsType<<Self as frame_system::Config>::RuntimeEvent>;
264
265		type RuntimeOrigin: From<crate::Origin>
266			+ From<<Self as frame_system::Config>::RuntimeOrigin>
267			+ Into<Result<crate::Origin, <Self as Config>::RuntimeOrigin>>;
268
269		/// The origin that can perform "force" actions on channels.
270		type ChannelManager: EnsureOrigin<<Self as frame_system::Config>::RuntimeOrigin>;
271
272		/// An interface for reserving deposits for opening channels.
273		///
274		/// NOTE that this Currency instance will be charged with the amounts defined in the
275		/// `Configuration` pallet. Specifically, that means that the `Balance` of the `Currency`
276		/// implementation should be the same as `Balance` as used in the `Configuration`.
277		type Currency: ReservableCurrency<Self::AccountId>;
278
279		/// The default channel size and capacity to use when opening a channel to a system
280		/// parachain.
281		type DefaultChannelSizeAndCapacityWithSystem: Get<(u32, u32)>;
282
283		/// Means of converting an `Xcm` into a `VersionedXcm`. This pallet sends HRMP XCM
284		/// notifications to the channel-related parachains, while the `WrapVersion` implementation
285		/// attempts to wrap them into the most suitable XCM version for the destination parachain.
286		///
287		/// NOTE: For example, `pallet_xcm` provides an accurate implementation (recommended), or
288		/// the default `()` implementation uses the latest XCM version for all parachains.
289		type VersionWrapper: xcm::WrapVersion;
290
291		/// Something that provides the weight of this pallet.
292		type WeightInfo: WeightInfo;
293	}
294
295	#[pallet::event]
296	#[pallet::generate_deposit(pub(super) fn deposit_event)]
297	pub enum Event<T: Config> {
298		/// Open HRMP channel requested.
299		OpenChannelRequested {
300			sender: ParaId,
301			recipient: ParaId,
302			proposed_max_capacity: u32,
303			proposed_max_message_size: u32,
304		},
305		/// An HRMP channel request sent by the receiver was canceled by either party.
306		OpenChannelCanceled { by_parachain: ParaId, channel_id: HrmpChannelId },
307		/// Open HRMP channel accepted.
308		OpenChannelAccepted { sender: ParaId, recipient: ParaId },
309		/// HRMP channel closed.
310		ChannelClosed { by_parachain: ParaId, channel_id: HrmpChannelId },
311		/// An HRMP channel was opened via Root origin.
312		HrmpChannelForceOpened {
313			sender: ParaId,
314			recipient: ParaId,
315			proposed_max_capacity: u32,
316			proposed_max_message_size: u32,
317		},
318		/// An HRMP channel was opened with a system chain.
319		HrmpSystemChannelOpened {
320			sender: ParaId,
321			recipient: ParaId,
322			proposed_max_capacity: u32,
323			proposed_max_message_size: u32,
324		},
325		/// An HRMP channel's deposits were updated.
326		OpenChannelDepositsUpdated { sender: ParaId, recipient: ParaId },
327	}
328
329	#[pallet::error]
330	pub enum Error<T> {
331		/// The sender tried to open a channel to themselves.
332		OpenHrmpChannelToSelf,
333		/// The recipient is not a valid para.
334		OpenHrmpChannelInvalidRecipient,
335		/// The requested capacity is zero.
336		OpenHrmpChannelZeroCapacity,
337		/// The requested capacity exceeds the global limit.
338		OpenHrmpChannelCapacityExceedsLimit,
339		/// The requested maximum message size is 0.
340		OpenHrmpChannelZeroMessageSize,
341		/// The open request requested the message size that exceeds the global limit.
342		OpenHrmpChannelMessageSizeExceedsLimit,
343		/// The channel already exists
344		OpenHrmpChannelAlreadyExists,
345		/// There is already a request to open the same channel.
346		OpenHrmpChannelAlreadyRequested,
347		/// The sender already has the maximum number of allowed outbound channels.
348		OpenHrmpChannelLimitExceeded,
349		/// The channel from the sender to the origin doesn't exist.
350		AcceptHrmpChannelDoesntExist,
351		/// The channel is already confirmed.
352		AcceptHrmpChannelAlreadyConfirmed,
353		/// The recipient already has the maximum number of allowed inbound channels.
354		AcceptHrmpChannelLimitExceeded,
355		/// The origin tries to close a channel where it is neither the sender nor the recipient.
356		CloseHrmpChannelUnauthorized,
357		/// The channel to be closed doesn't exist.
358		CloseHrmpChannelDoesntExist,
359		/// The channel close request is already requested.
360		CloseHrmpChannelAlreadyUnderway,
361		/// Canceling is requested by neither the sender nor recipient of the open channel request.
362		CancelHrmpOpenChannelUnauthorized,
363		/// The open request doesn't exist.
364		OpenHrmpChannelDoesntExist,
365		/// Cannot cancel an HRMP open channel request because it is already confirmed.
366		OpenHrmpChannelAlreadyConfirmed,
367		/// The provided witness data is wrong.
368		WrongWitness,
369		/// The channel between these two chains cannot be authorized.
370		ChannelCreationNotAuthorized,
371	}
372
373	/// The set of pending HRMP open channel requests.
374	///
375	/// The set is accompanied by a list for iteration.
376	///
377	/// Invariant:
378	/// - There are no channels that exists in list but not in the set and vice versa.
379	#[pallet::storage]
380	pub type HrmpOpenChannelRequests<T: Config> =
381		StorageMap<_, Twox64Concat, HrmpChannelId, HrmpOpenChannelRequest>;
382
383	// NOTE: could become bounded, but we don't have a global maximum for this.
384	// `HRMP_MAX_INBOUND_CHANNELS_BOUND` are per parachain, while this storage tracks the
385	// global state.
386	#[pallet::storage]
387	pub type HrmpOpenChannelRequestsList<T: Config> =
388		StorageValue<_, Vec<HrmpChannelId>, ValueQuery>;
389
390	/// This mapping tracks how many open channel requests are initiated by a given sender para.
391	/// Invariant: `HrmpOpenChannelRequests` should contain the same number of items that has
392	/// `(X, _)` as the number of `HrmpOpenChannelRequestCount` for `X`.
393	#[pallet::storage]
394	pub type HrmpOpenChannelRequestCount<T: Config> =
395		StorageMap<_, Twox64Concat, ParaId, u32, ValueQuery>;
396
397	/// This mapping tracks how many open channel requests were accepted by a given recipient para.
398	/// Invariant: `HrmpOpenChannelRequests` should contain the same number of items `(_, X)` with
399	/// `confirmed` set to true, as the number of `HrmpAcceptedChannelRequestCount` for `X`.
400	#[pallet::storage]
401	pub type HrmpAcceptedChannelRequestCount<T: Config> =
402		StorageMap<_, Twox64Concat, ParaId, u32, ValueQuery>;
403
404	/// A set of pending HRMP close channel requests that are going to be closed during the session
405	/// change. Used for checking if a given channel is registered for closure.
406	///
407	/// The set is accompanied by a list for iteration.
408	///
409	/// Invariant:
410	/// - There are no channels that exists in list but not in the set and vice versa.
411	#[pallet::storage]
412	pub type HrmpCloseChannelRequests<T: Config> = StorageMap<_, Twox64Concat, HrmpChannelId, ()>;
413
414	#[pallet::storage]
415	pub type HrmpCloseChannelRequestsList<T: Config> =
416		StorageValue<_, Vec<HrmpChannelId>, ValueQuery>;
417
418	/// The HRMP watermark associated with each para.
419	/// Invariant:
420	/// - each para `P` used here as a key should satisfy `Paras::is_valid_para(P)` within a
421	///   session.
422	#[pallet::storage]
423	pub type HrmpWatermarks<T: Config> = StorageMap<_, Twox64Concat, ParaId, BlockNumberFor<T>>;
424
425	/// HRMP channel data associated with each para.
426	/// Invariant:
427	/// - each participant in the channel should satisfy `Paras::is_valid_para(P)` within a session.
428	#[pallet::storage]
429	pub type HrmpChannels<T: Config> = StorageMap<_, Twox64Concat, HrmpChannelId, HrmpChannel>;
430
431	/// Ingress/egress indexes allow to find all the senders and receivers given the opposite side.
432	/// I.e.
433	///
434	/// (a) ingress index allows to find all the senders for a given recipient.
435	/// (b) egress index allows to find all the recipients for a given sender.
436	///
437	/// Invariants:
438	/// - for each ingress index entry for `P` each item `I` in the index should present in
439	///   `HrmpChannels` as `(I, P)`.
440	/// - for each egress index entry for `P` each item `E` in the index should present in
441	///   `HrmpChannels` as `(P, E)`.
442	/// - there should be no other dangling channels in `HrmpChannels`.
443	/// - the vectors are sorted.
444	#[pallet::storage]
445	pub type HrmpIngressChannelsIndex<T: Config> =
446		StorageMap<_, Twox64Concat, ParaId, Vec<ParaId>, ValueQuery>;
447
448	// NOTE that this field is used by parachains via merkle storage proofs, therefore changing
449	// the format will require migration of parachains.
450	#[pallet::storage]
451	pub type HrmpEgressChannelsIndex<T: Config> =
452		StorageMap<_, Twox64Concat, ParaId, Vec<ParaId>, ValueQuery>;
453
454	/// Storage for the messages for each channel.
455	/// Invariant: cannot be non-empty if the corresponding channel in `HrmpChannels` is `None`.
456	#[pallet::storage]
457	pub type HrmpChannelContents<T: Config> = StorageMap<
458		_,
459		Twox64Concat,
460		HrmpChannelId,
461		Vec<InboundHrmpMessage<BlockNumberFor<T>>>,
462		ValueQuery,
463	>;
464
465	/// Maintains a mapping that can be used to answer the question: What paras sent a message at
466	/// the given block number for a given receiver. Invariants:
467	/// - The inner `Vec<ParaId>` is never empty.
468	/// - The inner `Vec<ParaId>` cannot store two same `ParaId`.
469	/// - The outer vector is sorted ascending by block number and cannot store two items with the
470	///   same block number.
471	#[pallet::storage]
472	pub type HrmpChannelDigests<T: Config> =
473		StorageMap<_, Twox64Concat, ParaId, Vec<(BlockNumberFor<T>, Vec<ParaId>)>, ValueQuery>;
474
475	/// Preopen the given HRMP channels.
476	///
477	/// The values in the tuple corresponds to
478	/// `(sender, recipient, max_capacity, max_message_size)`, i.e. similar to `init_open_channel`.
479	/// In fact, the initialization is performed as if the `init_open_channel` and
480	/// `accept_open_channel` were called with the respective parameters and the session change take
481	///  place.
482	///
483	/// As such, each channel initializer should satisfy the same constraints, namely:
484	///
485	/// 1. `max_capacity` and `max_message_size` should be within the limits set by the
486	///    configuration pallet.
487	/// 2. `sender` and `recipient` must be valid paras.
488	#[pallet::genesis_config]
489	#[derive(DefaultNoBound)]
490	pub struct GenesisConfig<T: Config> {
491		#[serde(skip)]
492		_config: core::marker::PhantomData<T>,
493		preopen_hrmp_channels: Vec<(ParaId, ParaId, u32, u32)>,
494	}
495
496	#[pallet::genesis_build]
497	impl<T: Config> BuildGenesisConfig for GenesisConfig<T> {
498		fn build(&self) {
499			initialize_storage::<T>(&self.preopen_hrmp_channels);
500		}
501	}
502
503	#[pallet::call]
504	impl<T: Config> Pallet<T> {
505		/// Initiate opening a channel from a parachain to a given recipient with given channel
506		/// parameters.
507		///
508		/// - `proposed_max_capacity` - specifies how many messages can be in the channel at once.
509		/// - `proposed_max_message_size` - specifies the maximum size of the messages.
510		///
511		/// These numbers are a subject to the relay-chain configuration limits.
512		///
513		/// The channel can be opened only after the recipient confirms it and only on a session
514		/// change.
515		#[pallet::call_index(0)]
516		#[pallet::weight(<T as Config>::WeightInfo::hrmp_init_open_channel())]
517		pub fn hrmp_init_open_channel(
518			origin: OriginFor<T>,
519			recipient: ParaId,
520			proposed_max_capacity: u32,
521			proposed_max_message_size: u32,
522		) -> DispatchResult {
523			let origin = ensure_parachain(<T as Config>::RuntimeOrigin::from(origin))?;
524			Self::init_open_channel(
525				origin,
526				recipient,
527				proposed_max_capacity,
528				proposed_max_message_size,
529			)?;
530			Self::deposit_event(Event::OpenChannelRequested {
531				sender: origin,
532				recipient,
533				proposed_max_capacity,
534				proposed_max_message_size,
535			});
536			Ok(())
537		}
538
539		/// Accept a pending open channel request from the given sender.
540		///
541		/// The channel will be opened only on the next session boundary.
542		#[pallet::call_index(1)]
543		#[pallet::weight(<T as Config>::WeightInfo::hrmp_accept_open_channel())]
544		pub fn hrmp_accept_open_channel(origin: OriginFor<T>, sender: ParaId) -> DispatchResult {
545			let origin = ensure_parachain(<T as Config>::RuntimeOrigin::from(origin))?;
546			Self::accept_open_channel(origin, sender)?;
547			Self::deposit_event(Event::OpenChannelAccepted { sender, recipient: origin });
548			Ok(())
549		}
550
551		/// Initiate unilateral closing of a channel. The origin must be either the sender or the
552		/// recipient in the channel being closed.
553		///
554		/// The closure can only happen on a session change.
555		#[pallet::call_index(2)]
556		#[pallet::weight(<T as Config>::WeightInfo::hrmp_close_channel())]
557		pub fn hrmp_close_channel(
558			origin: OriginFor<T>,
559			channel_id: HrmpChannelId,
560		) -> DispatchResult {
561			let origin = ensure_parachain(<T as Config>::RuntimeOrigin::from(origin))?;
562			Self::close_channel(origin, channel_id.clone())?;
563			Self::deposit_event(Event::ChannelClosed { by_parachain: origin, channel_id });
564			Ok(())
565		}
566
567		/// This extrinsic triggers the cleanup of all the HRMP storage items that a para may have.
568		/// Normally this happens once per session, but this allows you to trigger the cleanup
569		/// immediately for a specific parachain.
570		///
571		/// Number of inbound and outbound channels for `para` must be provided as witness data.
572		///
573		/// Origin must be the `ChannelManager`.
574		#[pallet::call_index(3)]
575		#[pallet::weight(<T as Config>::WeightInfo::force_clean_hrmp(*num_inbound, *num_outbound))]
576		pub fn force_clean_hrmp(
577			origin: OriginFor<T>,
578			para: ParaId,
579			num_inbound: u32,
580			num_outbound: u32,
581		) -> DispatchResult {
582			T::ChannelManager::ensure_origin(origin)?;
583
584			ensure!(
585				HrmpIngressChannelsIndex::<T>::decode_len(para).unwrap_or_default() <=
586					num_inbound as usize,
587				Error::<T>::WrongWitness
588			);
589			ensure!(
590				HrmpEgressChannelsIndex::<T>::decode_len(para).unwrap_or_default() <=
591					num_outbound as usize,
592				Error::<T>::WrongWitness
593			);
594
595			Self::clean_hrmp_after_outgoing(&para);
596			Ok(())
597		}
598
599		/// Force process HRMP open channel requests.
600		///
601		/// If there are pending HRMP open channel requests, you can use this function to process
602		/// all of those requests immediately.
603		///
604		/// Total number of opening channels must be provided as witness data.
605		///
606		/// Origin must be the `ChannelManager`.
607		#[pallet::call_index(4)]
608		#[pallet::weight(<T as Config>::WeightInfo::force_process_hrmp_open(*channels))]
609		pub fn force_process_hrmp_open(origin: OriginFor<T>, channels: u32) -> DispatchResult {
610			T::ChannelManager::ensure_origin(origin)?;
611
612			ensure!(
613				HrmpOpenChannelRequestsList::<T>::decode_len().unwrap_or_default() as u32 <=
614					channels,
615				Error::<T>::WrongWitness
616			);
617
618			let host_config = configuration::ActiveConfig::<T>::get();
619			Self::process_hrmp_open_channel_requests(&host_config);
620			Ok(())
621		}
622
623		/// Force process HRMP close channel requests.
624		///
625		/// If there are pending HRMP close channel requests, you can use this function to process
626		/// all of those requests immediately.
627		///
628		/// Total number of closing channels must be provided as witness data.
629		///
630		/// Origin must be the `ChannelManager`.
631		#[pallet::call_index(5)]
632		#[pallet::weight(<T as Config>::WeightInfo::force_process_hrmp_close(*channels))]
633		pub fn force_process_hrmp_close(origin: OriginFor<T>, channels: u32) -> DispatchResult {
634			T::ChannelManager::ensure_origin(origin)?;
635
636			ensure!(
637				HrmpCloseChannelRequestsList::<T>::decode_len().unwrap_or_default() as u32 <=
638					channels,
639				Error::<T>::WrongWitness
640			);
641
642			Self::process_hrmp_close_channel_requests();
643			Ok(())
644		}
645
646		/// This cancels a pending open channel request. It can be canceled by either of the sender
647		/// or the recipient for that request. The origin must be either of those.
648		///
649		/// The cancellation happens immediately. It is not possible to cancel the request if it is
650		/// already accepted.
651		///
652		/// Total number of open requests (i.e. `HrmpOpenChannelRequestsList`) must be provided as
653		/// witness data.
654		#[pallet::call_index(6)]
655		#[pallet::weight(<T as Config>::WeightInfo::hrmp_cancel_open_request(*open_requests))]
656		pub fn hrmp_cancel_open_request(
657			origin: OriginFor<T>,
658			channel_id: HrmpChannelId,
659			open_requests: u32,
660		) -> DispatchResult {
661			let origin = ensure_parachain(<T as Config>::RuntimeOrigin::from(origin))?;
662			ensure!(
663				HrmpOpenChannelRequestsList::<T>::decode_len().unwrap_or_default() as u32 <=
664					open_requests,
665				Error::<T>::WrongWitness
666			);
667			Self::cancel_open_request(origin, channel_id.clone())?;
668			Self::deposit_event(Event::OpenChannelCanceled { by_parachain: origin, channel_id });
669			Ok(())
670		}
671
672		/// Open a channel from a `sender` to a `recipient` `ParaId`. Although opened by governance,
673		/// the `max_capacity` and `max_message_size` are still subject to the Relay Chain's
674		/// configured limits.
675		///
676		/// Expected use is when one (and only one) of the `ParaId`s involved in the channel is
677		/// governed by the system, e.g. a system parachain.
678		///
679		/// Origin must be the `ChannelManager`.
680		#[pallet::call_index(7)]
681		#[pallet::weight(<T as Config>::WeightInfo::force_open_hrmp_channel(1))]
682		pub fn force_open_hrmp_channel(
683			origin: OriginFor<T>,
684			sender: ParaId,
685			recipient: ParaId,
686			max_capacity: u32,
687			max_message_size: u32,
688		) -> DispatchResultWithPostInfo {
689			T::ChannelManager::ensure_origin(origin)?;
690
691			// Guard against a common footgun where someone makes a channel request to a system
692			// parachain and then makes a proposal to open the channel via governance, which fails
693			// because `init_open_channel` fails if there is an existing request. This check will
694			// clear an existing request such that `init_open_channel` should otherwise succeed.
695			let channel_id = HrmpChannelId { sender, recipient };
696			let cancel_request: u32 =
697				if let Some(_open_channel) = HrmpOpenChannelRequests::<T>::get(&channel_id) {
698					Self::cancel_open_request(sender, channel_id)?;
699					1
700				} else {
701					0
702				};
703
704			// Now we proceed with normal init/accept, except that we set `no_deposit` to true such
705			// that it will not require deposits from either member.
706			Self::init_open_channel(sender, recipient, max_capacity, max_message_size)?;
707			Self::accept_open_channel(recipient, sender)?;
708			Self::deposit_event(Event::HrmpChannelForceOpened {
709				sender,
710				recipient,
711				proposed_max_capacity: max_capacity,
712				proposed_max_message_size: max_message_size,
713			});
714
715			Ok(Some(<T as Config>::WeightInfo::force_open_hrmp_channel(cancel_request)).into())
716		}
717
718		/// Establish an HRMP channel between two system chains. If the channel does not already
719		/// exist, the transaction fees will be refunded to the caller. The system does not take
720		/// deposits for channels between system chains, and automatically sets the message number
721		/// and size limits to the maximum allowed by the network's configuration.
722		///
723		/// Arguments:
724		///
725		/// - `sender`: A system chain, `ParaId`.
726		/// - `recipient`: A system chain, `ParaId`.
727		///
728		/// Any signed origin can call this function, but _both_ inputs MUST be system chains. If
729		/// the channel does not exist yet, there is no fee.
730		#[pallet::call_index(8)]
731		#[pallet::weight(<T as Config>::WeightInfo::establish_system_channel())]
732		pub fn establish_system_channel(
733			origin: OriginFor<T>,
734			sender: ParaId,
735			recipient: ParaId,
736		) -> DispatchResultWithPostInfo {
737			let _caller = ensure_signed(origin)?;
738
739			// both chains must be system
740			ensure!(
741				sender.is_system() && recipient.is_system(),
742				Error::<T>::ChannelCreationNotAuthorized
743			);
744
745			let config = configuration::ActiveConfig::<T>::get();
746			let max_message_size = config.hrmp_channel_max_message_size;
747			let max_capacity = config.hrmp_channel_max_capacity;
748
749			Self::init_open_channel(sender, recipient, max_capacity, max_message_size)?;
750			Self::accept_open_channel(recipient, sender)?;
751
752			Self::deposit_event(Event::HrmpSystemChannelOpened {
753				sender,
754				recipient,
755				proposed_max_capacity: max_capacity,
756				proposed_max_message_size: max_message_size,
757			});
758
759			Ok(Pays::No.into())
760		}
761
762		/// Update the deposits held for an HRMP channel to the latest `Configuration`. Channels
763		/// with system chains do not require a deposit.
764		///
765		/// Arguments:
766		///
767		/// - `sender`: A chain, `ParaId`.
768		/// - `recipient`: A chain, `ParaId`.
769		///
770		/// Any signed origin can call this function.
771		#[pallet::call_index(9)]
772		#[pallet::weight(<T as Config>::WeightInfo::poke_channel_deposits())]
773		pub fn poke_channel_deposits(
774			origin: OriginFor<T>,
775			sender: ParaId,
776			recipient: ParaId,
777		) -> DispatchResult {
778			let _caller = ensure_signed(origin)?;
779			let channel_id = HrmpChannelId { sender, recipient };
780			let is_system = sender.is_system() || recipient.is_system();
781
782			let config = configuration::ActiveConfig::<T>::get();
783
784			// Channels with and amongst the system do not require a deposit.
785			let (new_sender_deposit, new_recipient_deposit) = if is_system {
786				(0, 0)
787			} else {
788				(config.hrmp_sender_deposit, config.hrmp_recipient_deposit)
789			};
790
791			HrmpChannels::<T>::mutate(&channel_id, |channel| -> DispatchResult {
792				if let Some(ref mut channel) = channel {
793					let current_sender_deposit = channel.sender_deposit;
794					let current_recipient_deposit = channel.recipient_deposit;
795
796					// nothing to update
797					if current_sender_deposit == new_sender_deposit &&
798						current_recipient_deposit == new_recipient_deposit
799					{
800						return Ok(())
801					}
802
803					// sender
804					if current_sender_deposit > new_sender_deposit {
805						// Can never underflow, but be paranoid.
806						let amount = current_sender_deposit
807							.checked_sub(new_sender_deposit)
808							.ok_or(ArithmeticError::Underflow)?;
809						T::Currency::unreserve(
810							&channel_id.sender.into_account_truncating(),
811							// The difference should always be convertible into `Balance`, but be
812							// paranoid and do nothing in case.
813							amount.try_into().unwrap_or(Zero::zero()),
814						);
815					} else if current_sender_deposit < new_sender_deposit {
816						let amount = new_sender_deposit
817							.checked_sub(current_sender_deposit)
818							.ok_or(ArithmeticError::Underflow)?;
819						T::Currency::reserve(
820							&channel_id.sender.into_account_truncating(),
821							amount.try_into().unwrap_or(Zero::zero()),
822						)?;
823					}
824
825					// recipient
826					if current_recipient_deposit > new_recipient_deposit {
827						let amount = current_recipient_deposit
828							.checked_sub(new_recipient_deposit)
829							.ok_or(ArithmeticError::Underflow)?;
830						T::Currency::unreserve(
831							&channel_id.recipient.into_account_truncating(),
832							amount.try_into().unwrap_or(Zero::zero()),
833						);
834					} else if current_recipient_deposit < new_recipient_deposit {
835						let amount = new_recipient_deposit
836							.checked_sub(current_recipient_deposit)
837							.ok_or(ArithmeticError::Underflow)?;
838						T::Currency::reserve(
839							&channel_id.recipient.into_account_truncating(),
840							amount.try_into().unwrap_or(Zero::zero()),
841						)?;
842					}
843
844					// update storage
845					channel.sender_deposit = new_sender_deposit;
846					channel.recipient_deposit = new_recipient_deposit;
847				} else {
848					return Err(Error::<T>::OpenHrmpChannelDoesntExist.into())
849				}
850				Ok(())
851			})?;
852
853			Self::deposit_event(Event::OpenChannelDepositsUpdated { sender, recipient });
854
855			Ok(())
856		}
857
858		/// Establish a bidirectional HRMP channel between a parachain and a system chain.
859		///
860		/// Arguments:
861		///
862		/// - `target_system_chain`: A system chain, `ParaId`.
863		///
864		/// The origin needs to be the parachain origin.
865		#[pallet::call_index(10)]
866		#[pallet::weight(<T as Config>::WeightInfo::establish_channel_with_system())]
867		pub fn establish_channel_with_system(
868			origin: OriginFor<T>,
869			target_system_chain: ParaId,
870		) -> DispatchResultWithPostInfo {
871			let sender = ensure_parachain(<T as Config>::RuntimeOrigin::from(origin))?;
872
873			ensure!(target_system_chain.is_system(), Error::<T>::ChannelCreationNotAuthorized);
874
875			let (max_message_size, max_capacity) =
876				T::DefaultChannelSizeAndCapacityWithSystem::get();
877
878			// create bidirectional channel
879			Self::init_open_channel(sender, target_system_chain, max_capacity, max_message_size)?;
880			Self::accept_open_channel(target_system_chain, sender)?;
881
882			Self::init_open_channel(target_system_chain, sender, max_capacity, max_message_size)?;
883			Self::accept_open_channel(sender, target_system_chain)?;
884
885			Self::deposit_event(Event::HrmpSystemChannelOpened {
886				sender,
887				recipient: target_system_chain,
888				proposed_max_capacity: max_capacity,
889				proposed_max_message_size: max_message_size,
890			});
891
892			Self::deposit_event(Event::HrmpSystemChannelOpened {
893				sender: target_system_chain,
894				recipient: sender,
895				proposed_max_capacity: max_capacity,
896				proposed_max_message_size: max_message_size,
897			});
898
899			Ok(Pays::No.into())
900		}
901	}
902}
903
904fn initialize_storage<T: Config>(preopen_hrmp_channels: &[(ParaId, ParaId, u32, u32)]) {
905	let host_config = configuration::ActiveConfig::<T>::get();
906	for &(sender, recipient, max_capacity, max_message_size) in preopen_hrmp_channels {
907		if let Err(err) =
908			preopen_hrmp_channel::<T>(sender, recipient, max_capacity, max_message_size)
909		{
910			panic!("failed to initialize the genesis storage: {:?}", err);
911		}
912	}
913	Pallet::<T>::process_hrmp_open_channel_requests(&host_config);
914}
915
916fn preopen_hrmp_channel<T: Config>(
917	sender: ParaId,
918	recipient: ParaId,
919	max_capacity: u32,
920	max_message_size: u32,
921) -> DispatchResult {
922	Pallet::<T>::init_open_channel(sender, recipient, max_capacity, max_message_size)?;
923	Pallet::<T>::accept_open_channel(recipient, sender)?;
924	Ok(())
925}
926
927/// Routines and getters related to HRMP.
928impl<T: Config> Pallet<T> {
929	/// Block initialization logic, called by initializer.
930	pub(crate) fn initializer_initialize(_now: BlockNumberFor<T>) -> Weight {
931		Weight::zero()
932	}
933
934	/// Block finalization logic, called by initializer.
935	pub(crate) fn initializer_finalize() {}
936
937	/// Called by the initializer to note that a new session has started.
938	pub(crate) fn initializer_on_new_session(
939		notification: &initializer::SessionChangeNotification<BlockNumberFor<T>>,
940		outgoing_paras: &[ParaId],
941	) -> Weight {
942		let w1 = Self::perform_outgoing_para_cleanup(&notification.prev_config, outgoing_paras);
943		Self::process_hrmp_open_channel_requests(&notification.prev_config);
944		Self::process_hrmp_close_channel_requests();
945		w1.saturating_add(<T as Config>::WeightInfo::force_process_hrmp_open(
946			outgoing_paras.len() as u32
947		))
948		.saturating_add(<T as Config>::WeightInfo::force_process_hrmp_close(
949			outgoing_paras.len() as u32,
950		))
951	}
952
953	/// Iterate over all paras that were noted for offboarding and remove all the data
954	/// associated with them.
955	fn perform_outgoing_para_cleanup(
956		config: &HostConfiguration<BlockNumberFor<T>>,
957		outgoing: &[ParaId],
958	) -> Weight {
959		let mut w = Self::clean_open_channel_requests(config, outgoing);
960		for outgoing_para in outgoing {
961			Self::clean_hrmp_after_outgoing(outgoing_para);
962
963			// we need a few extra bits of data to weigh this -- all of this is read internally
964			// anyways, so no overhead.
965			let ingress_count =
966				HrmpIngressChannelsIndex::<T>::decode_len(outgoing_para).unwrap_or_default() as u32;
967			let egress_count =
968				HrmpEgressChannelsIndex::<T>::decode_len(outgoing_para).unwrap_or_default() as u32;
969			w = w.saturating_add(<T as Config>::WeightInfo::force_clean_hrmp(
970				ingress_count,
971				egress_count,
972			));
973		}
974		w
975	}
976
977	// Go over the HRMP open channel requests and remove all in which offboarding paras participate.
978	//
979	// This will also perform the refunds for the counterparty if it doesn't offboard.
980	pub(crate) fn clean_open_channel_requests(
981		config: &HostConfiguration<BlockNumberFor<T>>,
982		outgoing: &[ParaId],
983	) -> Weight {
984		// First collect all the channel ids of the open requests in which there is at least one
985		// party presents in the outgoing list.
986		//
987		// Both the open channel request list and outgoing list are expected to be small enough.
988		// In the most common case there will be only single outgoing para.
989		let open_channel_reqs = HrmpOpenChannelRequestsList::<T>::get();
990		let (go, stay): (Vec<HrmpChannelId>, Vec<HrmpChannelId>) = open_channel_reqs
991			.into_iter()
992			.partition(|req_id| outgoing.iter().any(|id| req_id.is_participant(*id)));
993		HrmpOpenChannelRequestsList::<T>::put(stay);
994
995		// Then iterate over all open requests to be removed, pull them out of the set and perform
996		// the refunds if applicable.
997		for req_id in go {
998			let req_data = match HrmpOpenChannelRequests::<T>::take(&req_id) {
999				Some(req_data) => req_data,
1000				None => {
1001					// Can't normally happen but no need to panic.
1002					continue
1003				},
1004			};
1005
1006			// Return the deposit of the sender, but only if it is not the para being offboarded.
1007			if !outgoing.contains(&req_id.sender) {
1008				T::Currency::unreserve(
1009					&req_id.sender.into_account_truncating(),
1010					req_data.sender_deposit.unique_saturated_into(),
1011				);
1012			}
1013
1014			// If the request was confirmed, then it means it was confirmed in the finished session.
1015			// Therefore, the config's hrmp_recipient_deposit represents the actual value of the
1016			// deposit.
1017			//
1018			// We still want to refund the deposit only if the para is not being offboarded.
1019			if req_data.confirmed {
1020				if !outgoing.contains(&req_id.recipient) {
1021					T::Currency::unreserve(
1022						&req_id.recipient.into_account_truncating(),
1023						config.hrmp_recipient_deposit.unique_saturated_into(),
1024					);
1025				}
1026				Self::decrease_accepted_channel_request_count(req_id.recipient);
1027			}
1028		}
1029
1030		<T as Config>::WeightInfo::clean_open_channel_requests(outgoing.len() as u32)
1031	}
1032
1033	/// Remove all storage entries associated with the given para.
1034	fn clean_hrmp_after_outgoing(outgoing_para: &ParaId) {
1035		HrmpOpenChannelRequestCount::<T>::remove(outgoing_para);
1036		HrmpAcceptedChannelRequestCount::<T>::remove(outgoing_para);
1037
1038		let ingress = HrmpIngressChannelsIndex::<T>::take(outgoing_para)
1039			.into_iter()
1040			.map(|sender| HrmpChannelId { sender, recipient: *outgoing_para });
1041		let egress = HrmpEgressChannelsIndex::<T>::take(outgoing_para)
1042			.into_iter()
1043			.map(|recipient| HrmpChannelId { sender: *outgoing_para, recipient });
1044		let mut to_close = ingress.chain(egress).collect::<Vec<_>>();
1045		to_close.sort();
1046		to_close.dedup();
1047
1048		for channel in to_close {
1049			Self::close_hrmp_channel(&channel);
1050		}
1051	}
1052
1053	/// Iterate over all open channel requests and:
1054	///
1055	/// - prune the stale requests
1056	/// - enact the confirmed requests
1057	fn process_hrmp_open_channel_requests(config: &HostConfiguration<BlockNumberFor<T>>) {
1058		let mut open_req_channels = HrmpOpenChannelRequestsList::<T>::get();
1059		if open_req_channels.is_empty() {
1060			return
1061		}
1062
1063		// iterate the vector starting from the end making our way to the beginning. This way we
1064		// can leverage `swap_remove` to efficiently remove an item during iteration.
1065		let mut idx = open_req_channels.len();
1066		loop {
1067			// bail if we've iterated over all items.
1068			if idx == 0 {
1069				break
1070			}
1071
1072			idx -= 1;
1073			let channel_id = open_req_channels[idx].clone();
1074			let request = HrmpOpenChannelRequests::<T>::get(&channel_id).expect(
1075				"can't be `None` due to the invariant that the list contains the same items as the set; qed",
1076			);
1077
1078			let system_channel = channel_id.sender.is_system() || channel_id.recipient.is_system();
1079			let sender_deposit = request.sender_deposit;
1080			let recipient_deposit = if system_channel { 0 } else { config.hrmp_recipient_deposit };
1081
1082			if request.confirmed {
1083				if paras::Pallet::<T>::is_valid_para(channel_id.sender) &&
1084					paras::Pallet::<T>::is_valid_para(channel_id.recipient)
1085				{
1086					HrmpChannels::<T>::insert(
1087						&channel_id,
1088						HrmpChannel {
1089							sender_deposit,
1090							recipient_deposit,
1091							max_capacity: request.max_capacity,
1092							max_total_size: request.max_total_size,
1093							max_message_size: request.max_message_size,
1094							msg_count: 0,
1095							total_size: 0,
1096							mqc_head: None,
1097						},
1098					);
1099
1100					HrmpIngressChannelsIndex::<T>::mutate(&channel_id.recipient, |v| {
1101						if let Err(i) = v.binary_search(&channel_id.sender) {
1102							v.insert(i, channel_id.sender);
1103						}
1104					});
1105					HrmpEgressChannelsIndex::<T>::mutate(&channel_id.sender, |v| {
1106						if let Err(i) = v.binary_search(&channel_id.recipient) {
1107							v.insert(i, channel_id.recipient);
1108						}
1109					});
1110				}
1111
1112				Self::decrease_open_channel_request_count(channel_id.sender);
1113				Self::decrease_accepted_channel_request_count(channel_id.recipient);
1114
1115				let _ = open_req_channels.swap_remove(idx);
1116				HrmpOpenChannelRequests::<T>::remove(&channel_id);
1117			}
1118		}
1119
1120		HrmpOpenChannelRequestsList::<T>::put(open_req_channels);
1121	}
1122
1123	/// Iterate over all close channel requests unconditionally closing the channels.
1124	fn process_hrmp_close_channel_requests() {
1125		let close_reqs = HrmpCloseChannelRequestsList::<T>::take();
1126		for condemned_ch_id in close_reqs {
1127			HrmpCloseChannelRequests::<T>::remove(&condemned_ch_id);
1128			Self::close_hrmp_channel(&condemned_ch_id);
1129		}
1130	}
1131
1132	/// Close and remove the designated HRMP channel.
1133	///
1134	/// This includes returning the deposits.
1135	///
1136	/// This function is idempotent, meaning that after the first application it should have no
1137	/// effect (i.e. it won't return the deposits twice).
1138	fn close_hrmp_channel(channel_id: &HrmpChannelId) {
1139		if let Some(HrmpChannel { sender_deposit, recipient_deposit, .. }) =
1140			HrmpChannels::<T>::take(channel_id)
1141		{
1142			T::Currency::unreserve(
1143				&channel_id.sender.into_account_truncating(),
1144				sender_deposit.unique_saturated_into(),
1145			);
1146			T::Currency::unreserve(
1147				&channel_id.recipient.into_account_truncating(),
1148				recipient_deposit.unique_saturated_into(),
1149			);
1150		}
1151
1152		HrmpChannelContents::<T>::remove(channel_id);
1153
1154		HrmpEgressChannelsIndex::<T>::mutate(&channel_id.sender, |v| {
1155			if let Ok(i) = v.binary_search(&channel_id.recipient) {
1156				v.remove(i);
1157			}
1158		});
1159		HrmpIngressChannelsIndex::<T>::mutate(&channel_id.recipient, |v| {
1160			if let Ok(i) = v.binary_search(&channel_id.sender) {
1161				v.remove(i);
1162			}
1163		});
1164	}
1165
1166	/// Check that the candidate of the given recipient controls the HRMP watermark properly.
1167	pub(crate) fn check_hrmp_watermark(
1168		recipient: ParaId,
1169		relay_chain_parent_number: BlockNumberFor<T>,
1170		new_hrmp_watermark: BlockNumberFor<T>,
1171	) -> Result<(), HrmpWatermarkAcceptanceErr<BlockNumberFor<T>>> {
1172		// First, check where the watermark CANNOT legally land.
1173		//
1174		// (a) For ensuring that messages are eventually processed, we require each parablock's
1175		//     watermark to be greater than the last one. The exception to this is if the previous
1176		//     watermark was already equal to the current relay-parent number.
1177		//
1178		// (b) However, a parachain cannot read into "the future", therefore the watermark should
1179		//     not be greater than the relay-chain context block which the parablock refers to.
1180		if new_hrmp_watermark == relay_chain_parent_number {
1181			return Ok(())
1182		}
1183
1184		if new_hrmp_watermark > relay_chain_parent_number {
1185			return Err(HrmpWatermarkAcceptanceErr::AheadRelayParent {
1186				new_watermark: new_hrmp_watermark,
1187				relay_chain_parent_number,
1188			})
1189		}
1190
1191		if let Some(last_watermark) = HrmpWatermarks::<T>::get(&recipient) {
1192			if new_hrmp_watermark < last_watermark {
1193				return Err(HrmpWatermarkAcceptanceErr::AdvancementRule {
1194					new_watermark: new_hrmp_watermark,
1195					last_watermark,
1196				})
1197			}
1198
1199			if new_hrmp_watermark == last_watermark {
1200				return Ok(())
1201			}
1202		}
1203
1204		// Second, check where the watermark CAN land. It's one of the following:
1205		//
1206		// (a) The relay parent block number (checked above).
1207		// (b) A relay-chain block in which this para received at least one message (checked here)
1208		let digest = HrmpChannelDigests::<T>::get(&recipient);
1209		if !digest
1210			.binary_search_by_key(&new_hrmp_watermark, |(block_no, _)| *block_no)
1211			.is_ok()
1212		{
1213			return Err(HrmpWatermarkAcceptanceErr::LandsOnBlockWithNoMessages {
1214				new_watermark: new_hrmp_watermark,
1215			})
1216		}
1217		Ok(())
1218	}
1219
1220	/// Returns HRMP watermarks of previously sent messages to a given para.
1221	pub(crate) fn valid_watermarks(recipient: ParaId) -> Vec<BlockNumberFor<T>> {
1222		let mut valid_watermarks: Vec<_> = HrmpChannelDigests::<T>::get(&recipient)
1223			.into_iter()
1224			.map(|(block_no, _)| block_no)
1225			.collect();
1226
1227		// The current watermark will remain valid until updated.
1228		if let Some(last_watermark) = HrmpWatermarks::<T>::get(&recipient) {
1229			if valid_watermarks.first().map_or(false, |w| w > &last_watermark) {
1230				valid_watermarks.insert(0, last_watermark);
1231			}
1232		}
1233
1234		valid_watermarks
1235	}
1236
1237	pub(crate) fn check_outbound_hrmp(
1238		config: &HostConfiguration<BlockNumberFor<T>>,
1239		sender: ParaId,
1240		out_hrmp_msgs: &[OutboundHrmpMessage<ParaId>],
1241	) -> Result<(), OutboundHrmpAcceptanceErr> {
1242		if out_hrmp_msgs.len() as u32 > config.hrmp_max_message_num_per_candidate {
1243			return Err(OutboundHrmpAcceptanceErr::MoreMessagesThanPermitted {
1244				sent: out_hrmp_msgs.len() as u32,
1245				permitted: config.hrmp_max_message_num_per_candidate,
1246			})
1247		}
1248
1249		let mut last_recipient = None::<ParaId>;
1250
1251		for (idx, out_msg) in
1252			out_hrmp_msgs.iter().enumerate().map(|(idx, out_msg)| (idx as u32, out_msg))
1253		{
1254			match last_recipient {
1255				// the messages must be sorted in ascending order and there must be no two messages
1256				// sent to the same recipient. Thus we can check that every recipient is strictly
1257				// greater than the previous one.
1258				Some(last_recipient) if out_msg.recipient <= last_recipient =>
1259					return Err(OutboundHrmpAcceptanceErr::NotSorted { idx }),
1260				_ => last_recipient = Some(out_msg.recipient),
1261			}
1262
1263			let channel_id = HrmpChannelId { sender, recipient: out_msg.recipient };
1264
1265			let channel = match HrmpChannels::<T>::get(&channel_id) {
1266				Some(channel) => channel,
1267				None => return Err(OutboundHrmpAcceptanceErr::NoSuchChannel { channel_id, idx }),
1268			};
1269
1270			let msg_size = out_msg.data.len() as u32;
1271			if msg_size > channel.max_message_size {
1272				return Err(OutboundHrmpAcceptanceErr::MaxMessageSizeExceeded {
1273					idx,
1274					msg_size,
1275					max_size: channel.max_message_size,
1276				})
1277			}
1278
1279			let new_total_size = channel.total_size + out_msg.data.len() as u32;
1280			if new_total_size > channel.max_total_size {
1281				return Err(OutboundHrmpAcceptanceErr::TotalSizeExceeded {
1282					idx,
1283					total_size: new_total_size,
1284					limit: channel.max_total_size,
1285				})
1286			}
1287
1288			let new_msg_count = channel.msg_count + 1;
1289			if new_msg_count > channel.max_capacity {
1290				return Err(OutboundHrmpAcceptanceErr::CapacityExceeded {
1291					idx,
1292					count: new_msg_count,
1293					limit: channel.max_capacity,
1294				})
1295			}
1296		}
1297
1298		Ok(())
1299	}
1300
1301	/// Returns remaining outbound channels capacity in messages and in bytes per recipient para.
1302	pub(crate) fn outbound_remaining_capacity(sender: ParaId) -> Vec<(ParaId, (u32, u32))> {
1303		let recipients = HrmpEgressChannelsIndex::<T>::get(&sender);
1304		let mut remaining = Vec::with_capacity(recipients.len());
1305
1306		for recipient in recipients {
1307			let Some(channel) = HrmpChannels::<T>::get(&HrmpChannelId { sender, recipient }) else {
1308				continue
1309			};
1310			remaining.push((
1311				recipient,
1312				(
1313					channel.max_capacity - channel.msg_count,
1314					channel.max_total_size - channel.total_size,
1315				),
1316			));
1317		}
1318
1319		remaining
1320	}
1321
1322	pub(crate) fn prune_hrmp(recipient: ParaId, new_hrmp_watermark: BlockNumberFor<T>) {
1323		// sift through the incoming messages digest to collect the paras that sent at least one
1324		// message to this parachain between the old and new watermarks.
1325		let senders = HrmpChannelDigests::<T>::mutate(&recipient, |digest| {
1326			let mut senders = BTreeSet::new();
1327			let mut leftover = Vec::with_capacity(digest.len());
1328			for (block_no, paras_sent_msg) in mem::replace(digest, Vec::new()) {
1329				if block_no <= new_hrmp_watermark {
1330					senders.extend(paras_sent_msg);
1331				} else {
1332					leftover.push((block_no, paras_sent_msg));
1333				}
1334			}
1335			*digest = leftover;
1336			senders
1337		});
1338
1339		// having all senders we can trivially find out the channels which we need to prune.
1340		let channels_to_prune =
1341			senders.into_iter().map(|sender| HrmpChannelId { sender, recipient });
1342		for channel_id in channels_to_prune {
1343			// prune each channel up to the new watermark keeping track how many messages we removed
1344			// and what is the total byte size of them.
1345			let (mut pruned_cnt, mut pruned_size) = (0, 0);
1346
1347			let contents = HrmpChannelContents::<T>::get(&channel_id);
1348			let mut leftover = Vec::with_capacity(contents.len());
1349			for msg in contents {
1350				if msg.sent_at <= new_hrmp_watermark {
1351					pruned_cnt += 1;
1352					pruned_size += msg.data.len();
1353				} else {
1354					leftover.push(msg);
1355				}
1356			}
1357			if !leftover.is_empty() {
1358				HrmpChannelContents::<T>::insert(&channel_id, leftover);
1359			} else {
1360				HrmpChannelContents::<T>::remove(&channel_id);
1361			}
1362
1363			// update the channel metadata.
1364			HrmpChannels::<T>::mutate(&channel_id, |channel| {
1365				if let Some(ref mut channel) = channel {
1366					channel.msg_count -= pruned_cnt as u32;
1367					channel.total_size -= pruned_size as u32;
1368				}
1369			});
1370		}
1371
1372		HrmpWatermarks::<T>::insert(&recipient, new_hrmp_watermark);
1373	}
1374
1375	/// Process the outbound HRMP messages by putting them into the appropriate recipient queues.
1376	pub(crate) fn queue_outbound_hrmp(sender: ParaId, out_hrmp_msgs: HorizontalMessages) {
1377		let now = frame_system::Pallet::<T>::block_number();
1378
1379		for out_msg in out_hrmp_msgs {
1380			let channel_id = HrmpChannelId { sender, recipient: out_msg.recipient };
1381
1382			let mut channel = match HrmpChannels::<T>::get(&channel_id) {
1383				Some(channel) => channel,
1384				None => {
1385					// apparently, that since acceptance of this candidate the recipient was
1386					// offboarded and the channel no longer exists.
1387					continue
1388				},
1389			};
1390
1391			let inbound = InboundHrmpMessage { sent_at: now, data: out_msg.data };
1392
1393			// book keeping
1394			channel.msg_count += 1;
1395			channel.total_size += inbound.data.len() as u32;
1396
1397			// compute the new MQC head of the channel
1398			let prev_head = channel.mqc_head.unwrap_or(Default::default());
1399			let new_head = BlakeTwo256::hash_of(&(
1400				prev_head,
1401				inbound.sent_at,
1402				T::Hashing::hash_of(&inbound.data),
1403			));
1404			channel.mqc_head = Some(new_head);
1405
1406			HrmpChannels::<T>::insert(&channel_id, channel);
1407			HrmpChannelContents::<T>::append(&channel_id, inbound);
1408
1409			// The digests are sorted in ascending by block number order. There are only two
1410			// possible scenarios here ("the current" is the block of candidate's inclusion):
1411			//
1412			// (a) It's the first time anybody sends a message to this recipient within this block.
1413			//     In this case, the digest vector would be empty or the block number of the latest
1414			//     entry is smaller than the current.
1415			//
1416			// (b) Somebody has already sent a message within the current block. That means that
1417			//     the block number of the latest entry is equal to the current.
1418			//
1419			// Note that having the latest entry greater than the current block number is a logical
1420			// error.
1421			let mut recipient_digest = HrmpChannelDigests::<T>::get(&channel_id.recipient);
1422			if let Some(cur_block_digest) = recipient_digest
1423				.last_mut()
1424				.filter(|(block_no, _)| *block_no == now)
1425				.map(|(_, ref mut d)| d)
1426			{
1427				cur_block_digest.push(sender);
1428			} else {
1429				recipient_digest.push((now, vec![sender]));
1430			}
1431			HrmpChannelDigests::<T>::insert(&channel_id.recipient, recipient_digest);
1432		}
1433	}
1434
1435	/// Initiate opening a channel from a parachain to a given recipient with given channel
1436	/// parameters. If neither chain is part of the system, then a deposit from the `Configuration`
1437	/// will be required for `origin` (the sender) upon opening the request and the `recipient` upon
1438	/// accepting it.
1439	///
1440	/// Basically the same as [`hrmp_init_open_channel`](Pallet::hrmp_init_open_channel) but
1441	/// intended for calling directly from other pallets rather than dispatched.
1442	pub fn init_open_channel(
1443		origin: ParaId,
1444		recipient: ParaId,
1445		proposed_max_capacity: u32,
1446		proposed_max_message_size: u32,
1447	) -> DispatchResult {
1448		ensure!(origin != recipient, Error::<T>::OpenHrmpChannelToSelf);
1449		ensure!(
1450			paras::Pallet::<T>::is_valid_para(recipient),
1451			Error::<T>::OpenHrmpChannelInvalidRecipient,
1452		);
1453
1454		let config = configuration::ActiveConfig::<T>::get();
1455		ensure!(proposed_max_capacity > 0, Error::<T>::OpenHrmpChannelZeroCapacity);
1456		ensure!(
1457			proposed_max_capacity <= config.hrmp_channel_max_capacity,
1458			Error::<T>::OpenHrmpChannelCapacityExceedsLimit,
1459		);
1460		ensure!(proposed_max_message_size > 0, Error::<T>::OpenHrmpChannelZeroMessageSize);
1461		ensure!(
1462			proposed_max_message_size <= config.hrmp_channel_max_message_size,
1463			Error::<T>::OpenHrmpChannelMessageSizeExceedsLimit,
1464		);
1465
1466		let channel_id = HrmpChannelId { sender: origin, recipient };
1467		ensure!(
1468			HrmpOpenChannelRequests::<T>::get(&channel_id).is_none(),
1469			Error::<T>::OpenHrmpChannelAlreadyRequested,
1470		);
1471		ensure!(
1472			HrmpChannels::<T>::get(&channel_id).is_none(),
1473			Error::<T>::OpenHrmpChannelAlreadyExists,
1474		);
1475
1476		let egress_cnt = HrmpEgressChannelsIndex::<T>::decode_len(&origin).unwrap_or(0) as u32;
1477		let open_req_cnt = HrmpOpenChannelRequestCount::<T>::get(&origin);
1478		let channel_num_limit = config.hrmp_max_parachain_outbound_channels;
1479		ensure!(
1480			egress_cnt + open_req_cnt < channel_num_limit,
1481			Error::<T>::OpenHrmpChannelLimitExceeded,
1482		);
1483
1484		// Do not require deposits for channels with or amongst the system.
1485		let is_system = origin.is_system() || recipient.is_system();
1486		let deposit = if is_system { 0 } else { config.hrmp_sender_deposit };
1487		if !deposit.is_zero() {
1488			T::Currency::reserve(
1489				&origin.into_account_truncating(),
1490				deposit.unique_saturated_into(),
1491			)?;
1492		}
1493
1494		// mutating storage directly now -- shall not bail henceforth.
1495
1496		HrmpOpenChannelRequestCount::<T>::insert(&origin, open_req_cnt + 1);
1497		HrmpOpenChannelRequests::<T>::insert(
1498			&channel_id,
1499			HrmpOpenChannelRequest {
1500				confirmed: false,
1501				_age: 0,
1502				sender_deposit: deposit,
1503				max_capacity: proposed_max_capacity,
1504				max_message_size: proposed_max_message_size,
1505				max_total_size: config.hrmp_channel_max_total_size,
1506			},
1507		);
1508		HrmpOpenChannelRequestsList::<T>::append(channel_id);
1509
1510		Self::send_to_para(
1511			"init_open_channel",
1512			&config,
1513			recipient,
1514			Self::wrap_notification(|| {
1515				use xcm::opaque::latest::{prelude::*, Xcm};
1516				Xcm(vec![HrmpNewChannelOpenRequest {
1517					sender: origin.into(),
1518					max_capacity: proposed_max_capacity,
1519					max_message_size: proposed_max_message_size,
1520				}])
1521			}),
1522		);
1523
1524		Ok(())
1525	}
1526
1527	/// Accept a pending open channel request from the given sender.
1528	///
1529	/// Basically the same as [`hrmp_accept_open_channel`](Pallet::hrmp_accept_open_channel) but
1530	/// intended for calling directly from other pallets rather than dispatched.
1531	pub fn accept_open_channel(origin: ParaId, sender: ParaId) -> DispatchResult {
1532		let channel_id = HrmpChannelId { sender, recipient: origin };
1533		let mut channel_req = HrmpOpenChannelRequests::<T>::get(&channel_id)
1534			.ok_or(Error::<T>::AcceptHrmpChannelDoesntExist)?;
1535		ensure!(!channel_req.confirmed, Error::<T>::AcceptHrmpChannelAlreadyConfirmed);
1536
1537		// check if by accepting this open channel request, this parachain would exceed the
1538		// number of inbound channels.
1539		let config = configuration::ActiveConfig::<T>::get();
1540		let channel_num_limit = config.hrmp_max_parachain_inbound_channels;
1541		let ingress_cnt = HrmpIngressChannelsIndex::<T>::decode_len(&origin).unwrap_or(0) as u32;
1542		let accepted_cnt = HrmpAcceptedChannelRequestCount::<T>::get(&origin);
1543		ensure!(
1544			ingress_cnt + accepted_cnt < channel_num_limit,
1545			Error::<T>::AcceptHrmpChannelLimitExceeded,
1546		);
1547
1548		// Do not require deposits for channels with or amongst the system.
1549		let is_system = origin.is_system() || sender.is_system();
1550		let deposit = if is_system { 0 } else { config.hrmp_recipient_deposit };
1551		if !deposit.is_zero() {
1552			T::Currency::reserve(
1553				&origin.into_account_truncating(),
1554				deposit.unique_saturated_into(),
1555			)?;
1556		}
1557
1558		// persist the updated open channel request and then increment the number of accepted
1559		// channels.
1560		channel_req.confirmed = true;
1561		HrmpOpenChannelRequests::<T>::insert(&channel_id, channel_req);
1562		HrmpAcceptedChannelRequestCount::<T>::insert(&origin, accepted_cnt + 1);
1563
1564		Self::send_to_para(
1565			"accept_open_channel",
1566			&config,
1567			sender,
1568			Self::wrap_notification(|| {
1569				use xcm::opaque::latest::{prelude::*, Xcm};
1570				Xcm(vec![HrmpChannelAccepted { recipient: origin.into() }])
1571			}),
1572		);
1573
1574		Ok(())
1575	}
1576
1577	fn cancel_open_request(origin: ParaId, channel_id: HrmpChannelId) -> DispatchResult {
1578		// check if the origin is allowed to close the channel.
1579		ensure!(channel_id.is_participant(origin), Error::<T>::CancelHrmpOpenChannelUnauthorized);
1580
1581		let open_channel_req = HrmpOpenChannelRequests::<T>::get(&channel_id)
1582			.ok_or(Error::<T>::OpenHrmpChannelDoesntExist)?;
1583		ensure!(!open_channel_req.confirmed, Error::<T>::OpenHrmpChannelAlreadyConfirmed);
1584
1585		// Remove the request by the channel id and sync the accompanying list with the set.
1586		HrmpOpenChannelRequests::<T>::remove(&channel_id);
1587		HrmpOpenChannelRequestsList::<T>::mutate(|open_req_channels| {
1588			if let Some(pos) = open_req_channels.iter().position(|x| x == &channel_id) {
1589				open_req_channels.swap_remove(pos);
1590			}
1591		});
1592
1593		Self::decrease_open_channel_request_count(channel_id.sender);
1594		// Don't decrease `HrmpAcceptedChannelRequestCount` because we don't consider confirmed
1595		// requests here.
1596
1597		// Unreserve the sender's deposit. The recipient could not have left their deposit because
1598		// we ensured that the request is not confirmed.
1599		T::Currency::unreserve(
1600			&channel_id.sender.into_account_truncating(),
1601			open_channel_req.sender_deposit.unique_saturated_into(),
1602		);
1603
1604		Ok(())
1605	}
1606
1607	fn close_channel(origin: ParaId, channel_id: HrmpChannelId) -> Result<(), Error<T>> {
1608		// check if the origin is allowed to close the channel.
1609		ensure!(channel_id.is_participant(origin), Error::<T>::CloseHrmpChannelUnauthorized);
1610
1611		// check if the channel requested to close does exist.
1612		ensure!(
1613			HrmpChannels::<T>::get(&channel_id).is_some(),
1614			Error::<T>::CloseHrmpChannelDoesntExist,
1615		);
1616
1617		// check that there is no outstanding close request for this channel
1618		ensure!(
1619			HrmpCloseChannelRequests::<T>::get(&channel_id).is_none(),
1620			Error::<T>::CloseHrmpChannelAlreadyUnderway,
1621		);
1622
1623		HrmpCloseChannelRequests::<T>::insert(&channel_id, ());
1624		HrmpCloseChannelRequestsList::<T>::append(channel_id.clone());
1625
1626		let config = configuration::ActiveConfig::<T>::get();
1627		let opposite_party =
1628			if origin == channel_id.sender { channel_id.recipient } else { channel_id.sender };
1629
1630		Self::send_to_para(
1631			"close_channel",
1632			&config,
1633			opposite_party,
1634			Self::wrap_notification(|| {
1635				use xcm::opaque::latest::{prelude::*, Xcm};
1636				Xcm(vec![HrmpChannelClosing {
1637					initiator: origin.into(),
1638					sender: channel_id.sender.into(),
1639					recipient: channel_id.recipient.into(),
1640				}])
1641			}),
1642		);
1643
1644		Ok(())
1645	}
1646
1647	/// Returns the list of MQC heads for the inbound channels of the given recipient para paired
1648	/// with the sender para ids. This vector is sorted ascending by the para id and doesn't contain
1649	/// multiple entries with the same sender.
1650	#[cfg(test)]
1651	fn hrmp_mqc_heads(recipient: ParaId) -> Vec<(ParaId, Hash)> {
1652		let sender_set = HrmpIngressChannelsIndex::<T>::get(&recipient);
1653
1654		// The ingress channels vector is sorted, thus `mqc_heads` is sorted as well.
1655		let mut mqc_heads = Vec::with_capacity(sender_set.len());
1656		for sender in sender_set {
1657			let channel_metadata = HrmpChannels::<T>::get(&HrmpChannelId { sender, recipient });
1658			let mqc_head = channel_metadata
1659				.and_then(|metadata| metadata.mqc_head)
1660				.unwrap_or(Hash::default());
1661			mqc_heads.push((sender, mqc_head));
1662		}
1663
1664		mqc_heads
1665	}
1666
1667	/// Returns contents of all channels addressed to the given recipient. Channels that have no
1668	/// messages in them are also included.
1669	pub(crate) fn inbound_hrmp_channels_contents(
1670		recipient: ParaId,
1671	) -> BTreeMap<ParaId, Vec<InboundHrmpMessage<BlockNumberFor<T>>>> {
1672		let sender_set = HrmpIngressChannelsIndex::<T>::get(&recipient);
1673
1674		let mut inbound_hrmp_channels_contents = BTreeMap::new();
1675		for sender in sender_set {
1676			let channel_contents =
1677				HrmpChannelContents::<T>::get(&HrmpChannelId { sender, recipient });
1678			inbound_hrmp_channels_contents.insert(sender, channel_contents);
1679		}
1680
1681		inbound_hrmp_channels_contents
1682	}
1683}
1684
1685impl<T: Config> Pallet<T> {
1686	/// Decreases the open channel request count for the given sender. If the value reaches zero
1687	/// it is removed completely.
1688	fn decrease_open_channel_request_count(sender: ParaId) {
1689		HrmpOpenChannelRequestCount::<T>::mutate_exists(&sender, |opt_rc| {
1690			*opt_rc = opt_rc.and_then(|rc| match rc.saturating_sub(1) {
1691				0 => None,
1692				n => Some(n),
1693			});
1694		});
1695	}
1696
1697	/// Decreases the accepted channel request count for the given sender. If the value reaches
1698	/// zero it is removed completely.
1699	fn decrease_accepted_channel_request_count(recipient: ParaId) {
1700		HrmpAcceptedChannelRequestCount::<T>::mutate_exists(&recipient, |opt_rc| {
1701			*opt_rc = opt_rc.and_then(|rc| match rc.saturating_sub(1) {
1702				0 => None,
1703				n => Some(n),
1704			});
1705		});
1706	}
1707
1708	#[cfg(any(feature = "runtime-benchmarks", test))]
1709	fn assert_storage_consistency_exhaustive() {
1710		fn assert_is_sorted<T: Ord>(slice: &[T], id: &str) {
1711			assert!(slice.windows(2).all(|xs| xs[0] <= xs[1]), "{} supposed to be sorted", id);
1712		}
1713
1714		let assert_contains_only_onboarded = |paras: Vec<ParaId>, cause: &str| {
1715			for para in paras {
1716				assert!(
1717					crate::paras::Pallet::<T>::is_valid_para(para),
1718					"{}: {:?} para is offboarded",
1719					cause,
1720					para
1721				);
1722			}
1723		};
1724
1725		assert_eq!(
1726			HrmpOpenChannelRequests::<T>::iter().map(|(k, _)| k).collect::<BTreeSet<_>>(),
1727			HrmpOpenChannelRequestsList::<T>::get().into_iter().collect::<BTreeSet<_>>(),
1728		);
1729
1730		// verify that the set of keys in `HrmpOpenChannelRequestCount` corresponds to the set
1731		// of _senders_ in `HrmpOpenChannelRequests`.
1732		//
1733		// having ensured that, we can go ahead and go over all counts and verify that they match.
1734		assert_eq!(
1735			HrmpOpenChannelRequestCount::<T>::iter()
1736				.map(|(k, _)| k)
1737				.collect::<BTreeSet<_>>(),
1738			HrmpOpenChannelRequests::<T>::iter()
1739				.map(|(k, _)| k.sender)
1740				.collect::<BTreeSet<_>>(),
1741		);
1742		for (open_channel_initiator, expected_num) in HrmpOpenChannelRequestCount::<T>::iter() {
1743			let actual_num = HrmpOpenChannelRequests::<T>::iter()
1744				.filter(|(ch, _)| ch.sender == open_channel_initiator)
1745				.count() as u32;
1746			assert_eq!(expected_num, actual_num);
1747		}
1748
1749		// The same as above, but for accepted channel request count. Note that we are interested
1750		// only in confirmed open requests.
1751		assert_eq!(
1752			HrmpAcceptedChannelRequestCount::<T>::iter()
1753				.map(|(k, _)| k)
1754				.collect::<BTreeSet<_>>(),
1755			HrmpOpenChannelRequests::<T>::iter()
1756				.filter(|(_, v)| v.confirmed)
1757				.map(|(k, _)| k.recipient)
1758				.collect::<BTreeSet<_>>(),
1759		);
1760		for (channel_recipient, expected_num) in HrmpAcceptedChannelRequestCount::<T>::iter() {
1761			let actual_num = HrmpOpenChannelRequests::<T>::iter()
1762				.filter(|(ch, v)| ch.recipient == channel_recipient && v.confirmed)
1763				.count() as u32;
1764			assert_eq!(expected_num, actual_num);
1765		}
1766
1767		assert_eq!(
1768			HrmpCloseChannelRequests::<T>::iter().map(|(k, _)| k).collect::<BTreeSet<_>>(),
1769			HrmpCloseChannelRequestsList::<T>::get().into_iter().collect::<BTreeSet<_>>(),
1770		);
1771
1772		// A HRMP watermark can be None for an onboarded parachain. However, an offboarded parachain
1773		// cannot have an HRMP watermark: it should've been cleanup.
1774		assert_contains_only_onboarded(
1775			HrmpWatermarks::<T>::iter().map(|(k, _)| k).collect::<Vec<_>>(),
1776			"HRMP watermarks should contain only onboarded paras",
1777		);
1778
1779		// An entry in `HrmpChannels` indicates that the channel is open. Only open channels can
1780		// have contents.
1781		for (non_empty_channel, contents) in HrmpChannelContents::<T>::iter() {
1782			assert!(HrmpChannels::<T>::contains_key(&non_empty_channel));
1783
1784			// pedantic check: there should be no empty vectors in storage, those should be modeled
1785			// by a removed kv pair.
1786			assert!(!contents.is_empty());
1787		}
1788
1789		// Senders and recipients must be onboarded. Otherwise, all channels associated with them
1790		// are removed.
1791		assert_contains_only_onboarded(
1792			HrmpChannels::<T>::iter()
1793				.flat_map(|(k, _)| vec![k.sender, k.recipient])
1794				.collect::<Vec<_>>(),
1795			"senders and recipients in all channels should be onboarded",
1796		);
1797
1798		// Check the docs for `HrmpIngressChannelsIndex` and `HrmpEgressChannelsIndex` in decl
1799		// storage to get an index what are the channel mappings indexes.
1800		//
1801		// Here, from indexes.
1802		//
1803		// ingress         egress
1804		//
1805		// a -> [x, y]     x -> [a, b]
1806		// b -> [x, z]     y -> [a]
1807		//                 z -> [b]
1808		//
1809		// we derive a list of channels they represent.
1810		//
1811		//   (a, x)         (a, x)
1812		//   (a, y)         (a, y)
1813		//   (b, x)         (b, x)
1814		//   (b, z)         (b, z)
1815		//
1816		// and then that we compare that to the channel list in the `HrmpChannels`.
1817		let channel_set_derived_from_ingress = HrmpIngressChannelsIndex::<T>::iter()
1818			.flat_map(|(p, v)| v.into_iter().map(|i| (i, p)).collect::<Vec<_>>())
1819			.collect::<BTreeSet<_>>();
1820		let channel_set_derived_from_egress = HrmpEgressChannelsIndex::<T>::iter()
1821			.flat_map(|(p, v)| v.into_iter().map(|e| (p, e)).collect::<Vec<_>>())
1822			.collect::<BTreeSet<_>>();
1823		let channel_set_ground_truth = HrmpChannels::<T>::iter()
1824			.map(|(k, _)| (k.sender, k.recipient))
1825			.collect::<BTreeSet<_>>();
1826		assert_eq!(channel_set_derived_from_ingress, channel_set_derived_from_egress);
1827		assert_eq!(channel_set_derived_from_egress, channel_set_ground_truth);
1828
1829		HrmpIngressChannelsIndex::<T>::iter()
1830			.map(|(_, v)| v)
1831			.for_each(|v| assert_is_sorted(&v, "HrmpIngressChannelsIndex"));
1832		HrmpEgressChannelsIndex::<T>::iter()
1833			.map(|(_, v)| v)
1834			.for_each(|v| assert_is_sorted(&v, "HrmpIngressChannelsIndex"));
1835
1836		assert_contains_only_onboarded(
1837			HrmpChannelDigests::<T>::iter().map(|(k, _)| k).collect::<Vec<_>>(),
1838			"HRMP channel digests should contain only onboarded paras",
1839		);
1840		for (_digest_for_para, digest) in HrmpChannelDigests::<T>::iter() {
1841			// Assert that items are in **strictly** ascending order. The strictness also implies
1842			// there are no duplicates.
1843			assert!(digest.windows(2).all(|xs| xs[0].0 < xs[1].0));
1844
1845			for (_, mut senders) in digest {
1846				assert!(!senders.is_empty());
1847
1848				// check for duplicates. For that we sort the vector, then perform deduplication.
1849				// if the vector stayed the same, there are no duplicates.
1850				senders.sort();
1851				let orig_senders = senders.clone();
1852				senders.dedup();
1853				assert_eq!(
1854					orig_senders, senders,
1855					"duplicates removed implies existence of duplicates"
1856				);
1857			}
1858		}
1859	}
1860}
1861
1862impl<T: Config> Pallet<T> {
1863	/// Wraps HRMP XCM notifications to the most suitable XCM version for the destination para.
1864	/// If the XCM version is unknown, the latest XCM version is used as a best effort.
1865	fn wrap_notification(
1866		mut notification: impl FnMut() -> xcm::opaque::latest::opaque::Xcm,
1867	) -> impl FnOnce(ParaId) -> polkadot_primitives::DownwardMessage {
1868		use xcm::{
1869			opaque::VersionedXcm,
1870			prelude::{Junction, Location},
1871			WrapVersion,
1872		};
1873
1874		// Return a closure that can prepare notifications.
1875		move |dest| {
1876			// Attempt to wrap the notification for the destination parachain.
1877			T::VersionWrapper::wrap_version(
1878				&Location::new(0, [Junction::Parachain(dest.into())]),
1879				notification(),
1880			)
1881			.unwrap_or_else(|_| {
1882				// As a best effort, if we cannot resolve the version, fallback to using the latest
1883				// version.
1884				VersionedXcm::from(notification())
1885			})
1886			.encode()
1887		}
1888	}
1889
1890	/// Sends/enqueues notification to the destination parachain.
1891	fn send_to_para(
1892		log_label: &str,
1893		config: &HostConfiguration<BlockNumberFor<T>>,
1894		dest: ParaId,
1895		notification_bytes_for: impl FnOnce(ParaId) -> polkadot_primitives::DownwardMessage,
1896	) {
1897		// prepare notification
1898		let notification_bytes = notification_bytes_for(dest);
1899
1900		// try to enqueue
1901		if let Err(dmp::QueueDownwardMessageError::ExceedsMaxMessageSize) =
1902			dmp::Pallet::<T>::queue_downward_message(&config, dest, notification_bytes)
1903		{
1904			// this should never happen unless the max downward message size is configured to a
1905			// jokingly small number.
1906			log::error!(
1907				target: "runtime::hrmp",
1908				"sending '{log_label}::notification_bytes' failed."
1909			);
1910			debug_assert!(false);
1911		}
1912	}
1913}