1use crate::{
18 configuration::{self, HostConfiguration},
19 dmp, ensure_parachain, initializer, paras,
20};
21use alloc::{
22 collections::{btree_map::BTreeMap, btree_set::BTreeSet},
23 vec,
24 vec::Vec,
25};
26use codec::{Decode, Encode};
27use core::{fmt, mem};
28use frame_support::{pallet_prelude::*, traits::ReservableCurrency, DefaultNoBound};
29use frame_system::pallet_prelude::*;
30use polkadot_parachain_primitives::primitives::{HorizontalMessages, IsSystem};
31use polkadot_primitives::{
32 Balance, Hash, HrmpChannelId, Id as ParaId, InboundHrmpMessage, OutboundHrmpMessage,
33 SessionIndex,
34};
35use scale_info::TypeInfo;
36use sp_runtime::{
37 traits::{AccountIdConversion, BlakeTwo256, Hash as HashT, UniqueSaturatedInto, Zero},
38 ArithmeticError,
39};
40
41pub use pallet::*;
42
43pub const HRMP_MAX_INBOUND_CHANNELS_BOUND: u32 = 128;
48pub const HRMP_MAX_OUTBOUND_CHANNELS_BOUND: u32 = 128;
50
51#[cfg(test)]
52pub(crate) mod tests;
53
54#[cfg(feature = "runtime-benchmarks")]
55mod benchmarking;
56
57pub trait WeightInfo {
58 fn hrmp_init_open_channel() -> Weight;
59 fn hrmp_accept_open_channel() -> Weight;
60 fn hrmp_close_channel() -> Weight;
61 fn force_clean_hrmp(i: u32, e: u32) -> Weight;
62 fn force_process_hrmp_open(c: u32) -> Weight;
63 fn force_process_hrmp_close(c: u32) -> Weight;
64 fn hrmp_cancel_open_request(c: u32) -> Weight;
65 fn clean_open_channel_requests(c: u32) -> Weight;
66 fn force_open_hrmp_channel(c: u32) -> Weight;
67 fn establish_system_channel() -> Weight;
68 fn poke_channel_deposits() -> Weight;
69 fn establish_channel_with_system() -> Weight;
70}
71
72pub struct TestWeightInfo;
74
75impl WeightInfo for TestWeightInfo {
76 fn hrmp_accept_open_channel() -> Weight {
77 Weight::MAX
78 }
79 fn force_clean_hrmp(_: u32, _: u32) -> Weight {
80 Weight::MAX
81 }
82 fn force_process_hrmp_close(_: u32) -> Weight {
83 Weight::MAX
84 }
85 fn force_process_hrmp_open(_: u32) -> Weight {
86 Weight::MAX
87 }
88 fn hrmp_cancel_open_request(_: u32) -> Weight {
89 Weight::MAX
90 }
91 fn hrmp_close_channel() -> Weight {
92 Weight::MAX
93 }
94 fn hrmp_init_open_channel() -> Weight {
95 Weight::MAX
96 }
97 fn clean_open_channel_requests(_: u32) -> Weight {
98 Weight::MAX
99 }
100 fn force_open_hrmp_channel(_: u32) -> Weight {
101 Weight::MAX
102 }
103 fn establish_system_channel() -> Weight {
104 Weight::MAX
105 }
106 fn poke_channel_deposits() -> Weight {
107 Weight::MAX
108 }
109 fn establish_channel_with_system() -> Weight {
110 Weight::MAX
111 }
112}
113
114#[derive(Encode, Decode, TypeInfo)]
116pub struct HrmpOpenChannelRequest {
117 pub confirmed: bool,
119 pub _age: SessionIndex,
122 pub sender_deposit: Balance,
124 pub max_message_size: u32,
126 pub max_capacity: u32,
128 pub max_total_size: u32,
130}
131
132#[derive(Encode, Decode, TypeInfo)]
134#[cfg_attr(test, derive(Debug))]
135pub struct HrmpChannel {
136 pub max_capacity: u32,
144 pub max_total_size: u32,
146 pub max_message_size: u32,
148 pub msg_count: u32,
151 pub total_size: u32,
154 pub mqc_head: Option<Hash>,
162 pub sender_deposit: Balance,
164 pub recipient_deposit: Balance,
166}
167
168pub(crate) enum HrmpWatermarkAcceptanceErr<BlockNumber> {
171 AdvancementRule { new_watermark: BlockNumber, last_watermark: BlockNumber },
172 AheadRelayParent { new_watermark: BlockNumber, relay_chain_parent_number: BlockNumber },
173 LandsOnBlockWithNoMessages { new_watermark: BlockNumber },
174}
175
176pub(crate) enum OutboundHrmpAcceptanceErr {
179 MoreMessagesThanPermitted { sent: u32, permitted: u32 },
180 NotSorted { idx: u32 },
181 NoSuchChannel { idx: u32, channel_id: HrmpChannelId },
182 MaxMessageSizeExceeded { idx: u32, msg_size: u32, max_size: u32 },
183 TotalSizeExceeded { idx: u32, total_size: u32, limit: u32 },
184 CapacityExceeded { idx: u32, count: u32, limit: u32 },
185}
186
187impl<BlockNumber> fmt::Debug for HrmpWatermarkAcceptanceErr<BlockNumber>
188where
189 BlockNumber: fmt::Debug,
190{
191 fn fmt(&self, fmt: &mut fmt::Formatter) -> fmt::Result {
192 use HrmpWatermarkAcceptanceErr::*;
193 match self {
194 AdvancementRule { new_watermark, last_watermark } => write!(
195 fmt,
196 "the HRMP watermark is not advanced relative to the last watermark ({:?} > {:?})",
197 new_watermark, last_watermark,
198 ),
199 AheadRelayParent { new_watermark, relay_chain_parent_number } => write!(
200 fmt,
201 "the HRMP watermark is ahead the relay-parent ({:?} > {:?})",
202 new_watermark, relay_chain_parent_number
203 ),
204 LandsOnBlockWithNoMessages { new_watermark } => write!(
205 fmt,
206 "the HRMP watermark ({:?}) doesn't land on a block with messages received",
207 new_watermark
208 ),
209 }
210 }
211}
212
213impl fmt::Debug for OutboundHrmpAcceptanceErr {
214 fn fmt(&self, fmt: &mut fmt::Formatter) -> fmt::Result {
215 use OutboundHrmpAcceptanceErr::*;
216 match self {
217 MoreMessagesThanPermitted { sent, permitted } => write!(
218 fmt,
219 "more HRMP messages than permitted by config ({} > {})",
220 sent, permitted,
221 ),
222 NotSorted { idx } => {
223 write!(fmt, "the HRMP messages are not sorted (first unsorted is at index {})", idx,)
224 },
225 NoSuchChannel { idx, channel_id } => write!(
226 fmt,
227 "the HRMP message at index {} is sent to a non existent channel {:?}->{:?}",
228 idx, channel_id.sender, channel_id.recipient,
229 ),
230 MaxMessageSizeExceeded { idx, msg_size, max_size } => write!(
231 fmt,
232 "the HRMP message at index {} exceeds the negotiated channel maximum message size ({} > {})",
233 idx, msg_size, max_size,
234 ),
235 TotalSizeExceeded { idx, total_size, limit } => write!(
236 fmt,
237 "sending the HRMP message at index {} would exceed the negotiated channel total size ({} > {})",
238 idx, total_size, limit,
239 ),
240 CapacityExceeded { idx, count, limit } => write!(
241 fmt,
242 "sending the HRMP message at index {} would exceed the negotiated channel capacity ({} > {})",
243 idx, count, limit,
244 ),
245 }
246 }
247}
248
249#[frame_support::pallet]
250pub mod pallet {
251 use super::*;
252
253 #[pallet::pallet]
254 #[pallet::without_storage_info]
255 pub struct Pallet<T>(_);
256
257 #[pallet::config]
258 pub trait Config:
259 frame_system::Config + configuration::Config + paras::Config + dmp::Config
260 {
261 #[allow(deprecated)]
263 type RuntimeEvent: From<Event<Self>> + IsType<<Self as frame_system::Config>::RuntimeEvent>;
264
265 type RuntimeOrigin: From<crate::Origin>
266 + From<<Self as frame_system::Config>::RuntimeOrigin>
267 + Into<Result<crate::Origin, <Self as Config>::RuntimeOrigin>>;
268
269 type ChannelManager: EnsureOrigin<<Self as frame_system::Config>::RuntimeOrigin>;
271
272 type Currency: ReservableCurrency<Self::AccountId>;
278
279 type DefaultChannelSizeAndCapacityWithSystem: Get<(u32, u32)>;
282
283 type VersionWrapper: xcm::WrapVersion;
290
291 type WeightInfo: WeightInfo;
293 }
294
295 #[pallet::event]
296 #[pallet::generate_deposit(pub(super) fn deposit_event)]
297 pub enum Event<T: Config> {
298 OpenChannelRequested {
300 sender: ParaId,
301 recipient: ParaId,
302 proposed_max_capacity: u32,
303 proposed_max_message_size: u32,
304 },
305 OpenChannelCanceled { by_parachain: ParaId, channel_id: HrmpChannelId },
307 OpenChannelAccepted { sender: ParaId, recipient: ParaId },
309 ChannelClosed { by_parachain: ParaId, channel_id: HrmpChannelId },
311 HrmpChannelForceOpened {
313 sender: ParaId,
314 recipient: ParaId,
315 proposed_max_capacity: u32,
316 proposed_max_message_size: u32,
317 },
318 HrmpSystemChannelOpened {
320 sender: ParaId,
321 recipient: ParaId,
322 proposed_max_capacity: u32,
323 proposed_max_message_size: u32,
324 },
325 OpenChannelDepositsUpdated { sender: ParaId, recipient: ParaId },
327 }
328
329 #[pallet::error]
330 pub enum Error<T> {
331 OpenHrmpChannelToSelf,
333 OpenHrmpChannelInvalidRecipient,
335 OpenHrmpChannelZeroCapacity,
337 OpenHrmpChannelCapacityExceedsLimit,
339 OpenHrmpChannelZeroMessageSize,
341 OpenHrmpChannelMessageSizeExceedsLimit,
343 OpenHrmpChannelAlreadyExists,
345 OpenHrmpChannelAlreadyRequested,
347 OpenHrmpChannelLimitExceeded,
349 AcceptHrmpChannelDoesntExist,
351 AcceptHrmpChannelAlreadyConfirmed,
353 AcceptHrmpChannelLimitExceeded,
355 CloseHrmpChannelUnauthorized,
357 CloseHrmpChannelDoesntExist,
359 CloseHrmpChannelAlreadyUnderway,
361 CancelHrmpOpenChannelUnauthorized,
363 OpenHrmpChannelDoesntExist,
365 OpenHrmpChannelAlreadyConfirmed,
367 WrongWitness,
369 ChannelCreationNotAuthorized,
371 }
372
373 #[pallet::storage]
380 pub type HrmpOpenChannelRequests<T: Config> =
381 StorageMap<_, Twox64Concat, HrmpChannelId, HrmpOpenChannelRequest>;
382
383 #[pallet::storage]
387 pub type HrmpOpenChannelRequestsList<T: Config> =
388 StorageValue<_, Vec<HrmpChannelId>, ValueQuery>;
389
390 #[pallet::storage]
394 pub type HrmpOpenChannelRequestCount<T: Config> =
395 StorageMap<_, Twox64Concat, ParaId, u32, ValueQuery>;
396
397 #[pallet::storage]
401 pub type HrmpAcceptedChannelRequestCount<T: Config> =
402 StorageMap<_, Twox64Concat, ParaId, u32, ValueQuery>;
403
404 #[pallet::storage]
412 pub type HrmpCloseChannelRequests<T: Config> = StorageMap<_, Twox64Concat, HrmpChannelId, ()>;
413
414 #[pallet::storage]
415 pub type HrmpCloseChannelRequestsList<T: Config> =
416 StorageValue<_, Vec<HrmpChannelId>, ValueQuery>;
417
418 #[pallet::storage]
423 pub type HrmpWatermarks<T: Config> = StorageMap<_, Twox64Concat, ParaId, BlockNumberFor<T>>;
424
425 #[pallet::storage]
429 pub type HrmpChannels<T: Config> = StorageMap<_, Twox64Concat, HrmpChannelId, HrmpChannel>;
430
431 #[pallet::storage]
445 pub type HrmpIngressChannelsIndex<T: Config> =
446 StorageMap<_, Twox64Concat, ParaId, Vec<ParaId>, ValueQuery>;
447
448 #[pallet::storage]
451 pub type HrmpEgressChannelsIndex<T: Config> =
452 StorageMap<_, Twox64Concat, ParaId, Vec<ParaId>, ValueQuery>;
453
454 #[pallet::storage]
457 pub type HrmpChannelContents<T: Config> = StorageMap<
458 _,
459 Twox64Concat,
460 HrmpChannelId,
461 Vec<InboundHrmpMessage<BlockNumberFor<T>>>,
462 ValueQuery,
463 >;
464
465 #[pallet::storage]
472 pub type HrmpChannelDigests<T: Config> =
473 StorageMap<_, Twox64Concat, ParaId, Vec<(BlockNumberFor<T>, Vec<ParaId>)>, ValueQuery>;
474
475 #[pallet::genesis_config]
489 #[derive(DefaultNoBound)]
490 pub struct GenesisConfig<T: Config> {
491 #[serde(skip)]
492 _config: core::marker::PhantomData<T>,
493 preopen_hrmp_channels: Vec<(ParaId, ParaId, u32, u32)>,
494 }
495
496 #[pallet::genesis_build]
497 impl<T: Config> BuildGenesisConfig for GenesisConfig<T> {
498 fn build(&self) {
499 initialize_storage::<T>(&self.preopen_hrmp_channels);
500 }
501 }
502
503 #[pallet::call]
504 impl<T: Config> Pallet<T> {
505 #[pallet::call_index(0)]
516 #[pallet::weight(<T as Config>::WeightInfo::hrmp_init_open_channel())]
517 pub fn hrmp_init_open_channel(
518 origin: OriginFor<T>,
519 recipient: ParaId,
520 proposed_max_capacity: u32,
521 proposed_max_message_size: u32,
522 ) -> DispatchResult {
523 let origin = ensure_parachain(<T as Config>::RuntimeOrigin::from(origin))?;
524 Self::init_open_channel(
525 origin,
526 recipient,
527 proposed_max_capacity,
528 proposed_max_message_size,
529 )?;
530 Self::deposit_event(Event::OpenChannelRequested {
531 sender: origin,
532 recipient,
533 proposed_max_capacity,
534 proposed_max_message_size,
535 });
536 Ok(())
537 }
538
539 #[pallet::call_index(1)]
543 #[pallet::weight(<T as Config>::WeightInfo::hrmp_accept_open_channel())]
544 pub fn hrmp_accept_open_channel(origin: OriginFor<T>, sender: ParaId) -> DispatchResult {
545 let origin = ensure_parachain(<T as Config>::RuntimeOrigin::from(origin))?;
546 Self::accept_open_channel(origin, sender)?;
547 Self::deposit_event(Event::OpenChannelAccepted { sender, recipient: origin });
548 Ok(())
549 }
550
551 #[pallet::call_index(2)]
556 #[pallet::weight(<T as Config>::WeightInfo::hrmp_close_channel())]
557 pub fn hrmp_close_channel(
558 origin: OriginFor<T>,
559 channel_id: HrmpChannelId,
560 ) -> DispatchResult {
561 let origin = ensure_parachain(<T as Config>::RuntimeOrigin::from(origin))?;
562 Self::close_channel(origin, channel_id.clone())?;
563 Self::deposit_event(Event::ChannelClosed { by_parachain: origin, channel_id });
564 Ok(())
565 }
566
567 #[pallet::call_index(3)]
575 #[pallet::weight(<T as Config>::WeightInfo::force_clean_hrmp(*num_inbound, *num_outbound))]
576 pub fn force_clean_hrmp(
577 origin: OriginFor<T>,
578 para: ParaId,
579 num_inbound: u32,
580 num_outbound: u32,
581 ) -> DispatchResult {
582 T::ChannelManager::ensure_origin(origin)?;
583
584 ensure!(
585 HrmpIngressChannelsIndex::<T>::decode_len(para).unwrap_or_default() <=
586 num_inbound as usize,
587 Error::<T>::WrongWitness
588 );
589 ensure!(
590 HrmpEgressChannelsIndex::<T>::decode_len(para).unwrap_or_default() <=
591 num_outbound as usize,
592 Error::<T>::WrongWitness
593 );
594
595 Self::clean_hrmp_after_outgoing(¶);
596 Ok(())
597 }
598
599 #[pallet::call_index(4)]
608 #[pallet::weight(<T as Config>::WeightInfo::force_process_hrmp_open(*channels))]
609 pub fn force_process_hrmp_open(origin: OriginFor<T>, channels: u32) -> DispatchResult {
610 T::ChannelManager::ensure_origin(origin)?;
611
612 ensure!(
613 HrmpOpenChannelRequestsList::<T>::decode_len().unwrap_or_default() as u32 <=
614 channels,
615 Error::<T>::WrongWitness
616 );
617
618 let host_config = configuration::ActiveConfig::<T>::get();
619 Self::process_hrmp_open_channel_requests(&host_config);
620 Ok(())
621 }
622
623 #[pallet::call_index(5)]
632 #[pallet::weight(<T as Config>::WeightInfo::force_process_hrmp_close(*channels))]
633 pub fn force_process_hrmp_close(origin: OriginFor<T>, channels: u32) -> DispatchResult {
634 T::ChannelManager::ensure_origin(origin)?;
635
636 ensure!(
637 HrmpCloseChannelRequestsList::<T>::decode_len().unwrap_or_default() as u32 <=
638 channels,
639 Error::<T>::WrongWitness
640 );
641
642 Self::process_hrmp_close_channel_requests();
643 Ok(())
644 }
645
646 #[pallet::call_index(6)]
655 #[pallet::weight(<T as Config>::WeightInfo::hrmp_cancel_open_request(*open_requests))]
656 pub fn hrmp_cancel_open_request(
657 origin: OriginFor<T>,
658 channel_id: HrmpChannelId,
659 open_requests: u32,
660 ) -> DispatchResult {
661 let origin = ensure_parachain(<T as Config>::RuntimeOrigin::from(origin))?;
662 ensure!(
663 HrmpOpenChannelRequestsList::<T>::decode_len().unwrap_or_default() as u32 <=
664 open_requests,
665 Error::<T>::WrongWitness
666 );
667 Self::cancel_open_request(origin, channel_id.clone())?;
668 Self::deposit_event(Event::OpenChannelCanceled { by_parachain: origin, channel_id });
669 Ok(())
670 }
671
672 #[pallet::call_index(7)]
681 #[pallet::weight(<T as Config>::WeightInfo::force_open_hrmp_channel(1))]
682 pub fn force_open_hrmp_channel(
683 origin: OriginFor<T>,
684 sender: ParaId,
685 recipient: ParaId,
686 max_capacity: u32,
687 max_message_size: u32,
688 ) -> DispatchResultWithPostInfo {
689 T::ChannelManager::ensure_origin(origin)?;
690
691 let channel_id = HrmpChannelId { sender, recipient };
696 let cancel_request: u32 =
697 if let Some(_open_channel) = HrmpOpenChannelRequests::<T>::get(&channel_id) {
698 Self::cancel_open_request(sender, channel_id)?;
699 1
700 } else {
701 0
702 };
703
704 Self::init_open_channel(sender, recipient, max_capacity, max_message_size)?;
707 Self::accept_open_channel(recipient, sender)?;
708 Self::deposit_event(Event::HrmpChannelForceOpened {
709 sender,
710 recipient,
711 proposed_max_capacity: max_capacity,
712 proposed_max_message_size: max_message_size,
713 });
714
715 Ok(Some(<T as Config>::WeightInfo::force_open_hrmp_channel(cancel_request)).into())
716 }
717
718 #[pallet::call_index(8)]
731 #[pallet::weight(<T as Config>::WeightInfo::establish_system_channel())]
732 pub fn establish_system_channel(
733 origin: OriginFor<T>,
734 sender: ParaId,
735 recipient: ParaId,
736 ) -> DispatchResultWithPostInfo {
737 let _caller = ensure_signed(origin)?;
738
739 ensure!(
741 sender.is_system() && recipient.is_system(),
742 Error::<T>::ChannelCreationNotAuthorized
743 );
744
745 let config = configuration::ActiveConfig::<T>::get();
746 let max_message_size = config.hrmp_channel_max_message_size;
747 let max_capacity = config.hrmp_channel_max_capacity;
748
749 Self::init_open_channel(sender, recipient, max_capacity, max_message_size)?;
750 Self::accept_open_channel(recipient, sender)?;
751
752 Self::deposit_event(Event::HrmpSystemChannelOpened {
753 sender,
754 recipient,
755 proposed_max_capacity: max_capacity,
756 proposed_max_message_size: max_message_size,
757 });
758
759 Ok(Pays::No.into())
760 }
761
762 #[pallet::call_index(9)]
772 #[pallet::weight(<T as Config>::WeightInfo::poke_channel_deposits())]
773 pub fn poke_channel_deposits(
774 origin: OriginFor<T>,
775 sender: ParaId,
776 recipient: ParaId,
777 ) -> DispatchResult {
778 let _caller = ensure_signed(origin)?;
779 let channel_id = HrmpChannelId { sender, recipient };
780 let is_system = sender.is_system() || recipient.is_system();
781
782 let config = configuration::ActiveConfig::<T>::get();
783
784 let (new_sender_deposit, new_recipient_deposit) = if is_system {
786 (0, 0)
787 } else {
788 (config.hrmp_sender_deposit, config.hrmp_recipient_deposit)
789 };
790
791 HrmpChannels::<T>::mutate(&channel_id, |channel| -> DispatchResult {
792 if let Some(ref mut channel) = channel {
793 let current_sender_deposit = channel.sender_deposit;
794 let current_recipient_deposit = channel.recipient_deposit;
795
796 if current_sender_deposit == new_sender_deposit &&
798 current_recipient_deposit == new_recipient_deposit
799 {
800 return Ok(())
801 }
802
803 if current_sender_deposit > new_sender_deposit {
805 let amount = current_sender_deposit
807 .checked_sub(new_sender_deposit)
808 .ok_or(ArithmeticError::Underflow)?;
809 T::Currency::unreserve(
810 &channel_id.sender.into_account_truncating(),
811 amount.try_into().unwrap_or(Zero::zero()),
814 );
815 } else if current_sender_deposit < new_sender_deposit {
816 let amount = new_sender_deposit
817 .checked_sub(current_sender_deposit)
818 .ok_or(ArithmeticError::Underflow)?;
819 T::Currency::reserve(
820 &channel_id.sender.into_account_truncating(),
821 amount.try_into().unwrap_or(Zero::zero()),
822 )?;
823 }
824
825 if current_recipient_deposit > new_recipient_deposit {
827 let amount = current_recipient_deposit
828 .checked_sub(new_recipient_deposit)
829 .ok_or(ArithmeticError::Underflow)?;
830 T::Currency::unreserve(
831 &channel_id.recipient.into_account_truncating(),
832 amount.try_into().unwrap_or(Zero::zero()),
833 );
834 } else if current_recipient_deposit < new_recipient_deposit {
835 let amount = new_recipient_deposit
836 .checked_sub(current_recipient_deposit)
837 .ok_or(ArithmeticError::Underflow)?;
838 T::Currency::reserve(
839 &channel_id.recipient.into_account_truncating(),
840 amount.try_into().unwrap_or(Zero::zero()),
841 )?;
842 }
843
844 channel.sender_deposit = new_sender_deposit;
846 channel.recipient_deposit = new_recipient_deposit;
847 } else {
848 return Err(Error::<T>::OpenHrmpChannelDoesntExist.into())
849 }
850 Ok(())
851 })?;
852
853 Self::deposit_event(Event::OpenChannelDepositsUpdated { sender, recipient });
854
855 Ok(())
856 }
857
858 #[pallet::call_index(10)]
866 #[pallet::weight(<T as Config>::WeightInfo::establish_channel_with_system())]
867 pub fn establish_channel_with_system(
868 origin: OriginFor<T>,
869 target_system_chain: ParaId,
870 ) -> DispatchResultWithPostInfo {
871 let sender = ensure_parachain(<T as Config>::RuntimeOrigin::from(origin))?;
872
873 ensure!(target_system_chain.is_system(), Error::<T>::ChannelCreationNotAuthorized);
874
875 let (max_message_size, max_capacity) =
876 T::DefaultChannelSizeAndCapacityWithSystem::get();
877
878 Self::init_open_channel(sender, target_system_chain, max_capacity, max_message_size)?;
880 Self::accept_open_channel(target_system_chain, sender)?;
881
882 Self::init_open_channel(target_system_chain, sender, max_capacity, max_message_size)?;
883 Self::accept_open_channel(sender, target_system_chain)?;
884
885 Self::deposit_event(Event::HrmpSystemChannelOpened {
886 sender,
887 recipient: target_system_chain,
888 proposed_max_capacity: max_capacity,
889 proposed_max_message_size: max_message_size,
890 });
891
892 Self::deposit_event(Event::HrmpSystemChannelOpened {
893 sender: target_system_chain,
894 recipient: sender,
895 proposed_max_capacity: max_capacity,
896 proposed_max_message_size: max_message_size,
897 });
898
899 Ok(Pays::No.into())
900 }
901 }
902}
903
904fn initialize_storage<T: Config>(preopen_hrmp_channels: &[(ParaId, ParaId, u32, u32)]) {
905 let host_config = configuration::ActiveConfig::<T>::get();
906 for &(sender, recipient, max_capacity, max_message_size) in preopen_hrmp_channels {
907 if let Err(err) =
908 preopen_hrmp_channel::<T>(sender, recipient, max_capacity, max_message_size)
909 {
910 panic!("failed to initialize the genesis storage: {:?}", err);
911 }
912 }
913 Pallet::<T>::process_hrmp_open_channel_requests(&host_config);
914}
915
916fn preopen_hrmp_channel<T: Config>(
917 sender: ParaId,
918 recipient: ParaId,
919 max_capacity: u32,
920 max_message_size: u32,
921) -> DispatchResult {
922 Pallet::<T>::init_open_channel(sender, recipient, max_capacity, max_message_size)?;
923 Pallet::<T>::accept_open_channel(recipient, sender)?;
924 Ok(())
925}
926
927impl<T: Config> Pallet<T> {
929 pub(crate) fn initializer_initialize(_now: BlockNumberFor<T>) -> Weight {
931 Weight::zero()
932 }
933
934 pub(crate) fn initializer_finalize() {}
936
937 pub(crate) fn initializer_on_new_session(
939 notification: &initializer::SessionChangeNotification<BlockNumberFor<T>>,
940 outgoing_paras: &[ParaId],
941 ) -> Weight {
942 let w1 = Self::perform_outgoing_para_cleanup(¬ification.prev_config, outgoing_paras);
943 Self::process_hrmp_open_channel_requests(¬ification.prev_config);
944 Self::process_hrmp_close_channel_requests();
945 w1.saturating_add(<T as Config>::WeightInfo::force_process_hrmp_open(
946 outgoing_paras.len() as u32
947 ))
948 .saturating_add(<T as Config>::WeightInfo::force_process_hrmp_close(
949 outgoing_paras.len() as u32,
950 ))
951 }
952
953 fn perform_outgoing_para_cleanup(
956 config: &HostConfiguration<BlockNumberFor<T>>,
957 outgoing: &[ParaId],
958 ) -> Weight {
959 let mut w = Self::clean_open_channel_requests(config, outgoing);
960 for outgoing_para in outgoing {
961 Self::clean_hrmp_after_outgoing(outgoing_para);
962
963 let ingress_count =
966 HrmpIngressChannelsIndex::<T>::decode_len(outgoing_para).unwrap_or_default() as u32;
967 let egress_count =
968 HrmpEgressChannelsIndex::<T>::decode_len(outgoing_para).unwrap_or_default() as u32;
969 w = w.saturating_add(<T as Config>::WeightInfo::force_clean_hrmp(
970 ingress_count,
971 egress_count,
972 ));
973 }
974 w
975 }
976
977 pub(crate) fn clean_open_channel_requests(
981 config: &HostConfiguration<BlockNumberFor<T>>,
982 outgoing: &[ParaId],
983 ) -> Weight {
984 let open_channel_reqs = HrmpOpenChannelRequestsList::<T>::get();
990 let (go, stay): (Vec<HrmpChannelId>, Vec<HrmpChannelId>) = open_channel_reqs
991 .into_iter()
992 .partition(|req_id| outgoing.iter().any(|id| req_id.is_participant(*id)));
993 HrmpOpenChannelRequestsList::<T>::put(stay);
994
995 for req_id in go {
998 let req_data = match HrmpOpenChannelRequests::<T>::take(&req_id) {
999 Some(req_data) => req_data,
1000 None => {
1001 continue
1003 },
1004 };
1005
1006 if !outgoing.contains(&req_id.sender) {
1008 T::Currency::unreserve(
1009 &req_id.sender.into_account_truncating(),
1010 req_data.sender_deposit.unique_saturated_into(),
1011 );
1012 }
1013
1014 if req_data.confirmed {
1020 if !outgoing.contains(&req_id.recipient) {
1021 T::Currency::unreserve(
1022 &req_id.recipient.into_account_truncating(),
1023 config.hrmp_recipient_deposit.unique_saturated_into(),
1024 );
1025 }
1026 Self::decrease_accepted_channel_request_count(req_id.recipient);
1027 }
1028 }
1029
1030 <T as Config>::WeightInfo::clean_open_channel_requests(outgoing.len() as u32)
1031 }
1032
1033 fn clean_hrmp_after_outgoing(outgoing_para: &ParaId) {
1035 HrmpOpenChannelRequestCount::<T>::remove(outgoing_para);
1036 HrmpAcceptedChannelRequestCount::<T>::remove(outgoing_para);
1037
1038 let ingress = HrmpIngressChannelsIndex::<T>::take(outgoing_para)
1039 .into_iter()
1040 .map(|sender| HrmpChannelId { sender, recipient: *outgoing_para });
1041 let egress = HrmpEgressChannelsIndex::<T>::take(outgoing_para)
1042 .into_iter()
1043 .map(|recipient| HrmpChannelId { sender: *outgoing_para, recipient });
1044 let mut to_close = ingress.chain(egress).collect::<Vec<_>>();
1045 to_close.sort();
1046 to_close.dedup();
1047
1048 for channel in to_close {
1049 Self::close_hrmp_channel(&channel);
1050 }
1051 }
1052
1053 fn process_hrmp_open_channel_requests(config: &HostConfiguration<BlockNumberFor<T>>) {
1058 let mut open_req_channels = HrmpOpenChannelRequestsList::<T>::get();
1059 if open_req_channels.is_empty() {
1060 return
1061 }
1062
1063 let mut idx = open_req_channels.len();
1066 loop {
1067 if idx == 0 {
1069 break
1070 }
1071
1072 idx -= 1;
1073 let channel_id = open_req_channels[idx].clone();
1074 let request = HrmpOpenChannelRequests::<T>::get(&channel_id).expect(
1075 "can't be `None` due to the invariant that the list contains the same items as the set; qed",
1076 );
1077
1078 let system_channel = channel_id.sender.is_system() || channel_id.recipient.is_system();
1079 let sender_deposit = request.sender_deposit;
1080 let recipient_deposit = if system_channel { 0 } else { config.hrmp_recipient_deposit };
1081
1082 if request.confirmed {
1083 if paras::Pallet::<T>::is_valid_para(channel_id.sender) &&
1084 paras::Pallet::<T>::is_valid_para(channel_id.recipient)
1085 {
1086 HrmpChannels::<T>::insert(
1087 &channel_id,
1088 HrmpChannel {
1089 sender_deposit,
1090 recipient_deposit,
1091 max_capacity: request.max_capacity,
1092 max_total_size: request.max_total_size,
1093 max_message_size: request.max_message_size,
1094 msg_count: 0,
1095 total_size: 0,
1096 mqc_head: None,
1097 },
1098 );
1099
1100 HrmpIngressChannelsIndex::<T>::mutate(&channel_id.recipient, |v| {
1101 if let Err(i) = v.binary_search(&channel_id.sender) {
1102 v.insert(i, channel_id.sender);
1103 }
1104 });
1105 HrmpEgressChannelsIndex::<T>::mutate(&channel_id.sender, |v| {
1106 if let Err(i) = v.binary_search(&channel_id.recipient) {
1107 v.insert(i, channel_id.recipient);
1108 }
1109 });
1110 }
1111
1112 Self::decrease_open_channel_request_count(channel_id.sender);
1113 Self::decrease_accepted_channel_request_count(channel_id.recipient);
1114
1115 let _ = open_req_channels.swap_remove(idx);
1116 HrmpOpenChannelRequests::<T>::remove(&channel_id);
1117 }
1118 }
1119
1120 HrmpOpenChannelRequestsList::<T>::put(open_req_channels);
1121 }
1122
1123 fn process_hrmp_close_channel_requests() {
1125 let close_reqs = HrmpCloseChannelRequestsList::<T>::take();
1126 for condemned_ch_id in close_reqs {
1127 HrmpCloseChannelRequests::<T>::remove(&condemned_ch_id);
1128 Self::close_hrmp_channel(&condemned_ch_id);
1129 }
1130 }
1131
1132 fn close_hrmp_channel(channel_id: &HrmpChannelId) {
1139 if let Some(HrmpChannel { sender_deposit, recipient_deposit, .. }) =
1140 HrmpChannels::<T>::take(channel_id)
1141 {
1142 T::Currency::unreserve(
1143 &channel_id.sender.into_account_truncating(),
1144 sender_deposit.unique_saturated_into(),
1145 );
1146 T::Currency::unreserve(
1147 &channel_id.recipient.into_account_truncating(),
1148 recipient_deposit.unique_saturated_into(),
1149 );
1150 }
1151
1152 HrmpChannelContents::<T>::remove(channel_id);
1153
1154 HrmpEgressChannelsIndex::<T>::mutate(&channel_id.sender, |v| {
1155 if let Ok(i) = v.binary_search(&channel_id.recipient) {
1156 v.remove(i);
1157 }
1158 });
1159 HrmpIngressChannelsIndex::<T>::mutate(&channel_id.recipient, |v| {
1160 if let Ok(i) = v.binary_search(&channel_id.sender) {
1161 v.remove(i);
1162 }
1163 });
1164 }
1165
1166 pub(crate) fn check_hrmp_watermark(
1168 recipient: ParaId,
1169 relay_chain_parent_number: BlockNumberFor<T>,
1170 new_hrmp_watermark: BlockNumberFor<T>,
1171 ) -> Result<(), HrmpWatermarkAcceptanceErr<BlockNumberFor<T>>> {
1172 if new_hrmp_watermark == relay_chain_parent_number {
1181 return Ok(())
1182 }
1183
1184 if new_hrmp_watermark > relay_chain_parent_number {
1185 return Err(HrmpWatermarkAcceptanceErr::AheadRelayParent {
1186 new_watermark: new_hrmp_watermark,
1187 relay_chain_parent_number,
1188 })
1189 }
1190
1191 if let Some(last_watermark) = HrmpWatermarks::<T>::get(&recipient) {
1192 if new_hrmp_watermark < last_watermark {
1193 return Err(HrmpWatermarkAcceptanceErr::AdvancementRule {
1194 new_watermark: new_hrmp_watermark,
1195 last_watermark,
1196 })
1197 }
1198
1199 if new_hrmp_watermark == last_watermark {
1200 return Ok(())
1201 }
1202 }
1203
1204 let digest = HrmpChannelDigests::<T>::get(&recipient);
1209 if !digest
1210 .binary_search_by_key(&new_hrmp_watermark, |(block_no, _)| *block_no)
1211 .is_ok()
1212 {
1213 return Err(HrmpWatermarkAcceptanceErr::LandsOnBlockWithNoMessages {
1214 new_watermark: new_hrmp_watermark,
1215 })
1216 }
1217 Ok(())
1218 }
1219
1220 pub(crate) fn valid_watermarks(recipient: ParaId) -> Vec<BlockNumberFor<T>> {
1222 let mut valid_watermarks: Vec<_> = HrmpChannelDigests::<T>::get(&recipient)
1223 .into_iter()
1224 .map(|(block_no, _)| block_no)
1225 .collect();
1226
1227 if let Some(last_watermark) = HrmpWatermarks::<T>::get(&recipient) {
1229 if valid_watermarks.first().map_or(false, |w| w > &last_watermark) {
1230 valid_watermarks.insert(0, last_watermark);
1231 }
1232 }
1233
1234 valid_watermarks
1235 }
1236
1237 pub(crate) fn check_outbound_hrmp(
1238 config: &HostConfiguration<BlockNumberFor<T>>,
1239 sender: ParaId,
1240 out_hrmp_msgs: &[OutboundHrmpMessage<ParaId>],
1241 ) -> Result<(), OutboundHrmpAcceptanceErr> {
1242 if out_hrmp_msgs.len() as u32 > config.hrmp_max_message_num_per_candidate {
1243 return Err(OutboundHrmpAcceptanceErr::MoreMessagesThanPermitted {
1244 sent: out_hrmp_msgs.len() as u32,
1245 permitted: config.hrmp_max_message_num_per_candidate,
1246 })
1247 }
1248
1249 let mut last_recipient = None::<ParaId>;
1250
1251 for (idx, out_msg) in
1252 out_hrmp_msgs.iter().enumerate().map(|(idx, out_msg)| (idx as u32, out_msg))
1253 {
1254 match last_recipient {
1255 Some(last_recipient) if out_msg.recipient <= last_recipient =>
1259 return Err(OutboundHrmpAcceptanceErr::NotSorted { idx }),
1260 _ => last_recipient = Some(out_msg.recipient),
1261 }
1262
1263 let channel_id = HrmpChannelId { sender, recipient: out_msg.recipient };
1264
1265 let channel = match HrmpChannels::<T>::get(&channel_id) {
1266 Some(channel) => channel,
1267 None => return Err(OutboundHrmpAcceptanceErr::NoSuchChannel { channel_id, idx }),
1268 };
1269
1270 let msg_size = out_msg.data.len() as u32;
1271 if msg_size > channel.max_message_size {
1272 return Err(OutboundHrmpAcceptanceErr::MaxMessageSizeExceeded {
1273 idx,
1274 msg_size,
1275 max_size: channel.max_message_size,
1276 })
1277 }
1278
1279 let new_total_size = channel.total_size + out_msg.data.len() as u32;
1280 if new_total_size > channel.max_total_size {
1281 return Err(OutboundHrmpAcceptanceErr::TotalSizeExceeded {
1282 idx,
1283 total_size: new_total_size,
1284 limit: channel.max_total_size,
1285 })
1286 }
1287
1288 let new_msg_count = channel.msg_count + 1;
1289 if new_msg_count > channel.max_capacity {
1290 return Err(OutboundHrmpAcceptanceErr::CapacityExceeded {
1291 idx,
1292 count: new_msg_count,
1293 limit: channel.max_capacity,
1294 })
1295 }
1296 }
1297
1298 Ok(())
1299 }
1300
1301 pub(crate) fn outbound_remaining_capacity(sender: ParaId) -> Vec<(ParaId, (u32, u32))> {
1303 let recipients = HrmpEgressChannelsIndex::<T>::get(&sender);
1304 let mut remaining = Vec::with_capacity(recipients.len());
1305
1306 for recipient in recipients {
1307 let Some(channel) = HrmpChannels::<T>::get(&HrmpChannelId { sender, recipient }) else {
1308 continue
1309 };
1310 remaining.push((
1311 recipient,
1312 (
1313 channel.max_capacity - channel.msg_count,
1314 channel.max_total_size - channel.total_size,
1315 ),
1316 ));
1317 }
1318
1319 remaining
1320 }
1321
1322 pub(crate) fn prune_hrmp(recipient: ParaId, new_hrmp_watermark: BlockNumberFor<T>) {
1323 let senders = HrmpChannelDigests::<T>::mutate(&recipient, |digest| {
1326 let mut senders = BTreeSet::new();
1327 let mut leftover = Vec::with_capacity(digest.len());
1328 for (block_no, paras_sent_msg) in mem::replace(digest, Vec::new()) {
1329 if block_no <= new_hrmp_watermark {
1330 senders.extend(paras_sent_msg);
1331 } else {
1332 leftover.push((block_no, paras_sent_msg));
1333 }
1334 }
1335 *digest = leftover;
1336 senders
1337 });
1338
1339 let channels_to_prune =
1341 senders.into_iter().map(|sender| HrmpChannelId { sender, recipient });
1342 for channel_id in channels_to_prune {
1343 let (mut pruned_cnt, mut pruned_size) = (0, 0);
1346
1347 let contents = HrmpChannelContents::<T>::get(&channel_id);
1348 let mut leftover = Vec::with_capacity(contents.len());
1349 for msg in contents {
1350 if msg.sent_at <= new_hrmp_watermark {
1351 pruned_cnt += 1;
1352 pruned_size += msg.data.len();
1353 } else {
1354 leftover.push(msg);
1355 }
1356 }
1357 if !leftover.is_empty() {
1358 HrmpChannelContents::<T>::insert(&channel_id, leftover);
1359 } else {
1360 HrmpChannelContents::<T>::remove(&channel_id);
1361 }
1362
1363 HrmpChannels::<T>::mutate(&channel_id, |channel| {
1365 if let Some(ref mut channel) = channel {
1366 channel.msg_count -= pruned_cnt as u32;
1367 channel.total_size -= pruned_size as u32;
1368 }
1369 });
1370 }
1371
1372 HrmpWatermarks::<T>::insert(&recipient, new_hrmp_watermark);
1373 }
1374
1375 pub(crate) fn queue_outbound_hrmp(sender: ParaId, out_hrmp_msgs: HorizontalMessages) {
1377 let now = frame_system::Pallet::<T>::block_number();
1378
1379 for out_msg in out_hrmp_msgs {
1380 let channel_id = HrmpChannelId { sender, recipient: out_msg.recipient };
1381
1382 let mut channel = match HrmpChannels::<T>::get(&channel_id) {
1383 Some(channel) => channel,
1384 None => {
1385 continue
1388 },
1389 };
1390
1391 let inbound = InboundHrmpMessage { sent_at: now, data: out_msg.data };
1392
1393 channel.msg_count += 1;
1395 channel.total_size += inbound.data.len() as u32;
1396
1397 let prev_head = channel.mqc_head.unwrap_or(Default::default());
1399 let new_head = BlakeTwo256::hash_of(&(
1400 prev_head,
1401 inbound.sent_at,
1402 T::Hashing::hash_of(&inbound.data),
1403 ));
1404 channel.mqc_head = Some(new_head);
1405
1406 HrmpChannels::<T>::insert(&channel_id, channel);
1407 HrmpChannelContents::<T>::append(&channel_id, inbound);
1408
1409 let mut recipient_digest = HrmpChannelDigests::<T>::get(&channel_id.recipient);
1422 if let Some(cur_block_digest) = recipient_digest
1423 .last_mut()
1424 .filter(|(block_no, _)| *block_no == now)
1425 .map(|(_, ref mut d)| d)
1426 {
1427 cur_block_digest.push(sender);
1428 } else {
1429 recipient_digest.push((now, vec![sender]));
1430 }
1431 HrmpChannelDigests::<T>::insert(&channel_id.recipient, recipient_digest);
1432 }
1433 }
1434
1435 pub fn init_open_channel(
1443 origin: ParaId,
1444 recipient: ParaId,
1445 proposed_max_capacity: u32,
1446 proposed_max_message_size: u32,
1447 ) -> DispatchResult {
1448 ensure!(origin != recipient, Error::<T>::OpenHrmpChannelToSelf);
1449 ensure!(
1450 paras::Pallet::<T>::is_valid_para(recipient),
1451 Error::<T>::OpenHrmpChannelInvalidRecipient,
1452 );
1453
1454 let config = configuration::ActiveConfig::<T>::get();
1455 ensure!(proposed_max_capacity > 0, Error::<T>::OpenHrmpChannelZeroCapacity);
1456 ensure!(
1457 proposed_max_capacity <= config.hrmp_channel_max_capacity,
1458 Error::<T>::OpenHrmpChannelCapacityExceedsLimit,
1459 );
1460 ensure!(proposed_max_message_size > 0, Error::<T>::OpenHrmpChannelZeroMessageSize);
1461 ensure!(
1462 proposed_max_message_size <= config.hrmp_channel_max_message_size,
1463 Error::<T>::OpenHrmpChannelMessageSizeExceedsLimit,
1464 );
1465
1466 let channel_id = HrmpChannelId { sender: origin, recipient };
1467 ensure!(
1468 HrmpOpenChannelRequests::<T>::get(&channel_id).is_none(),
1469 Error::<T>::OpenHrmpChannelAlreadyRequested,
1470 );
1471 ensure!(
1472 HrmpChannels::<T>::get(&channel_id).is_none(),
1473 Error::<T>::OpenHrmpChannelAlreadyExists,
1474 );
1475
1476 let egress_cnt = HrmpEgressChannelsIndex::<T>::decode_len(&origin).unwrap_or(0) as u32;
1477 let open_req_cnt = HrmpOpenChannelRequestCount::<T>::get(&origin);
1478 let channel_num_limit = config.hrmp_max_parachain_outbound_channels;
1479 ensure!(
1480 egress_cnt + open_req_cnt < channel_num_limit,
1481 Error::<T>::OpenHrmpChannelLimitExceeded,
1482 );
1483
1484 let is_system = origin.is_system() || recipient.is_system();
1486 let deposit = if is_system { 0 } else { config.hrmp_sender_deposit };
1487 if !deposit.is_zero() {
1488 T::Currency::reserve(
1489 &origin.into_account_truncating(),
1490 deposit.unique_saturated_into(),
1491 )?;
1492 }
1493
1494 HrmpOpenChannelRequestCount::<T>::insert(&origin, open_req_cnt + 1);
1497 HrmpOpenChannelRequests::<T>::insert(
1498 &channel_id,
1499 HrmpOpenChannelRequest {
1500 confirmed: false,
1501 _age: 0,
1502 sender_deposit: deposit,
1503 max_capacity: proposed_max_capacity,
1504 max_message_size: proposed_max_message_size,
1505 max_total_size: config.hrmp_channel_max_total_size,
1506 },
1507 );
1508 HrmpOpenChannelRequestsList::<T>::append(channel_id);
1509
1510 Self::send_to_para(
1511 "init_open_channel",
1512 &config,
1513 recipient,
1514 Self::wrap_notification(|| {
1515 use xcm::opaque::latest::{prelude::*, Xcm};
1516 Xcm(vec![HrmpNewChannelOpenRequest {
1517 sender: origin.into(),
1518 max_capacity: proposed_max_capacity,
1519 max_message_size: proposed_max_message_size,
1520 }])
1521 }),
1522 );
1523
1524 Ok(())
1525 }
1526
1527 pub fn accept_open_channel(origin: ParaId, sender: ParaId) -> DispatchResult {
1532 let channel_id = HrmpChannelId { sender, recipient: origin };
1533 let mut channel_req = HrmpOpenChannelRequests::<T>::get(&channel_id)
1534 .ok_or(Error::<T>::AcceptHrmpChannelDoesntExist)?;
1535 ensure!(!channel_req.confirmed, Error::<T>::AcceptHrmpChannelAlreadyConfirmed);
1536
1537 let config = configuration::ActiveConfig::<T>::get();
1540 let channel_num_limit = config.hrmp_max_parachain_inbound_channels;
1541 let ingress_cnt = HrmpIngressChannelsIndex::<T>::decode_len(&origin).unwrap_or(0) as u32;
1542 let accepted_cnt = HrmpAcceptedChannelRequestCount::<T>::get(&origin);
1543 ensure!(
1544 ingress_cnt + accepted_cnt < channel_num_limit,
1545 Error::<T>::AcceptHrmpChannelLimitExceeded,
1546 );
1547
1548 let is_system = origin.is_system() || sender.is_system();
1550 let deposit = if is_system { 0 } else { config.hrmp_recipient_deposit };
1551 if !deposit.is_zero() {
1552 T::Currency::reserve(
1553 &origin.into_account_truncating(),
1554 deposit.unique_saturated_into(),
1555 )?;
1556 }
1557
1558 channel_req.confirmed = true;
1561 HrmpOpenChannelRequests::<T>::insert(&channel_id, channel_req);
1562 HrmpAcceptedChannelRequestCount::<T>::insert(&origin, accepted_cnt + 1);
1563
1564 Self::send_to_para(
1565 "accept_open_channel",
1566 &config,
1567 sender,
1568 Self::wrap_notification(|| {
1569 use xcm::opaque::latest::{prelude::*, Xcm};
1570 Xcm(vec![HrmpChannelAccepted { recipient: origin.into() }])
1571 }),
1572 );
1573
1574 Ok(())
1575 }
1576
1577 fn cancel_open_request(origin: ParaId, channel_id: HrmpChannelId) -> DispatchResult {
1578 ensure!(channel_id.is_participant(origin), Error::<T>::CancelHrmpOpenChannelUnauthorized);
1580
1581 let open_channel_req = HrmpOpenChannelRequests::<T>::get(&channel_id)
1582 .ok_or(Error::<T>::OpenHrmpChannelDoesntExist)?;
1583 ensure!(!open_channel_req.confirmed, Error::<T>::OpenHrmpChannelAlreadyConfirmed);
1584
1585 HrmpOpenChannelRequests::<T>::remove(&channel_id);
1587 HrmpOpenChannelRequestsList::<T>::mutate(|open_req_channels| {
1588 if let Some(pos) = open_req_channels.iter().position(|x| x == &channel_id) {
1589 open_req_channels.swap_remove(pos);
1590 }
1591 });
1592
1593 Self::decrease_open_channel_request_count(channel_id.sender);
1594 T::Currency::unreserve(
1600 &channel_id.sender.into_account_truncating(),
1601 open_channel_req.sender_deposit.unique_saturated_into(),
1602 );
1603
1604 Ok(())
1605 }
1606
1607 fn close_channel(origin: ParaId, channel_id: HrmpChannelId) -> Result<(), Error<T>> {
1608 ensure!(channel_id.is_participant(origin), Error::<T>::CloseHrmpChannelUnauthorized);
1610
1611 ensure!(
1613 HrmpChannels::<T>::get(&channel_id).is_some(),
1614 Error::<T>::CloseHrmpChannelDoesntExist,
1615 );
1616
1617 ensure!(
1619 HrmpCloseChannelRequests::<T>::get(&channel_id).is_none(),
1620 Error::<T>::CloseHrmpChannelAlreadyUnderway,
1621 );
1622
1623 HrmpCloseChannelRequests::<T>::insert(&channel_id, ());
1624 HrmpCloseChannelRequestsList::<T>::append(channel_id.clone());
1625
1626 let config = configuration::ActiveConfig::<T>::get();
1627 let opposite_party =
1628 if origin == channel_id.sender { channel_id.recipient } else { channel_id.sender };
1629
1630 Self::send_to_para(
1631 "close_channel",
1632 &config,
1633 opposite_party,
1634 Self::wrap_notification(|| {
1635 use xcm::opaque::latest::{prelude::*, Xcm};
1636 Xcm(vec![HrmpChannelClosing {
1637 initiator: origin.into(),
1638 sender: channel_id.sender.into(),
1639 recipient: channel_id.recipient.into(),
1640 }])
1641 }),
1642 );
1643
1644 Ok(())
1645 }
1646
1647 #[cfg(test)]
1651 fn hrmp_mqc_heads(recipient: ParaId) -> Vec<(ParaId, Hash)> {
1652 let sender_set = HrmpIngressChannelsIndex::<T>::get(&recipient);
1653
1654 let mut mqc_heads = Vec::with_capacity(sender_set.len());
1656 for sender in sender_set {
1657 let channel_metadata = HrmpChannels::<T>::get(&HrmpChannelId { sender, recipient });
1658 let mqc_head = channel_metadata
1659 .and_then(|metadata| metadata.mqc_head)
1660 .unwrap_or(Hash::default());
1661 mqc_heads.push((sender, mqc_head));
1662 }
1663
1664 mqc_heads
1665 }
1666
1667 pub(crate) fn inbound_hrmp_channels_contents(
1670 recipient: ParaId,
1671 ) -> BTreeMap<ParaId, Vec<InboundHrmpMessage<BlockNumberFor<T>>>> {
1672 let sender_set = HrmpIngressChannelsIndex::<T>::get(&recipient);
1673
1674 let mut inbound_hrmp_channels_contents = BTreeMap::new();
1675 for sender in sender_set {
1676 let channel_contents =
1677 HrmpChannelContents::<T>::get(&HrmpChannelId { sender, recipient });
1678 inbound_hrmp_channels_contents.insert(sender, channel_contents);
1679 }
1680
1681 inbound_hrmp_channels_contents
1682 }
1683}
1684
1685impl<T: Config> Pallet<T> {
1686 fn decrease_open_channel_request_count(sender: ParaId) {
1689 HrmpOpenChannelRequestCount::<T>::mutate_exists(&sender, |opt_rc| {
1690 *opt_rc = opt_rc.and_then(|rc| match rc.saturating_sub(1) {
1691 0 => None,
1692 n => Some(n),
1693 });
1694 });
1695 }
1696
1697 fn decrease_accepted_channel_request_count(recipient: ParaId) {
1700 HrmpAcceptedChannelRequestCount::<T>::mutate_exists(&recipient, |opt_rc| {
1701 *opt_rc = opt_rc.and_then(|rc| match rc.saturating_sub(1) {
1702 0 => None,
1703 n => Some(n),
1704 });
1705 });
1706 }
1707
1708 #[cfg(any(feature = "runtime-benchmarks", test))]
1709 fn assert_storage_consistency_exhaustive() {
1710 fn assert_is_sorted<T: Ord>(slice: &[T], id: &str) {
1711 assert!(slice.windows(2).all(|xs| xs[0] <= xs[1]), "{} supposed to be sorted", id);
1712 }
1713
1714 let assert_contains_only_onboarded = |paras: Vec<ParaId>, cause: &str| {
1715 for para in paras {
1716 assert!(
1717 crate::paras::Pallet::<T>::is_valid_para(para),
1718 "{}: {:?} para is offboarded",
1719 cause,
1720 para
1721 );
1722 }
1723 };
1724
1725 assert_eq!(
1726 HrmpOpenChannelRequests::<T>::iter().map(|(k, _)| k).collect::<BTreeSet<_>>(),
1727 HrmpOpenChannelRequestsList::<T>::get().into_iter().collect::<BTreeSet<_>>(),
1728 );
1729
1730 assert_eq!(
1735 HrmpOpenChannelRequestCount::<T>::iter()
1736 .map(|(k, _)| k)
1737 .collect::<BTreeSet<_>>(),
1738 HrmpOpenChannelRequests::<T>::iter()
1739 .map(|(k, _)| k.sender)
1740 .collect::<BTreeSet<_>>(),
1741 );
1742 for (open_channel_initiator, expected_num) in HrmpOpenChannelRequestCount::<T>::iter() {
1743 let actual_num = HrmpOpenChannelRequests::<T>::iter()
1744 .filter(|(ch, _)| ch.sender == open_channel_initiator)
1745 .count() as u32;
1746 assert_eq!(expected_num, actual_num);
1747 }
1748
1749 assert_eq!(
1752 HrmpAcceptedChannelRequestCount::<T>::iter()
1753 .map(|(k, _)| k)
1754 .collect::<BTreeSet<_>>(),
1755 HrmpOpenChannelRequests::<T>::iter()
1756 .filter(|(_, v)| v.confirmed)
1757 .map(|(k, _)| k.recipient)
1758 .collect::<BTreeSet<_>>(),
1759 );
1760 for (channel_recipient, expected_num) in HrmpAcceptedChannelRequestCount::<T>::iter() {
1761 let actual_num = HrmpOpenChannelRequests::<T>::iter()
1762 .filter(|(ch, v)| ch.recipient == channel_recipient && v.confirmed)
1763 .count() as u32;
1764 assert_eq!(expected_num, actual_num);
1765 }
1766
1767 assert_eq!(
1768 HrmpCloseChannelRequests::<T>::iter().map(|(k, _)| k).collect::<BTreeSet<_>>(),
1769 HrmpCloseChannelRequestsList::<T>::get().into_iter().collect::<BTreeSet<_>>(),
1770 );
1771
1772 assert_contains_only_onboarded(
1775 HrmpWatermarks::<T>::iter().map(|(k, _)| k).collect::<Vec<_>>(),
1776 "HRMP watermarks should contain only onboarded paras",
1777 );
1778
1779 for (non_empty_channel, contents) in HrmpChannelContents::<T>::iter() {
1782 assert!(HrmpChannels::<T>::contains_key(&non_empty_channel));
1783
1784 assert!(!contents.is_empty());
1787 }
1788
1789 assert_contains_only_onboarded(
1792 HrmpChannels::<T>::iter()
1793 .flat_map(|(k, _)| vec![k.sender, k.recipient])
1794 .collect::<Vec<_>>(),
1795 "senders and recipients in all channels should be onboarded",
1796 );
1797
1798 let channel_set_derived_from_ingress = HrmpIngressChannelsIndex::<T>::iter()
1818 .flat_map(|(p, v)| v.into_iter().map(|i| (i, p)).collect::<Vec<_>>())
1819 .collect::<BTreeSet<_>>();
1820 let channel_set_derived_from_egress = HrmpEgressChannelsIndex::<T>::iter()
1821 .flat_map(|(p, v)| v.into_iter().map(|e| (p, e)).collect::<Vec<_>>())
1822 .collect::<BTreeSet<_>>();
1823 let channel_set_ground_truth = HrmpChannels::<T>::iter()
1824 .map(|(k, _)| (k.sender, k.recipient))
1825 .collect::<BTreeSet<_>>();
1826 assert_eq!(channel_set_derived_from_ingress, channel_set_derived_from_egress);
1827 assert_eq!(channel_set_derived_from_egress, channel_set_ground_truth);
1828
1829 HrmpIngressChannelsIndex::<T>::iter()
1830 .map(|(_, v)| v)
1831 .for_each(|v| assert_is_sorted(&v, "HrmpIngressChannelsIndex"));
1832 HrmpEgressChannelsIndex::<T>::iter()
1833 .map(|(_, v)| v)
1834 .for_each(|v| assert_is_sorted(&v, "HrmpIngressChannelsIndex"));
1835
1836 assert_contains_only_onboarded(
1837 HrmpChannelDigests::<T>::iter().map(|(k, _)| k).collect::<Vec<_>>(),
1838 "HRMP channel digests should contain only onboarded paras",
1839 );
1840 for (_digest_for_para, digest) in HrmpChannelDigests::<T>::iter() {
1841 assert!(digest.windows(2).all(|xs| xs[0].0 < xs[1].0));
1844
1845 for (_, mut senders) in digest {
1846 assert!(!senders.is_empty());
1847
1848 senders.sort();
1851 let orig_senders = senders.clone();
1852 senders.dedup();
1853 assert_eq!(
1854 orig_senders, senders,
1855 "duplicates removed implies existence of duplicates"
1856 );
1857 }
1858 }
1859 }
1860}
1861
1862impl<T: Config> Pallet<T> {
1863 fn wrap_notification(
1866 mut notification: impl FnMut() -> xcm::opaque::latest::opaque::Xcm,
1867 ) -> impl FnOnce(ParaId) -> polkadot_primitives::DownwardMessage {
1868 use xcm::{
1869 opaque::VersionedXcm,
1870 prelude::{Junction, Location},
1871 WrapVersion,
1872 };
1873
1874 move |dest| {
1876 T::VersionWrapper::wrap_version(
1878 &Location::new(0, [Junction::Parachain(dest.into())]),
1879 notification(),
1880 )
1881 .unwrap_or_else(|_| {
1882 VersionedXcm::from(notification())
1885 })
1886 .encode()
1887 }
1888 }
1889
1890 fn send_to_para(
1892 log_label: &str,
1893 config: &HostConfiguration<BlockNumberFor<T>>,
1894 dest: ParaId,
1895 notification_bytes_for: impl FnOnce(ParaId) -> polkadot_primitives::DownwardMessage,
1896 ) {
1897 let notification_bytes = notification_bytes_for(dest);
1899
1900 if let Err(dmp::QueueDownwardMessageError::ExceedsMaxMessageSize) =
1902 dmp::Pallet::<T>::queue_downward_message(&config, dest, notification_bytes)
1903 {
1904 log::error!(
1907 target: "runtime::hrmp",
1908 "sending '{log_label}::notification_bytes' failed."
1909 );
1910 debug_assert!(false);
1911 }
1912 }
1913}