referrerpolicy=no-referrer-when-downgrade

cumulus_pallet_parachain_system/
lib.rs

1// Copyright (C) Parity Technologies (UK) Ltd.
2// This file is part of Cumulus.
3// SPDX-License-Identifier: Apache-2.0
4
5// Licensed under the Apache License, Version 2.0 (the "License");
6// you may not use this file except in compliance with the License.
7// You may obtain a copy of the License at
8//
9// 	http://www.apache.org/licenses/LICENSE-2.0
10//
11// Unless required by applicable law or agreed to in writing, software
12// distributed under the License is distributed on an "AS IS" BASIS,
13// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14// See the License for the specific language governing permissions and
15// limitations under the License.
16
17#![cfg_attr(not(feature = "std"), no_std)]
18
19//! `cumulus-pallet-parachain-system` is a base pallet for Cumulus-based parachains.
20//!
21//! This pallet handles low-level details of being a parachain. Its responsibilities include:
22//!
23//! - ingestion of the parachain validation data;
24//! - ingestion and dispatch of incoming downward and lateral messages;
25//! - coordinating upgrades with the Relay Chain; and
26//! - communication of parachain outputs, such as sent messages, signaling an upgrade, etc.
27//!
28//! Users must ensure that they register this pallet as an inherent provider.
29
30extern crate alloc;
31
32use alloc::{collections::btree_map::BTreeMap, vec, vec::Vec};
33use codec::{Decode, DecodeLimit, Encode};
34use core::cmp;
35use cumulus_primitives_core::{
36	relay_chain::{self, UMPSignal, UMP_SEPARATOR},
37	AbridgedHostConfiguration, ChannelInfo, ChannelStatus, CollationInfo, CumulusDigestItem,
38	GetChannelInfo, ListChannelInfos, MessageSendError, OutboundHrmpMessage, ParaId,
39	PersistedValidationData, UpwardMessage, UpwardMessageSender, XcmpMessageHandler,
40	XcmpMessageSource,
41};
42use cumulus_primitives_parachain_inherent::{v0, MessageQueueChain, ParachainInherentData};
43use frame_support::{
44	dispatch::{DispatchClass, DispatchResult},
45	ensure,
46	inherent::{InherentData, InherentIdentifier, ProvideInherent},
47	traits::{Get, HandleMessage},
48	weights::Weight,
49};
50use frame_system::{ensure_none, ensure_root, pallet_prelude::HeaderFor};
51use parachain_inherent::{
52	deconstruct_parachain_inherent_data, AbridgedInboundDownwardMessages,
53	AbridgedInboundHrmpMessages, BasicParachainInherentData, InboundMessageId, InboundMessagesData,
54};
55use polkadot_parachain_primitives::primitives::RelayChainBlockNumber;
56use polkadot_runtime_parachains::{FeeTracker, GetMinFeeFactor};
57use scale_info::TypeInfo;
58use sp_runtime::{
59	traits::{BlockNumberProvider, Hash},
60	Debug, FixedU128, SaturatedConversion,
61};
62use xcm::{latest::XcmHash, VersionedLocation, VersionedXcm, MAX_XCM_DECODE_DEPTH};
63use xcm_builder::InspectMessageQueues;
64
65mod benchmarking;
66pub mod migration;
67mod mock;
68#[cfg(test)]
69mod tests;
70pub mod weights;
71
72pub use weights::WeightInfo;
73
74mod unincluded_segment;
75
76pub mod consensus_hook;
77pub mod relay_state_snapshot;
78#[macro_use]
79pub mod validate_block;
80mod descendant_validation;
81pub mod parachain_inherent;
82
83use unincluded_segment::{
84	HrmpChannelUpdate, HrmpWatermarkUpdate, OutboundBandwidthLimits, SegmentTracker,
85};
86
87pub use consensus_hook::{ConsensusHook, ExpectParentIncluded};
88/// Register the `validate_block` function that is used by parachains to validate blocks on a
89/// validator.
90///
91/// Does *nothing* when `std` feature is enabled.
92///
93/// Expects as parameters the runtime, a block executor and an inherent checker.
94///
95/// # Example
96///
97/// ```
98///     struct BlockExecutor;
99///     struct Runtime;
100///
101///     cumulus_pallet_parachain_system::register_validate_block! {
102///         Runtime = Runtime,
103///         BlockExecutor = Executive,
104///     }
105///
106/// # fn main() {}
107/// ```
108pub use cumulus_pallet_parachain_system_proc_macro::register_validate_block;
109pub use relay_state_snapshot::{MessagingStateSnapshot, RelayChainStateProof};
110pub use unincluded_segment::{Ancestor, UsedBandwidth};
111
112pub use pallet::*;
113
114const LOG_TARGET: &str = "parachain-system";
115
116/// Something that can check the associated relay block number.
117///
118/// Each Parachain block is built in the context of a relay chain block, this trait allows us
119/// to validate the given relay chain block number. With async backing it is legal to build
120/// multiple Parachain blocks per relay chain parent. With this trait it is possible for the
121/// Parachain to ensure that still only one Parachain block is build per relay chain parent.
122///
123/// By default [`RelayNumberStrictlyIncreases`] and [`AnyRelayNumber`] are provided.
124pub trait CheckAssociatedRelayNumber {
125	/// Check the current relay number versus the previous relay number.
126	///
127	/// The implementation should panic when there is something wrong.
128	fn check_associated_relay_number(
129		current: RelayChainBlockNumber,
130		previous: RelayChainBlockNumber,
131	);
132}
133
134/// Provides an implementation of [`CheckAssociatedRelayNumber`].
135///
136/// It will ensure that the associated relay block number strictly increases between Parachain
137/// blocks. This should be used by production Parachains when in doubt.
138pub struct RelayNumberStrictlyIncreases;
139
140impl CheckAssociatedRelayNumber for RelayNumberStrictlyIncreases {
141	fn check_associated_relay_number(
142		current: RelayChainBlockNumber,
143		previous: RelayChainBlockNumber,
144	) {
145		if current <= previous {
146			panic!("Relay chain block number needs to strictly increase between Parachain blocks!")
147		}
148	}
149}
150
151/// Provides an implementation of [`CheckAssociatedRelayNumber`].
152///
153/// This will accept any relay chain block number combination. This is mainly useful for
154/// test parachains.
155pub struct AnyRelayNumber;
156
157impl CheckAssociatedRelayNumber for AnyRelayNumber {
158	fn check_associated_relay_number(_: RelayChainBlockNumber, _: RelayChainBlockNumber) {}
159}
160
161/// Provides an implementation of [`CheckAssociatedRelayNumber`].
162///
163/// It will ensure that the associated relay block number monotonically increases between Parachain
164/// blocks. This should be used when asynchronous backing is enabled.
165pub struct RelayNumberMonotonicallyIncreases;
166
167impl CheckAssociatedRelayNumber for RelayNumberMonotonicallyIncreases {
168	fn check_associated_relay_number(
169		current: RelayChainBlockNumber,
170		previous: RelayChainBlockNumber,
171	) {
172		if current < previous {
173			panic!("Relay chain block number needs to monotonically increase between Parachain blocks!")
174		}
175	}
176}
177
178/// The max length of a DMP message.
179pub type MaxDmpMessageLenOf<T> = <<T as Config>::DmpQueue as HandleMessage>::MaxMessageLen;
180
181pub mod ump_constants {
182	/// `host_config.max_upward_queue_size / THRESHOLD_FACTOR` is the threshold after which delivery
183	/// starts getting exponentially more expensive.
184	/// `2` means the price starts to increase when queue is half full.
185	pub const THRESHOLD_FACTOR: u32 = 2;
186}
187
188#[frame_support::pallet]
189pub mod pallet {
190	use super::*;
191	use cumulus_primitives_core::CoreInfoExistsAtMaxOnce;
192	use frame_support::pallet_prelude::*;
193	use frame_system::pallet_prelude::*;
194
195	#[pallet::pallet]
196	#[pallet::storage_version(migration::STORAGE_VERSION)]
197	#[pallet::without_storage_info]
198	pub struct Pallet<T>(_);
199
200	#[pallet::config]
201	pub trait Config: frame_system::Config<OnSetCode = ParachainSetCode<Self>> {
202		/// The overarching event type.
203		#[allow(deprecated)]
204		type RuntimeEvent: From<Event<Self>> + IsType<<Self as frame_system::Config>::RuntimeEvent>;
205
206		/// Something which can be notified when the validation data is set.
207		type OnSystemEvent: OnSystemEvent;
208
209		/// Returns the parachain ID we are running with.
210		#[pallet::constant]
211		type SelfParaId: Get<ParaId>;
212
213		/// The place where outbound XCMP messages come from. This is queried in `finalize_block`.
214		type OutboundXcmpMessageSource: XcmpMessageSource;
215
216		/// Queues inbound downward messages for delayed processing.
217		///
218		/// All inbound DMP messages from the relay are pushed into this. The handler is expected to
219		/// eventually process all the messages that are pushed to it.
220		type DmpQueue: HandleMessage;
221
222		/// The weight we reserve at the beginning of the block for processing DMP messages.
223		type ReservedDmpWeight: Get<Weight>;
224
225		/// The message handler that will be invoked when messages are received via XCMP.
226		///
227		/// This should normally link to the XCMP Queue pallet.
228		type XcmpMessageHandler: XcmpMessageHandler;
229
230		/// The weight we reserve at the beginning of the block for processing XCMP messages.
231		type ReservedXcmpWeight: Get<Weight>;
232
233		/// Something that can check the associated relay parent block number.
234		type CheckAssociatedRelayNumber: CheckAssociatedRelayNumber;
235
236		/// Weight info for functions and calls.
237		type WeightInfo: WeightInfo;
238
239		/// An entry-point for higher-level logic to manage the backlog of unincluded parachain
240		/// blocks and authorship rights for those blocks.
241		///
242		/// Typically, this should be a hook tailored to the collator-selection/consensus mechanism
243		/// that is used for this chain.
244		///
245		/// However, to maintain the same behavior as prior to asynchronous backing, provide the
246		/// [`consensus_hook::ExpectParentIncluded`] here. This is only necessary in the case
247		/// that collators aren't expected to have node versions that supply the included block
248		/// in the relay-chain state proof.
249		type ConsensusHook: ConsensusHook;
250
251		/// The offset between the tip of the relay chain and the parent relay block used as parent
252		/// when authoring a parachain block.
253		///
254		/// This setting directly impacts the number of descendant headers that are expected in the
255		/// `set_validation_data` inherent.
256		///
257		/// For any setting `N` larger than zero, the inherent expects that the inherent includes
258		/// the relay parent plus `N` descendants. These headers are required to validate that new
259		/// parachain blocks are authored at the correct offset.
260		///
261		/// While this helps to reduce forks on the parachain side, it increases the delay for
262		/// processing XCM messages. So, the value should be chosen wisely.
263		///
264		/// If set to 0, this config has no impact.
265		type RelayParentOffset: Get<u32>;
266	}
267
268	#[pallet::hooks]
269	impl<T: Config> Hooks<BlockNumberFor<T>> for Pallet<T> {
270		/// Handles actually sending upward messages by moving them from `PendingUpwardMessages` to
271		/// `UpwardMessages`. Decreases the delivery fee factor if after sending messages, the queue
272		/// total size is less than the threshold (see [`ump_constants::THRESHOLD_FACTOR`]).
273		/// Also does the sending for HRMP messages it takes from `OutboundXcmpMessageSource`.
274		fn on_finalize(_: BlockNumberFor<T>) {
275			<DidSetValidationCode<T>>::kill();
276			<UpgradeRestrictionSignal<T>>::kill();
277			let relay_upgrade_go_ahead = <UpgradeGoAhead<T>>::take();
278
279			let vfp = <ValidationData<T>>::get().expect(
280				r"Missing required set_validation_data inherent. This inherent must be
281				present in every block. This error typically occurs when the set_validation_data
282				execution failed and was rejected by the block builder. Check earlier log entries
283				for the specific cause of the failure.",
284			);
285
286			LastRelayChainBlockNumber::<T>::put(vfp.relay_parent_number);
287
288			let host_config = match HostConfiguration::<T>::get() {
289				Some(ok) => ok,
290				None => {
291					debug_assert!(
292						false,
293						"host configuration is promised to set until `on_finalize`; qed",
294					);
295					return
296				},
297			};
298
299			// Before updating the relevant messaging state, we need to extract
300			// the total bandwidth limits for the purpose of updating the unincluded
301			// segment.
302			let total_bandwidth_out = match RelevantMessagingState::<T>::get() {
303				Some(s) => OutboundBandwidthLimits::from_relay_chain_state(&s),
304				None => {
305					debug_assert!(
306						false,
307						"relevant messaging state is promised to be set until `on_finalize`; \
308							qed",
309					);
310					return
311				},
312			};
313
314			// After this point, the `RelevantMessagingState` in storage reflects the
315			// unincluded segment.
316			Self::adjust_egress_bandwidth_limits();
317
318			let (ump_msg_count, ump_total_bytes) = <PendingUpwardMessages<T>>::mutate(|up| {
319				let (available_capacity, available_size) = match RelevantMessagingState::<T>::get()
320				{
321					Some(limits) => (
322						limits.relay_dispatch_queue_remaining_capacity.remaining_count,
323						limits.relay_dispatch_queue_remaining_capacity.remaining_size,
324					),
325					None => {
326						debug_assert!(
327							false,
328							"relevant messaging state is promised to be set until `on_finalize`; \
329								qed",
330						);
331						return (0, 0)
332					},
333				};
334
335				let available_capacity =
336					cmp::min(available_capacity, host_config.max_upward_message_num_per_candidate);
337
338				// Count the number of messages we can possibly fit in the given constraints, i.e.
339				// available_capacity and available_size.
340				let (num, total_size) = up
341					.iter()
342					.scan((0u32, 0u32), |state, msg| {
343						let (cap_used, size_used) = *state;
344						let new_cap = cap_used.saturating_add(1);
345						let new_size = size_used.saturating_add(msg.len() as u32);
346						match available_capacity
347							.checked_sub(new_cap)
348							.and(available_size.checked_sub(new_size))
349						{
350							Some(_) => {
351								*state = (new_cap, new_size);
352								Some(*state)
353							},
354							_ => None,
355						}
356					})
357					.last()
358					.unwrap_or_default();
359
360				// TODO: #274 Return back messages that do not longer fit into the queue.
361
362				UpwardMessages::<T>::put(&up[..num as usize]);
363				*up = up.split_off(num as usize);
364
365				if let Some(core_info) =
366					CumulusDigestItem::find_core_info(&frame_system::Pallet::<T>::digest())
367				{
368					PendingUpwardSignals::<T>::append(
369						UMPSignal::SelectCore(core_info.selector, core_info.claim_queue_offset)
370							.encode(),
371					);
372				}
373
374				// Send the pending UMP signals.
375				Self::send_ump_signals();
376
377				// If the total size of the pending messages is less than the threshold,
378				// we decrease the fee factor, since the queue is less congested.
379				// This makes delivery of new messages cheaper.
380				let threshold = host_config
381					.max_upward_queue_size
382					.saturating_div(ump_constants::THRESHOLD_FACTOR);
383				let remaining_total_size: usize = up.iter().map(UpwardMessage::len).sum();
384				if remaining_total_size <= threshold as usize {
385					Self::decrease_fee_factor(());
386				}
387
388				(num, total_size)
389			});
390
391			// Sending HRMP messages is a little bit more involved. There are the following
392			// constraints:
393			//
394			// - a channel should exist (and it can be closed while a message is buffered),
395			// - at most one message can be sent in a channel,
396			// - the sent out messages should be ordered by ascension of recipient para id.
397			// - the capacity and total size of the channel is limited,
398			// - the maximum size of a message is limited (and can potentially be changed),
399
400			let maximum_channels = host_config
401				.hrmp_max_message_num_per_candidate
402				.min(<AnnouncedHrmpMessagesPerCandidate<T>>::take())
403				as usize;
404
405			// Note: this internally calls the `GetChannelInfo` implementation for this
406			// pallet, which draws on the `RelevantMessagingState`. That in turn has
407			// been adjusted above to reflect the correct limits in all channels.
408			let outbound_messages =
409				T::OutboundXcmpMessageSource::take_outbound_messages(maximum_channels)
410					.into_iter()
411					.map(|(recipient, data)| OutboundHrmpMessage { recipient, data })
412					.collect::<Vec<_>>();
413
414			// Update the unincluded segment length; capacity checks were done previously in
415			// `set_validation_data`, so this can be done unconditionally.
416			{
417				let hrmp_outgoing = outbound_messages
418					.iter()
419					.map(|msg| {
420						(
421							msg.recipient,
422							HrmpChannelUpdate { msg_count: 1, total_bytes: msg.data.len() as u32 },
423						)
424					})
425					.collect();
426				let used_bandwidth =
427					UsedBandwidth { ump_msg_count, ump_total_bytes, hrmp_outgoing };
428
429				let mut aggregated_segment =
430					AggregatedUnincludedSegment::<T>::get().unwrap_or_default();
431				let consumed_go_ahead_signal =
432					if aggregated_segment.consumed_go_ahead_signal().is_some() {
433						// Some ancestor within the segment already processed this signal --
434						// validated during inherent creation.
435						None
436					} else {
437						relay_upgrade_go_ahead
438					};
439				// The bandwidth constructed was ensured to satisfy relay chain constraints.
440				let ancestor = Ancestor::new_unchecked(used_bandwidth, consumed_go_ahead_signal);
441
442				let watermark = HrmpWatermark::<T>::get();
443				let watermark_update = HrmpWatermarkUpdate::new(watermark, vfp.relay_parent_number);
444
445				aggregated_segment
446					.append(&ancestor, watermark_update, &total_bandwidth_out)
447					.expect("unincluded segment limits exceeded");
448				AggregatedUnincludedSegment::<T>::put(aggregated_segment);
449				// Check in `on_initialize` guarantees there's space for this block.
450				UnincludedSegment::<T>::append(ancestor);
451			}
452			HrmpOutboundMessages::<T>::put(outbound_messages);
453		}
454
455		fn on_initialize(_n: BlockNumberFor<T>) -> Weight {
456			let mut weight = Weight::zero();
457
458			// To prevent removing `NewValidationCode` that was set by another `on_initialize`
459			// like for example from scheduler, we only kill the storage entry if it was not yet
460			// updated in the current block.
461			if !<DidSetValidationCode<T>>::get() {
462				// NOTE: Killing here is required to at least include the trie nodes down to the key
463				// in the proof. Because this value will be read in `validate_block` and thus,
464				// needs to be reachable by the proof.
465				NewValidationCode::<T>::kill();
466				weight += T::DbWeight::get().writes(1);
467			}
468
469			// The parent hash was unknown during block finalization. Update it here.
470			{
471				<UnincludedSegment<T>>::mutate(|chain| {
472					if let Some(ancestor) = chain.last_mut() {
473						let parent = frame_system::Pallet::<T>::parent_hash();
474						// Ancestor is the latest finalized block, thus current parent is
475						// its output head.
476						ancestor.replace_para_head_hash(parent);
477					}
478				});
479				weight += T::DbWeight::get().reads_writes(1, 1);
480
481				// Weight used during finalization.
482				weight += T::DbWeight::get().reads_writes(3, 2);
483			}
484
485			// Remove the validation from the old block.
486			ValidationData::<T>::kill();
487			// NOTE: Killing here is required to at least include the trie nodes down to the keys
488			// in the proof. Because these values will be read in `validate_block` and thus,
489			// need to be reachable by the proof.
490			ProcessedDownwardMessages::<T>::kill();
491			UpwardMessages::<T>::kill();
492			HrmpOutboundMessages::<T>::kill();
493			CustomValidationHeadData::<T>::kill();
494			// The same as above. Reading here to make sure that the key is included in the proof.
495			HrmpWatermark::<T>::get();
496			weight += T::DbWeight::get().reads_writes(1, 5);
497
498			// Here, in `on_initialize` we must report the weight for both `on_initialize` and
499			// `on_finalize`.
500			//
501			// One complication here, is that the `host_configuration` is updated by an inherent
502			// and those are processed after the block initialization phase. Therefore, we have to
503			// be content only with the configuration as per the previous block. That means that
504			// the configuration can be either stale (or be absent altogether in case of the
505			// beginning of the chain).
506			//
507			// In order to mitigate this, we do the following. At the time, we are only concerned
508			// about `hrmp_max_message_num_per_candidate`. We reserve the amount of weight to
509			// process the number of HRMP messages according to the potentially stale
510			// configuration. In `on_finalize` we will process only the maximum between the
511			// announced number of messages and the actual received in the fresh configuration.
512			//
513			// In the common case, they will be the same. In the case the actual value is smaller
514			// than the announced, we would waste some of weight. In the case the actual value is
515			// greater than the announced, we will miss opportunity to send a couple of messages.
516			weight += T::DbWeight::get().reads_writes(1, 1);
517			let hrmp_max_message_num_per_candidate = HostConfiguration::<T>::get()
518				.map(|cfg| cfg.hrmp_max_message_num_per_candidate)
519				.unwrap_or(0);
520			<AnnouncedHrmpMessagesPerCandidate<T>>::put(hrmp_max_message_num_per_candidate);
521
522			// NOTE that the actual weight consumed by `on_finalize` may turn out lower.
523			weight += T::DbWeight::get().reads_writes(
524				3 + hrmp_max_message_num_per_candidate as u64,
525				4 + hrmp_max_message_num_per_candidate as u64,
526			);
527
528			// Weight for updating the last relay chain block number in `on_finalize`.
529			weight += T::DbWeight::get().reads_writes(1, 1);
530
531			// Weight for adjusting the unincluded segment in `on_finalize`.
532			weight += T::DbWeight::get().reads_writes(6, 3);
533
534			// Always try to read `UpgradeGoAhead` in `on_finalize`.
535			weight += T::DbWeight::get().reads(1);
536
537			// We need to ensure that `CoreInfo` digest exists only once.
538			match CumulusDigestItem::core_info_exists_at_max_once(
539				&frame_system::Pallet::<T>::digest(),
540			) {
541				CoreInfoExistsAtMaxOnce::Once(core_info) => {
542					assert_eq!(
543						core_info.claim_queue_offset.0,
544						T::RelayParentOffset::get() as u8,
545						"Only {} is supported as valid claim queue offset",
546						T::RelayParentOffset::get()
547					);
548				},
549				CoreInfoExistsAtMaxOnce::NotFound => {},
550				CoreInfoExistsAtMaxOnce::MoreThanOnce => {
551					panic!("`CumulusDigestItem::CoreInfo` must exist at max once.");
552				},
553			}
554
555			weight
556		}
557	}
558
559	#[pallet::call]
560	impl<T: Config> Pallet<T> {
561		/// Set the current validation data.
562		///
563		/// This should be invoked exactly once per block. It will panic at the finalization
564		/// phase if the call was not invoked.
565		///
566		/// The dispatch origin for this call must be `Inherent`
567		///
568		/// As a side effect, this function upgrades the current validation function
569		/// if the appropriate time has come.
570		#[pallet::call_index(0)]
571		#[pallet::weight((0, DispatchClass::Mandatory))]
572		// TODO: This weight should be corrected. Currently the weight is registered manually in the
573		// call with `register_extra_weight_unchecked`.
574		pub fn set_validation_data(
575			origin: OriginFor<T>,
576			data: BasicParachainInherentData,
577			inbound_messages_data: InboundMessagesData,
578		) -> DispatchResult {
579			ensure_none(origin)?;
580			assert!(
581				!<ValidationData<T>>::exists(),
582				"ValidationData must be updated only once in a block",
583			);
584
585			// TODO: This is more than zero, but will need benchmarking to figure out what.
586			let mut total_weight = Weight::zero();
587
588			// NOTE: the inherent data is expected to be unique, even if this block is build
589			// in the context of the same relay parent as the previous one. In particular,
590			// the inherent shouldn't contain messages that were already processed by any of the
591			// ancestors.
592			//
593			// This invariant should be upheld by the `ProvideInherent` implementation.
594			let BasicParachainInherentData {
595				validation_data: vfp,
596				relay_chain_state,
597				relay_parent_descendants,
598				collator_peer_id,
599			} = data;
600
601			// Check that the associated relay chain block number is as expected.
602			T::CheckAssociatedRelayNumber::check_associated_relay_number(
603				vfp.relay_parent_number,
604				LastRelayChainBlockNumber::<T>::get(),
605			);
606
607			let relay_state_proof = RelayChainStateProof::new(
608				T::SelfParaId::get(),
609				vfp.relay_parent_storage_root,
610				relay_chain_state.clone(),
611			)
612			.expect("Invalid relay chain state proof");
613
614			let expected_rp_descendants_num = T::RelayParentOffset::get();
615
616			if expected_rp_descendants_num > 0 {
617				if let Err(err) = descendant_validation::verify_relay_parent_descendants(
618					&relay_state_proof,
619					relay_parent_descendants,
620					vfp.relay_parent_storage_root,
621					expected_rp_descendants_num,
622				) {
623					panic!(
624						"Unable to verify provided relay parent descendants. \
625						expected_rp_descendants_num: {expected_rp_descendants_num} \
626						error: {err:?}"
627					);
628				};
629			}
630
631			// Update the desired maximum capacity according to the consensus hook.
632			let (consensus_hook_weight, capacity) =
633				T::ConsensusHook::on_state_proof(&relay_state_proof);
634			total_weight += consensus_hook_weight;
635			total_weight += Self::maybe_drop_included_ancestors(&relay_state_proof, capacity);
636			// Deposit a log indicating the relay-parent storage root.
637			// TODO: remove this in favor of the relay-parent's hash after
638			// https://github.com/paritytech/cumulus/issues/303
639			frame_system::Pallet::<T>::deposit_log(
640				cumulus_primitives_core::rpsr_digest::relay_parent_storage_root_item(
641					vfp.relay_parent_storage_root,
642					vfp.relay_parent_number,
643				),
644			);
645
646			// Initialization logic: we know that this runs exactly once every block,
647			// which means we can put the initialization logic here to remove the
648			// sequencing problem.
649			let upgrade_go_ahead_signal = relay_state_proof
650				.read_upgrade_go_ahead_signal()
651				.expect("Invalid upgrade go ahead signal");
652
653			let upgrade_signal_in_segment = AggregatedUnincludedSegment::<T>::get()
654				.as_ref()
655				.and_then(SegmentTracker::consumed_go_ahead_signal);
656			if let Some(signal_in_segment) = upgrade_signal_in_segment.as_ref() {
657				// Unincluded ancestor consuming upgrade signal is still within the segment,
658				// sanity check that it matches with the signal from relay chain.
659				assert_eq!(upgrade_go_ahead_signal, Some(*signal_in_segment));
660			}
661			match upgrade_go_ahead_signal {
662				Some(_signal) if upgrade_signal_in_segment.is_some() => {
663					// Do nothing, processing logic was executed by unincluded ancestor.
664				},
665				Some(relay_chain::UpgradeGoAhead::GoAhead) => {
666					assert!(
667						<PendingValidationCode<T>>::exists(),
668						"No new validation function found in storage, GoAhead signal is not expected",
669					);
670					let validation_code = <PendingValidationCode<T>>::take();
671
672					frame_system::Pallet::<T>::update_code_in_storage(&validation_code);
673					<T::OnSystemEvent as OnSystemEvent>::on_validation_code_applied();
674					Self::deposit_event(Event::ValidationFunctionApplied {
675						relay_chain_block_num: vfp.relay_parent_number,
676					});
677				},
678				Some(relay_chain::UpgradeGoAhead::Abort) => {
679					<PendingValidationCode<T>>::kill();
680					Self::deposit_event(Event::ValidationFunctionDiscarded);
681				},
682				None => {},
683			}
684			<UpgradeRestrictionSignal<T>>::put(
685				relay_state_proof
686					.read_upgrade_restriction_signal()
687					.expect("Invalid upgrade restriction signal"),
688			);
689			<UpgradeGoAhead<T>>::put(upgrade_go_ahead_signal);
690
691			let host_config = relay_state_proof
692				.read_abridged_host_configuration()
693				.expect("Invalid host configuration in relay chain state proof");
694
695			let relevant_messaging_state = relay_state_proof
696				.read_messaging_state_snapshot(&host_config)
697				.expect("Invalid messaging state in relay chain state proof");
698
699			<ValidationData<T>>::put(&vfp);
700			<RelayStateProof<T>>::put(relay_chain_state);
701			<RelevantMessagingState<T>>::put(relevant_messaging_state.clone());
702			<HostConfiguration<T>>::put(host_config);
703
704			<T::OnSystemEvent as OnSystemEvent>::on_validation_data(&vfp);
705
706			if let Some(collator_peer_id) = collator_peer_id {
707				PendingUpwardSignals::<T>::append(
708					UMPSignal::ApprovedPeer(collator_peer_id).encode(),
709				);
710			}
711
712			total_weight.saturating_accrue(Self::enqueue_inbound_downward_messages(
713				relevant_messaging_state.dmq_mqc_head,
714				inbound_messages_data.downward_messages,
715			));
716			total_weight.saturating_accrue(Self::enqueue_inbound_horizontal_messages(
717				&relevant_messaging_state.ingress_channels,
718				inbound_messages_data.horizontal_messages,
719				vfp.relay_parent_number,
720			));
721
722			frame_system::Pallet::<T>::register_extra_weight_unchecked(
723				total_weight,
724				DispatchClass::Mandatory,
725			);
726
727			Ok(())
728		}
729
730		#[pallet::call_index(1)]
731		#[pallet::weight((1_000, DispatchClass::Operational))]
732		pub fn sudo_send_upward_message(
733			origin: OriginFor<T>,
734			message: UpwardMessage,
735		) -> DispatchResult {
736			ensure_root(origin)?;
737			let _ = Self::send_upward_message(message);
738			Ok(())
739		}
740
741		// WARNING: call indices 2 and 3 were used in a former version of this pallet. Using them
742		// again will require to bump the transaction version of runtimes using this pallet.
743	}
744
745	#[pallet::event]
746	#[pallet::generate_deposit(pub(super) fn deposit_event)]
747	pub enum Event<T: Config> {
748		/// The validation function has been scheduled to apply.
749		ValidationFunctionStored,
750		/// The validation function was applied as of the contained relay chain block number.
751		ValidationFunctionApplied { relay_chain_block_num: RelayChainBlockNumber },
752		/// The relay-chain aborted the upgrade process.
753		ValidationFunctionDiscarded,
754		/// Some downward messages have been received and will be processed.
755		DownwardMessagesReceived { count: u32 },
756		/// Downward messages were processed using the given weight.
757		DownwardMessagesProcessed { weight_used: Weight, dmq_head: relay_chain::Hash },
758		/// An upward message was sent to the relay chain.
759		UpwardMessageSent { message_hash: Option<XcmHash> },
760	}
761
762	#[pallet::error]
763	pub enum Error<T> {
764		/// Attempt to upgrade validation function while existing upgrade pending.
765		OverlappingUpgrades,
766		/// Polkadot currently prohibits this parachain from upgrading its validation function.
767		ProhibitedByPolkadot,
768		/// The supplied validation function has compiled into a blob larger than Polkadot is
769		/// willing to run.
770		TooBig,
771		/// The inherent which supplies the validation data did not run this block.
772		ValidationDataNotAvailable,
773		/// The inherent which supplies the host configuration did not run this block.
774		HostConfigurationNotAvailable,
775		/// No validation function upgrade is currently scheduled.
776		NotScheduled,
777	}
778
779	/// Latest included block descendants the runtime accepted. In other words, these are
780	/// ancestors of the currently executing block which have not been included in the observed
781	/// relay-chain state.
782	///
783	/// The segment length is limited by the capacity returned from the [`ConsensusHook`] configured
784	/// in the pallet.
785	#[pallet::storage]
786	pub type UnincludedSegment<T: Config> = StorageValue<_, Vec<Ancestor<T::Hash>>, ValueQuery>;
787
788	/// Storage field that keeps track of bandwidth used by the unincluded segment along with the
789	/// latest HRMP watermark. Used for limiting the acceptance of new blocks with
790	/// respect to relay chain constraints.
791	#[pallet::storage]
792	pub type AggregatedUnincludedSegment<T: Config> =
793		StorageValue<_, SegmentTracker<T::Hash>, OptionQuery>;
794
795	/// In case of a scheduled upgrade, this storage field contains the validation code to be
796	/// applied.
797	///
798	/// As soon as the relay chain gives us the go-ahead signal, we will overwrite the
799	/// [`:code`][sp_core::storage::well_known_keys::CODE] which will result the next block process
800	/// with the new validation code. This concludes the upgrade process.
801	#[pallet::storage]
802	pub type PendingValidationCode<T: Config> = StorageValue<_, Vec<u8>, ValueQuery>;
803
804	/// Validation code that is set by the parachain and is to be communicated to collator and
805	/// consequently the relay-chain.
806	///
807	/// This will be cleared in `on_initialize` of each new block if no other pallet already set
808	/// the value.
809	#[pallet::storage]
810	pub type NewValidationCode<T: Config> = StorageValue<_, Vec<u8>, OptionQuery>;
811
812	/// The [`PersistedValidationData`] set for this block.
813	///
814	/// This value is expected to be set only once by the [`Pallet::set_validation_data`] inherent.
815	#[pallet::storage]
816	pub type ValidationData<T: Config> = StorageValue<_, PersistedValidationData>;
817
818	/// Were the validation data set to notify the relay chain?
819	#[pallet::storage]
820	pub type DidSetValidationCode<T: Config> = StorageValue<_, bool, ValueQuery>;
821
822	/// The relay chain block number associated with the last parachain block.
823	///
824	/// This is updated in `on_finalize`.
825	#[pallet::storage]
826	pub type LastRelayChainBlockNumber<T: Config> =
827		StorageValue<_, RelayChainBlockNumber, ValueQuery>;
828
829	/// An option which indicates if the relay-chain restricts signalling a validation code upgrade.
830	/// In other words, if this is `Some` and [`NewValidationCode`] is `Some` then the produced
831	/// candidate will be invalid.
832	///
833	/// This storage item is a mirror of the corresponding value for the current parachain from the
834	/// relay-chain. This value is ephemeral which means it doesn't hit the storage. This value is
835	/// set after the inherent.
836	#[pallet::storage]
837	pub type UpgradeRestrictionSignal<T: Config> =
838		StorageValue<_, Option<relay_chain::UpgradeRestriction>, ValueQuery>;
839
840	/// Optional upgrade go-ahead signal from the relay-chain.
841	///
842	/// This storage item is a mirror of the corresponding value for the current parachain from the
843	/// relay-chain. This value is ephemeral which means it doesn't hit the storage. This value is
844	/// set after the inherent.
845	#[pallet::storage]
846	pub type UpgradeGoAhead<T: Config> =
847		StorageValue<_, Option<relay_chain::UpgradeGoAhead>, ValueQuery>;
848
849	/// The state proof for the last relay parent block.
850	///
851	/// This field is meant to be updated each block with the validation data inherent. Therefore,
852	/// before processing of the inherent, e.g. in `on_initialize` this data may be stale.
853	///
854	/// This data is also absent from the genesis.
855	#[pallet::storage]
856	pub type RelayStateProof<T: Config> = StorageValue<_, sp_trie::StorageProof>;
857
858	/// The snapshot of some state related to messaging relevant to the current parachain as per
859	/// the relay parent.
860	///
861	/// This field is meant to be updated each block with the validation data inherent. Therefore,
862	/// before processing of the inherent, e.g. in `on_initialize` this data may be stale.
863	///
864	/// This data is also absent from the genesis.
865	#[pallet::storage]
866	pub type RelevantMessagingState<T: Config> = StorageValue<_, MessagingStateSnapshot>;
867
868	/// The parachain host configuration that was obtained from the relay parent.
869	///
870	/// This field is meant to be updated each block with the validation data inherent. Therefore,
871	/// before processing of the inherent, e.g. in `on_initialize` this data may be stale.
872	///
873	/// This data is also absent from the genesis.
874	#[pallet::storage]
875	#[pallet::disable_try_decode_storage]
876	pub type HostConfiguration<T: Config> = StorageValue<_, AbridgedHostConfiguration>;
877
878	/// The last downward message queue chain head we have observed.
879	///
880	/// This value is loaded before and saved after processing inbound downward messages carried
881	/// by the system inherent.
882	#[pallet::storage]
883	pub type LastDmqMqcHead<T: Config> = StorageValue<_, MessageQueueChain, ValueQuery>;
884
885	/// The message queue chain heads we have observed per each channel incoming channel.
886	///
887	/// This value is loaded before and saved after processing inbound downward messages carried
888	/// by the system inherent.
889	#[pallet::storage]
890	pub type LastHrmpMqcHeads<T: Config> =
891		StorageValue<_, BTreeMap<ParaId, MessageQueueChain>, ValueQuery>;
892
893	/// Number of downward messages processed in a block.
894	///
895	/// This will be cleared in `on_initialize` of each new block.
896	#[pallet::storage]
897	pub type ProcessedDownwardMessages<T: Config> = StorageValue<_, u32, ValueQuery>;
898
899	/// The last processed downward message.
900	///
901	/// We need to keep track of this to filter the messages that have been already processed.
902	#[pallet::storage]
903	pub type LastProcessedDownwardMessage<T: Config> = StorageValue<_, InboundMessageId>;
904
905	/// HRMP watermark that was set in a block.
906	#[pallet::storage]
907	pub type HrmpWatermark<T: Config> = StorageValue<_, relay_chain::BlockNumber, ValueQuery>;
908
909	/// The last processed HRMP message.
910	///
911	/// We need to keep track of this to filter the messages that have been already processed.
912	#[pallet::storage]
913	pub type LastProcessedHrmpMessage<T: Config> = StorageValue<_, InboundMessageId>;
914
915	/// HRMP messages that were sent in a block.
916	///
917	/// This will be cleared in `on_initialize` of each new block.
918	#[pallet::storage]
919	pub type HrmpOutboundMessages<T: Config> =
920		StorageValue<_, Vec<OutboundHrmpMessage>, ValueQuery>;
921
922	/// Upward messages that were sent in a block.
923	///
924	/// This will be cleared in `on_initialize` for each new block.
925	#[pallet::storage]
926	pub type UpwardMessages<T: Config> = StorageValue<_, Vec<UpwardMessage>, ValueQuery>;
927
928	/// Upward messages that are still pending and not yet sent to the relay chain.
929	#[pallet::storage]
930	pub type PendingUpwardMessages<T: Config> = StorageValue<_, Vec<UpwardMessage>, ValueQuery>;
931
932	/// Upward signals that are still pending and not yet sent to the relay chain.
933	///
934	/// This will be cleared in `on_finalize` for each block.
935	#[pallet::storage]
936	pub type PendingUpwardSignals<T: Config> = StorageValue<_, Vec<UpwardMessage>, ValueQuery>;
937
938	/// The factor to multiply the base delivery fee by for UMP.
939	#[pallet::storage]
940	pub type UpwardDeliveryFeeFactor<T: Config> =
941		StorageValue<_, FixedU128, ValueQuery, GetMinFeeFactor<Pallet<T>>>;
942
943	/// The number of HRMP messages we observed in `on_initialize` and thus used that number for
944	/// announcing the weight of `on_initialize` and `on_finalize`.
945	#[pallet::storage]
946	pub type AnnouncedHrmpMessagesPerCandidate<T: Config> = StorageValue<_, u32, ValueQuery>;
947
948	/// The weight we reserve at the beginning of the block for processing XCMP messages. This
949	/// overrides the amount set in the Config trait.
950	#[pallet::storage]
951	pub type ReservedXcmpWeightOverride<T: Config> = StorageValue<_, Weight>;
952
953	/// The weight we reserve at the beginning of the block for processing DMP messages. This
954	/// overrides the amount set in the Config trait.
955	#[pallet::storage]
956	pub type ReservedDmpWeightOverride<T: Config> = StorageValue<_, Weight>;
957
958	/// A custom head data that should be returned as result of `validate_block`.
959	///
960	/// See `Pallet::set_custom_validation_head_data` for more information.
961	#[pallet::storage]
962	pub type CustomValidationHeadData<T: Config> = StorageValue<_, Vec<u8>, OptionQuery>;
963
964	#[pallet::inherent]
965	impl<T: Config> ProvideInherent for Pallet<T> {
966		type Call = Call<T>;
967		type Error = sp_inherents::MakeFatalError<()>;
968		const INHERENT_IDENTIFIER: InherentIdentifier =
969			cumulus_primitives_parachain_inherent::INHERENT_IDENTIFIER;
970
971		fn create_inherent(data: &InherentData) -> Option<Self::Call> {
972			let data = match data
973				.get_data::<ParachainInherentData>(&Self::INHERENT_IDENTIFIER)
974				.ok()
975				.flatten()
976			{
977				None => {
978					// Key Self::INHERENT_IDENTIFIER is expected to contain versioned inherent
979					// data. Older nodes are unaware of the new format and might provide the
980					// legacy data format. We try to load it and transform it into the current
981					// version.
982					let data = data
983						.get_data::<v0::ParachainInherentData>(
984							&cumulus_primitives_parachain_inherent::PARACHAIN_INHERENT_IDENTIFIER_V0,
985						)
986						.ok()
987						.flatten()?;
988					data.into()
989				},
990				Some(data) => data,
991			};
992
993			Some(Self::do_create_inherent(data))
994		}
995
996		fn is_inherent(call: &Self::Call) -> bool {
997			matches!(call, Call::set_validation_data { .. })
998		}
999	}
1000
1001	#[pallet::genesis_config]
1002	#[derive(frame_support::DefaultNoBound)]
1003	pub struct GenesisConfig<T: Config> {
1004		#[serde(skip)]
1005		pub _config: core::marker::PhantomData<T>,
1006	}
1007
1008	#[pallet::genesis_build]
1009	impl<T: Config> BuildGenesisConfig for GenesisConfig<T> {
1010		fn build(&self) {
1011			// TODO: Remove after https://github.com/paritytech/cumulus/issues/479
1012			sp_io::storage::set(b":c", &[]);
1013		}
1014	}
1015}
1016
1017impl<T: Config> Pallet<T> {
1018	/// Get the unincluded segment size after the given hash.
1019	///
1020	/// If the unincluded segment doesn't contain the given hash, this returns the
1021	/// length of the entire unincluded segment.
1022	///
1023	/// This is intended to be used for determining how long the unincluded segment _would be_
1024	/// in runtime APIs related to authoring.
1025	pub fn unincluded_segment_size_after(included_hash: T::Hash) -> u32 {
1026		let segment = UnincludedSegment::<T>::get();
1027		crate::unincluded_segment::size_after_included(included_hash, &segment)
1028	}
1029}
1030
1031impl<T: Config> FeeTracker for Pallet<T> {
1032	type Id = ();
1033
1034	fn get_fee_factor(_id: Self::Id) -> FixedU128 {
1035		UpwardDeliveryFeeFactor::<T>::get()
1036	}
1037
1038	fn set_fee_factor(_id: Self::Id, val: FixedU128) {
1039		UpwardDeliveryFeeFactor::<T>::set(val);
1040	}
1041}
1042
1043impl<T: Config> ListChannelInfos for Pallet<T> {
1044	fn outgoing_channels() -> Vec<ParaId> {
1045		let Some(state) = RelevantMessagingState::<T>::get() else { return Vec::new() };
1046		state.egress_channels.into_iter().map(|(id, _)| id).collect()
1047	}
1048}
1049
1050impl<T: Config> GetChannelInfo for Pallet<T> {
1051	fn get_channel_status(id: ParaId) -> ChannelStatus {
1052		// Note, that we are using `relevant_messaging_state` which may be from the previous
1053		// block, in case this is called from `on_initialize`, i.e. before the inherent with
1054		// fresh data is submitted.
1055		//
1056		// That shouldn't be a problem though because this is anticipated and already can
1057		// happen. This is because sending implies that a message is buffered until there is
1058		// space to send a message in the candidate. After a while waiting in a buffer, it may
1059		// be discovered that the channel to which a message were addressed is now closed.
1060		// Another possibility, is that the maximum message size was decreased so that a
1061		// message in the buffer doesn't fit. Should any of that happen the sender should be
1062		// notified about the message was discarded.
1063		//
1064		// Here it a similar case, with the difference that the realization that the channel is
1065		// closed came the same block.
1066		let channels = match RelevantMessagingState::<T>::get() {
1067			None => {
1068				log::warn!("calling `get_channel_status` with no RelevantMessagingState?!");
1069				return ChannelStatus::Closed
1070			},
1071			Some(d) => d.egress_channels,
1072		};
1073		// ^^^ NOTE: This storage field should carry over from the previous block. So if it's
1074		// None then it must be that this is an edge-case where a message is attempted to be
1075		// sent at the first block. It should be safe to assume that there are no channels
1076		// opened at all so early. At least, relying on this assumption seems to be a better
1077		// trade-off, compared to introducing an error variant that the clients should be
1078		// prepared to handle.
1079		let index = match channels.binary_search_by_key(&id, |item| item.0) {
1080			Err(_) => return ChannelStatus::Closed,
1081			Ok(i) => i,
1082		};
1083		let meta = &channels[index].1;
1084		if meta.msg_count + 1 > meta.max_capacity {
1085			// The channel is at its capacity. Skip it for now.
1086			return ChannelStatus::Full
1087		}
1088		let max_size_now = meta.max_total_size - meta.total_size;
1089		let max_size_ever = meta.max_message_size;
1090		ChannelStatus::Ready(max_size_now as usize, max_size_ever as usize)
1091	}
1092
1093	fn get_channel_info(id: ParaId) -> Option<ChannelInfo> {
1094		let channels = RelevantMessagingState::<T>::get()?.egress_channels;
1095		let index = channels.binary_search_by_key(&id, |item| item.0).ok()?;
1096		let info = ChannelInfo {
1097			max_capacity: channels[index].1.max_capacity,
1098			max_total_size: channels[index].1.max_total_size,
1099			max_message_size: channels[index].1.max_message_size,
1100			msg_count: channels[index].1.msg_count,
1101			total_size: channels[index].1.total_size,
1102		};
1103		Some(info)
1104	}
1105}
1106
1107impl<T: Config> Pallet<T> {
1108	/// The bandwidth limit per block that applies when receiving messages from the relay chain via
1109	/// DMP or XCMP.
1110	///
1111	/// The limit is per message passing mechanism (e.g. 1 MiB for DMP, 1 MiB for XCMP).
1112	///
1113	/// The purpose of this limit is to make sure that the total size of the messages received by
1114	/// the parachain from the relay chain doesn't exceed the block size. Currently each message
1115	/// passing mechanism can use 1/6 of the total block PoV which means that in total 1/3
1116	/// of the block PoV can be used for message passing.
1117	fn messages_collection_size_limit() -> usize {
1118		let max_block_weight = <T as frame_system::Config>::BlockWeights::get().max_block;
1119		let max_block_pov = max_block_weight.proof_size();
1120		(max_block_pov / 6).saturated_into()
1121	}
1122
1123	/// Updates inherent data to only include the messages that weren't already processed
1124	/// by the runtime and to compress (hash) the messages that exceed the allocated size.
1125	///
1126	/// This method doesn't check for mqc heads mismatch. If the MQC doesn't match after
1127	/// dropping messages, the runtime will panic when executing the inherent.
1128	fn do_create_inherent(data: ParachainInherentData) -> Call<T> {
1129		let (data, mut downward_messages, mut horizontal_messages) =
1130			deconstruct_parachain_inherent_data(data);
1131		let last_relay_block_number = LastRelayChainBlockNumber::<T>::get();
1132
1133		let messages_collection_size_limit = Self::messages_collection_size_limit();
1134		// DMQ.
1135		let last_processed_msg = LastProcessedDownwardMessage::<T>::get()
1136			.unwrap_or(InboundMessageId { sent_at: last_relay_block_number, reverse_idx: 0 });
1137		downward_messages.drop_processed_messages(&last_processed_msg);
1138		let mut size_limit = messages_collection_size_limit;
1139		let downward_messages = downward_messages.into_abridged(&mut size_limit);
1140
1141		// HRMP.
1142		let last_processed_msg = LastProcessedHrmpMessage::<T>::get()
1143			.unwrap_or(InboundMessageId { sent_at: last_relay_block_number, reverse_idx: 0 });
1144		horizontal_messages.drop_processed_messages(&last_processed_msg);
1145		size_limit = size_limit.saturating_add(messages_collection_size_limit);
1146		let horizontal_messages = horizontal_messages.into_abridged(&mut size_limit);
1147
1148		let inbound_messages_data =
1149			InboundMessagesData::new(downward_messages, horizontal_messages);
1150
1151		Call::set_validation_data { data, inbound_messages_data }
1152	}
1153
1154	/// Enqueue all inbound downward messages relayed by the collator into the MQ pallet.
1155	///
1156	/// Checks if the sequence of the messages is valid, dispatches them and communicates the
1157	/// number of processed messages to the collator via a storage update.
1158	///
1159	/// # Panics
1160	///
1161	/// If it turns out that after processing all messages the Message Queue Chain
1162	/// hash doesn't match the expected.
1163	fn enqueue_inbound_downward_messages(
1164		expected_dmq_mqc_head: relay_chain::Hash,
1165		downward_messages: AbridgedInboundDownwardMessages,
1166	) -> Weight {
1167		downward_messages.check_enough_messages_included("DMQ");
1168
1169		let mut dmq_head = <LastDmqMqcHead<T>>::get();
1170
1171		let (messages, hashed_messages) = downward_messages.messages();
1172		let message_count = messages.len() as u32;
1173		let weight_used = T::WeightInfo::enqueue_inbound_downward_messages(message_count);
1174		if let Some(last_msg) = messages.last() {
1175			Self::deposit_event(Event::DownwardMessagesReceived { count: message_count });
1176
1177			// Eagerly update the MQC head hash:
1178			for msg in messages {
1179				dmq_head.extend_downward(msg);
1180			}
1181			<LastDmqMqcHead<T>>::put(&dmq_head);
1182			Self::deposit_event(Event::DownwardMessagesProcessed {
1183				weight_used,
1184				dmq_head: dmq_head.head(),
1185			});
1186
1187			let mut last_processed_msg =
1188				InboundMessageId { sent_at: last_msg.sent_at, reverse_idx: 0 };
1189			for msg in hashed_messages {
1190				dmq_head.extend_with_hashed_msg(msg);
1191
1192				if msg.sent_at == last_processed_msg.sent_at {
1193					last_processed_msg.reverse_idx += 1;
1194				}
1195			}
1196			LastProcessedDownwardMessage::<T>::put(last_processed_msg);
1197
1198			T::DmpQueue::handle_messages(downward_messages.bounded_msgs_iter());
1199		}
1200
1201		// After hashing each message in the message queue chain submitted by the collator, we
1202		// should arrive to the MQC head provided by the relay chain.
1203		//
1204		// A mismatch means that at least some of the submitted messages were altered, omitted or
1205		// added improperly.
1206		assert_eq!(dmq_head.head(), expected_dmq_mqc_head, "DMQ head mismatch");
1207
1208		ProcessedDownwardMessages::<T>::put(message_count);
1209
1210		weight_used
1211	}
1212
1213	fn check_hrmp_mcq_heads(
1214		ingress_channels: &[(ParaId, cumulus_primitives_core::AbridgedHrmpChannel)],
1215		mqc_heads: &mut BTreeMap<ParaId, MessageQueueChain>,
1216	) {
1217		// Check that the MQC heads for each channel provided by the relay chain match the MQC
1218		// heads we have after processing all incoming messages.
1219		//
1220		// Along the way we also carry over the relevant entries from the `last_mqc_heads` to
1221		// `running_mqc_heads`. Otherwise, in a block where no messages were sent in a channel
1222		// it won't get into next block's `last_mqc_heads` and thus will be all zeros, which
1223		// would corrupt the message queue chain.
1224		for (sender, channel) in ingress_channels {
1225			let cur_head = mqc_heads.entry(*sender).or_default().head();
1226			let target_head = channel.mqc_head.unwrap_or_default();
1227			assert_eq!(cur_head, target_head, "HRMP head mismatch");
1228		}
1229	}
1230
1231	/// Performs some checks related to the sender and the `sent_at` field of an HRMP message.
1232	///
1233	/// **Panics** if the message submitted by the collator doesn't respect the expected order or if
1234	///            it was sent from a para which has no open channel to this parachain.
1235	fn check_hrmp_message_metadata(
1236		ingress_channels: &[(ParaId, cumulus_primitives_core::AbridgedHrmpChannel)],
1237		maybe_prev_msg_metadata: &mut Option<(u32, ParaId)>,
1238		msg_metadata: (u32, ParaId),
1239	) {
1240		// Check that the message is properly ordered.
1241		if let Some(prev_msg) = maybe_prev_msg_metadata {
1242			assert!(&msg_metadata >= prev_msg, "[HRMP] Messages order violation");
1243		}
1244		*maybe_prev_msg_metadata = Some(msg_metadata);
1245
1246		// Check that the message is sent from an existing channel. The channel exists
1247		// if its MQC head is present in `vfp.hrmp_mqc_heads`.
1248		let sender = msg_metadata.1;
1249		let maybe_channel_idx =
1250			ingress_channels.binary_search_by_key(&sender, |&(channel_sender, _)| channel_sender);
1251		assert!(
1252			maybe_channel_idx.is_ok(),
1253			"One of the messages submitted by the collator was sent from a sender ({}) \
1254					that doesn't have a channel opened to this parachain",
1255			<ParaId as Into<u32>>::into(sender)
1256		);
1257	}
1258
1259	/// Process all inbound horizontal messages relayed by the collator.
1260	///
1261	/// This is similar to [`enqueue_inbound_downward_messages`], but works with multiple inbound
1262	/// channels. It immediately dispatches signals and queues all other XCMs. Blob messages are
1263	/// ignored.
1264	///
1265	/// **Panics** if either any of horizontal messages submitted by the collator was sent from
1266	///            a para which has no open channel to this parachain or if after processing
1267	///            messages across all inbound channels MQCs were obtained which do not
1268	///            correspond to the ones found on the relay-chain.
1269	fn enqueue_inbound_horizontal_messages(
1270		ingress_channels: &[(ParaId, cumulus_primitives_core::AbridgedHrmpChannel)],
1271		horizontal_messages: AbridgedInboundHrmpMessages,
1272		relay_parent_number: relay_chain::BlockNumber,
1273	) -> Weight {
1274		// First, check the HRMP advancement rule.
1275		horizontal_messages.check_enough_messages_included("HRMP");
1276
1277		let (messages, hashed_messages) = horizontal_messages.messages();
1278		let mut mqc_heads = <LastHrmpMqcHeads<T>>::get();
1279
1280		if messages.is_empty() {
1281			Self::check_hrmp_mcq_heads(ingress_channels, &mut mqc_heads);
1282			let last_processed_msg =
1283				InboundMessageId { sent_at: relay_parent_number, reverse_idx: 0 };
1284			LastProcessedHrmpMessage::<T>::put(last_processed_msg);
1285			HrmpWatermark::<T>::put(relay_parent_number);
1286			return T::DbWeight::get().reads_writes(1, 2);
1287		}
1288
1289		let mut prev_msg_metadata = None;
1290		let mut last_processed_block = HrmpWatermark::<T>::get();
1291		let mut last_processed_msg = InboundMessageId { sent_at: 0, reverse_idx: 0 };
1292		for (sender, msg) in messages {
1293			Self::check_hrmp_message_metadata(
1294				ingress_channels,
1295				&mut prev_msg_metadata,
1296				(msg.sent_at, *sender),
1297			);
1298			mqc_heads.entry(*sender).or_default().extend_hrmp(msg);
1299
1300			if msg.sent_at > last_processed_msg.sent_at && last_processed_msg.sent_at > 0 {
1301				last_processed_block = last_processed_msg.sent_at;
1302			}
1303			last_processed_msg.sent_at = msg.sent_at;
1304		}
1305		<LastHrmpMqcHeads<T>>::put(&mqc_heads);
1306		for (sender, msg) in hashed_messages {
1307			Self::check_hrmp_message_metadata(
1308				ingress_channels,
1309				&mut prev_msg_metadata,
1310				(msg.sent_at, *sender),
1311			);
1312			mqc_heads.entry(*sender).or_default().extend_with_hashed_msg(msg);
1313
1314			if msg.sent_at == last_processed_msg.sent_at {
1315				last_processed_msg.reverse_idx += 1;
1316			}
1317		}
1318		if last_processed_msg.sent_at > 0 && last_processed_msg.reverse_idx == 0 {
1319			last_processed_block = last_processed_msg.sent_at;
1320		}
1321		LastProcessedHrmpMessage::<T>::put(&last_processed_msg);
1322		Self::check_hrmp_mcq_heads(ingress_channels, &mut mqc_heads);
1323
1324		let max_weight =
1325			<ReservedXcmpWeightOverride<T>>::get().unwrap_or_else(T::ReservedXcmpWeight::get);
1326		let weight_used = T::XcmpMessageHandler::handle_xcmp_messages(
1327			horizontal_messages.flat_msgs_iter(),
1328			max_weight,
1329		);
1330
1331		// Update watermark
1332		HrmpWatermark::<T>::put(last_processed_block);
1333
1334		weight_used.saturating_add(T::DbWeight::get().reads_writes(2, 3))
1335	}
1336
1337	/// Drop blocks from the unincluded segment with respect to the latest parachain head.
1338	fn maybe_drop_included_ancestors(
1339		relay_state_proof: &RelayChainStateProof,
1340		capacity: consensus_hook::UnincludedSegmentCapacity,
1341	) -> Weight {
1342		let mut weight_used = Weight::zero();
1343		// If the unincluded segment length is nonzero, then the parachain head must be present.
1344		let para_head =
1345			relay_state_proof.read_included_para_head().ok().map(|h| T::Hashing::hash(&h.0));
1346
1347		let unincluded_segment_len = <UnincludedSegment<T>>::decode_len().unwrap_or(0);
1348		weight_used += T::DbWeight::get().reads(1);
1349
1350		// Clean up unincluded segment if nonempty.
1351		let included_head = match (para_head, capacity.is_expecting_included_parent()) {
1352			(Some(h), true) => {
1353				assert_eq!(
1354					h,
1355					frame_system::Pallet::<T>::parent_hash(),
1356					"expected parent to be included"
1357				);
1358
1359				h
1360			},
1361			(Some(h), false) => h,
1362			(None, true) => {
1363				// All this logic is essentially a workaround to support collators which
1364				// might still not provide the included block with the state proof.
1365				frame_system::Pallet::<T>::parent_hash()
1366			},
1367			(None, false) => panic!("included head not present in relay storage proof"),
1368		};
1369
1370		let new_len = {
1371			let para_head_hash = included_head;
1372			let dropped: Vec<Ancestor<T::Hash>> = <UnincludedSegment<T>>::mutate(|chain| {
1373				// Drop everything up to (inclusive) the block with an included para head, if
1374				// present.
1375				let idx = chain
1376					.iter()
1377					.position(|block| {
1378						let head_hash = block
1379							.para_head_hash()
1380							.expect("para head hash is updated during block initialization; qed");
1381						head_hash == &para_head_hash
1382					})
1383					.map_or(0, |idx| idx + 1); // inclusive.
1384
1385				chain.drain(..idx).collect()
1386			});
1387			weight_used += T::DbWeight::get().reads_writes(1, 1);
1388
1389			let new_len = unincluded_segment_len - dropped.len();
1390			if !dropped.is_empty() {
1391				<AggregatedUnincludedSegment<T>>::mutate(|agg| {
1392					let agg = agg.as_mut().expect(
1393						"dropped part of the segment wasn't empty, hence value exists; qed",
1394					);
1395					for block in dropped {
1396						agg.subtract(&block);
1397					}
1398				});
1399				weight_used += T::DbWeight::get().reads_writes(1, 1);
1400			}
1401
1402			new_len as u32
1403		};
1404
1405		// Current block validity check: ensure there is space in the unincluded segment.
1406		//
1407		// If this fails, the parachain needs to wait for ancestors to be included before
1408		// a new block is allowed.
1409		assert!(new_len < capacity.get(), "no space left for the block in the unincluded segment");
1410		weight_used
1411	}
1412
1413	/// This adjusts the `RelevantMessagingState` according to the bandwidth limits in the
1414	/// unincluded segment.
1415	//
1416	// Reads: 2
1417	// Writes: 1
1418	fn adjust_egress_bandwidth_limits() {
1419		let unincluded_segment = match AggregatedUnincludedSegment::<T>::get() {
1420			None => return,
1421			Some(s) => s,
1422		};
1423
1424		<RelevantMessagingState<T>>::mutate(|messaging_state| {
1425			let messaging_state = match messaging_state {
1426				None => return,
1427				Some(s) => s,
1428			};
1429
1430			let used_bandwidth = unincluded_segment.used_bandwidth();
1431
1432			let channels = &mut messaging_state.egress_channels;
1433			for (para_id, used) in used_bandwidth.hrmp_outgoing.iter() {
1434				let i = match channels.binary_search_by_key(para_id, |item| item.0) {
1435					Ok(i) => i,
1436					Err(_) => continue, // indicates channel closed.
1437				};
1438
1439				let c = &mut channels[i].1;
1440
1441				c.total_size = (c.total_size + used.total_bytes).min(c.max_total_size);
1442				c.msg_count = (c.msg_count + used.msg_count).min(c.max_capacity);
1443			}
1444
1445			let upward_capacity = &mut messaging_state.relay_dispatch_queue_remaining_capacity;
1446			upward_capacity.remaining_count =
1447				upward_capacity.remaining_count.saturating_sub(used_bandwidth.ump_msg_count);
1448			upward_capacity.remaining_size =
1449				upward_capacity.remaining_size.saturating_sub(used_bandwidth.ump_total_bytes);
1450		});
1451	}
1452
1453	/// Put a new validation function into a particular location where polkadot
1454	/// monitors for updates. Calling this function notifies polkadot that a new
1455	/// upgrade has been scheduled.
1456	fn notify_polkadot_of_pending_upgrade(code: &[u8]) {
1457		NewValidationCode::<T>::put(code);
1458		<DidSetValidationCode<T>>::put(true);
1459	}
1460
1461	/// The maximum code size permitted, in bytes.
1462	///
1463	/// Returns `None` if the relay chain parachain host configuration hasn't been submitted yet.
1464	pub fn max_code_size() -> Option<u32> {
1465		<HostConfiguration<T>>::get().map(|cfg| cfg.max_code_size)
1466	}
1467
1468	/// The implementation of the runtime upgrade functionality for parachains.
1469	pub fn schedule_code_upgrade(validation_function: Vec<u8>) -> DispatchResult {
1470		// Ensure that `ValidationData` exists. We do not care about the validation data per se,
1471		// but we do care about the [`UpgradeRestrictionSignal`] which arrives with the same
1472		// inherent.
1473		ensure!(<ValidationData<T>>::exists(), Error::<T>::ValidationDataNotAvailable,);
1474		ensure!(<UpgradeRestrictionSignal<T>>::get().is_none(), Error::<T>::ProhibitedByPolkadot);
1475
1476		ensure!(!<PendingValidationCode<T>>::exists(), Error::<T>::OverlappingUpgrades);
1477		let cfg = HostConfiguration::<T>::get().ok_or(Error::<T>::HostConfigurationNotAvailable)?;
1478		ensure!(validation_function.len() <= cfg.max_code_size as usize, Error::<T>::TooBig);
1479
1480		// When a code upgrade is scheduled, it has to be applied in two
1481		// places, synchronized: both polkadot and the individual parachain
1482		// have to upgrade on the same relay chain block.
1483		//
1484		// `notify_polkadot_of_pending_upgrade` notifies polkadot; the `PendingValidationCode`
1485		// storage keeps track locally for the parachain upgrade, which will
1486		// be applied later: when the relay-chain communicates go-ahead signal to us.
1487		Self::notify_polkadot_of_pending_upgrade(&validation_function);
1488		<PendingValidationCode<T>>::put(validation_function);
1489		Self::deposit_event(Event::ValidationFunctionStored);
1490
1491		Ok(())
1492	}
1493
1494	/// Returns the [`CollationInfo`] of the current active block.
1495	///
1496	/// The given `header` is the header of the built block we are collecting the collation info
1497	/// for.
1498	///
1499	/// This is expected to be used by the
1500	/// [`CollectCollationInfo`](cumulus_primitives_core::CollectCollationInfo) runtime api.
1501	pub fn collect_collation_info(header: &HeaderFor<T>) -> CollationInfo {
1502		CollationInfo {
1503			hrmp_watermark: HrmpWatermark::<T>::get(),
1504			horizontal_messages: HrmpOutboundMessages::<T>::get(),
1505			upward_messages: UpwardMessages::<T>::get(),
1506			processed_downward_messages: ProcessedDownwardMessages::<T>::get(),
1507			new_validation_code: NewValidationCode::<T>::get().map(Into::into),
1508			// Check if there is a custom header that will also be returned by the validation phase.
1509			// If so, we need to also return it here.
1510			head_data: CustomValidationHeadData::<T>::get()
1511				.map_or_else(|| header.encode(), |v| v)
1512				.into(),
1513		}
1514	}
1515
1516	/// Set a custom head data that should be returned as result of `validate_block`.
1517	///
1518	/// This will overwrite the head data that is returned as result of `validate_block` while
1519	/// validating a `PoV` on the relay chain. Normally the head data that is being returned
1520	/// by `validate_block` is the header of the block that is validated, thus it can be
1521	/// enacted as the new best block. However, for features like forking it can be useful
1522	/// to overwrite the head data with a custom header.
1523	///
1524	/// # Attention
1525	///
1526	/// This should only be used when you are sure what you are doing as this can brick
1527	/// your Parachain.
1528	pub fn set_custom_validation_head_data(head_data: Vec<u8>) {
1529		CustomValidationHeadData::<T>::put(head_data);
1530	}
1531
1532	/// Send the pending ump signals
1533	fn send_ump_signals() {
1534		let ump_signals = PendingUpwardSignals::<T>::take();
1535		if !ump_signals.is_empty() {
1536			UpwardMessages::<T>::append(UMP_SEPARATOR);
1537			ump_signals.into_iter().for_each(|s| UpwardMessages::<T>::append(s));
1538		}
1539	}
1540
1541	/// Open HRMP channel for using it in benchmarks or tests.
1542	///
1543	/// The caller assumes that the pallet will accept regular outbound message to the sibling
1544	/// `target_parachain` after this call. No other assumptions are made.
1545	#[cfg(any(feature = "runtime-benchmarks", feature = "std"))]
1546	pub fn open_outbound_hrmp_channel_for_benchmarks_or_tests(target_parachain: ParaId) {
1547		RelevantMessagingState::<T>::put(MessagingStateSnapshot {
1548			dmq_mqc_head: Default::default(),
1549			relay_dispatch_queue_remaining_capacity: Default::default(),
1550			ingress_channels: Default::default(),
1551			egress_channels: vec![(
1552				target_parachain,
1553				cumulus_primitives_core::AbridgedHrmpChannel {
1554					max_capacity: 10,
1555					max_total_size: 10_000_000_u32,
1556					max_message_size: 10_000_000_u32,
1557					msg_count: 5,
1558					total_size: 5_000_000_u32,
1559					mqc_head: None,
1560				},
1561			)],
1562		})
1563	}
1564
1565	/// Open HRMP channel for using it in benchmarks or tests.
1566	///
1567	/// The caller assumes that the pallet will accept regular outbound message to the sibling
1568	/// `target_parachain` after this call. No other assumptions are made.
1569	#[cfg(any(feature = "runtime-benchmarks", feature = "std"))]
1570	pub fn open_custom_outbound_hrmp_channel_for_benchmarks_or_tests(
1571		target_parachain: ParaId,
1572		channel: cumulus_primitives_core::AbridgedHrmpChannel,
1573	) {
1574		RelevantMessagingState::<T>::put(MessagingStateSnapshot {
1575			dmq_mqc_head: Default::default(),
1576			relay_dispatch_queue_remaining_capacity: Default::default(),
1577			ingress_channels: Default::default(),
1578			egress_channels: vec![(target_parachain, channel)],
1579		})
1580	}
1581
1582	/// Prepare/insert relevant data for `schedule_code_upgrade` for benchmarks.
1583	#[cfg(feature = "runtime-benchmarks")]
1584	pub fn initialize_for_set_code_benchmark(max_code_size: u32) {
1585		// insert dummy ValidationData
1586		let vfp = PersistedValidationData {
1587			parent_head: polkadot_parachain_primitives::primitives::HeadData(Default::default()),
1588			relay_parent_number: 1,
1589			relay_parent_storage_root: Default::default(),
1590			max_pov_size: 1_000,
1591		};
1592		<ValidationData<T>>::put(&vfp);
1593
1594		// insert dummy HostConfiguration with
1595		let host_config = AbridgedHostConfiguration {
1596			max_code_size,
1597			max_head_data_size: 32 * 1024,
1598			max_upward_queue_count: 8,
1599			max_upward_queue_size: 1024 * 1024,
1600			max_upward_message_size: 4 * 1024,
1601			max_upward_message_num_per_candidate: 2,
1602			hrmp_max_message_num_per_candidate: 2,
1603			validation_upgrade_cooldown: 2,
1604			validation_upgrade_delay: 2,
1605			async_backing_params: relay_chain::AsyncBackingParams {
1606				allowed_ancestry_len: 0,
1607				max_candidate_depth: 0,
1608			},
1609		};
1610		<HostConfiguration<T>>::put(host_config);
1611	}
1612}
1613
1614/// Type that implements `SetCode`.
1615pub struct ParachainSetCode<T>(core::marker::PhantomData<T>);
1616impl<T: Config> frame_system::SetCode<T> for ParachainSetCode<T> {
1617	fn set_code(code: Vec<u8>) -> DispatchResult {
1618		Pallet::<T>::schedule_code_upgrade(code)
1619	}
1620}
1621
1622impl<T: Config> Pallet<T> {
1623	/// Puts a message in the `PendingUpwardMessages` storage item.
1624	/// The message will be later sent in `on_finalize`.
1625	/// Checks host configuration to see if message is too big.
1626	/// Increases the delivery fee factor if the queue is sufficiently (see
1627	/// [`ump_constants::THRESHOLD_FACTOR`]) congested.
1628	pub fn send_upward_message(message: UpwardMessage) -> Result<(u32, XcmHash), MessageSendError> {
1629		let message_len = message.len();
1630		// Check if the message fits into the relay-chain constraints.
1631		//
1632		// Note, that we are using `host_configuration` here which may be from the previous
1633		// block, in case this is called from `on_initialize`, i.e. before the inherent with fresh
1634		// data is submitted.
1635		//
1636		// That shouldn't be a problem since this is a preliminary check and the actual check would
1637		// be performed just before submitting the message from the candidate, and it already can
1638		// happen that during the time the message is buffered for sending the relay-chain setting
1639		// may change so that the message is no longer valid.
1640		//
1641		// However, changing this setting is expected to be rare.
1642		if let Some(cfg) = HostConfiguration::<T>::get() {
1643			if message_len > cfg.max_upward_message_size as usize {
1644				return Err(MessageSendError::TooBig);
1645			}
1646			let threshold =
1647				cfg.max_upward_queue_size.saturating_div(ump_constants::THRESHOLD_FACTOR);
1648			// We check the threshold against total size and not number of messages since messages
1649			// could be big or small.
1650			<PendingUpwardMessages<T>>::append(message.clone());
1651			let pending_messages = PendingUpwardMessages::<T>::get();
1652			let total_size: usize = pending_messages.iter().map(UpwardMessage::len).sum();
1653			if total_size > threshold as usize {
1654				// We increase the fee factor by a factor based on the new message's size in KB
1655				Self::increase_fee_factor((), message_len as u128);
1656			}
1657		} else {
1658			// This storage field should carry over from the previous block. So if it's None
1659			// then it must be that this is an edge-case where a message is attempted to be
1660			// sent at the first block.
1661			//
1662			// Let's pass this message through. I think it's not unreasonable to expect that
1663			// the message is not huge and it comes through, but if it doesn't it can be
1664			// returned back to the sender.
1665			//
1666			// Thus fall through here.
1667			<PendingUpwardMessages<T>>::append(message.clone());
1668		};
1669
1670		// The relay ump does not use using_encoded
1671		// We apply the same this to use the same hash
1672		let hash = sp_io::hashing::blake2_256(&message);
1673		Self::deposit_event(Event::UpwardMessageSent { message_hash: Some(hash) });
1674		Ok((0, hash))
1675	}
1676
1677	/// Get the relay chain block number which was used as an anchor for the last block in this
1678	/// chain.
1679	pub fn last_relay_block_number() -> RelayChainBlockNumber {
1680		LastRelayChainBlockNumber::<T>::get()
1681	}
1682}
1683
1684impl<T: Config> UpwardMessageSender for Pallet<T> {
1685	fn send_upward_message(message: UpwardMessage) -> Result<(u32, XcmHash), MessageSendError> {
1686		Self::send_upward_message(message)
1687	}
1688
1689	fn can_send_upward_message(message: &UpwardMessage) -> Result<(), MessageSendError> {
1690		let max_upward_message_size = HostConfiguration::<T>::get()
1691			.map(|cfg| cfg.max_upward_message_size)
1692			.ok_or(MessageSendError::Other)?;
1693		if message.len() > max_upward_message_size as usize {
1694			Err(MessageSendError::TooBig)
1695		} else {
1696			Ok(())
1697		}
1698	}
1699
1700	#[cfg(any(feature = "std", feature = "runtime-benchmarks", test))]
1701	fn ensure_successful_delivery() {
1702		const MAX_UPWARD_MESSAGE_SIZE: u32 = 65_531 * 3;
1703		const MAX_CODE_SIZE: u32 = 3 * 1024 * 1024;
1704		HostConfiguration::<T>::mutate(|cfg| match cfg {
1705			Some(cfg) => cfg.max_upward_message_size = MAX_UPWARD_MESSAGE_SIZE,
1706			None =>
1707				*cfg = Some(AbridgedHostConfiguration {
1708					max_code_size: MAX_CODE_SIZE,
1709					max_head_data_size: 32 * 1024,
1710					max_upward_queue_count: 8,
1711					max_upward_queue_size: 1024 * 1024,
1712					max_upward_message_size: MAX_UPWARD_MESSAGE_SIZE,
1713					max_upward_message_num_per_candidate: 2,
1714					hrmp_max_message_num_per_candidate: 2,
1715					validation_upgrade_cooldown: 2,
1716					validation_upgrade_delay: 2,
1717					async_backing_params: relay_chain::AsyncBackingParams {
1718						allowed_ancestry_len: 0,
1719						max_candidate_depth: 0,
1720					},
1721				}),
1722		})
1723	}
1724}
1725
1726impl<T: Config> InspectMessageQueues for Pallet<T> {
1727	fn clear_messages() {
1728		PendingUpwardMessages::<T>::kill();
1729	}
1730
1731	fn get_messages() -> Vec<(VersionedLocation, Vec<VersionedXcm<()>>)> {
1732		use xcm::prelude::*;
1733
1734		let messages: Vec<VersionedXcm<()>> = PendingUpwardMessages::<T>::get()
1735			.iter()
1736			.map(|encoded_message| {
1737				VersionedXcm::<()>::decode_all_with_depth_limit(
1738					MAX_XCM_DECODE_DEPTH,
1739					&mut &encoded_message[..],
1740				)
1741				.unwrap()
1742			})
1743			.collect();
1744
1745		if messages.is_empty() {
1746			vec![]
1747		} else {
1748			vec![(VersionedLocation::from(Location::parent()), messages)]
1749		}
1750	}
1751}
1752
1753#[cfg(feature = "runtime-benchmarks")]
1754impl<T: Config> polkadot_runtime_parachains::EnsureForParachain for Pallet<T> {
1755	fn ensure(para_id: ParaId) {
1756		if let ChannelStatus::Closed = Self::get_channel_status(para_id) {
1757			Self::open_outbound_hrmp_channel_for_benchmarks_or_tests(para_id)
1758		}
1759	}
1760}
1761
1762/// Something that should be informed about system related events.
1763///
1764/// This includes events like [`on_validation_data`](Self::on_validation_data) that is being
1765/// called when the parachain inherent is executed that contains the validation data.
1766/// Or like [`on_validation_code_applied`](Self::on_validation_code_applied) that is called
1767/// when the new validation is written to the state. This means that
1768/// from the next block the runtime is being using this new code.
1769#[impl_trait_for_tuples::impl_for_tuples(30)]
1770pub trait OnSystemEvent {
1771	/// Called in each blocks once when the validation data is set by the inherent.
1772	fn on_validation_data(data: &PersistedValidationData);
1773	/// Called when the validation code is being applied, aka from the next block on this is the new
1774	/// runtime.
1775	fn on_validation_code_applied();
1776}
1777
1778/// Holds the most recent relay-parent state root and block number of the current parachain block.
1779#[derive(PartialEq, Eq, Clone, Encode, Decode, TypeInfo, Default, Debug)]
1780pub struct RelayChainState {
1781	/// Current relay chain height.
1782	pub number: relay_chain::BlockNumber,
1783	/// State root for current relay chain height.
1784	pub state_root: relay_chain::Hash,
1785}
1786
1787/// This exposes the [`RelayChainState`] to other runtime modules.
1788///
1789/// Enables parachains to read relay chain state via state proofs.
1790pub trait RelaychainStateProvider {
1791	/// May be called by any runtime module to obtain the current state of the relay chain.
1792	///
1793	/// **NOTE**: This is not guaranteed to return monotonically increasing relay parents.
1794	fn current_relay_chain_state() -> RelayChainState;
1795
1796	/// Utility function only to be used in benchmarking scenarios, to be implemented optionally,
1797	/// else a noop.
1798	///
1799	/// It allows for setting a custom RelayChainState.
1800	#[cfg(feature = "runtime-benchmarks")]
1801	fn set_current_relay_chain_state(_state: RelayChainState) {}
1802}
1803
1804/// Implements [`BlockNumberProvider`] that returns relay chain block number fetched from validation
1805/// data.
1806///
1807/// When validation data is not available (e.g. within `on_initialize`), it will fallback to use
1808/// [`Pallet::last_relay_block_number()`].
1809///
1810/// **NOTE**: This has been deprecated, please use [`RelaychainDataProvider`]
1811#[deprecated = "Use `RelaychainDataProvider` instead"]
1812pub type RelaychainBlockNumberProvider<T> = RelaychainDataProvider<T>;
1813
1814/// Implements [`BlockNumberProvider`] and [`RelaychainStateProvider`] that returns relevant relay
1815/// data fetched from validation data.
1816///
1817/// NOTE: When validation data is not available (e.g. within `on_initialize`):
1818///
1819/// - [`current_relay_chain_state`](Self::current_relay_chain_state): Will return the default value
1820///   of [`RelayChainState`].
1821/// - [`current_block_number`](Self::current_block_number): Will return
1822///   [`Pallet::last_relay_block_number()`].
1823pub struct RelaychainDataProvider<T>(core::marker::PhantomData<T>);
1824
1825impl<T: Config> BlockNumberProvider for RelaychainDataProvider<T> {
1826	type BlockNumber = relay_chain::BlockNumber;
1827
1828	fn current_block_number() -> relay_chain::BlockNumber {
1829		ValidationData::<T>::get()
1830			.map(|d| d.relay_parent_number)
1831			.unwrap_or_else(|| Pallet::<T>::last_relay_block_number())
1832	}
1833
1834	#[cfg(any(feature = "std", feature = "runtime-benchmarks", test))]
1835	fn set_block_number(block: Self::BlockNumber) {
1836		let mut validation_data = ValidationData::<T>::get().unwrap_or_else(||
1837			// PersistedValidationData does not impl default in non-std
1838			PersistedValidationData {
1839				parent_head: vec![].into(),
1840				relay_parent_number: Default::default(),
1841				max_pov_size: Default::default(),
1842				relay_parent_storage_root: Default::default(),
1843			});
1844		validation_data.relay_parent_number = block;
1845		ValidationData::<T>::put(validation_data)
1846	}
1847}
1848
1849impl<T: Config> RelaychainStateProvider for RelaychainDataProvider<T> {
1850	fn current_relay_chain_state() -> RelayChainState {
1851		ValidationData::<T>::get()
1852			.map(|d| RelayChainState {
1853				number: d.relay_parent_number,
1854				state_root: d.relay_parent_storage_root,
1855			})
1856			.unwrap_or_default()
1857	}
1858
1859	#[cfg(feature = "runtime-benchmarks")]
1860	fn set_current_relay_chain_state(state: RelayChainState) {
1861		let mut validation_data = ValidationData::<T>::get().unwrap_or_else(||
1862			// PersistedValidationData does not impl default in non-std
1863			PersistedValidationData {
1864				parent_head: vec![].into(),
1865				relay_parent_number: Default::default(),
1866				max_pov_size: Default::default(),
1867				relay_parent_storage_root: Default::default(),
1868			});
1869		validation_data.relay_parent_number = state.number;
1870		validation_data.relay_parent_storage_root = state.state_root;
1871		ValidationData::<T>::put(validation_data)
1872	}
1873}