1use crate::{
18 configuration::{self, HostConfiguration},
19 dmp, ensure_parachain, initializer, paras,
20};
21use alloc::{
22 collections::{btree_map::BTreeMap, btree_set::BTreeSet},
23 vec,
24 vec::Vec,
25};
26use codec::{Decode, Encode};
27use core::{fmt, mem};
28use frame_support::{pallet_prelude::*, traits::ReservableCurrency, DefaultNoBound};
29use frame_system::pallet_prelude::*;
30use polkadot_parachain_primitives::primitives::{HorizontalMessages, IsSystem};
31use polkadot_primitives::{
32 Balance, Hash, HrmpChannelId, Id as ParaId, InboundHrmpMessage, OutboundHrmpMessage,
33 SessionIndex,
34};
35use scale_info::TypeInfo;
36use sp_runtime::{
37 traits::{AccountIdConversion, BlakeTwo256, Hash as HashT, UniqueSaturatedInto, Zero},
38 ArithmeticError,
39};
40
41pub use pallet::*;
42
43pub const HRMP_MAX_INBOUND_CHANNELS_BOUND: u32 = 128;
48pub const HRMP_MAX_OUTBOUND_CHANNELS_BOUND: u32 = 128;
50
51#[cfg(test)]
52pub(crate) mod tests;
53
54#[cfg(feature = "runtime-benchmarks")]
55mod benchmarking;
56
57pub trait WeightInfo {
58 fn hrmp_init_open_channel() -> Weight;
59 fn hrmp_accept_open_channel() -> Weight;
60 fn hrmp_close_channel() -> Weight;
61 fn force_clean_hrmp(i: u32, e: u32) -> Weight;
62 fn force_process_hrmp_open(c: u32) -> Weight;
63 fn force_process_hrmp_close(c: u32) -> Weight;
64 fn hrmp_cancel_open_request(c: u32) -> Weight;
65 fn clean_open_channel_requests(c: u32) -> Weight;
66 fn force_open_hrmp_channel(c: u32) -> Weight;
67 fn establish_system_channel() -> Weight;
68 fn poke_channel_deposits() -> Weight;
69 fn establish_channel_with_system() -> Weight;
70}
71
72pub struct TestWeightInfo;
74
75impl WeightInfo for TestWeightInfo {
76 fn hrmp_accept_open_channel() -> Weight {
77 Weight::MAX
78 }
79 fn force_clean_hrmp(_: u32, _: u32) -> Weight {
80 Weight::MAX
81 }
82 fn force_process_hrmp_close(_: u32) -> Weight {
83 Weight::MAX
84 }
85 fn force_process_hrmp_open(_: u32) -> Weight {
86 Weight::MAX
87 }
88 fn hrmp_cancel_open_request(_: u32) -> Weight {
89 Weight::MAX
90 }
91 fn hrmp_close_channel() -> Weight {
92 Weight::MAX
93 }
94 fn hrmp_init_open_channel() -> Weight {
95 Weight::MAX
96 }
97 fn clean_open_channel_requests(_: u32) -> Weight {
98 Weight::MAX
99 }
100 fn force_open_hrmp_channel(_: u32) -> Weight {
101 Weight::MAX
102 }
103 fn establish_system_channel() -> Weight {
104 Weight::MAX
105 }
106 fn poke_channel_deposits() -> Weight {
107 Weight::MAX
108 }
109 fn establish_channel_with_system() -> Weight {
110 Weight::MAX
111 }
112}
113
114#[derive(Encode, Decode, TypeInfo)]
116pub struct HrmpOpenChannelRequest {
117 pub confirmed: bool,
119 pub _age: SessionIndex,
122 pub sender_deposit: Balance,
124 pub max_message_size: u32,
126 pub max_capacity: u32,
128 pub max_total_size: u32,
130}
131
132#[derive(Encode, Decode, TypeInfo)]
134#[cfg_attr(test, derive(Debug))]
135pub struct HrmpChannel {
136 pub max_capacity: u32,
144 pub max_total_size: u32,
146 pub max_message_size: u32,
148 pub msg_count: u32,
151 pub total_size: u32,
154 pub mqc_head: Option<Hash>,
162 pub sender_deposit: Balance,
164 pub recipient_deposit: Balance,
166}
167
168pub(crate) enum HrmpWatermarkAcceptanceErr<BlockNumber> {
171 AdvancementRule { new_watermark: BlockNumber, last_watermark: BlockNumber },
172 AheadRelayParent { new_watermark: BlockNumber, relay_chain_parent_number: BlockNumber },
173 LandsOnBlockWithNoMessages { new_watermark: BlockNumber },
174}
175
176pub(crate) enum OutboundHrmpAcceptanceErr {
179 MoreMessagesThanPermitted { sent: u32, permitted: u32 },
180 NotSorted { idx: u32 },
181 NoSuchChannel { idx: u32, channel_id: HrmpChannelId },
182 MaxMessageSizeExceeded { idx: u32, msg_size: u32, max_size: u32 },
183 TotalSizeExceeded { idx: u32, total_size: u32, limit: u32 },
184 CapacityExceeded { idx: u32, count: u32, limit: u32 },
185}
186
187impl<BlockNumber> fmt::Debug for HrmpWatermarkAcceptanceErr<BlockNumber>
188where
189 BlockNumber: fmt::Debug,
190{
191 fn fmt(&self, fmt: &mut fmt::Formatter) -> fmt::Result {
192 use HrmpWatermarkAcceptanceErr::*;
193 match self {
194 AdvancementRule { new_watermark, last_watermark } => write!(
195 fmt,
196 "the HRMP watermark is not advanced relative to the last watermark ({:?} > {:?})",
197 new_watermark, last_watermark,
198 ),
199 AheadRelayParent { new_watermark, relay_chain_parent_number } => write!(
200 fmt,
201 "the HRMP watermark is ahead the relay-parent ({:?} > {:?})",
202 new_watermark, relay_chain_parent_number
203 ),
204 LandsOnBlockWithNoMessages { new_watermark } => write!(
205 fmt,
206 "the HRMP watermark ({:?}) doesn't land on a block with messages received",
207 new_watermark
208 ),
209 }
210 }
211}
212
213impl fmt::Debug for OutboundHrmpAcceptanceErr {
214 fn fmt(&self, fmt: &mut fmt::Formatter) -> fmt::Result {
215 use OutboundHrmpAcceptanceErr::*;
216 match self {
217 MoreMessagesThanPermitted { sent, permitted } => write!(
218 fmt,
219 "more HRMP messages than permitted by config ({} > {})",
220 sent, permitted,
221 ),
222 NotSorted { idx } => {
223 write!(fmt, "the HRMP messages are not sorted (first unsorted is at index {})", idx,)
224 },
225 NoSuchChannel { idx, channel_id } => write!(
226 fmt,
227 "the HRMP message at index {} is sent to a non existent channel {:?}->{:?}",
228 idx, channel_id.sender, channel_id.recipient,
229 ),
230 MaxMessageSizeExceeded { idx, msg_size, max_size } => write!(
231 fmt,
232 "the HRMP message at index {} exceeds the negotiated channel maximum message size ({} > {})",
233 idx, msg_size, max_size,
234 ),
235 TotalSizeExceeded { idx, total_size, limit } => write!(
236 fmt,
237 "sending the HRMP message at index {} would exceed the negotiated channel total size ({} > {})",
238 idx, total_size, limit,
239 ),
240 CapacityExceeded { idx, count, limit } => write!(
241 fmt,
242 "sending the HRMP message at index {} would exceed the negotiated channel capacity ({} > {})",
243 idx, count, limit,
244 ),
245 }
246 }
247}
248
249#[frame_support::pallet]
250pub mod pallet {
251 use super::*;
252
253 #[pallet::pallet]
254 #[pallet::without_storage_info]
255 pub struct Pallet<T>(_);
256
257 #[pallet::config]
258 pub trait Config:
259 frame_system::Config + configuration::Config + paras::Config + dmp::Config
260 {
261 #[allow(deprecated)]
263 type RuntimeEvent: From<Event<Self>> + IsType<<Self as frame_system::Config>::RuntimeEvent>;
264
265 type RuntimeOrigin: From<crate::Origin>
266 + From<<Self as frame_system::Config>::RuntimeOrigin>
267 + Into<Result<crate::Origin, <Self as Config>::RuntimeOrigin>>;
268
269 type ChannelManager: EnsureOrigin<<Self as frame_system::Config>::RuntimeOrigin>;
271
272 type Currency: ReservableCurrency<Self::AccountId>;
278
279 type DefaultChannelSizeAndCapacityWithSystem: Get<(u32, u32)>;
282
283 type VersionWrapper: xcm::WrapVersion;
290
291 type WeightInfo: WeightInfo;
293 }
294
295 #[pallet::event]
296 #[pallet::generate_deposit(pub(super) fn deposit_event)]
297 pub enum Event<T: Config> {
298 OpenChannelRequested {
300 sender: ParaId,
301 recipient: ParaId,
302 proposed_max_capacity: u32,
303 proposed_max_message_size: u32,
304 },
305 OpenChannelCanceled { by_parachain: ParaId, channel_id: HrmpChannelId },
307 OpenChannelAccepted { sender: ParaId, recipient: ParaId },
309 ChannelClosed { by_parachain: ParaId, channel_id: HrmpChannelId },
311 HrmpChannelForceOpened {
313 sender: ParaId,
314 recipient: ParaId,
315 proposed_max_capacity: u32,
316 proposed_max_message_size: u32,
317 },
318 HrmpSystemChannelOpened {
320 sender: ParaId,
321 recipient: ParaId,
322 proposed_max_capacity: u32,
323 proposed_max_message_size: u32,
324 },
325 OpenChannelDepositsUpdated { sender: ParaId, recipient: ParaId },
327 }
328
329 #[pallet::error]
330 pub enum Error<T> {
331 OpenHrmpChannelToSelf,
333 OpenHrmpChannelInvalidRecipient,
335 OpenHrmpChannelZeroCapacity,
337 OpenHrmpChannelCapacityExceedsLimit,
339 OpenHrmpChannelZeroMessageSize,
341 OpenHrmpChannelMessageSizeExceedsLimit,
343 OpenHrmpChannelAlreadyExists,
345 OpenHrmpChannelAlreadyRequested,
347 OpenHrmpChannelLimitExceeded,
349 AcceptHrmpChannelDoesntExist,
351 AcceptHrmpChannelAlreadyConfirmed,
353 AcceptHrmpChannelLimitExceeded,
355 CloseHrmpChannelUnauthorized,
357 CloseHrmpChannelDoesntExist,
359 CloseHrmpChannelAlreadyUnderway,
361 CancelHrmpOpenChannelUnauthorized,
363 OpenHrmpChannelDoesntExist,
365 OpenHrmpChannelAlreadyConfirmed,
367 WrongWitness,
369 ChannelCreationNotAuthorized,
371 }
372
373 #[pallet::storage]
380 pub type HrmpOpenChannelRequests<T: Config> =
381 StorageMap<_, Twox64Concat, HrmpChannelId, HrmpOpenChannelRequest>;
382
383 #[pallet::storage]
387 pub type HrmpOpenChannelRequestsList<T: Config> =
388 StorageValue<_, Vec<HrmpChannelId>, ValueQuery>;
389
390 #[pallet::storage]
394 pub type HrmpOpenChannelRequestCount<T: Config> =
395 StorageMap<_, Twox64Concat, ParaId, u32, ValueQuery>;
396
397 #[pallet::storage]
401 pub type HrmpAcceptedChannelRequestCount<T: Config> =
402 StorageMap<_, Twox64Concat, ParaId, u32, ValueQuery>;
403
404 #[pallet::storage]
412 pub type HrmpCloseChannelRequests<T: Config> = StorageMap<_, Twox64Concat, HrmpChannelId, ()>;
413
414 #[pallet::storage]
415 pub type HrmpCloseChannelRequestsList<T: Config> =
416 StorageValue<_, Vec<HrmpChannelId>, ValueQuery>;
417
418 #[pallet::storage]
423 pub type HrmpWatermarks<T: Config> = StorageMap<_, Twox64Concat, ParaId, BlockNumberFor<T>>;
424
425 #[pallet::storage]
429 pub type HrmpChannels<T: Config> = StorageMap<_, Twox64Concat, HrmpChannelId, HrmpChannel>;
430
431 #[pallet::storage]
445 pub type HrmpIngressChannelsIndex<T: Config> =
446 StorageMap<_, Twox64Concat, ParaId, Vec<ParaId>, ValueQuery>;
447
448 #[pallet::storage]
451 pub type HrmpEgressChannelsIndex<T: Config> =
452 StorageMap<_, Twox64Concat, ParaId, Vec<ParaId>, ValueQuery>;
453
454 #[pallet::storage]
457 pub type HrmpChannelContents<T: Config> = StorageMap<
458 _,
459 Twox64Concat,
460 HrmpChannelId,
461 Vec<InboundHrmpMessage<BlockNumberFor<T>>>,
462 ValueQuery,
463 >;
464
465 #[pallet::storage]
472 pub type HrmpChannelDigests<T: Config> =
473 StorageMap<_, Twox64Concat, ParaId, Vec<(BlockNumberFor<T>, Vec<ParaId>)>, ValueQuery>;
474
475 #[pallet::genesis_config]
489 #[derive(DefaultNoBound)]
490 pub struct GenesisConfig<T: Config> {
491 #[serde(skip)]
492 _config: core::marker::PhantomData<T>,
493 preopen_hrmp_channels: Vec<(ParaId, ParaId, u32, u32)>,
494 }
495
496 #[pallet::genesis_build]
497 impl<T: Config> BuildGenesisConfig for GenesisConfig<T> {
498 fn build(&self) {
499 initialize_storage::<T>(&self.preopen_hrmp_channels);
500 }
501 }
502
503 #[pallet::call]
504 impl<T: Config> Pallet<T> {
505 #[pallet::call_index(0)]
516 #[pallet::weight(<T as Config>::WeightInfo::hrmp_init_open_channel())]
517 pub fn hrmp_init_open_channel(
518 origin: OriginFor<T>,
519 recipient: ParaId,
520 proposed_max_capacity: u32,
521 proposed_max_message_size: u32,
522 ) -> DispatchResult {
523 let origin = ensure_parachain(<T as Config>::RuntimeOrigin::from(origin))?;
524 Self::init_open_channel(
525 origin,
526 recipient,
527 proposed_max_capacity,
528 proposed_max_message_size,
529 )?;
530 Self::deposit_event(Event::OpenChannelRequested {
531 sender: origin,
532 recipient,
533 proposed_max_capacity,
534 proposed_max_message_size,
535 });
536 Ok(())
537 }
538
539 #[pallet::call_index(1)]
543 #[pallet::weight(<T as Config>::WeightInfo::hrmp_accept_open_channel())]
544 pub fn hrmp_accept_open_channel(origin: OriginFor<T>, sender: ParaId) -> DispatchResult {
545 let origin = ensure_parachain(<T as Config>::RuntimeOrigin::from(origin))?;
546 Self::accept_open_channel(origin, sender)?;
547 Self::deposit_event(Event::OpenChannelAccepted { sender, recipient: origin });
548 Ok(())
549 }
550
551 #[pallet::call_index(2)]
556 #[pallet::weight(<T as Config>::WeightInfo::hrmp_close_channel())]
557 pub fn hrmp_close_channel(
558 origin: OriginFor<T>,
559 channel_id: HrmpChannelId,
560 ) -> DispatchResult {
561 let origin = ensure_parachain(<T as Config>::RuntimeOrigin::from(origin))?;
562 Self::close_channel(origin, channel_id.clone())?;
563 Self::deposit_event(Event::ChannelClosed { by_parachain: origin, channel_id });
564 Ok(())
565 }
566
567 #[pallet::call_index(3)]
575 #[pallet::weight(<T as Config>::WeightInfo::force_clean_hrmp(*num_inbound, *num_outbound))]
576 pub fn force_clean_hrmp(
577 origin: OriginFor<T>,
578 para: ParaId,
579 num_inbound: u32,
580 num_outbound: u32,
581 ) -> DispatchResult {
582 T::ChannelManager::ensure_origin(origin)?;
583
584 ensure!(
585 HrmpIngressChannelsIndex::<T>::decode_len(para).unwrap_or_default() <=
586 num_inbound as usize,
587 Error::<T>::WrongWitness
588 );
589 ensure!(
590 HrmpEgressChannelsIndex::<T>::decode_len(para).unwrap_or_default() <=
591 num_outbound as usize,
592 Error::<T>::WrongWitness
593 );
594
595 Self::clean_hrmp_after_outgoing(¶);
596 Ok(())
597 }
598
599 #[pallet::call_index(4)]
608 #[pallet::weight(<T as Config>::WeightInfo::force_process_hrmp_open(*channels))]
609 pub fn force_process_hrmp_open(origin: OriginFor<T>, channels: u32) -> DispatchResult {
610 T::ChannelManager::ensure_origin(origin)?;
611
612 ensure!(
613 HrmpOpenChannelRequestsList::<T>::decode_len().unwrap_or_default() as u32 <=
614 channels,
615 Error::<T>::WrongWitness
616 );
617
618 let host_config = configuration::ActiveConfig::<T>::get();
619 Self::process_hrmp_open_channel_requests(&host_config);
620 Ok(())
621 }
622
623 #[pallet::call_index(5)]
632 #[pallet::weight(<T as Config>::WeightInfo::force_process_hrmp_close(*channels))]
633 pub fn force_process_hrmp_close(origin: OriginFor<T>, channels: u32) -> DispatchResult {
634 T::ChannelManager::ensure_origin(origin)?;
635
636 ensure!(
637 HrmpCloseChannelRequestsList::<T>::decode_len().unwrap_or_default() as u32 <=
638 channels,
639 Error::<T>::WrongWitness
640 );
641
642 Self::process_hrmp_close_channel_requests();
643 Ok(())
644 }
645
646 #[pallet::call_index(6)]
655 #[pallet::weight(<T as Config>::WeightInfo::hrmp_cancel_open_request(*open_requests))]
656 pub fn hrmp_cancel_open_request(
657 origin: OriginFor<T>,
658 channel_id: HrmpChannelId,
659 open_requests: u32,
660 ) -> DispatchResult {
661 let origin = ensure_parachain(<T as Config>::RuntimeOrigin::from(origin))?;
662 ensure!(
663 HrmpOpenChannelRequestsList::<T>::decode_len().unwrap_or_default() as u32 <=
664 open_requests,
665 Error::<T>::WrongWitness
666 );
667 Self::cancel_open_request(origin, channel_id.clone())?;
668 Self::deposit_event(Event::OpenChannelCanceled { by_parachain: origin, channel_id });
669 Ok(())
670 }
671
672 #[pallet::call_index(7)]
681 #[pallet::weight(<T as Config>::WeightInfo::force_open_hrmp_channel(1))]
682 pub fn force_open_hrmp_channel(
683 origin: OriginFor<T>,
684 sender: ParaId,
685 recipient: ParaId,
686 max_capacity: u32,
687 max_message_size: u32,
688 ) -> DispatchResultWithPostInfo {
689 T::ChannelManager::ensure_origin(origin)?;
690
691 let channel_id = HrmpChannelId { sender, recipient };
696 let cancel_request: u32 =
697 if let Some(_open_channel) = HrmpOpenChannelRequests::<T>::get(&channel_id) {
698 Self::cancel_open_request(sender, channel_id)?;
699 1
700 } else {
701 0
702 };
703
704 Self::init_open_channel(sender, recipient, max_capacity, max_message_size)?;
707 Self::accept_open_channel(recipient, sender)?;
708 Self::deposit_event(Event::HrmpChannelForceOpened {
709 sender,
710 recipient,
711 proposed_max_capacity: max_capacity,
712 proposed_max_message_size: max_message_size,
713 });
714
715 Ok(Some(<T as Config>::WeightInfo::force_open_hrmp_channel(cancel_request)).into())
716 }
717
718 #[pallet::call_index(8)]
731 #[pallet::weight(<T as Config>::WeightInfo::establish_system_channel())]
732 pub fn establish_system_channel(
733 origin: OriginFor<T>,
734 sender: ParaId,
735 recipient: ParaId,
736 ) -> DispatchResultWithPostInfo {
737 let _caller = ensure_signed(origin)?;
738
739 ensure!(
741 sender.is_system() && recipient.is_system(),
742 Error::<T>::ChannelCreationNotAuthorized
743 );
744
745 let config = configuration::ActiveConfig::<T>::get();
746 let max_message_size = config.hrmp_channel_max_message_size;
747 let max_capacity = config.hrmp_channel_max_capacity;
748
749 Self::init_open_channel(sender, recipient, max_capacity, max_message_size)?;
750 Self::accept_open_channel(recipient, sender)?;
751
752 Self::deposit_event(Event::HrmpSystemChannelOpened {
753 sender,
754 recipient,
755 proposed_max_capacity: max_capacity,
756 proposed_max_message_size: max_message_size,
757 });
758
759 Ok(Pays::No.into())
760 }
761
762 #[pallet::call_index(9)]
772 #[pallet::weight(<T as Config>::WeightInfo::poke_channel_deposits())]
773 pub fn poke_channel_deposits(
774 origin: OriginFor<T>,
775 sender: ParaId,
776 recipient: ParaId,
777 ) -> DispatchResult {
778 let _caller = ensure_signed(origin)?;
779 let channel_id = HrmpChannelId { sender, recipient };
780 let is_system = sender.is_system() || recipient.is_system();
781
782 let config = configuration::ActiveConfig::<T>::get();
783
784 let (new_sender_deposit, new_recipient_deposit) = if is_system {
786 (0, 0)
787 } else {
788 (config.hrmp_sender_deposit, config.hrmp_recipient_deposit)
789 };
790
791 HrmpChannels::<T>::mutate(&channel_id, |channel| -> DispatchResult {
792 if let Some(ref mut channel) = channel {
793 let current_sender_deposit = channel.sender_deposit;
794 let current_recipient_deposit = channel.recipient_deposit;
795
796 if current_sender_deposit == new_sender_deposit &&
798 current_recipient_deposit == new_recipient_deposit
799 {
800 return Ok(());
801 }
802
803 if current_sender_deposit > new_sender_deposit {
805 let amount = current_sender_deposit
807 .checked_sub(new_sender_deposit)
808 .ok_or(ArithmeticError::Underflow)?;
809 T::Currency::unreserve(
810 &channel_id.sender.into_account_truncating(),
811 amount.try_into().unwrap_or(Zero::zero()),
814 );
815 } else if current_sender_deposit < new_sender_deposit {
816 let amount = new_sender_deposit
817 .checked_sub(current_sender_deposit)
818 .ok_or(ArithmeticError::Underflow)?;
819 T::Currency::reserve(
820 &channel_id.sender.into_account_truncating(),
821 amount.try_into().unwrap_or(Zero::zero()),
822 )?;
823 }
824
825 if current_recipient_deposit > new_recipient_deposit {
827 let amount = current_recipient_deposit
828 .checked_sub(new_recipient_deposit)
829 .ok_or(ArithmeticError::Underflow)?;
830 T::Currency::unreserve(
831 &channel_id.recipient.into_account_truncating(),
832 amount.try_into().unwrap_or(Zero::zero()),
833 );
834 } else if current_recipient_deposit < new_recipient_deposit {
835 let amount = new_recipient_deposit
836 .checked_sub(current_recipient_deposit)
837 .ok_or(ArithmeticError::Underflow)?;
838 T::Currency::reserve(
839 &channel_id.recipient.into_account_truncating(),
840 amount.try_into().unwrap_or(Zero::zero()),
841 )?;
842 }
843
844 channel.sender_deposit = new_sender_deposit;
846 channel.recipient_deposit = new_recipient_deposit;
847 } else {
848 return Err(Error::<T>::OpenHrmpChannelDoesntExist.into());
849 }
850 Ok(())
851 })?;
852
853 Self::deposit_event(Event::OpenChannelDepositsUpdated { sender, recipient });
854
855 Ok(())
856 }
857
858 #[pallet::call_index(10)]
866 #[pallet::weight(<T as Config>::WeightInfo::establish_channel_with_system())]
867 pub fn establish_channel_with_system(
868 origin: OriginFor<T>,
869 target_system_chain: ParaId,
870 ) -> DispatchResultWithPostInfo {
871 let sender = ensure_parachain(<T as Config>::RuntimeOrigin::from(origin))?;
872
873 ensure!(target_system_chain.is_system(), Error::<T>::ChannelCreationNotAuthorized);
874
875 let (max_message_size, max_capacity) =
876 T::DefaultChannelSizeAndCapacityWithSystem::get();
877
878 Self::init_open_channel(sender, target_system_chain, max_capacity, max_message_size)?;
880 Self::accept_open_channel(target_system_chain, sender)?;
881
882 Self::init_open_channel(target_system_chain, sender, max_capacity, max_message_size)?;
883 Self::accept_open_channel(sender, target_system_chain)?;
884
885 Self::deposit_event(Event::HrmpSystemChannelOpened {
886 sender,
887 recipient: target_system_chain,
888 proposed_max_capacity: max_capacity,
889 proposed_max_message_size: max_message_size,
890 });
891
892 Self::deposit_event(Event::HrmpSystemChannelOpened {
893 sender: target_system_chain,
894 recipient: sender,
895 proposed_max_capacity: max_capacity,
896 proposed_max_message_size: max_message_size,
897 });
898
899 Ok(Pays::No.into())
900 }
901 }
902}
903
904fn initialize_storage<T: Config>(preopen_hrmp_channels: &[(ParaId, ParaId, u32, u32)]) {
905 let host_config = configuration::ActiveConfig::<T>::get();
906 for &(sender, recipient, max_capacity, max_message_size) in preopen_hrmp_channels {
907 if let Err(err) =
908 preopen_hrmp_channel::<T>(sender, recipient, max_capacity, max_message_size)
909 {
910 panic!("failed to initialize the genesis storage: {:?}", err);
911 }
912 }
913 Pallet::<T>::process_hrmp_open_channel_requests(&host_config);
914}
915
916fn preopen_hrmp_channel<T: Config>(
917 sender: ParaId,
918 recipient: ParaId,
919 max_capacity: u32,
920 max_message_size: u32,
921) -> DispatchResult {
922 Pallet::<T>::init_open_channel(sender, recipient, max_capacity, max_message_size)?;
923 Pallet::<T>::accept_open_channel(recipient, sender)?;
924 Ok(())
925}
926
927impl<T: Config> Pallet<T> {
929 pub(crate) fn initializer_initialize(_now: BlockNumberFor<T>) -> Weight {
931 Weight::zero()
932 }
933
934 pub(crate) fn initializer_finalize() {}
936
937 pub(crate) fn initializer_on_new_session(
939 notification: &initializer::SessionChangeNotification<BlockNumberFor<T>>,
940 outgoing_paras: &[ParaId],
941 ) -> Weight {
942 let w1 = Self::perform_outgoing_para_cleanup(¬ification.prev_config, outgoing_paras);
943 Self::process_hrmp_open_channel_requests(¬ification.prev_config);
944 Self::process_hrmp_close_channel_requests();
945 w1.saturating_add(<T as Config>::WeightInfo::force_process_hrmp_open(
946 outgoing_paras.len() as u32
947 ))
948 .saturating_add(<T as Config>::WeightInfo::force_process_hrmp_close(
949 outgoing_paras.len() as u32,
950 ))
951 }
952
953 fn perform_outgoing_para_cleanup(
956 config: &HostConfiguration<BlockNumberFor<T>>,
957 outgoing: &[ParaId],
958 ) -> Weight {
959 let mut w = Self::clean_open_channel_requests(config, outgoing);
960 for outgoing_para in outgoing {
961 Self::clean_hrmp_after_outgoing(outgoing_para);
962
963 let ingress_count =
966 HrmpIngressChannelsIndex::<T>::decode_len(outgoing_para).unwrap_or_default() as u32;
967 let egress_count =
968 HrmpEgressChannelsIndex::<T>::decode_len(outgoing_para).unwrap_or_default() as u32;
969 w = w.saturating_add(<T as Config>::WeightInfo::force_clean_hrmp(
970 ingress_count,
971 egress_count,
972 ));
973 }
974 w
975 }
976
977 pub(crate) fn clean_open_channel_requests(
981 config: &HostConfiguration<BlockNumberFor<T>>,
982 outgoing: &[ParaId],
983 ) -> Weight {
984 let open_channel_reqs = HrmpOpenChannelRequestsList::<T>::get();
990 let (go, stay): (Vec<HrmpChannelId>, Vec<HrmpChannelId>) = open_channel_reqs
991 .into_iter()
992 .partition(|req_id| outgoing.iter().any(|id| req_id.is_participant(*id)));
993 HrmpOpenChannelRequestsList::<T>::put(stay);
994
995 for req_id in go {
998 let req_data = match HrmpOpenChannelRequests::<T>::take(&req_id) {
999 Some(req_data) => req_data,
1000 None => {
1001 continue;
1003 },
1004 };
1005
1006 if !outgoing.contains(&req_id.sender) {
1008 T::Currency::unreserve(
1009 &req_id.sender.into_account_truncating(),
1010 req_data.sender_deposit.unique_saturated_into(),
1011 );
1012 }
1013
1014 if req_data.confirmed {
1020 if !outgoing.contains(&req_id.recipient) {
1021 T::Currency::unreserve(
1022 &req_id.recipient.into_account_truncating(),
1023 config.hrmp_recipient_deposit.unique_saturated_into(),
1024 );
1025 }
1026 Self::decrease_accepted_channel_request_count(req_id.recipient);
1027 }
1028 }
1029
1030 <T as Config>::WeightInfo::clean_open_channel_requests(outgoing.len() as u32)
1031 }
1032
1033 fn clean_hrmp_after_outgoing(outgoing_para: &ParaId) {
1035 HrmpOpenChannelRequestCount::<T>::remove(outgoing_para);
1036 HrmpAcceptedChannelRequestCount::<T>::remove(outgoing_para);
1037
1038 let ingress = HrmpIngressChannelsIndex::<T>::take(outgoing_para)
1039 .into_iter()
1040 .map(|sender| HrmpChannelId { sender, recipient: *outgoing_para });
1041 let egress = HrmpEgressChannelsIndex::<T>::take(outgoing_para)
1042 .into_iter()
1043 .map(|recipient| HrmpChannelId { sender: *outgoing_para, recipient });
1044 let mut to_close = ingress.chain(egress).collect::<Vec<_>>();
1045 to_close.sort();
1046 to_close.dedup();
1047
1048 for channel in to_close {
1049 Self::close_hrmp_channel(&channel);
1050 }
1051 }
1052
1053 fn process_hrmp_open_channel_requests(config: &HostConfiguration<BlockNumberFor<T>>) {
1058 let mut open_req_channels = HrmpOpenChannelRequestsList::<T>::get();
1059 if open_req_channels.is_empty() {
1060 return;
1061 }
1062
1063 let mut idx = open_req_channels.len();
1066 loop {
1067 if idx == 0 {
1069 break;
1070 }
1071
1072 idx -= 1;
1073 let channel_id = open_req_channels[idx].clone();
1074 let request = HrmpOpenChannelRequests::<T>::get(&channel_id).expect(
1075 "can't be `None` due to the invariant that the list contains the same items as the set; qed",
1076 );
1077
1078 let system_channel = channel_id.sender.is_system() || channel_id.recipient.is_system();
1079 let sender_deposit = request.sender_deposit;
1080 let recipient_deposit = if system_channel { 0 } else { config.hrmp_recipient_deposit };
1081
1082 if request.confirmed {
1083 if paras::Pallet::<T>::is_valid_para(channel_id.sender) &&
1084 paras::Pallet::<T>::is_valid_para(channel_id.recipient)
1085 {
1086 HrmpChannels::<T>::insert(
1087 &channel_id,
1088 HrmpChannel {
1089 sender_deposit,
1090 recipient_deposit,
1091 max_capacity: request.max_capacity,
1092 max_total_size: request.max_total_size,
1093 max_message_size: request.max_message_size,
1094 msg_count: 0,
1095 total_size: 0,
1096 mqc_head: None,
1097 },
1098 );
1099
1100 HrmpIngressChannelsIndex::<T>::mutate(&channel_id.recipient, |v| {
1101 if let Err(i) = v.binary_search(&channel_id.sender) {
1102 v.insert(i, channel_id.sender);
1103 }
1104 });
1105 HrmpEgressChannelsIndex::<T>::mutate(&channel_id.sender, |v| {
1106 if let Err(i) = v.binary_search(&channel_id.recipient) {
1107 v.insert(i, channel_id.recipient);
1108 }
1109 });
1110 }
1111
1112 Self::decrease_open_channel_request_count(channel_id.sender);
1113 Self::decrease_accepted_channel_request_count(channel_id.recipient);
1114
1115 let _ = open_req_channels.swap_remove(idx);
1116 HrmpOpenChannelRequests::<T>::remove(&channel_id);
1117 }
1118 }
1119
1120 HrmpOpenChannelRequestsList::<T>::put(open_req_channels);
1121 }
1122
1123 fn process_hrmp_close_channel_requests() {
1125 let close_reqs = HrmpCloseChannelRequestsList::<T>::take();
1126 for condemned_ch_id in close_reqs {
1127 HrmpCloseChannelRequests::<T>::remove(&condemned_ch_id);
1128 Self::close_hrmp_channel(&condemned_ch_id);
1129 }
1130 }
1131
1132 fn close_hrmp_channel(channel_id: &HrmpChannelId) {
1139 if let Some(HrmpChannel { sender_deposit, recipient_deposit, .. }) =
1140 HrmpChannels::<T>::take(channel_id)
1141 {
1142 T::Currency::unreserve(
1143 &channel_id.sender.into_account_truncating(),
1144 sender_deposit.unique_saturated_into(),
1145 );
1146 T::Currency::unreserve(
1147 &channel_id.recipient.into_account_truncating(),
1148 recipient_deposit.unique_saturated_into(),
1149 );
1150 }
1151
1152 HrmpChannelContents::<T>::remove(channel_id);
1153
1154 HrmpEgressChannelsIndex::<T>::mutate(&channel_id.sender, |v| {
1155 if let Ok(i) = v.binary_search(&channel_id.recipient) {
1156 v.remove(i);
1157 }
1158 });
1159 HrmpIngressChannelsIndex::<T>::mutate(&channel_id.recipient, |v| {
1160 if let Ok(i) = v.binary_search(&channel_id.sender) {
1161 v.remove(i);
1162 }
1163 });
1164 }
1165
1166 pub(crate) fn check_hrmp_watermark(
1168 recipient: ParaId,
1169 relay_chain_parent_number: BlockNumberFor<T>,
1170 new_hrmp_watermark: BlockNumberFor<T>,
1171 ) -> Result<(), HrmpWatermarkAcceptanceErr<BlockNumberFor<T>>> {
1172 if new_hrmp_watermark == relay_chain_parent_number {
1181 return Ok(());
1182 }
1183
1184 if new_hrmp_watermark > relay_chain_parent_number {
1185 return Err(HrmpWatermarkAcceptanceErr::AheadRelayParent {
1186 new_watermark: new_hrmp_watermark,
1187 relay_chain_parent_number,
1188 });
1189 }
1190
1191 if let Some(last_watermark) = HrmpWatermarks::<T>::get(&recipient) {
1192 if new_hrmp_watermark < last_watermark {
1193 return Err(HrmpWatermarkAcceptanceErr::AdvancementRule {
1194 new_watermark: new_hrmp_watermark,
1195 last_watermark,
1196 });
1197 }
1198
1199 if new_hrmp_watermark == last_watermark {
1200 return Ok(());
1201 }
1202 }
1203
1204 let digest = HrmpChannelDigests::<T>::get(&recipient);
1209 if !digest
1210 .binary_search_by_key(&new_hrmp_watermark, |(block_no, _)| *block_no)
1211 .is_ok()
1212 {
1213 return Err(HrmpWatermarkAcceptanceErr::LandsOnBlockWithNoMessages {
1214 new_watermark: new_hrmp_watermark,
1215 });
1216 }
1217 Ok(())
1218 }
1219
1220 pub(crate) fn valid_watermarks(recipient: ParaId) -> Vec<BlockNumberFor<T>> {
1222 let mut valid_watermarks: Vec<_> = HrmpChannelDigests::<T>::get(&recipient)
1223 .into_iter()
1224 .map(|(block_no, _)| block_no)
1225 .collect();
1226
1227 if let Some(last_watermark) = HrmpWatermarks::<T>::get(&recipient) {
1229 if valid_watermarks.first().map_or(false, |w| w > &last_watermark) {
1230 valid_watermarks.insert(0, last_watermark);
1231 }
1232 }
1233
1234 valid_watermarks
1235 }
1236
1237 pub(crate) fn check_outbound_hrmp(
1238 config: &HostConfiguration<BlockNumberFor<T>>,
1239 sender: ParaId,
1240 out_hrmp_msgs: &[OutboundHrmpMessage<ParaId>],
1241 ) -> Result<(), OutboundHrmpAcceptanceErr> {
1242 if out_hrmp_msgs.len() as u32 > config.hrmp_max_message_num_per_candidate {
1243 return Err(OutboundHrmpAcceptanceErr::MoreMessagesThanPermitted {
1244 sent: out_hrmp_msgs.len() as u32,
1245 permitted: config.hrmp_max_message_num_per_candidate,
1246 });
1247 }
1248
1249 let mut last_recipient = None::<ParaId>;
1250
1251 for (idx, out_msg) in
1252 out_hrmp_msgs.iter().enumerate().map(|(idx, out_msg)| (idx as u32, out_msg))
1253 {
1254 match last_recipient {
1255 Some(last_recipient) if out_msg.recipient <= last_recipient => {
1259 return Err(OutboundHrmpAcceptanceErr::NotSorted { idx })
1260 },
1261 _ => last_recipient = Some(out_msg.recipient),
1262 }
1263
1264 let channel_id = HrmpChannelId { sender, recipient: out_msg.recipient };
1265
1266 let channel = match HrmpChannels::<T>::get(&channel_id) {
1267 Some(channel) => channel,
1268 None => return Err(OutboundHrmpAcceptanceErr::NoSuchChannel { channel_id, idx }),
1269 };
1270
1271 let msg_size = out_msg.data.len() as u32;
1272 if msg_size > channel.max_message_size {
1273 return Err(OutboundHrmpAcceptanceErr::MaxMessageSizeExceeded {
1274 idx,
1275 msg_size,
1276 max_size: channel.max_message_size,
1277 });
1278 }
1279
1280 let new_total_size = channel.total_size + out_msg.data.len() as u32;
1281 if new_total_size > channel.max_total_size {
1282 return Err(OutboundHrmpAcceptanceErr::TotalSizeExceeded {
1283 idx,
1284 total_size: new_total_size,
1285 limit: channel.max_total_size,
1286 });
1287 }
1288
1289 let new_msg_count = channel.msg_count + 1;
1290 if new_msg_count > channel.max_capacity {
1291 return Err(OutboundHrmpAcceptanceErr::CapacityExceeded {
1292 idx,
1293 count: new_msg_count,
1294 limit: channel.max_capacity,
1295 });
1296 }
1297 }
1298
1299 Ok(())
1300 }
1301
1302 pub(crate) fn outbound_remaining_capacity(sender: ParaId) -> Vec<(ParaId, (u32, u32))> {
1304 let recipients = HrmpEgressChannelsIndex::<T>::get(&sender);
1305 let mut remaining = Vec::with_capacity(recipients.len());
1306
1307 for recipient in recipients {
1308 let Some(channel) = HrmpChannels::<T>::get(&HrmpChannelId { sender, recipient }) else {
1309 continue;
1310 };
1311 remaining.push((
1312 recipient,
1313 (
1314 channel.max_capacity - channel.msg_count,
1315 channel.max_total_size - channel.total_size,
1316 ),
1317 ));
1318 }
1319
1320 remaining
1321 }
1322
1323 pub(crate) fn prune_hrmp(recipient: ParaId, new_hrmp_watermark: BlockNumberFor<T>) {
1324 let senders = HrmpChannelDigests::<T>::mutate(&recipient, |digest| {
1327 let mut senders = BTreeSet::new();
1328 let mut leftover = Vec::with_capacity(digest.len());
1329 for (block_no, paras_sent_msg) in mem::replace(digest, Vec::new()) {
1330 if block_no <= new_hrmp_watermark {
1331 senders.extend(paras_sent_msg);
1332 } else {
1333 leftover.push((block_no, paras_sent_msg));
1334 }
1335 }
1336 *digest = leftover;
1337 senders
1338 });
1339
1340 let channels_to_prune =
1342 senders.into_iter().map(|sender| HrmpChannelId { sender, recipient });
1343 for channel_id in channels_to_prune {
1344 let (mut pruned_cnt, mut pruned_size) = (0, 0);
1347
1348 let contents = HrmpChannelContents::<T>::get(&channel_id);
1349 let mut leftover = Vec::with_capacity(contents.len());
1350 for msg in contents {
1351 if msg.sent_at <= new_hrmp_watermark {
1352 pruned_cnt += 1;
1353 pruned_size += msg.data.len();
1354 } else {
1355 leftover.push(msg);
1356 }
1357 }
1358 if !leftover.is_empty() {
1359 HrmpChannelContents::<T>::insert(&channel_id, leftover);
1360 } else {
1361 HrmpChannelContents::<T>::remove(&channel_id);
1362 }
1363
1364 HrmpChannels::<T>::mutate(&channel_id, |channel| {
1366 if let Some(ref mut channel) = channel {
1367 channel.msg_count -= pruned_cnt as u32;
1368 channel.total_size -= pruned_size as u32;
1369 }
1370 });
1371 }
1372
1373 HrmpWatermarks::<T>::insert(&recipient, new_hrmp_watermark);
1374 }
1375
1376 pub(crate) fn queue_outbound_hrmp(sender: ParaId, out_hrmp_msgs: HorizontalMessages) {
1378 let now = frame_system::Pallet::<T>::block_number();
1379
1380 for out_msg in out_hrmp_msgs {
1381 let channel_id = HrmpChannelId { sender, recipient: out_msg.recipient };
1382
1383 let mut channel = match HrmpChannels::<T>::get(&channel_id) {
1384 Some(channel) => channel,
1385 None => {
1386 continue;
1389 },
1390 };
1391
1392 let inbound = InboundHrmpMessage { sent_at: now, data: out_msg.data };
1393
1394 channel.msg_count += 1;
1396 channel.total_size += inbound.data.len() as u32;
1397
1398 let prev_head = channel.mqc_head.unwrap_or(Default::default());
1400 let new_head = BlakeTwo256::hash_of(&(
1401 prev_head,
1402 inbound.sent_at,
1403 T::Hashing::hash_of(&inbound.data),
1404 ));
1405 channel.mqc_head = Some(new_head);
1406
1407 HrmpChannels::<T>::insert(&channel_id, channel);
1408 HrmpChannelContents::<T>::append(&channel_id, inbound);
1409
1410 let mut recipient_digest = HrmpChannelDigests::<T>::get(&channel_id.recipient);
1423 if let Some(cur_block_digest) = recipient_digest
1424 .last_mut()
1425 .filter(|(block_no, _)| *block_no == now)
1426 .map(|(_, ref mut d)| d)
1427 {
1428 cur_block_digest.push(sender);
1429 } else {
1430 recipient_digest.push((now, vec![sender]));
1431 }
1432 HrmpChannelDigests::<T>::insert(&channel_id.recipient, recipient_digest);
1433 }
1434 }
1435
1436 pub fn init_open_channel(
1444 origin: ParaId,
1445 recipient: ParaId,
1446 proposed_max_capacity: u32,
1447 proposed_max_message_size: u32,
1448 ) -> DispatchResult {
1449 ensure!(origin != recipient, Error::<T>::OpenHrmpChannelToSelf);
1450 ensure!(
1451 paras::Pallet::<T>::is_valid_para(recipient),
1452 Error::<T>::OpenHrmpChannelInvalidRecipient,
1453 );
1454
1455 let config = configuration::ActiveConfig::<T>::get();
1456 ensure!(proposed_max_capacity > 0, Error::<T>::OpenHrmpChannelZeroCapacity);
1457 ensure!(
1458 proposed_max_capacity <= config.hrmp_channel_max_capacity,
1459 Error::<T>::OpenHrmpChannelCapacityExceedsLimit,
1460 );
1461 ensure!(proposed_max_message_size > 0, Error::<T>::OpenHrmpChannelZeroMessageSize);
1462 ensure!(
1463 proposed_max_message_size <= config.hrmp_channel_max_message_size,
1464 Error::<T>::OpenHrmpChannelMessageSizeExceedsLimit,
1465 );
1466
1467 let channel_id = HrmpChannelId { sender: origin, recipient };
1468 ensure!(
1469 HrmpOpenChannelRequests::<T>::get(&channel_id).is_none(),
1470 Error::<T>::OpenHrmpChannelAlreadyRequested,
1471 );
1472 ensure!(
1473 HrmpChannels::<T>::get(&channel_id).is_none(),
1474 Error::<T>::OpenHrmpChannelAlreadyExists,
1475 );
1476
1477 let egress_cnt = HrmpEgressChannelsIndex::<T>::decode_len(&origin).unwrap_or(0) as u32;
1478 let open_req_cnt = HrmpOpenChannelRequestCount::<T>::get(&origin);
1479 let channel_num_limit = config.hrmp_max_parachain_outbound_channels;
1480 ensure!(
1481 egress_cnt + open_req_cnt < channel_num_limit,
1482 Error::<T>::OpenHrmpChannelLimitExceeded,
1483 );
1484
1485 let is_system = origin.is_system() || recipient.is_system();
1487 let deposit = if is_system { 0 } else { config.hrmp_sender_deposit };
1488 if !deposit.is_zero() {
1489 T::Currency::reserve(
1490 &origin.into_account_truncating(),
1491 deposit.unique_saturated_into(),
1492 )?;
1493 }
1494
1495 HrmpOpenChannelRequestCount::<T>::insert(&origin, open_req_cnt + 1);
1498 HrmpOpenChannelRequests::<T>::insert(
1499 &channel_id,
1500 HrmpOpenChannelRequest {
1501 confirmed: false,
1502 _age: 0,
1503 sender_deposit: deposit,
1504 max_capacity: proposed_max_capacity,
1505 max_message_size: proposed_max_message_size,
1506 max_total_size: config.hrmp_channel_max_total_size,
1507 },
1508 );
1509 HrmpOpenChannelRequestsList::<T>::append(channel_id);
1510
1511 Self::send_to_para(
1512 "init_open_channel",
1513 &config,
1514 recipient,
1515 Self::wrap_notification(|| {
1516 use xcm::opaque::latest::{prelude::*, Xcm};
1517 Xcm(vec![HrmpNewChannelOpenRequest {
1518 sender: origin.into(),
1519 max_capacity: proposed_max_capacity,
1520 max_message_size: proposed_max_message_size,
1521 }])
1522 }),
1523 );
1524
1525 Ok(())
1526 }
1527
1528 pub fn accept_open_channel(origin: ParaId, sender: ParaId) -> DispatchResult {
1533 let channel_id = HrmpChannelId { sender, recipient: origin };
1534 let mut channel_req = HrmpOpenChannelRequests::<T>::get(&channel_id)
1535 .ok_or(Error::<T>::AcceptHrmpChannelDoesntExist)?;
1536 ensure!(!channel_req.confirmed, Error::<T>::AcceptHrmpChannelAlreadyConfirmed);
1537
1538 let config = configuration::ActiveConfig::<T>::get();
1541 let channel_num_limit = config.hrmp_max_parachain_inbound_channels;
1542 let ingress_cnt = HrmpIngressChannelsIndex::<T>::decode_len(&origin).unwrap_or(0) as u32;
1543 let accepted_cnt = HrmpAcceptedChannelRequestCount::<T>::get(&origin);
1544 ensure!(
1545 ingress_cnt + accepted_cnt < channel_num_limit,
1546 Error::<T>::AcceptHrmpChannelLimitExceeded,
1547 );
1548
1549 let is_system = origin.is_system() || sender.is_system();
1551 let deposit = if is_system { 0 } else { config.hrmp_recipient_deposit };
1552 if !deposit.is_zero() {
1553 T::Currency::reserve(
1554 &origin.into_account_truncating(),
1555 deposit.unique_saturated_into(),
1556 )?;
1557 }
1558
1559 channel_req.confirmed = true;
1562 HrmpOpenChannelRequests::<T>::insert(&channel_id, channel_req);
1563 HrmpAcceptedChannelRequestCount::<T>::insert(&origin, accepted_cnt + 1);
1564
1565 Self::send_to_para(
1566 "accept_open_channel",
1567 &config,
1568 sender,
1569 Self::wrap_notification(|| {
1570 use xcm::opaque::latest::{prelude::*, Xcm};
1571 Xcm(vec![HrmpChannelAccepted { recipient: origin.into() }])
1572 }),
1573 );
1574
1575 Ok(())
1576 }
1577
1578 fn cancel_open_request(origin: ParaId, channel_id: HrmpChannelId) -> DispatchResult {
1579 ensure!(channel_id.is_participant(origin), Error::<T>::CancelHrmpOpenChannelUnauthorized);
1581
1582 let open_channel_req = HrmpOpenChannelRequests::<T>::get(&channel_id)
1583 .ok_or(Error::<T>::OpenHrmpChannelDoesntExist)?;
1584 ensure!(!open_channel_req.confirmed, Error::<T>::OpenHrmpChannelAlreadyConfirmed);
1585
1586 HrmpOpenChannelRequests::<T>::remove(&channel_id);
1588 HrmpOpenChannelRequestsList::<T>::mutate(|open_req_channels| {
1589 if let Some(pos) = open_req_channels.iter().position(|x| x == &channel_id) {
1590 open_req_channels.swap_remove(pos);
1591 }
1592 });
1593
1594 Self::decrease_open_channel_request_count(channel_id.sender);
1595 T::Currency::unreserve(
1601 &channel_id.sender.into_account_truncating(),
1602 open_channel_req.sender_deposit.unique_saturated_into(),
1603 );
1604
1605 Ok(())
1606 }
1607
1608 fn close_channel(origin: ParaId, channel_id: HrmpChannelId) -> Result<(), Error<T>> {
1609 ensure!(channel_id.is_participant(origin), Error::<T>::CloseHrmpChannelUnauthorized);
1611
1612 ensure!(
1614 HrmpChannels::<T>::get(&channel_id).is_some(),
1615 Error::<T>::CloseHrmpChannelDoesntExist,
1616 );
1617
1618 ensure!(
1620 HrmpCloseChannelRequests::<T>::get(&channel_id).is_none(),
1621 Error::<T>::CloseHrmpChannelAlreadyUnderway,
1622 );
1623
1624 HrmpCloseChannelRequests::<T>::insert(&channel_id, ());
1625 HrmpCloseChannelRequestsList::<T>::append(channel_id.clone());
1626
1627 let config = configuration::ActiveConfig::<T>::get();
1628 let opposite_party =
1629 if origin == channel_id.sender { channel_id.recipient } else { channel_id.sender };
1630
1631 Self::send_to_para(
1632 "close_channel",
1633 &config,
1634 opposite_party,
1635 Self::wrap_notification(|| {
1636 use xcm::opaque::latest::{prelude::*, Xcm};
1637 Xcm(vec![HrmpChannelClosing {
1638 initiator: origin.into(),
1639 sender: channel_id.sender.into(),
1640 recipient: channel_id.recipient.into(),
1641 }])
1642 }),
1643 );
1644
1645 Ok(())
1646 }
1647
1648 #[cfg(test)]
1652 fn hrmp_mqc_heads(recipient: ParaId) -> Vec<(ParaId, Hash)> {
1653 let sender_set = HrmpIngressChannelsIndex::<T>::get(&recipient);
1654
1655 let mut mqc_heads = Vec::with_capacity(sender_set.len());
1657 for sender in sender_set {
1658 let channel_metadata = HrmpChannels::<T>::get(&HrmpChannelId { sender, recipient });
1659 let mqc_head = channel_metadata
1660 .and_then(|metadata| metadata.mqc_head)
1661 .unwrap_or(Hash::default());
1662 mqc_heads.push((sender, mqc_head));
1663 }
1664
1665 mqc_heads
1666 }
1667
1668 pub(crate) fn inbound_hrmp_channels_contents(
1671 recipient: ParaId,
1672 ) -> BTreeMap<ParaId, Vec<InboundHrmpMessage<BlockNumberFor<T>>>> {
1673 let sender_set = HrmpIngressChannelsIndex::<T>::get(&recipient);
1674
1675 let mut inbound_hrmp_channels_contents = BTreeMap::new();
1676 for sender in sender_set {
1677 let channel_contents =
1678 HrmpChannelContents::<T>::get(&HrmpChannelId { sender, recipient });
1679 inbound_hrmp_channels_contents.insert(sender, channel_contents);
1680 }
1681
1682 inbound_hrmp_channels_contents
1683 }
1684}
1685
1686impl<T: Config> Pallet<T> {
1687 fn decrease_open_channel_request_count(sender: ParaId) {
1690 HrmpOpenChannelRequestCount::<T>::mutate_exists(&sender, |opt_rc| {
1691 *opt_rc = opt_rc.and_then(|rc| match rc.saturating_sub(1) {
1692 0 => None,
1693 n => Some(n),
1694 });
1695 });
1696 }
1697
1698 fn decrease_accepted_channel_request_count(recipient: ParaId) {
1701 HrmpAcceptedChannelRequestCount::<T>::mutate_exists(&recipient, |opt_rc| {
1702 *opt_rc = opt_rc.and_then(|rc| match rc.saturating_sub(1) {
1703 0 => None,
1704 n => Some(n),
1705 });
1706 });
1707 }
1708
1709 #[cfg(any(feature = "runtime-benchmarks", test))]
1710 fn assert_storage_consistency_exhaustive() {
1711 fn assert_is_sorted<T: Ord>(slice: &[T], id: &str) {
1712 assert!(slice.windows(2).all(|xs| xs[0] <= xs[1]), "{} supposed to be sorted", id);
1713 }
1714
1715 let assert_contains_only_onboarded = |paras: Vec<ParaId>, cause: &str| {
1716 for para in paras {
1717 assert!(
1718 crate::paras::Pallet::<T>::is_valid_para(para),
1719 "{}: {:?} para is offboarded",
1720 cause,
1721 para
1722 );
1723 }
1724 };
1725
1726 assert_eq!(
1727 HrmpOpenChannelRequests::<T>::iter().map(|(k, _)| k).collect::<BTreeSet<_>>(),
1728 HrmpOpenChannelRequestsList::<T>::get().into_iter().collect::<BTreeSet<_>>(),
1729 );
1730
1731 assert_eq!(
1736 HrmpOpenChannelRequestCount::<T>::iter()
1737 .map(|(k, _)| k)
1738 .collect::<BTreeSet<_>>(),
1739 HrmpOpenChannelRequests::<T>::iter()
1740 .map(|(k, _)| k.sender)
1741 .collect::<BTreeSet<_>>(),
1742 );
1743 for (open_channel_initiator, expected_num) in HrmpOpenChannelRequestCount::<T>::iter() {
1744 let actual_num = HrmpOpenChannelRequests::<T>::iter()
1745 .filter(|(ch, _)| ch.sender == open_channel_initiator)
1746 .count() as u32;
1747 assert_eq!(expected_num, actual_num);
1748 }
1749
1750 assert_eq!(
1753 HrmpAcceptedChannelRequestCount::<T>::iter()
1754 .map(|(k, _)| k)
1755 .collect::<BTreeSet<_>>(),
1756 HrmpOpenChannelRequests::<T>::iter()
1757 .filter(|(_, v)| v.confirmed)
1758 .map(|(k, _)| k.recipient)
1759 .collect::<BTreeSet<_>>(),
1760 );
1761 for (channel_recipient, expected_num) in HrmpAcceptedChannelRequestCount::<T>::iter() {
1762 let actual_num = HrmpOpenChannelRequests::<T>::iter()
1763 .filter(|(ch, v)| ch.recipient == channel_recipient && v.confirmed)
1764 .count() as u32;
1765 assert_eq!(expected_num, actual_num);
1766 }
1767
1768 assert_eq!(
1769 HrmpCloseChannelRequests::<T>::iter().map(|(k, _)| k).collect::<BTreeSet<_>>(),
1770 HrmpCloseChannelRequestsList::<T>::get().into_iter().collect::<BTreeSet<_>>(),
1771 );
1772
1773 assert_contains_only_onboarded(
1776 HrmpWatermarks::<T>::iter().map(|(k, _)| k).collect::<Vec<_>>(),
1777 "HRMP watermarks should contain only onboarded paras",
1778 );
1779
1780 for (non_empty_channel, contents) in HrmpChannelContents::<T>::iter() {
1783 assert!(HrmpChannels::<T>::contains_key(&non_empty_channel));
1784
1785 assert!(!contents.is_empty());
1788 }
1789
1790 assert_contains_only_onboarded(
1793 HrmpChannels::<T>::iter()
1794 .flat_map(|(k, _)| vec![k.sender, k.recipient])
1795 .collect::<Vec<_>>(),
1796 "senders and recipients in all channels should be onboarded",
1797 );
1798
1799 let channel_set_derived_from_ingress = HrmpIngressChannelsIndex::<T>::iter()
1819 .flat_map(|(p, v)| v.into_iter().map(|i| (i, p)).collect::<Vec<_>>())
1820 .collect::<BTreeSet<_>>();
1821 let channel_set_derived_from_egress = HrmpEgressChannelsIndex::<T>::iter()
1822 .flat_map(|(p, v)| v.into_iter().map(|e| (p, e)).collect::<Vec<_>>())
1823 .collect::<BTreeSet<_>>();
1824 let channel_set_ground_truth = HrmpChannels::<T>::iter()
1825 .map(|(k, _)| (k.sender, k.recipient))
1826 .collect::<BTreeSet<_>>();
1827 assert_eq!(channel_set_derived_from_ingress, channel_set_derived_from_egress);
1828 assert_eq!(channel_set_derived_from_egress, channel_set_ground_truth);
1829
1830 HrmpIngressChannelsIndex::<T>::iter()
1831 .map(|(_, v)| v)
1832 .for_each(|v| assert_is_sorted(&v, "HrmpIngressChannelsIndex"));
1833 HrmpEgressChannelsIndex::<T>::iter()
1834 .map(|(_, v)| v)
1835 .for_each(|v| assert_is_sorted(&v, "HrmpIngressChannelsIndex"));
1836
1837 assert_contains_only_onboarded(
1838 HrmpChannelDigests::<T>::iter().map(|(k, _)| k).collect::<Vec<_>>(),
1839 "HRMP channel digests should contain only onboarded paras",
1840 );
1841 for (_digest_for_para, digest) in HrmpChannelDigests::<T>::iter() {
1842 assert!(digest.windows(2).all(|xs| xs[0].0 < xs[1].0));
1845
1846 for (_, mut senders) in digest {
1847 assert!(!senders.is_empty());
1848
1849 senders.sort();
1852 let orig_senders = senders.clone();
1853 senders.dedup();
1854 assert_eq!(
1855 orig_senders, senders,
1856 "duplicates removed implies existence of duplicates"
1857 );
1858 }
1859 }
1860 }
1861}
1862
1863impl<T: Config> Pallet<T> {
1864 fn wrap_notification(
1867 mut notification: impl FnMut() -> xcm::opaque::latest::opaque::Xcm,
1868 ) -> impl FnOnce(ParaId) -> polkadot_primitives::DownwardMessage {
1869 use xcm::{
1870 opaque::VersionedXcm,
1871 prelude::{Junction, Location},
1872 WrapVersion,
1873 };
1874
1875 move |dest| {
1877 T::VersionWrapper::wrap_version(
1879 &Location::new(0, [Junction::Parachain(dest.into())]),
1880 notification(),
1881 )
1882 .unwrap_or_else(|_| {
1883 VersionedXcm::from(notification())
1886 })
1887 .encode()
1888 }
1889 }
1890
1891 fn send_to_para(
1893 log_label: &str,
1894 config: &HostConfiguration<BlockNumberFor<T>>,
1895 dest: ParaId,
1896 notification_bytes_for: impl FnOnce(ParaId) -> polkadot_primitives::DownwardMessage,
1897 ) {
1898 let notification_bytes = notification_bytes_for(dest);
1900
1901 if let Err(dmp::QueueDownwardMessageError::ExceedsMaxMessageSize) =
1903 dmp::Pallet::<T>::queue_downward_message(&config, dest, notification_bytes)
1904 {
1905 log::error!(
1908 target: "runtime::hrmp",
1909 "sending '{log_label}::notification_bytes' failed."
1910 );
1911 debug_assert!(false);
1912 }
1913 }
1914}