referrerpolicy=no-referrer-when-downgrade

cumulus_pallet_parachain_system/
lib.rs

1// Copyright (C) Parity Technologies (UK) Ltd.
2// This file is part of Cumulus.
3// SPDX-License-Identifier: Apache-2.0
4
5// Licensed under the Apache License, Version 2.0 (the "License");
6// you may not use this file except in compliance with the License.
7// You may obtain a copy of the License at
8//
9// 	http://www.apache.org/licenses/LICENSE-2.0
10//
11// Unless required by applicable law or agreed to in writing, software
12// distributed under the License is distributed on an "AS IS" BASIS,
13// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14// See the License for the specific language governing permissions and
15// limitations under the License.
16
17#![cfg_attr(not(feature = "std"), no_std)]
18
19//! `cumulus-pallet-parachain-system` is a base pallet for Cumulus-based parachains.
20//!
21//! This pallet handles low-level details of being a parachain. Its responsibilities include:
22//!
23//! - ingestion of the parachain validation data;
24//! - ingestion and dispatch of incoming downward and lateral messages;
25//! - coordinating upgrades with the Relay Chain; and
26//! - communication of parachain outputs, such as sent messages, signaling an upgrade, etc.
27//!
28//! Users must ensure that they register this pallet as an inherent provider.
29
30extern crate alloc;
31
32use alloc::{collections::btree_map::BTreeMap, vec, vec::Vec};
33use codec::{Decode, DecodeLimit, Encode};
34use core::cmp;
35use cumulus_primitives_core::{
36	relay_chain, AbridgedHostConfiguration, ChannelInfo, ChannelStatus, CollationInfo,
37	CumulusDigestItem, GetChannelInfo, ListChannelInfos, MessageSendError, OutboundHrmpMessage,
38	ParaId, PersistedValidationData, UpwardMessage, UpwardMessageSender, XcmpMessageHandler,
39	XcmpMessageSource,
40};
41use cumulus_primitives_parachain_inherent::{v0, MessageQueueChain, ParachainInherentData};
42use frame_support::{
43	dispatch::{DispatchClass, DispatchResult},
44	ensure,
45	inherent::{InherentData, InherentIdentifier, ProvideInherent},
46	traits::{Get, HandleMessage},
47	weights::Weight,
48};
49use frame_system::{ensure_none, ensure_root, pallet_prelude::HeaderFor};
50use parachain_inherent::{
51	deconstruct_parachain_inherent_data, AbridgedInboundDownwardMessages,
52	AbridgedInboundHrmpMessages, BasicParachainInherentData, InboundMessageId, InboundMessagesData,
53};
54use polkadot_parachain_primitives::primitives::RelayChainBlockNumber;
55use polkadot_runtime_parachains::{FeeTracker, GetMinFeeFactor};
56use scale_info::TypeInfo;
57use sp_runtime::{
58	traits::{Block as BlockT, BlockNumberProvider, Hash},
59	FixedU128, RuntimeDebug, SaturatedConversion,
60};
61use xcm::{latest::XcmHash, VersionedLocation, VersionedXcm, MAX_XCM_DECODE_DEPTH};
62use xcm_builder::InspectMessageQueues;
63
64mod benchmarking;
65pub mod migration;
66mod mock;
67#[cfg(test)]
68mod tests;
69pub mod weights;
70
71pub use weights::WeightInfo;
72
73mod unincluded_segment;
74
75pub mod consensus_hook;
76pub mod relay_state_snapshot;
77#[macro_use]
78pub mod validate_block;
79mod descendant_validation;
80pub mod parachain_inherent;
81
82use unincluded_segment::{
83	HrmpChannelUpdate, HrmpWatermarkUpdate, OutboundBandwidthLimits, SegmentTracker,
84};
85
86pub use consensus_hook::{ConsensusHook, ExpectParentIncluded};
87/// Register the `validate_block` function that is used by parachains to validate blocks on a
88/// validator.
89///
90/// Does *nothing* when `std` feature is enabled.
91///
92/// Expects as parameters the runtime, a block executor and an inherent checker.
93///
94/// # Example
95///
96/// ```
97///     struct BlockExecutor;
98///     struct Runtime;
99///     struct CheckInherents;
100///
101///     cumulus_pallet_parachain_system::register_validate_block! {
102///         Runtime = Runtime,
103///         BlockExecutor = Executive,
104///         CheckInherents = CheckInherents,
105///     }
106///
107/// # fn main() {}
108/// ```
109pub use cumulus_pallet_parachain_system_proc_macro::register_validate_block;
110pub use relay_state_snapshot::{MessagingStateSnapshot, RelayChainStateProof};
111pub use unincluded_segment::{Ancestor, UsedBandwidth};
112
113pub use pallet::*;
114
115const LOG_TARGET: &str = "parachain-system";
116
117/// Something that can check the associated relay block number.
118///
119/// Each Parachain block is built in the context of a relay chain block, this trait allows us
120/// to validate the given relay chain block number. With async backing it is legal to build
121/// multiple Parachain blocks per relay chain parent. With this trait it is possible for the
122/// Parachain to ensure that still only one Parachain block is build per relay chain parent.
123///
124/// By default [`RelayNumberStrictlyIncreases`] and [`AnyRelayNumber`] are provided.
125pub trait CheckAssociatedRelayNumber {
126	/// Check the current relay number versus the previous relay number.
127	///
128	/// The implementation should panic when there is something wrong.
129	fn check_associated_relay_number(
130		current: RelayChainBlockNumber,
131		previous: RelayChainBlockNumber,
132	);
133}
134
135/// Provides an implementation of [`CheckAssociatedRelayNumber`].
136///
137/// It will ensure that the associated relay block number strictly increases between Parachain
138/// blocks. This should be used by production Parachains when in doubt.
139pub struct RelayNumberStrictlyIncreases;
140
141impl CheckAssociatedRelayNumber for RelayNumberStrictlyIncreases {
142	fn check_associated_relay_number(
143		current: RelayChainBlockNumber,
144		previous: RelayChainBlockNumber,
145	) {
146		if current <= previous {
147			panic!("Relay chain block number needs to strictly increase between Parachain blocks!")
148		}
149	}
150}
151
152/// Provides an implementation of [`CheckAssociatedRelayNumber`].
153///
154/// This will accept any relay chain block number combination. This is mainly useful for
155/// test parachains.
156pub struct AnyRelayNumber;
157
158impl CheckAssociatedRelayNumber for AnyRelayNumber {
159	fn check_associated_relay_number(_: RelayChainBlockNumber, _: RelayChainBlockNumber) {}
160}
161
162/// Provides an implementation of [`CheckAssociatedRelayNumber`].
163///
164/// It will ensure that the associated relay block number monotonically increases between Parachain
165/// blocks. This should be used when asynchronous backing is enabled.
166pub struct RelayNumberMonotonicallyIncreases;
167
168impl CheckAssociatedRelayNumber for RelayNumberMonotonicallyIncreases {
169	fn check_associated_relay_number(
170		current: RelayChainBlockNumber,
171		previous: RelayChainBlockNumber,
172	) {
173		if current < previous {
174			panic!("Relay chain block number needs to monotonically increase between Parachain blocks!")
175		}
176	}
177}
178
179/// The max length of a DMP message.
180pub type MaxDmpMessageLenOf<T> = <<T as Config>::DmpQueue as HandleMessage>::MaxMessageLen;
181
182pub mod ump_constants {
183	/// `host_config.max_upward_queue_size / THRESHOLD_FACTOR` is the threshold after which delivery
184	/// starts getting exponentially more expensive.
185	/// `2` means the price starts to increase when queue is half full.
186	pub const THRESHOLD_FACTOR: u32 = 2;
187}
188
189#[frame_support::pallet]
190pub mod pallet {
191	use super::*;
192	use cumulus_primitives_core::CoreInfoExistsAtMaxOnce;
193	use frame_support::pallet_prelude::*;
194	use frame_system::pallet_prelude::*;
195
196	#[pallet::pallet]
197	#[pallet::storage_version(migration::STORAGE_VERSION)]
198	#[pallet::without_storage_info]
199	pub struct Pallet<T>(_);
200
201	#[pallet::config]
202	pub trait Config: frame_system::Config<OnSetCode = ParachainSetCode<Self>> {
203		/// The overarching event type.
204		#[allow(deprecated)]
205		type RuntimeEvent: From<Event<Self>> + IsType<<Self as frame_system::Config>::RuntimeEvent>;
206
207		/// Something which can be notified when the validation data is set.
208		type OnSystemEvent: OnSystemEvent;
209
210		/// Returns the parachain ID we are running with.
211		#[pallet::constant]
212		type SelfParaId: Get<ParaId>;
213
214		/// The place where outbound XCMP messages come from. This is queried in `finalize_block`.
215		type OutboundXcmpMessageSource: XcmpMessageSource;
216
217		/// Queues inbound downward messages for delayed processing.
218		///
219		/// All inbound DMP messages from the relay are pushed into this. The handler is expected to
220		/// eventually process all the messages that are pushed to it.
221		type DmpQueue: HandleMessage;
222
223		/// The weight we reserve at the beginning of the block for processing DMP messages.
224		type ReservedDmpWeight: Get<Weight>;
225
226		/// The message handler that will be invoked when messages are received via XCMP.
227		///
228		/// This should normally link to the XCMP Queue pallet.
229		type XcmpMessageHandler: XcmpMessageHandler;
230
231		/// The weight we reserve at the beginning of the block for processing XCMP messages.
232		type ReservedXcmpWeight: Get<Weight>;
233
234		/// Something that can check the associated relay parent block number.
235		type CheckAssociatedRelayNumber: CheckAssociatedRelayNumber;
236
237		/// Weight info for functions and calls.
238		type WeightInfo: WeightInfo;
239
240		/// An entry-point for higher-level logic to manage the backlog of unincluded parachain
241		/// blocks and authorship rights for those blocks.
242		///
243		/// Typically, this should be a hook tailored to the collator-selection/consensus mechanism
244		/// that is used for this chain.
245		///
246		/// However, to maintain the same behavior as prior to asynchronous backing, provide the
247		/// [`consensus_hook::ExpectParentIncluded`] here. This is only necessary in the case
248		/// that collators aren't expected to have node versions that supply the included block
249		/// in the relay-chain state proof.
250		type ConsensusHook: ConsensusHook;
251
252		/// The offset between the tip of the relay chain and the parent relay block used as parent
253		/// when authoring a parachain block.
254		///
255		/// This setting directly impacts the number of descendant headers that are expected in the
256		/// `set_validation_data` inherent.
257		///
258		/// For any setting `N` larger than zero, the inherent expects that the inherent includes
259		/// the relay parent plus `N` descendants. These headers are required to validate that new
260		/// parachain blocks are authored at the correct offset.
261		///
262		/// While this helps to reduce forks on the parachain side, it increases the delay for
263		/// processing XCM messages. So, the value should be chosen wisely.
264		///
265		/// If set to 0, this config has no impact.
266		type RelayParentOffset: Get<u32>;
267	}
268
269	#[pallet::hooks]
270	impl<T: Config> Hooks<BlockNumberFor<T>> for Pallet<T> {
271		/// Handles actually sending upward messages by moving them from `PendingUpwardMessages` to
272		/// `UpwardMessages`. Decreases the delivery fee factor if after sending messages, the queue
273		/// total size is less than the threshold (see [`ump_constants::THRESHOLD_FACTOR`]).
274		/// Also does the sending for HRMP messages it takes from `OutboundXcmpMessageSource`.
275		fn on_finalize(_: BlockNumberFor<T>) {
276			<DidSetValidationCode<T>>::kill();
277			<UpgradeRestrictionSignal<T>>::kill();
278			let relay_upgrade_go_ahead = <UpgradeGoAhead<T>>::take();
279
280			let vfp = <ValidationData<T>>::get().expect(
281				r"Missing required set_validation_data inherent. This inherent must be
282				present in every block. This error typically occurs when the set_validation_data
283				execution failed and was rejected by the block builder. Check earlier log entries
284				for the specific cause of the failure.",
285			);
286
287			LastRelayChainBlockNumber::<T>::put(vfp.relay_parent_number);
288
289			let host_config = match HostConfiguration::<T>::get() {
290				Some(ok) => ok,
291				None => {
292					debug_assert!(
293						false,
294						"host configuration is promised to set until `on_finalize`; qed",
295					);
296					return
297				},
298			};
299
300			// Before updating the relevant messaging state, we need to extract
301			// the total bandwidth limits for the purpose of updating the unincluded
302			// segment.
303			let total_bandwidth_out = match RelevantMessagingState::<T>::get() {
304				Some(s) => OutboundBandwidthLimits::from_relay_chain_state(&s),
305				None => {
306					debug_assert!(
307						false,
308						"relevant messaging state is promised to be set until `on_finalize`; \
309							qed",
310					);
311					return
312				},
313			};
314
315			// After this point, the `RelevantMessagingState` in storage reflects the
316			// unincluded segment.
317			Self::adjust_egress_bandwidth_limits();
318
319			let (ump_msg_count, ump_total_bytes) = <PendingUpwardMessages<T>>::mutate(|up| {
320				let (available_capacity, available_size) = match RelevantMessagingState::<T>::get()
321				{
322					Some(limits) => (
323						limits.relay_dispatch_queue_remaining_capacity.remaining_count,
324						limits.relay_dispatch_queue_remaining_capacity.remaining_size,
325					),
326					None => {
327						debug_assert!(
328							false,
329							"relevant messaging state is promised to be set until `on_finalize`; \
330								qed",
331						);
332						return (0, 0)
333					},
334				};
335
336				let available_capacity =
337					cmp::min(available_capacity, host_config.max_upward_message_num_per_candidate);
338
339				// Count the number of messages we can possibly fit in the given constraints, i.e.
340				// available_capacity and available_size.
341				let (num, total_size) = up
342					.iter()
343					.scan((0u32, 0u32), |state, msg| {
344						let (cap_used, size_used) = *state;
345						let new_cap = cap_used.saturating_add(1);
346						let new_size = size_used.saturating_add(msg.len() as u32);
347						match available_capacity
348							.checked_sub(new_cap)
349							.and(available_size.checked_sub(new_size))
350						{
351							Some(_) => {
352								*state = (new_cap, new_size);
353								Some(*state)
354							},
355							_ => None,
356						}
357					})
358					.last()
359					.unwrap_or_default();
360
361				// TODO: #274 Return back messages that do not longer fit into the queue.
362
363				UpwardMessages::<T>::put(&up[..num as usize]);
364				*up = up.split_off(num as usize);
365
366				// Send the core selector UMP signal. This is experimental until relay chain
367				// validators are upgraded to handle ump signals.
368				#[cfg(feature = "experimental-ump-signals")]
369				Self::send_ump_signal();
370
371				// If the total size of the pending messages is less than the threshold,
372				// we decrease the fee factor, since the queue is less congested.
373				// This makes delivery of new messages cheaper.
374				let threshold = host_config
375					.max_upward_queue_size
376					.saturating_div(ump_constants::THRESHOLD_FACTOR);
377				let remaining_total_size: usize = up.iter().map(UpwardMessage::len).sum();
378				if remaining_total_size <= threshold as usize {
379					Self::decrease_fee_factor(());
380				}
381
382				(num, total_size)
383			});
384
385			// Sending HRMP messages is a little bit more involved. There are the following
386			// constraints:
387			//
388			// - a channel should exist (and it can be closed while a message is buffered),
389			// - at most one message can be sent in a channel,
390			// - the sent out messages should be ordered by ascension of recipient para id.
391			// - the capacity and total size of the channel is limited,
392			// - the maximum size of a message is limited (and can potentially be changed),
393
394			let maximum_channels = host_config
395				.hrmp_max_message_num_per_candidate
396				.min(<AnnouncedHrmpMessagesPerCandidate<T>>::take())
397				as usize;
398
399			// Note: this internally calls the `GetChannelInfo` implementation for this
400			// pallet, which draws on the `RelevantMessagingState`. That in turn has
401			// been adjusted above to reflect the correct limits in all channels.
402			let outbound_messages =
403				T::OutboundXcmpMessageSource::take_outbound_messages(maximum_channels)
404					.into_iter()
405					.map(|(recipient, data)| OutboundHrmpMessage { recipient, data })
406					.collect::<Vec<_>>();
407
408			// Update the unincluded segment length; capacity checks were done previously in
409			// `set_validation_data`, so this can be done unconditionally.
410			{
411				let hrmp_outgoing = outbound_messages
412					.iter()
413					.map(|msg| {
414						(
415							msg.recipient,
416							HrmpChannelUpdate { msg_count: 1, total_bytes: msg.data.len() as u32 },
417						)
418					})
419					.collect();
420				let used_bandwidth =
421					UsedBandwidth { ump_msg_count, ump_total_bytes, hrmp_outgoing };
422
423				let mut aggregated_segment =
424					AggregatedUnincludedSegment::<T>::get().unwrap_or_default();
425				let consumed_go_ahead_signal =
426					if aggregated_segment.consumed_go_ahead_signal().is_some() {
427						// Some ancestor within the segment already processed this signal --
428						// validated during inherent creation.
429						None
430					} else {
431						relay_upgrade_go_ahead
432					};
433				// The bandwidth constructed was ensured to satisfy relay chain constraints.
434				let ancestor = Ancestor::new_unchecked(used_bandwidth, consumed_go_ahead_signal);
435
436				let watermark = HrmpWatermark::<T>::get();
437				let watermark_update = HrmpWatermarkUpdate::new(watermark, vfp.relay_parent_number);
438
439				aggregated_segment
440					.append(&ancestor, watermark_update, &total_bandwidth_out)
441					.expect("unincluded segment limits exceeded");
442				AggregatedUnincludedSegment::<T>::put(aggregated_segment);
443				// Check in `on_initialize` guarantees there's space for this block.
444				UnincludedSegment::<T>::append(ancestor);
445			}
446			HrmpOutboundMessages::<T>::put(outbound_messages);
447		}
448
449		fn on_initialize(_n: BlockNumberFor<T>) -> Weight {
450			let mut weight = Weight::zero();
451
452			// To prevent removing `NewValidationCode` that was set by another `on_initialize`
453			// like for example from scheduler, we only kill the storage entry if it was not yet
454			// updated in the current block.
455			if !<DidSetValidationCode<T>>::get() {
456				// NOTE: Killing here is required to at least include the trie nodes down to the key
457				// in the proof. Because this value will be read in `validate_block` and thus,
458				// needs to be reachable by the proof.
459				NewValidationCode::<T>::kill();
460				weight += T::DbWeight::get().writes(1);
461			}
462
463			// The parent hash was unknown during block finalization. Update it here.
464			{
465				<UnincludedSegment<T>>::mutate(|chain| {
466					if let Some(ancestor) = chain.last_mut() {
467						let parent = frame_system::Pallet::<T>::parent_hash();
468						// Ancestor is the latest finalized block, thus current parent is
469						// its output head.
470						ancestor.replace_para_head_hash(parent);
471					}
472				});
473				weight += T::DbWeight::get().reads_writes(1, 1);
474
475				// Weight used during finalization.
476				weight += T::DbWeight::get().reads_writes(3, 2);
477			}
478
479			// Remove the validation from the old block.
480			ValidationData::<T>::kill();
481			// NOTE: Killing here is required to at least include the trie nodes down to the keys
482			// in the proof. Because these values will be read in `validate_block` and thus,
483			// need to be reachable by the proof.
484			ProcessedDownwardMessages::<T>::kill();
485			UpwardMessages::<T>::kill();
486			HrmpOutboundMessages::<T>::kill();
487			CustomValidationHeadData::<T>::kill();
488			// The same as above. Reading here to make sure that the key is included in the proof.
489			HrmpWatermark::<T>::get();
490			weight += T::DbWeight::get().reads_writes(1, 5);
491
492			// Here, in `on_initialize` we must report the weight for both `on_initialize` and
493			// `on_finalize`.
494			//
495			// One complication here, is that the `host_configuration` is updated by an inherent
496			// and those are processed after the block initialization phase. Therefore, we have to
497			// be content only with the configuration as per the previous block. That means that
498			// the configuration can be either stale (or be absent altogether in case of the
499			// beginning of the chain).
500			//
501			// In order to mitigate this, we do the following. At the time, we are only concerned
502			// about `hrmp_max_message_num_per_candidate`. We reserve the amount of weight to
503			// process the number of HRMP messages according to the potentially stale
504			// configuration. In `on_finalize` we will process only the maximum between the
505			// announced number of messages and the actual received in the fresh configuration.
506			//
507			// In the common case, they will be the same. In the case the actual value is smaller
508			// than the announced, we would waste some of weight. In the case the actual value is
509			// greater than the announced, we will miss opportunity to send a couple of messages.
510			weight += T::DbWeight::get().reads_writes(1, 1);
511			let hrmp_max_message_num_per_candidate = HostConfiguration::<T>::get()
512				.map(|cfg| cfg.hrmp_max_message_num_per_candidate)
513				.unwrap_or(0);
514			<AnnouncedHrmpMessagesPerCandidate<T>>::put(hrmp_max_message_num_per_candidate);
515
516			// NOTE that the actual weight consumed by `on_finalize` may turn out lower.
517			weight += T::DbWeight::get().reads_writes(
518				3 + hrmp_max_message_num_per_candidate as u64,
519				4 + hrmp_max_message_num_per_candidate as u64,
520			);
521
522			// Weight for updating the last relay chain block number in `on_finalize`.
523			weight += T::DbWeight::get().reads_writes(1, 1);
524
525			// Weight for adjusting the unincluded segment in `on_finalize`.
526			weight += T::DbWeight::get().reads_writes(6, 3);
527
528			// Always try to read `UpgradeGoAhead` in `on_finalize`.
529			weight += T::DbWeight::get().reads(1);
530
531			// We need to ensure that `CoreInfo` digest exists only once.
532			match CumulusDigestItem::core_info_exists_at_max_once(
533				&frame_system::Pallet::<T>::digest(),
534			) {
535				CoreInfoExistsAtMaxOnce::Once(core_info) => {
536					assert_eq!(
537						core_info.claim_queue_offset.0,
538						T::RelayParentOffset::get() as u8,
539						"Only {} is supported as valid claim queue offset",
540						T::RelayParentOffset::get()
541					);
542				},
543				CoreInfoExistsAtMaxOnce::NotFound => {},
544				CoreInfoExistsAtMaxOnce::MoreThanOnce => {
545					panic!("`CumulusDigestItem::CoreInfo` must exist at max once.");
546				},
547			}
548
549			weight
550		}
551	}
552
553	#[pallet::call]
554	impl<T: Config> Pallet<T> {
555		/// Set the current validation data.
556		///
557		/// This should be invoked exactly once per block. It will panic at the finalization
558		/// phase if the call was not invoked.
559		///
560		/// The dispatch origin for this call must be `Inherent`
561		///
562		/// As a side effect, this function upgrades the current validation function
563		/// if the appropriate time has come.
564		#[pallet::call_index(0)]
565		#[pallet::weight((0, DispatchClass::Mandatory))]
566		// TODO: This weight should be corrected. Currently the weight is registered manually in the
567		// call with `register_extra_weight_unchecked`.
568		pub fn set_validation_data(
569			origin: OriginFor<T>,
570			data: BasicParachainInherentData,
571			inbound_messages_data: InboundMessagesData,
572		) -> DispatchResult {
573			ensure_none(origin)?;
574			assert!(
575				!<ValidationData<T>>::exists(),
576				"ValidationData must be updated only once in a block",
577			);
578
579			// TODO: This is more than zero, but will need benchmarking to figure out what.
580			let mut total_weight = Weight::zero();
581
582			// NOTE: the inherent data is expected to be unique, even if this block is built
583			// in the context of the same relay parent as the previous one. In particular,
584			// the inherent shouldn't contain messages that were already processed by any of the
585			// ancestors.
586			//
587			// This invariant should be upheld by the `ProvideInherent` implementation.
588			let BasicParachainInherentData {
589				validation_data: vfp,
590				relay_chain_state,
591				relay_parent_descendants,
592				collator_peer_id: _,
593			} = data;
594
595			// Check that the associated relay chain block number is as expected.
596			T::CheckAssociatedRelayNumber::check_associated_relay_number(
597				vfp.relay_parent_number,
598				LastRelayChainBlockNumber::<T>::get(),
599			);
600
601			let relay_state_proof = RelayChainStateProof::new(
602				T::SelfParaId::get(),
603				vfp.relay_parent_storage_root,
604				relay_chain_state.clone(),
605			)
606			.expect("Invalid relay chain state proof");
607
608			let expected_rp_descendants_num = T::RelayParentOffset::get();
609
610			if expected_rp_descendants_num > 0 {
611				if let Err(err) = descendant_validation::verify_relay_parent_descendants(
612					&relay_state_proof,
613					relay_parent_descendants,
614					vfp.relay_parent_storage_root,
615					expected_rp_descendants_num,
616				) {
617					panic!(
618						"Unable to verify provided relay parent descendants. \
619						expected_rp_descendants_num: {expected_rp_descendants_num} \
620						error: {err:?}"
621					);
622				};
623			}
624
625			// Update the desired maximum capacity according to the consensus hook.
626			let (consensus_hook_weight, capacity) =
627				T::ConsensusHook::on_state_proof(&relay_state_proof);
628			total_weight += consensus_hook_weight;
629			total_weight += Self::maybe_drop_included_ancestors(&relay_state_proof, capacity);
630			// Deposit a log indicating the relay-parent storage root.
631			// TODO: remove this in favor of the relay-parent's hash after
632			// https://github.com/paritytech/cumulus/issues/303
633			frame_system::Pallet::<T>::deposit_log(
634				cumulus_primitives_core::rpsr_digest::relay_parent_storage_root_item(
635					vfp.relay_parent_storage_root,
636					vfp.relay_parent_number,
637				),
638			);
639
640			// initialization logic: we know that this runs exactly once every block,
641			// which means we can put the initialization logic here to remove the
642			// sequencing problem.
643			let upgrade_go_ahead_signal = relay_state_proof
644				.read_upgrade_go_ahead_signal()
645				.expect("Invalid upgrade go ahead signal");
646
647			let upgrade_signal_in_segment = AggregatedUnincludedSegment::<T>::get()
648				.as_ref()
649				.and_then(SegmentTracker::consumed_go_ahead_signal);
650			if let Some(signal_in_segment) = upgrade_signal_in_segment.as_ref() {
651				// Unincluded ancestor consuming upgrade signal is still within the segment,
652				// sanity check that it matches with the signal from relay chain.
653				assert_eq!(upgrade_go_ahead_signal, Some(*signal_in_segment));
654			}
655			match upgrade_go_ahead_signal {
656				Some(_signal) if upgrade_signal_in_segment.is_some() => {
657					// Do nothing, processing logic was executed by unincluded ancestor.
658				},
659				Some(relay_chain::UpgradeGoAhead::GoAhead) => {
660					assert!(
661						<PendingValidationCode<T>>::exists(),
662						"No new validation function found in storage, GoAhead signal is not expected",
663					);
664					let validation_code = <PendingValidationCode<T>>::take();
665
666					frame_system::Pallet::<T>::update_code_in_storage(&validation_code);
667					<T::OnSystemEvent as OnSystemEvent>::on_validation_code_applied();
668					Self::deposit_event(Event::ValidationFunctionApplied {
669						relay_chain_block_num: vfp.relay_parent_number,
670					});
671				},
672				Some(relay_chain::UpgradeGoAhead::Abort) => {
673					<PendingValidationCode<T>>::kill();
674					Self::deposit_event(Event::ValidationFunctionDiscarded);
675				},
676				None => {},
677			}
678			<UpgradeRestrictionSignal<T>>::put(
679				relay_state_proof
680					.read_upgrade_restriction_signal()
681					.expect("Invalid upgrade restriction signal"),
682			);
683			<UpgradeGoAhead<T>>::put(upgrade_go_ahead_signal);
684
685			let host_config = relay_state_proof
686				.read_abridged_host_configuration()
687				.expect("Invalid host configuration in relay chain state proof");
688
689			let relevant_messaging_state = relay_state_proof
690				.read_messaging_state_snapshot(&host_config)
691				.expect("Invalid messaging state in relay chain state proof");
692
693			<ValidationData<T>>::put(&vfp);
694			<RelayStateProof<T>>::put(relay_chain_state);
695			<RelevantMessagingState<T>>::put(relevant_messaging_state.clone());
696			<HostConfiguration<T>>::put(host_config);
697
698			<T::OnSystemEvent as OnSystemEvent>::on_validation_data(&vfp);
699
700			total_weight.saturating_accrue(Self::enqueue_inbound_downward_messages(
701				relevant_messaging_state.dmq_mqc_head,
702				inbound_messages_data.downward_messages,
703			));
704			total_weight.saturating_accrue(Self::enqueue_inbound_horizontal_messages(
705				&relevant_messaging_state.ingress_channels,
706				inbound_messages_data.horizontal_messages,
707				vfp.relay_parent_number,
708			));
709
710			frame_system::Pallet::<T>::register_extra_weight_unchecked(
711				total_weight,
712				DispatchClass::Mandatory,
713			);
714
715			Ok(())
716		}
717
718		#[pallet::call_index(1)]
719		#[pallet::weight((1_000, DispatchClass::Operational))]
720		pub fn sudo_send_upward_message(
721			origin: OriginFor<T>,
722			message: UpwardMessage,
723		) -> DispatchResult {
724			ensure_root(origin)?;
725			let _ = Self::send_upward_message(message);
726			Ok(())
727		}
728
729		// WARNING: call indices 2 and 3 were used in a former version of this pallet. Using them
730		// again will require to bump the transaction version of runtimes using this pallet.
731	}
732
733	#[pallet::event]
734	#[pallet::generate_deposit(pub(super) fn deposit_event)]
735	pub enum Event<T: Config> {
736		/// The validation function has been scheduled to apply.
737		ValidationFunctionStored,
738		/// The validation function was applied as of the contained relay chain block number.
739		ValidationFunctionApplied { relay_chain_block_num: RelayChainBlockNumber },
740		/// The relay-chain aborted the upgrade process.
741		ValidationFunctionDiscarded,
742		/// Some downward messages have been received and will be processed.
743		DownwardMessagesReceived { count: u32 },
744		/// Downward messages were processed using the given weight.
745		DownwardMessagesProcessed { weight_used: Weight, dmq_head: relay_chain::Hash },
746		/// An upward message was sent to the relay chain.
747		UpwardMessageSent { message_hash: Option<XcmHash> },
748	}
749
750	#[pallet::error]
751	pub enum Error<T> {
752		/// Attempt to upgrade validation function while existing upgrade pending.
753		OverlappingUpgrades,
754		/// Polkadot currently prohibits this parachain from upgrading its validation function.
755		ProhibitedByPolkadot,
756		/// The supplied validation function has compiled into a blob larger than Polkadot is
757		/// willing to run.
758		TooBig,
759		/// The inherent which supplies the validation data did not run this block.
760		ValidationDataNotAvailable,
761		/// The inherent which supplies the host configuration did not run this block.
762		HostConfigurationNotAvailable,
763		/// No validation function upgrade is currently scheduled.
764		NotScheduled,
765	}
766
767	/// Latest included block descendants the runtime accepted. In other words, these are
768	/// ancestors of the currently executing block which have not been included in the observed
769	/// relay-chain state.
770	///
771	/// The segment length is limited by the capacity returned from the [`ConsensusHook`] configured
772	/// in the pallet.
773	#[pallet::storage]
774	pub type UnincludedSegment<T: Config> = StorageValue<_, Vec<Ancestor<T::Hash>>, ValueQuery>;
775
776	/// Storage field that keeps track of bandwidth used by the unincluded segment along with the
777	/// latest HRMP watermark. Used for limiting the acceptance of new blocks with
778	/// respect to relay chain constraints.
779	#[pallet::storage]
780	pub type AggregatedUnincludedSegment<T: Config> =
781		StorageValue<_, SegmentTracker<T::Hash>, OptionQuery>;
782
783	/// In case of a scheduled upgrade, this storage field contains the validation code to be
784	/// applied.
785	///
786	/// As soon as the relay chain gives us the go-ahead signal, we will overwrite the
787	/// [`:code`][sp_core::storage::well_known_keys::CODE] which will result the next block process
788	/// with the new validation code. This concludes the upgrade process.
789	#[pallet::storage]
790	pub type PendingValidationCode<T: Config> = StorageValue<_, Vec<u8>, ValueQuery>;
791
792	/// Validation code that is set by the parachain and is to be communicated to collator and
793	/// consequently the relay-chain.
794	///
795	/// This will be cleared in `on_initialize` of each new block if no other pallet already set
796	/// the value.
797	#[pallet::storage]
798	pub type NewValidationCode<T: Config> = StorageValue<_, Vec<u8>, OptionQuery>;
799
800	/// The [`PersistedValidationData`] set for this block.
801	/// This value is expected to be set only once per block and it's never stored
802	/// in the trie.
803	#[pallet::storage]
804	pub type ValidationData<T: Config> = StorageValue<_, PersistedValidationData>;
805
806	/// Were the validation data set to notify the relay chain?
807	#[pallet::storage]
808	pub type DidSetValidationCode<T: Config> = StorageValue<_, bool, ValueQuery>;
809
810	/// The relay chain block number associated with the last parachain block.
811	///
812	/// This is updated in `on_finalize`.
813	#[pallet::storage]
814	pub type LastRelayChainBlockNumber<T: Config> =
815		StorageValue<_, RelayChainBlockNumber, ValueQuery>;
816
817	/// An option which indicates if the relay-chain restricts signalling a validation code upgrade.
818	/// In other words, if this is `Some` and [`NewValidationCode`] is `Some` then the produced
819	/// candidate will be invalid.
820	///
821	/// This storage item is a mirror of the corresponding value for the current parachain from the
822	/// relay-chain. This value is ephemeral which means it doesn't hit the storage. This value is
823	/// set after the inherent.
824	#[pallet::storage]
825	pub type UpgradeRestrictionSignal<T: Config> =
826		StorageValue<_, Option<relay_chain::UpgradeRestriction>, ValueQuery>;
827
828	/// Optional upgrade go-ahead signal from the relay-chain.
829	///
830	/// This storage item is a mirror of the corresponding value for the current parachain from the
831	/// relay-chain. This value is ephemeral which means it doesn't hit the storage. This value is
832	/// set after the inherent.
833	#[pallet::storage]
834	pub type UpgradeGoAhead<T: Config> =
835		StorageValue<_, Option<relay_chain::UpgradeGoAhead>, ValueQuery>;
836
837	/// The state proof for the last relay parent block.
838	///
839	/// This field is meant to be updated each block with the validation data inherent. Therefore,
840	/// before processing of the inherent, e.g. in `on_initialize` this data may be stale.
841	///
842	/// This data is also absent from the genesis.
843	#[pallet::storage]
844	pub type RelayStateProof<T: Config> = StorageValue<_, sp_trie::StorageProof>;
845
846	/// The snapshot of some state related to messaging relevant to the current parachain as per
847	/// the relay parent.
848	///
849	/// This field is meant to be updated each block with the validation data inherent. Therefore,
850	/// before processing of the inherent, e.g. in `on_initialize` this data may be stale.
851	///
852	/// This data is also absent from the genesis.
853	#[pallet::storage]
854	pub type RelevantMessagingState<T: Config> = StorageValue<_, MessagingStateSnapshot>;
855
856	/// The parachain host configuration that was obtained from the relay parent.
857	///
858	/// This field is meant to be updated each block with the validation data inherent. Therefore,
859	/// before processing of the inherent, e.g. in `on_initialize` this data may be stale.
860	///
861	/// This data is also absent from the genesis.
862	#[pallet::storage]
863	#[pallet::disable_try_decode_storage]
864	pub type HostConfiguration<T: Config> = StorageValue<_, AbridgedHostConfiguration>;
865
866	/// The last downward message queue chain head we have observed.
867	///
868	/// This value is loaded before and saved after processing inbound downward messages carried
869	/// by the system inherent.
870	#[pallet::storage]
871	pub type LastDmqMqcHead<T: Config> = StorageValue<_, MessageQueueChain, ValueQuery>;
872
873	/// The message queue chain heads we have observed per each channel incoming channel.
874	///
875	/// This value is loaded before and saved after processing inbound downward messages carried
876	/// by the system inherent.
877	#[pallet::storage]
878	pub type LastHrmpMqcHeads<T: Config> =
879		StorageValue<_, BTreeMap<ParaId, MessageQueueChain>, ValueQuery>;
880
881	/// Number of downward messages processed in a block.
882	///
883	/// This will be cleared in `on_initialize` of each new block.
884	#[pallet::storage]
885	pub type ProcessedDownwardMessages<T: Config> = StorageValue<_, u32, ValueQuery>;
886
887	/// The last processed downward message.
888	///
889	/// We need to keep track of this to filter the messages that have been already processed.
890	#[pallet::storage]
891	pub type LastProcessedDownwardMessage<T: Config> = StorageValue<_, InboundMessageId>;
892
893	/// HRMP watermark that was set in a block.
894	#[pallet::storage]
895	pub type HrmpWatermark<T: Config> = StorageValue<_, relay_chain::BlockNumber, ValueQuery>;
896
897	/// The last processed HRMP message.
898	///
899	/// We need to keep track of this to filter the messages that have been already processed.
900	#[pallet::storage]
901	pub type LastProcessedHrmpMessage<T: Config> = StorageValue<_, InboundMessageId>;
902
903	/// HRMP messages that were sent in a block.
904	///
905	/// This will be cleared in `on_initialize` of each new block.
906	#[pallet::storage]
907	pub type HrmpOutboundMessages<T: Config> =
908		StorageValue<_, Vec<OutboundHrmpMessage>, ValueQuery>;
909
910	/// Upward messages that were sent in a block.
911	///
912	/// This will be cleared in `on_initialize` of each new block.
913	#[pallet::storage]
914	pub type UpwardMessages<T: Config> = StorageValue<_, Vec<UpwardMessage>, ValueQuery>;
915
916	/// Upward messages that are still pending and not yet send to the relay chain.
917	#[pallet::storage]
918	pub type PendingUpwardMessages<T: Config> = StorageValue<_, Vec<UpwardMessage>, ValueQuery>;
919
920	/// The factor to multiply the base delivery fee by for UMP.
921	#[pallet::storage]
922	pub type UpwardDeliveryFeeFactor<T: Config> =
923		StorageValue<_, FixedU128, ValueQuery, GetMinFeeFactor<Pallet<T>>>;
924
925	/// The number of HRMP messages we observed in `on_initialize` and thus used that number for
926	/// announcing the weight of `on_initialize` and `on_finalize`.
927	#[pallet::storage]
928	pub type AnnouncedHrmpMessagesPerCandidate<T: Config> = StorageValue<_, u32, ValueQuery>;
929
930	/// The weight we reserve at the beginning of the block for processing XCMP messages. This
931	/// overrides the amount set in the Config trait.
932	#[pallet::storage]
933	pub type ReservedXcmpWeightOverride<T: Config> = StorageValue<_, Weight>;
934
935	/// The weight we reserve at the beginning of the block for processing DMP messages. This
936	/// overrides the amount set in the Config trait.
937	#[pallet::storage]
938	pub type ReservedDmpWeightOverride<T: Config> = StorageValue<_, Weight>;
939
940	/// A custom head data that should be returned as result of `validate_block`.
941	///
942	/// See `Pallet::set_custom_validation_head_data` for more information.
943	#[pallet::storage]
944	pub type CustomValidationHeadData<T: Config> = StorageValue<_, Vec<u8>, OptionQuery>;
945
946	#[pallet::inherent]
947	impl<T: Config> ProvideInherent for Pallet<T> {
948		type Call = Call<T>;
949		type Error = sp_inherents::MakeFatalError<()>;
950		const INHERENT_IDENTIFIER: InherentIdentifier =
951			cumulus_primitives_parachain_inherent::INHERENT_IDENTIFIER;
952
953		fn create_inherent(data: &InherentData) -> Option<Self::Call> {
954			let data = match data
955				.get_data::<ParachainInherentData>(&Self::INHERENT_IDENTIFIER)
956				.ok()
957				.flatten()
958			{
959				None => {
960					// Key Self::INHERENT_IDENTIFIER is expected to contain versioned inherent
961					// data. Older nodes are unaware of the new format and might provide the
962					// legacy data format. We try to load it and transform it into the current
963					// version.
964					let data = data
965						.get_data::<v0::ParachainInherentData>(
966							&cumulus_primitives_parachain_inherent::PARACHAIN_INHERENT_IDENTIFIER_V0,
967						)
968						.ok()
969						.flatten()?;
970					data.into()
971				},
972				Some(data) => data,
973			};
974
975			Some(Self::do_create_inherent(data))
976		}
977
978		fn is_inherent(call: &Self::Call) -> bool {
979			matches!(call, Call::set_validation_data { .. })
980		}
981	}
982
983	#[pallet::genesis_config]
984	#[derive(frame_support::DefaultNoBound)]
985	pub struct GenesisConfig<T: Config> {
986		#[serde(skip)]
987		pub _config: core::marker::PhantomData<T>,
988	}
989
990	#[pallet::genesis_build]
991	impl<T: Config> BuildGenesisConfig for GenesisConfig<T> {
992		fn build(&self) {
993			// TODO: Remove after https://github.com/paritytech/cumulus/issues/479
994			sp_io::storage::set(b":c", &[]);
995		}
996	}
997}
998
999impl<T: Config> Pallet<T> {
1000	/// Get the unincluded segment size after the given hash.
1001	///
1002	/// If the unincluded segment doesn't contain the given hash, this returns the
1003	/// length of the entire unincluded segment.
1004	///
1005	/// This is intended to be used for determining how long the unincluded segment _would be_
1006	/// in runtime APIs related to authoring.
1007	pub fn unincluded_segment_size_after(included_hash: T::Hash) -> u32 {
1008		let segment = UnincludedSegment::<T>::get();
1009		crate::unincluded_segment::size_after_included(included_hash, &segment)
1010	}
1011}
1012
1013impl<T: Config> FeeTracker for Pallet<T> {
1014	type Id = ();
1015
1016	fn get_fee_factor(_id: Self::Id) -> FixedU128 {
1017		UpwardDeliveryFeeFactor::<T>::get()
1018	}
1019
1020	fn set_fee_factor(_id: Self::Id, val: FixedU128) {
1021		UpwardDeliveryFeeFactor::<T>::set(val);
1022	}
1023}
1024
1025impl<T: Config> ListChannelInfos for Pallet<T> {
1026	fn outgoing_channels() -> Vec<ParaId> {
1027		let Some(state) = RelevantMessagingState::<T>::get() else { return Vec::new() };
1028		state.egress_channels.into_iter().map(|(id, _)| id).collect()
1029	}
1030}
1031
1032impl<T: Config> GetChannelInfo for Pallet<T> {
1033	fn get_channel_status(id: ParaId) -> ChannelStatus {
1034		// Note, that we are using `relevant_messaging_state` which may be from the previous
1035		// block, in case this is called from `on_initialize`, i.e. before the inherent with
1036		// fresh data is submitted.
1037		//
1038		// That shouldn't be a problem though because this is anticipated and already can
1039		// happen. This is because sending implies that a message is buffered until there is
1040		// space to send a message in the candidate. After a while waiting in a buffer, it may
1041		// be discovered that the channel to which a message were addressed is now closed.
1042		// Another possibility, is that the maximum message size was decreased so that a
1043		// message in the buffer doesn't fit. Should any of that happen the sender should be
1044		// notified about the message was discarded.
1045		//
1046		// Here it a similar case, with the difference that the realization that the channel is
1047		// closed came the same block.
1048		let channels = match RelevantMessagingState::<T>::get() {
1049			None => {
1050				log::warn!("calling `get_channel_status` with no RelevantMessagingState?!");
1051				return ChannelStatus::Closed
1052			},
1053			Some(d) => d.egress_channels,
1054		};
1055		// ^^^ NOTE: This storage field should carry over from the previous block. So if it's
1056		// None then it must be that this is an edge-case where a message is attempted to be
1057		// sent at the first block. It should be safe to assume that there are no channels
1058		// opened at all so early. At least, relying on this assumption seems to be a better
1059		// trade-off, compared to introducing an error variant that the clients should be
1060		// prepared to handle.
1061		let index = match channels.binary_search_by_key(&id, |item| item.0) {
1062			Err(_) => return ChannelStatus::Closed,
1063			Ok(i) => i,
1064		};
1065		let meta = &channels[index].1;
1066		if meta.msg_count + 1 > meta.max_capacity {
1067			// The channel is at its capacity. Skip it for now.
1068			return ChannelStatus::Full
1069		}
1070		let max_size_now = meta.max_total_size - meta.total_size;
1071		let max_size_ever = meta.max_message_size;
1072		ChannelStatus::Ready(max_size_now as usize, max_size_ever as usize)
1073	}
1074
1075	fn get_channel_info(id: ParaId) -> Option<ChannelInfo> {
1076		let channels = RelevantMessagingState::<T>::get()?.egress_channels;
1077		let index = channels.binary_search_by_key(&id, |item| item.0).ok()?;
1078		let info = ChannelInfo {
1079			max_capacity: channels[index].1.max_capacity,
1080			max_total_size: channels[index].1.max_total_size,
1081			max_message_size: channels[index].1.max_message_size,
1082			msg_count: channels[index].1.msg_count,
1083			total_size: channels[index].1.total_size,
1084		};
1085		Some(info)
1086	}
1087}
1088
1089impl<T: Config> Pallet<T> {
1090	/// The bandwidth limit per block that applies when receiving messages from the relay chain via
1091	/// DMP or XCMP.
1092	///
1093	/// The limit is per message passing mechanism (e.g. 1 MiB for DMP, 1 MiB for XCMP).
1094	///
1095	/// The purpose of this limit is to make sure that the total size of the messages received by
1096	/// the parachain from the relay chain doesn't exceed the block size. Currently each message
1097	/// passing mechanism can use 1/6 of the total block PoV which means that in total 1/3
1098	/// of the block PoV can be used for message passing.
1099	fn messages_collection_size_limit() -> usize {
1100		let max_block_weight = <T as frame_system::Config>::BlockWeights::get().max_block;
1101		let max_block_pov = max_block_weight.proof_size();
1102		(max_block_pov / 6).saturated_into()
1103	}
1104
1105	/// Updates inherent data to only include the messages that weren't already processed
1106	/// by the runtime and to compress (hash) the messages that exceed the allocated size.
1107	///
1108	/// This method doesn't check for mqc heads mismatch. If the MQC doesn't match after
1109	/// dropping messages, the runtime will panic when executing the inherent.
1110	fn do_create_inherent(data: ParachainInherentData) -> Call<T> {
1111		let (data, mut downward_messages, mut horizontal_messages) =
1112			deconstruct_parachain_inherent_data(data);
1113		let last_relay_block_number = LastRelayChainBlockNumber::<T>::get();
1114
1115		let messages_collection_size_limit = Self::messages_collection_size_limit();
1116		// DMQ.
1117		let last_processed_msg = LastProcessedDownwardMessage::<T>::get()
1118			.unwrap_or(InboundMessageId { sent_at: last_relay_block_number, reverse_idx: 0 });
1119		downward_messages.drop_processed_messages(&last_processed_msg);
1120		let mut size_limit = messages_collection_size_limit;
1121		let downward_messages = downward_messages.into_abridged(&mut size_limit);
1122
1123		// HRMP.
1124		let last_processed_msg = LastProcessedHrmpMessage::<T>::get()
1125			.unwrap_or(InboundMessageId { sent_at: last_relay_block_number, reverse_idx: 0 });
1126		horizontal_messages.drop_processed_messages(&last_processed_msg);
1127		size_limit = size_limit.saturating_add(messages_collection_size_limit);
1128		let horizontal_messages = horizontal_messages.into_abridged(&mut size_limit);
1129
1130		let inbound_messages_data =
1131			InboundMessagesData::new(downward_messages, horizontal_messages);
1132
1133		Call::set_validation_data { data, inbound_messages_data }
1134	}
1135
1136	/// Enqueue all inbound downward messages relayed by the collator into the MQ pallet.
1137	///
1138	/// Checks if the sequence of the messages is valid, dispatches them and communicates the
1139	/// number of processed messages to the collator via a storage update.
1140	///
1141	/// # Panics
1142	///
1143	/// If it turns out that after processing all messages the Message Queue Chain
1144	/// hash doesn't match the expected.
1145	fn enqueue_inbound_downward_messages(
1146		expected_dmq_mqc_head: relay_chain::Hash,
1147		downward_messages: AbridgedInboundDownwardMessages,
1148	) -> Weight {
1149		downward_messages.check_enough_messages_included("DMQ");
1150
1151		let mut dmq_head = <LastDmqMqcHead<T>>::get();
1152
1153		let (messages, hashed_messages) = downward_messages.messages();
1154		let message_count = messages.len() as u32;
1155		let weight_used = T::WeightInfo::enqueue_inbound_downward_messages(message_count);
1156		if let Some(last_msg) = messages.last() {
1157			Self::deposit_event(Event::DownwardMessagesReceived { count: message_count });
1158
1159			// Eagerly update the MQC head hash:
1160			for msg in messages {
1161				dmq_head.extend_downward(msg);
1162			}
1163			<LastDmqMqcHead<T>>::put(&dmq_head);
1164			Self::deposit_event(Event::DownwardMessagesProcessed {
1165				weight_used,
1166				dmq_head: dmq_head.head(),
1167			});
1168
1169			let mut last_processed_msg =
1170				InboundMessageId { sent_at: last_msg.sent_at, reverse_idx: 0 };
1171			for msg in hashed_messages {
1172				dmq_head.extend_with_hashed_msg(msg);
1173
1174				if msg.sent_at == last_processed_msg.sent_at {
1175					last_processed_msg.reverse_idx += 1;
1176				}
1177			}
1178			LastProcessedDownwardMessage::<T>::put(last_processed_msg);
1179
1180			T::DmpQueue::handle_messages(downward_messages.bounded_msgs_iter());
1181		}
1182
1183		// After hashing each message in the message queue chain submitted by the collator, we
1184		// should arrive to the MQC head provided by the relay chain.
1185		//
1186		// A mismatch means that at least some of the submitted messages were altered, omitted or
1187		// added improperly.
1188		assert_eq!(dmq_head.head(), expected_dmq_mqc_head, "DMQ head mismatch");
1189
1190		ProcessedDownwardMessages::<T>::put(message_count);
1191
1192		weight_used
1193	}
1194
1195	fn check_hrmp_mcq_heads(
1196		ingress_channels: &[(ParaId, cumulus_primitives_core::AbridgedHrmpChannel)],
1197		mqc_heads: &mut BTreeMap<ParaId, MessageQueueChain>,
1198	) {
1199		// Check that the MQC heads for each channel provided by the relay chain match the MQC
1200		// heads we have after processing all incoming messages.
1201		//
1202		// Along the way we also carry over the relevant entries from the `last_mqc_heads` to
1203		// `running_mqc_heads`. Otherwise, in a block where no messages were sent in a channel
1204		// it won't get into next block's `last_mqc_heads` and thus will be all zeros, which
1205		// would corrupt the message queue chain.
1206		for (sender, channel) in ingress_channels {
1207			let cur_head = mqc_heads.entry(*sender).or_default().head();
1208			let target_head = channel.mqc_head.unwrap_or_default();
1209			assert_eq!(cur_head, target_head, "HRMP head mismatch");
1210		}
1211	}
1212
1213	/// Performs some checks related to the sender and the `sent_at` field of an HRMP message.
1214	///
1215	/// **Panics** if the message submitted by the collator doesn't respect the expected order or if
1216	///            it was sent from a para which has no open channel to this parachain.
1217	fn check_hrmp_message_metadata(
1218		ingress_channels: &[(ParaId, cumulus_primitives_core::AbridgedHrmpChannel)],
1219		maybe_prev_msg_metadata: &mut Option<(u32, ParaId)>,
1220		msg_metadata: (u32, ParaId),
1221	) {
1222		// Check that the message is properly ordered.
1223		if let Some(prev_msg) = maybe_prev_msg_metadata {
1224			assert!(&msg_metadata >= prev_msg, "[HRMP] Messages order violation");
1225		}
1226		*maybe_prev_msg_metadata = Some(msg_metadata);
1227
1228		// Check that the message is sent from an existing channel. The channel exists
1229		// if its MQC head is present in `vfp.hrmp_mqc_heads`.
1230		let sender = msg_metadata.1;
1231		let maybe_channel_idx =
1232			ingress_channels.binary_search_by_key(&sender, |&(channel_sender, _)| channel_sender);
1233		assert!(
1234			maybe_channel_idx.is_ok(),
1235			"One of the messages submitted by the collator was sent from a sender ({}) \
1236					that doesn't have a channel opened to this parachain",
1237			<ParaId as Into<u32>>::into(sender)
1238		);
1239	}
1240
1241	/// Process all inbound horizontal messages relayed by the collator.
1242	///
1243	/// This is similar to [`enqueue_inbound_downward_messages`], but works with multiple inbound
1244	/// channels. It immediately dispatches signals and queues all other XCMs. Blob messages are
1245	/// ignored.
1246	///
1247	/// **Panics** if either any of horizontal messages submitted by the collator was sent from
1248	///            a para which has no open channel to this parachain or if after processing
1249	///            messages across all inbound channels MQCs were obtained which do not
1250	///            correspond to the ones found on the relay-chain.
1251	fn enqueue_inbound_horizontal_messages(
1252		ingress_channels: &[(ParaId, cumulus_primitives_core::AbridgedHrmpChannel)],
1253		horizontal_messages: AbridgedInboundHrmpMessages,
1254		relay_parent_number: relay_chain::BlockNumber,
1255	) -> Weight {
1256		// First, check the HRMP advancement rule.
1257		horizontal_messages.check_enough_messages_included("HRMP");
1258
1259		let (messages, hashed_messages) = horizontal_messages.messages();
1260		let mut mqc_heads = <LastHrmpMqcHeads<T>>::get();
1261
1262		if messages.is_empty() {
1263			Self::check_hrmp_mcq_heads(ingress_channels, &mut mqc_heads);
1264			let last_processed_msg =
1265				InboundMessageId { sent_at: relay_parent_number, reverse_idx: 0 };
1266			LastProcessedHrmpMessage::<T>::put(last_processed_msg);
1267			HrmpWatermark::<T>::put(relay_parent_number);
1268			return T::DbWeight::get().reads_writes(1, 2);
1269		}
1270
1271		let mut prev_msg_metadata = None;
1272		let mut last_processed_block = HrmpWatermark::<T>::get();
1273		let mut last_processed_msg = InboundMessageId { sent_at: 0, reverse_idx: 0 };
1274		for (sender, msg) in messages {
1275			Self::check_hrmp_message_metadata(
1276				ingress_channels,
1277				&mut prev_msg_metadata,
1278				(msg.sent_at, *sender),
1279			);
1280			mqc_heads.entry(*sender).or_default().extend_hrmp(msg);
1281
1282			if msg.sent_at > last_processed_msg.sent_at && last_processed_msg.sent_at > 0 {
1283				last_processed_block = last_processed_msg.sent_at;
1284			}
1285			last_processed_msg.sent_at = msg.sent_at;
1286		}
1287		<LastHrmpMqcHeads<T>>::put(&mqc_heads);
1288		for (sender, msg) in hashed_messages {
1289			Self::check_hrmp_message_metadata(
1290				ingress_channels,
1291				&mut prev_msg_metadata,
1292				(msg.sent_at, *sender),
1293			);
1294			mqc_heads.entry(*sender).or_default().extend_with_hashed_msg(msg);
1295
1296			if msg.sent_at == last_processed_msg.sent_at {
1297				last_processed_msg.reverse_idx += 1;
1298			}
1299		}
1300		if last_processed_msg.sent_at > 0 && last_processed_msg.reverse_idx == 0 {
1301			last_processed_block = last_processed_msg.sent_at;
1302		}
1303		LastProcessedHrmpMessage::<T>::put(&last_processed_msg);
1304		Self::check_hrmp_mcq_heads(ingress_channels, &mut mqc_heads);
1305
1306		let max_weight =
1307			<ReservedXcmpWeightOverride<T>>::get().unwrap_or_else(T::ReservedXcmpWeight::get);
1308		let weight_used = T::XcmpMessageHandler::handle_xcmp_messages(
1309			horizontal_messages.flat_msgs_iter(),
1310			max_weight,
1311		);
1312
1313		// Update watermark
1314		HrmpWatermark::<T>::put(last_processed_block);
1315
1316		weight_used.saturating_add(T::DbWeight::get().reads_writes(2, 3))
1317	}
1318
1319	/// Drop blocks from the unincluded segment with respect to the latest parachain head.
1320	fn maybe_drop_included_ancestors(
1321		relay_state_proof: &RelayChainStateProof,
1322		capacity: consensus_hook::UnincludedSegmentCapacity,
1323	) -> Weight {
1324		let mut weight_used = Weight::zero();
1325		// If the unincluded segment length is nonzero, then the parachain head must be present.
1326		let para_head =
1327			relay_state_proof.read_included_para_head().ok().map(|h| T::Hashing::hash(&h.0));
1328
1329		let unincluded_segment_len = <UnincludedSegment<T>>::decode_len().unwrap_or(0);
1330		weight_used += T::DbWeight::get().reads(1);
1331
1332		// Clean up unincluded segment if nonempty.
1333		let included_head = match (para_head, capacity.is_expecting_included_parent()) {
1334			(Some(h), true) => {
1335				assert_eq!(
1336					h,
1337					frame_system::Pallet::<T>::parent_hash(),
1338					"expected parent to be included"
1339				);
1340
1341				h
1342			},
1343			(Some(h), false) => h,
1344			(None, true) => {
1345				// All this logic is essentially a workaround to support collators which
1346				// might still not provide the included block with the state proof.
1347				frame_system::Pallet::<T>::parent_hash()
1348			},
1349			(None, false) => panic!("included head not present in relay storage proof"),
1350		};
1351
1352		let new_len = {
1353			let para_head_hash = included_head;
1354			let dropped: Vec<Ancestor<T::Hash>> = <UnincludedSegment<T>>::mutate(|chain| {
1355				// Drop everything up to (inclusive) the block with an included para head, if
1356				// present.
1357				let idx = chain
1358					.iter()
1359					.position(|block| {
1360						let head_hash = block
1361							.para_head_hash()
1362							.expect("para head hash is updated during block initialization; qed");
1363						head_hash == &para_head_hash
1364					})
1365					.map_or(0, |idx| idx + 1); // inclusive.
1366
1367				chain.drain(..idx).collect()
1368			});
1369			weight_used += T::DbWeight::get().reads_writes(1, 1);
1370
1371			let new_len = unincluded_segment_len - dropped.len();
1372			if !dropped.is_empty() {
1373				<AggregatedUnincludedSegment<T>>::mutate(|agg| {
1374					let agg = agg.as_mut().expect(
1375						"dropped part of the segment wasn't empty, hence value exists; qed",
1376					);
1377					for block in dropped {
1378						agg.subtract(&block);
1379					}
1380				});
1381				weight_used += T::DbWeight::get().reads_writes(1, 1);
1382			}
1383
1384			new_len as u32
1385		};
1386
1387		// Current block validity check: ensure there is space in the unincluded segment.
1388		//
1389		// If this fails, the parachain needs to wait for ancestors to be included before
1390		// a new block is allowed.
1391		assert!(new_len < capacity.get(), "no space left for the block in the unincluded segment");
1392		weight_used
1393	}
1394
1395	/// This adjusts the `RelevantMessagingState` according to the bandwidth limits in the
1396	/// unincluded segment.
1397	//
1398	// Reads: 2
1399	// Writes: 1
1400	fn adjust_egress_bandwidth_limits() {
1401		let unincluded_segment = match AggregatedUnincludedSegment::<T>::get() {
1402			None => return,
1403			Some(s) => s,
1404		};
1405
1406		<RelevantMessagingState<T>>::mutate(|messaging_state| {
1407			let messaging_state = match messaging_state {
1408				None => return,
1409				Some(s) => s,
1410			};
1411
1412			let used_bandwidth = unincluded_segment.used_bandwidth();
1413
1414			let channels = &mut messaging_state.egress_channels;
1415			for (para_id, used) in used_bandwidth.hrmp_outgoing.iter() {
1416				let i = match channels.binary_search_by_key(para_id, |item| item.0) {
1417					Ok(i) => i,
1418					Err(_) => continue, // indicates channel closed.
1419				};
1420
1421				let c = &mut channels[i].1;
1422
1423				c.total_size = (c.total_size + used.total_bytes).min(c.max_total_size);
1424				c.msg_count = (c.msg_count + used.msg_count).min(c.max_capacity);
1425			}
1426
1427			let upward_capacity = &mut messaging_state.relay_dispatch_queue_remaining_capacity;
1428			upward_capacity.remaining_count =
1429				upward_capacity.remaining_count.saturating_sub(used_bandwidth.ump_msg_count);
1430			upward_capacity.remaining_size =
1431				upward_capacity.remaining_size.saturating_sub(used_bandwidth.ump_total_bytes);
1432		});
1433	}
1434
1435	/// Put a new validation function into a particular location where polkadot
1436	/// monitors for updates. Calling this function notifies polkadot that a new
1437	/// upgrade has been scheduled.
1438	fn notify_polkadot_of_pending_upgrade(code: &[u8]) {
1439		NewValidationCode::<T>::put(code);
1440		<DidSetValidationCode<T>>::put(true);
1441	}
1442
1443	/// The maximum code size permitted, in bytes.
1444	///
1445	/// Returns `None` if the relay chain parachain host configuration hasn't been submitted yet.
1446	pub fn max_code_size() -> Option<u32> {
1447		<HostConfiguration<T>>::get().map(|cfg| cfg.max_code_size)
1448	}
1449
1450	/// The implementation of the runtime upgrade functionality for parachains.
1451	pub fn schedule_code_upgrade(validation_function: Vec<u8>) -> DispatchResult {
1452		// Ensure that `ValidationData` exists. We do not care about the validation data per se,
1453		// but we do care about the [`UpgradeRestrictionSignal`] which arrives with the same
1454		// inherent.
1455		ensure!(<ValidationData<T>>::exists(), Error::<T>::ValidationDataNotAvailable,);
1456		ensure!(<UpgradeRestrictionSignal<T>>::get().is_none(), Error::<T>::ProhibitedByPolkadot);
1457
1458		ensure!(!<PendingValidationCode<T>>::exists(), Error::<T>::OverlappingUpgrades);
1459		let cfg = HostConfiguration::<T>::get().ok_or(Error::<T>::HostConfigurationNotAvailable)?;
1460		ensure!(validation_function.len() <= cfg.max_code_size as usize, Error::<T>::TooBig);
1461
1462		// When a code upgrade is scheduled, it has to be applied in two
1463		// places, synchronized: both polkadot and the individual parachain
1464		// have to upgrade on the same relay chain block.
1465		//
1466		// `notify_polkadot_of_pending_upgrade` notifies polkadot; the `PendingValidationCode`
1467		// storage keeps track locally for the parachain upgrade, which will
1468		// be applied later: when the relay-chain communicates go-ahead signal to us.
1469		Self::notify_polkadot_of_pending_upgrade(&validation_function);
1470		<PendingValidationCode<T>>::put(validation_function);
1471		Self::deposit_event(Event::ValidationFunctionStored);
1472
1473		Ok(())
1474	}
1475
1476	/// Returns the [`CollationInfo`] of the current active block.
1477	///
1478	/// The given `header` is the header of the built block we are collecting the collation info
1479	/// for.
1480	///
1481	/// This is expected to be used by the
1482	/// [`CollectCollationInfo`](cumulus_primitives_core::CollectCollationInfo) runtime api.
1483	pub fn collect_collation_info(header: &HeaderFor<T>) -> CollationInfo {
1484		CollationInfo {
1485			hrmp_watermark: HrmpWatermark::<T>::get(),
1486			horizontal_messages: HrmpOutboundMessages::<T>::get(),
1487			upward_messages: UpwardMessages::<T>::get(),
1488			processed_downward_messages: ProcessedDownwardMessages::<T>::get(),
1489			new_validation_code: NewValidationCode::<T>::get().map(Into::into),
1490			// Check if there is a custom header that will also be returned by the validation phase.
1491			// If so, we need to also return it here.
1492			head_data: CustomValidationHeadData::<T>::get()
1493				.map_or_else(|| header.encode(), |v| v)
1494				.into(),
1495		}
1496	}
1497
1498	/// Set a custom head data that should be returned as result of `validate_block`.
1499	///
1500	/// This will overwrite the head data that is returned as result of `validate_block` while
1501	/// validating a `PoV` on the relay chain. Normally the head data that is being returned
1502	/// by `validate_block` is the header of the block that is validated, thus it can be
1503	/// enacted as the new best block. However, for features like forking it can be useful
1504	/// to overwrite the head data with a custom header.
1505	///
1506	/// # Attention
1507	///
1508	/// This should only be used when you are sure what you are doing as this can brick
1509	/// your Parachain.
1510	pub fn set_custom_validation_head_data(head_data: Vec<u8>) {
1511		CustomValidationHeadData::<T>::put(head_data);
1512	}
1513
1514	/// Send the ump signals
1515	#[cfg(feature = "experimental-ump-signals")]
1516	fn send_ump_signal() {
1517		use cumulus_primitives_core::relay_chain::{UMPSignal, UMP_SEPARATOR};
1518
1519		UpwardMessages::<T>::mutate(|up| {
1520			if let Some(core_info) =
1521				CumulusDigestItem::find_core_info(&frame_system::Pallet::<T>::digest())
1522			{
1523				up.push(UMP_SEPARATOR);
1524
1525				// Send the core selector signal.
1526				up.push(
1527					UMPSignal::SelectCore(core_info.selector, core_info.claim_queue_offset)
1528						.encode(),
1529				);
1530			}
1531		});
1532	}
1533
1534	/// Open HRMP channel for using it in benchmarks or tests.
1535	///
1536	/// The caller assumes that the pallet will accept regular outbound message to the sibling
1537	/// `target_parachain` after this call. No other assumptions are made.
1538	#[cfg(any(feature = "runtime-benchmarks", feature = "std"))]
1539	pub fn open_outbound_hrmp_channel_for_benchmarks_or_tests(target_parachain: ParaId) {
1540		RelevantMessagingState::<T>::put(MessagingStateSnapshot {
1541			dmq_mqc_head: Default::default(),
1542			relay_dispatch_queue_remaining_capacity: Default::default(),
1543			ingress_channels: Default::default(),
1544			egress_channels: vec![(
1545				target_parachain,
1546				cumulus_primitives_core::AbridgedHrmpChannel {
1547					max_capacity: 10,
1548					max_total_size: 10_000_000_u32,
1549					max_message_size: 10_000_000_u32,
1550					msg_count: 5,
1551					total_size: 5_000_000_u32,
1552					mqc_head: None,
1553				},
1554			)],
1555		})
1556	}
1557
1558	/// Open HRMP channel for using it in benchmarks or tests.
1559	///
1560	/// The caller assumes that the pallet will accept regular outbound message to the sibling
1561	/// `target_parachain` after this call. No other assumptions are made.
1562	#[cfg(any(feature = "runtime-benchmarks", feature = "std"))]
1563	pub fn open_custom_outbound_hrmp_channel_for_benchmarks_or_tests(
1564		target_parachain: ParaId,
1565		channel: cumulus_primitives_core::AbridgedHrmpChannel,
1566	) {
1567		RelevantMessagingState::<T>::put(MessagingStateSnapshot {
1568			dmq_mqc_head: Default::default(),
1569			relay_dispatch_queue_remaining_capacity: Default::default(),
1570			ingress_channels: Default::default(),
1571			egress_channels: vec![(target_parachain, channel)],
1572		})
1573	}
1574
1575	/// Prepare/insert relevant data for `schedule_code_upgrade` for benchmarks.
1576	#[cfg(feature = "runtime-benchmarks")]
1577	pub fn initialize_for_set_code_benchmark(max_code_size: u32) {
1578		// insert dummy ValidationData
1579		let vfp = PersistedValidationData {
1580			parent_head: polkadot_parachain_primitives::primitives::HeadData(Default::default()),
1581			relay_parent_number: 1,
1582			relay_parent_storage_root: Default::default(),
1583			max_pov_size: 1_000,
1584		};
1585		<ValidationData<T>>::put(&vfp);
1586
1587		// insert dummy HostConfiguration with
1588		let host_config = AbridgedHostConfiguration {
1589			max_code_size,
1590			max_head_data_size: 32 * 1024,
1591			max_upward_queue_count: 8,
1592			max_upward_queue_size: 1024 * 1024,
1593			max_upward_message_size: 4 * 1024,
1594			max_upward_message_num_per_candidate: 2,
1595			hrmp_max_message_num_per_candidate: 2,
1596			validation_upgrade_cooldown: 2,
1597			validation_upgrade_delay: 2,
1598			async_backing_params: relay_chain::AsyncBackingParams {
1599				allowed_ancestry_len: 0,
1600				max_candidate_depth: 0,
1601			},
1602		};
1603		<HostConfiguration<T>>::put(host_config);
1604	}
1605}
1606
1607/// Type that implements `SetCode`.
1608pub struct ParachainSetCode<T>(core::marker::PhantomData<T>);
1609impl<T: Config> frame_system::SetCode<T> for ParachainSetCode<T> {
1610	fn set_code(code: Vec<u8>) -> DispatchResult {
1611		Pallet::<T>::schedule_code_upgrade(code)
1612	}
1613}
1614
1615impl<T: Config> Pallet<T> {
1616	/// Puts a message in the `PendingUpwardMessages` storage item.
1617	/// The message will be later sent in `on_finalize`.
1618	/// Checks host configuration to see if message is too big.
1619	/// Increases the delivery fee factor if the queue is sufficiently (see
1620	/// [`ump_constants::THRESHOLD_FACTOR`]) congested.
1621	pub fn send_upward_message(message: UpwardMessage) -> Result<(u32, XcmHash), MessageSendError> {
1622		let message_len = message.len();
1623		// Check if the message fits into the relay-chain constraints.
1624		//
1625		// Note, that we are using `host_configuration` here which may be from the previous
1626		// block, in case this is called from `on_initialize`, i.e. before the inherent with fresh
1627		// data is submitted.
1628		//
1629		// That shouldn't be a problem since this is a preliminary check and the actual check would
1630		// be performed just before submitting the message from the candidate, and it already can
1631		// happen that during the time the message is buffered for sending the relay-chain setting
1632		// may change so that the message is no longer valid.
1633		//
1634		// However, changing this setting is expected to be rare.
1635		if let Some(cfg) = HostConfiguration::<T>::get() {
1636			if message_len > cfg.max_upward_message_size as usize {
1637				return Err(MessageSendError::TooBig);
1638			}
1639			let threshold =
1640				cfg.max_upward_queue_size.saturating_div(ump_constants::THRESHOLD_FACTOR);
1641			// We check the threshold against total size and not number of messages since messages
1642			// could be big or small.
1643			<PendingUpwardMessages<T>>::append(message.clone());
1644			let pending_messages = PendingUpwardMessages::<T>::get();
1645			let total_size: usize = pending_messages.iter().map(UpwardMessage::len).sum();
1646			if total_size > threshold as usize {
1647				// We increase the fee factor by a factor based on the new message's size in KB
1648				Self::increase_fee_factor((), message_len as u128);
1649			}
1650		} else {
1651			// This storage field should carry over from the previous block. So if it's None
1652			// then it must be that this is an edge-case where a message is attempted to be
1653			// sent at the first block.
1654			//
1655			// Let's pass this message through. I think it's not unreasonable to expect that
1656			// the message is not huge and it comes through, but if it doesn't it can be
1657			// returned back to the sender.
1658			//
1659			// Thus fall through here.
1660			<PendingUpwardMessages<T>>::append(message.clone());
1661		};
1662
1663		// The relay ump does not use using_encoded
1664		// We apply the same this to use the same hash
1665		let hash = sp_io::hashing::blake2_256(&message);
1666		Self::deposit_event(Event::UpwardMessageSent { message_hash: Some(hash) });
1667		Ok((0, hash))
1668	}
1669
1670	/// Get the relay chain block number which was used as an anchor for the last block in this
1671	/// chain.
1672	pub fn last_relay_block_number() -> RelayChainBlockNumber {
1673		LastRelayChainBlockNumber::<T>::get()
1674	}
1675}
1676
1677impl<T: Config> UpwardMessageSender for Pallet<T> {
1678	fn send_upward_message(message: UpwardMessage) -> Result<(u32, XcmHash), MessageSendError> {
1679		Self::send_upward_message(message)
1680	}
1681
1682	fn can_send_upward_message(message: &UpwardMessage) -> Result<(), MessageSendError> {
1683		let max_upward_message_size = HostConfiguration::<T>::get()
1684			.map(|cfg| cfg.max_upward_message_size)
1685			.ok_or(MessageSendError::Other)?;
1686		if message.len() > max_upward_message_size as usize {
1687			Err(MessageSendError::TooBig)
1688		} else {
1689			Ok(())
1690		}
1691	}
1692
1693	#[cfg(any(feature = "std", feature = "runtime-benchmarks", test))]
1694	fn ensure_successful_delivery() {
1695		const MAX_UPWARD_MESSAGE_SIZE: u32 = 65_531 * 3;
1696		const MAX_CODE_SIZE: u32 = 3 * 1024 * 1024;
1697		HostConfiguration::<T>::mutate(|cfg| match cfg {
1698			Some(cfg) => cfg.max_upward_message_size = MAX_UPWARD_MESSAGE_SIZE,
1699			None =>
1700				*cfg = Some(AbridgedHostConfiguration {
1701					max_code_size: MAX_CODE_SIZE,
1702					max_head_data_size: 32 * 1024,
1703					max_upward_queue_count: 8,
1704					max_upward_queue_size: 1024 * 1024,
1705					max_upward_message_size: MAX_UPWARD_MESSAGE_SIZE,
1706					max_upward_message_num_per_candidate: 2,
1707					hrmp_max_message_num_per_candidate: 2,
1708					validation_upgrade_cooldown: 2,
1709					validation_upgrade_delay: 2,
1710					async_backing_params: relay_chain::AsyncBackingParams {
1711						allowed_ancestry_len: 0,
1712						max_candidate_depth: 0,
1713					},
1714				}),
1715		})
1716	}
1717}
1718
1719impl<T: Config> InspectMessageQueues for Pallet<T> {
1720	fn clear_messages() {
1721		PendingUpwardMessages::<T>::kill();
1722	}
1723
1724	fn get_messages() -> Vec<(VersionedLocation, Vec<VersionedXcm<()>>)> {
1725		use xcm::prelude::*;
1726
1727		let messages: Vec<VersionedXcm<()>> = PendingUpwardMessages::<T>::get()
1728			.iter()
1729			.map(|encoded_message| {
1730				VersionedXcm::<()>::decode_all_with_depth_limit(
1731					MAX_XCM_DECODE_DEPTH,
1732					&mut &encoded_message[..],
1733				)
1734				.unwrap()
1735			})
1736			.collect();
1737
1738		if messages.is_empty() {
1739			vec![]
1740		} else {
1741			vec![(VersionedLocation::from(Location::parent()), messages)]
1742		}
1743	}
1744}
1745
1746#[cfg(feature = "runtime-benchmarks")]
1747impl<T: Config> polkadot_runtime_parachains::EnsureForParachain for Pallet<T> {
1748	fn ensure(para_id: ParaId) {
1749		if let ChannelStatus::Closed = Self::get_channel_status(para_id) {
1750			Self::open_outbound_hrmp_channel_for_benchmarks_or_tests(para_id)
1751		}
1752	}
1753}
1754
1755/// Something that can check the inherents of a block.
1756#[deprecated(note = "This trait is deprecated and will be removed by September 2024. \
1757		Consider switching to `cumulus-pallet-parachain-system::ConsensusHook`")]
1758pub trait CheckInherents<Block: BlockT> {
1759	/// Check all inherents of the block.
1760	///
1761	/// This function gets passed all the extrinsics of the block, so it is up to the callee to
1762	/// identify the inherents. The `validation_data` can be used to access the
1763	fn check_inherents(
1764		block: &Block,
1765		validation_data: &RelayChainStateProof,
1766	) -> frame_support::inherent::CheckInherentsResult;
1767}
1768
1769/// Struct that always returns `Ok` on inherents check, needed for backwards-compatibility.
1770#[doc(hidden)]
1771pub struct DummyCheckInherents<Block>(core::marker::PhantomData<Block>);
1772
1773#[allow(deprecated)]
1774impl<Block: BlockT> CheckInherents<Block> for DummyCheckInherents<Block> {
1775	fn check_inherents(
1776		_: &Block,
1777		_: &RelayChainStateProof,
1778	) -> frame_support::inherent::CheckInherentsResult {
1779		sp_inherents::CheckInherentsResult::new()
1780	}
1781}
1782
1783/// Something that should be informed about system related events.
1784///
1785/// This includes events like [`on_validation_data`](Self::on_validation_data) that is being
1786/// called when the parachain inherent is executed that contains the validation data.
1787/// Or like [`on_validation_code_applied`](Self::on_validation_code_applied) that is called
1788/// when the new validation is written to the state. This means that
1789/// from the next block the runtime is being using this new code.
1790#[impl_trait_for_tuples::impl_for_tuples(30)]
1791pub trait OnSystemEvent {
1792	/// Called in each blocks once when the validation data is set by the inherent.
1793	fn on_validation_data(data: &PersistedValidationData);
1794	/// Called when the validation code is being applied, aka from the next block on this is the new
1795	/// runtime.
1796	fn on_validation_code_applied();
1797}
1798
1799/// Holds the most recent relay-parent state root and block number of the current parachain block.
1800#[derive(PartialEq, Eq, Clone, Encode, Decode, TypeInfo, Default, RuntimeDebug)]
1801pub struct RelayChainState {
1802	/// Current relay chain height.
1803	pub number: relay_chain::BlockNumber,
1804	/// State root for current relay chain height.
1805	pub state_root: relay_chain::Hash,
1806}
1807
1808/// This exposes the [`RelayChainState`] to other runtime modules.
1809///
1810/// Enables parachains to read relay chain state via state proofs.
1811pub trait RelaychainStateProvider {
1812	/// May be called by any runtime module to obtain the current state of the relay chain.
1813	///
1814	/// **NOTE**: This is not guaranteed to return monotonically increasing relay parents.
1815	fn current_relay_chain_state() -> RelayChainState;
1816
1817	/// Utility function only to be used in benchmarking scenarios, to be implemented optionally,
1818	/// else a noop.
1819	///
1820	/// It allows for setting a custom RelayChainState.
1821	#[cfg(feature = "runtime-benchmarks")]
1822	fn set_current_relay_chain_state(_state: RelayChainState) {}
1823}
1824
1825/// Implements [`BlockNumberProvider`] that returns relay chain block number fetched from validation
1826/// data.
1827///
1828/// When validation data is not available (e.g. within `on_initialize`), it will fallback to use
1829/// [`Pallet::last_relay_block_number()`].
1830///
1831/// **NOTE**: This has been deprecated, please use [`RelaychainDataProvider`]
1832#[deprecated = "Use `RelaychainDataProvider` instead"]
1833pub type RelaychainBlockNumberProvider<T> = RelaychainDataProvider<T>;
1834
1835/// Implements [`BlockNumberProvider`] and [`RelaychainStateProvider`] that returns relevant relay
1836/// data fetched from validation data.
1837///
1838/// NOTE: When validation data is not available (e.g. within `on_initialize`):
1839///
1840/// - [`current_relay_chain_state`](Self::current_relay_chain_state): Will return the default value
1841///   of [`RelayChainState`].
1842/// - [`current_block_number`](Self::current_block_number): Will return
1843///   [`Pallet::last_relay_block_number()`].
1844pub struct RelaychainDataProvider<T>(core::marker::PhantomData<T>);
1845
1846impl<T: Config> BlockNumberProvider for RelaychainDataProvider<T> {
1847	type BlockNumber = relay_chain::BlockNumber;
1848
1849	fn current_block_number() -> relay_chain::BlockNumber {
1850		ValidationData::<T>::get()
1851			.map(|d| d.relay_parent_number)
1852			.unwrap_or_else(|| Pallet::<T>::last_relay_block_number())
1853	}
1854
1855	#[cfg(any(feature = "std", feature = "runtime-benchmarks", test))]
1856	fn set_block_number(block: Self::BlockNumber) {
1857		let mut validation_data = ValidationData::<T>::get().unwrap_or_else(||
1858			// PersistedValidationData does not impl default in non-std
1859			PersistedValidationData {
1860				parent_head: vec![].into(),
1861				relay_parent_number: Default::default(),
1862				max_pov_size: Default::default(),
1863				relay_parent_storage_root: Default::default(),
1864			});
1865		validation_data.relay_parent_number = block;
1866		ValidationData::<T>::put(validation_data)
1867	}
1868}
1869
1870impl<T: Config> RelaychainStateProvider for RelaychainDataProvider<T> {
1871	fn current_relay_chain_state() -> RelayChainState {
1872		ValidationData::<T>::get()
1873			.map(|d| RelayChainState {
1874				number: d.relay_parent_number,
1875				state_root: d.relay_parent_storage_root,
1876			})
1877			.unwrap_or_default()
1878	}
1879
1880	#[cfg(feature = "runtime-benchmarks")]
1881	fn set_current_relay_chain_state(state: RelayChainState) {
1882		let mut validation_data = ValidationData::<T>::get().unwrap_or_else(||
1883			// PersistedValidationData does not impl default in non-std
1884			PersistedValidationData {
1885				parent_head: vec![].into(),
1886				relay_parent_number: Default::default(),
1887				max_pov_size: Default::default(),
1888				relay_parent_storage_root: Default::default(),
1889			});
1890		validation_data.relay_parent_number = state.number;
1891		validation_data.relay_parent_storage_root = state.state_root;
1892		ValidationData::<T>::put(validation_data)
1893	}
1894}