referrerpolicy=no-referrer-when-downgrade

cumulus_pallet_parachain_system/
lib.rs

1// Copyright (C) Parity Technologies (UK) Ltd.
2// This file is part of Cumulus.
3// SPDX-License-Identifier: Apache-2.0
4
5// Licensed under the Apache License, Version 2.0 (the "License");
6// you may not use this file except in compliance with the License.
7// You may obtain a copy of the License at
8//
9// 	http://www.apache.org/licenses/LICENSE-2.0
10//
11// Unless required by applicable law or agreed to in writing, software
12// distributed under the License is distributed on an "AS IS" BASIS,
13// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14// See the License for the specific language governing permissions and
15// limitations under the License.
16
17#![cfg_attr(not(feature = "std"), no_std)]
18
19//! `cumulus-pallet-parachain-system` is a base pallet for Cumulus-based parachains.
20//!
21//! This pallet handles low-level details of being a parachain. Its responsibilities include:
22//!
23//! - ingestion of the parachain validation data;
24//! - ingestion and dispatch of incoming downward and lateral messages;
25//! - coordinating upgrades with the Relay Chain; and
26//! - communication of parachain outputs, such as sent messages, signaling an upgrade, etc.
27//!
28//! Users must ensure that they register this pallet as an inherent provider.
29
30extern crate alloc;
31
32use alloc::{collections::btree_map::BTreeMap, vec, vec::Vec};
33use codec::{Decode, DecodeLimit, Encode};
34use core::cmp;
35use cumulus_primitives_core::{
36	relay_chain::{self, UMPSignal, UMP_SEPARATOR},
37	AbridgedHostConfiguration, ChannelInfo, ChannelStatus, CollationInfo, CoreInfo,
38	CumulusDigestItem, GetChannelInfo, ListChannelInfos, MessageSendError, OutboundHrmpMessage,
39	ParaId, PersistedValidationData, UpwardMessage, UpwardMessageSender, VerifySchedulingSignature,
40	XcmpMessageHandler, XcmpMessageSource,
41};
42use cumulus_primitives_parachain_inherent::{v0, MessageQueueChain, ParachainInherentData};
43use frame_support::{
44	dispatch::{DispatchClass, DispatchResult},
45	ensure,
46	inherent::{InherentData, InherentIdentifier, ProvideInherent},
47	traits::{Get, HandleMessage},
48	weights::Weight,
49};
50use frame_system::{ensure_none, ensure_root, pallet_prelude::HeaderFor};
51use parachain_inherent::{
52	deconstruct_parachain_inherent_data, AbridgedInboundDownwardMessages,
53	AbridgedInboundHrmpMessages, BasicParachainInherentData, InboundMessageId, InboundMessagesData,
54};
55use polkadot_parachain_primitives::primitives::RelayChainBlockNumber;
56use polkadot_runtime_parachains::{FeeTracker, GetMinFeeFactor};
57use scale_info::TypeInfo;
58use sp_runtime::{
59	traits::{BlockNumberProvider, Hash},
60	Debug, FixedU128, SaturatedConversion,
61};
62use xcm::{latest::XcmHash, VersionedLocation, VersionedXcm, MAX_XCM_DECODE_DEPTH};
63use xcm_builder::InspectMessageQueues;
64
65mod benchmarking;
66pub mod block_weight;
67pub mod consensus_hook;
68pub mod migration;
69mod mock;
70pub mod relay_state_snapshot;
71#[cfg(test)]
72mod tests;
73mod unincluded_segment;
74pub mod weights;
75#[macro_use]
76pub mod validate_block;
77mod descendant_validation;
78pub mod parachain_inherent;
79
80use unincluded_segment::{
81	HrmpChannelUpdate, HrmpWatermarkUpdate, OutboundBandwidthLimits, SegmentTracker,
82};
83
84pub use consensus_hook::{ConsensusHook, ExpectParentIncluded};
85/// Register the `validate_block` function that is used by parachains to validate blocks on a
86/// validator.
87///
88/// Does *nothing* when `std` feature is enabled.
89///
90/// Expects as parameters the runtime, a block executor and an inherent checker.
91///
92/// # Example
93///
94/// ```
95///     struct BlockExecutor;
96///     struct Runtime;
97///
98///     cumulus_pallet_parachain_system::register_validate_block! {
99///         Runtime = Runtime,
100///         BlockExecutor = Executive,
101///     }
102///
103/// # fn main() {}
104/// ```
105pub use cumulus_pallet_parachain_system_proc_macro::register_validate_block;
106pub use relay_state_snapshot::{MessagingStateSnapshot, RelayChainStateProof};
107pub use unincluded_segment::{Ancestor, UsedBandwidth};
108pub use weights::WeightInfo;
109
110use crate::parachain_inherent::AbridgedInboundMessagesSizeInfo;
111pub use pallet::*;
112
113const LOG_TARGET: &str = "runtime::parachain-system";
114
115/// Tracks cumulative UMP and HRMP message counts sent across blocks within a single PoV.
116#[derive(Encode, Decode, Clone, Debug, TypeInfo, Default)]
117pub struct PoVMessages {
118	/// Relay parent storage root of the current PoV.
119	pub relay_storage_root_or_hash: relay_chain::Hash,
120	/// The core selector of the current Pov.
121	pub core_selector: u8,
122	/// The bundle index of the current PoV. `None` when `BundleInfo` digest is absent.
123	pub bundle_index: u8,
124	/// Cumulative count of UMP messages sent in this PoV.
125	pub ump_msg_count: u32,
126	/// Cumulative count of HRMP outbound messages sent in this PoV.
127	pub hrmp_outbound_count: u32,
128	/// Recipients already used for HRMP outbound messages in this PoV.
129	pub hrmp_outbound_recipients: Vec<ParaId>,
130}
131
132/// Something that can check the associated relay block number.
133///
134/// Each Parachain block is built in the context of a relay chain block, this trait allows us
135/// to validate the given relay chain block number. With async backing it is legal to build
136/// multiple Parachain blocks per relay chain parent. With this trait it is possible for the
137/// Parachain to ensure that still only one Parachain block is build per relay chain parent.
138///
139/// By default [`RelayNumberStrictlyIncreases`] and [`AnyRelayNumber`] are provided.
140pub trait CheckAssociatedRelayNumber {
141	/// Check the current relay number versus the previous relay number.
142	///
143	/// The implementation should panic when there is something wrong.
144	fn check_associated_relay_number(
145		current: RelayChainBlockNumber,
146		previous: RelayChainBlockNumber,
147	);
148}
149
150/// Provides an implementation of [`CheckAssociatedRelayNumber`].
151///
152/// It will ensure that the associated relay block number strictly increases between Parachain
153/// blocks. This should be used by production Parachains when in doubt.
154pub struct RelayNumberStrictlyIncreases;
155
156impl CheckAssociatedRelayNumber for RelayNumberStrictlyIncreases {
157	fn check_associated_relay_number(
158		current: RelayChainBlockNumber,
159		previous: RelayChainBlockNumber,
160	) {
161		if current <= previous {
162			panic!("Relay chain block number needs to strictly increase between Parachain blocks!")
163		}
164	}
165}
166
167/// Provides an implementation of [`CheckAssociatedRelayNumber`].
168///
169/// This will accept any relay chain block number combination. This is mainly useful for
170/// test parachains.
171pub struct AnyRelayNumber;
172
173impl CheckAssociatedRelayNumber for AnyRelayNumber {
174	fn check_associated_relay_number(_: RelayChainBlockNumber, _: RelayChainBlockNumber) {}
175}
176
177/// Provides an implementation of [`CheckAssociatedRelayNumber`].
178///
179/// It will ensure that the associated relay block number monotonically increases between Parachain
180/// blocks. This should be used when asynchronous backing is enabled.
181pub struct RelayNumberMonotonicallyIncreases;
182
183impl CheckAssociatedRelayNumber for RelayNumberMonotonicallyIncreases {
184	fn check_associated_relay_number(
185		current: RelayChainBlockNumber,
186		previous: RelayChainBlockNumber,
187	) {
188		if current < previous {
189			panic!(
190				"Relay chain block number needs to monotonically increase between Parachain blocks!"
191			)
192		}
193	}
194}
195
196/// The max length of a DMP message.
197pub type MaxDmpMessageLenOf<T> = <<T as Config>::DmpQueue as HandleMessage>::MaxMessageLen;
198
199pub mod ump_constants {
200	/// `host_config.max_upward_queue_size / THRESHOLD_FACTOR` is the threshold after which delivery
201	/// starts getting exponentially more expensive.
202	/// `2` means the price starts to increase when queue is half full.
203	pub const THRESHOLD_FACTOR: u32 = 2;
204}
205
206#[frame_support::pallet]
207pub mod pallet {
208	use super::*;
209	use codec::Compact;
210	use cumulus_primitives_core::CoreInfoExistsAtMaxOnce;
211	use frame_support::pallet_prelude::{ValueQuery, *};
212	use frame_system::pallet_prelude::*;
213
214	#[pallet::pallet]
215	#[pallet::storage_version(migration::STORAGE_VERSION)]
216	#[pallet::without_storage_info]
217	pub struct Pallet<T>(_);
218
219	#[pallet::config]
220	pub trait Config: frame_system::Config<OnSetCode = ParachainSetCode<Self>> {
221		/// The overarching event type.
222		#[allow(deprecated)]
223		type RuntimeEvent: From<Event<Self>> + IsType<<Self as frame_system::Config>::RuntimeEvent>;
224
225		/// Something which can be notified when the validation data is set.
226		type OnSystemEvent: OnSystemEvent;
227
228		/// Returns the parachain ID we are running with.
229		#[pallet::constant]
230		type SelfParaId: Get<ParaId>;
231
232		/// The place where outbound XCMP messages come from. This is queried in `finalize_block`.
233		type OutboundXcmpMessageSource: XcmpMessageSource;
234
235		/// Queues inbound downward messages for delayed processing.
236		///
237		/// All inbound DMP messages from the relay are pushed into this. The handler is expected to
238		/// eventually process all the messages that are pushed to it.
239		type DmpQueue: HandleMessage;
240
241		/// The weight we reserve at the beginning of the block for processing DMP messages.
242		type ReservedDmpWeight: Get<Weight>;
243
244		/// The message handler that will be invoked when messages are received via XCMP.
245		///
246		/// This should normally link to the XCMP Queue pallet.
247		type XcmpMessageHandler: XcmpMessageHandler;
248
249		/// The weight we reserve at the beginning of the block for processing XCMP messages.
250		type ReservedXcmpWeight: Get<Weight>;
251
252		/// Something that can check the associated relay parent block number.
253		type CheckAssociatedRelayNumber: CheckAssociatedRelayNumber;
254
255		/// Weight info for functions and calls.
256		type WeightInfo: WeightInfo;
257
258		/// An entry-point for higher-level logic to manage the backlog of unincluded parachain
259		/// blocks and authorship rights for those blocks.
260		///
261		/// Typically, this should be a hook tailored to the collator-selection/consensus mechanism
262		/// that is used for this chain.
263		///
264		/// However, to maintain the same behavior as prior to asynchronous backing, provide the
265		/// [`consensus_hook::ExpectParentIncluded`] here. This is only necessary in the case
266		/// that collators aren't expected to have node versions that supply the included block
267		/// in the relay-chain state proof.
268		type ConsensusHook: ConsensusHook;
269
270		/// The offset between the tip of the relay chain and the parent relay block used as parent
271		/// when authoring a parachain block.
272		///
273		/// This setting directly impacts the number of descendant headers that are expected in the
274		/// `set_validation_data` inherent.
275		///
276		/// For any setting `N` larger than zero, the inherent expects that the inherent includes
277		/// the relay parent plus `N` descendants. These headers are required to validate that new
278		/// parachain blocks are authored at the correct offset.
279		///
280		/// While this helps to reduce forks on the parachain side, it increases the delay for
281		/// processing XCM messages. So, the value should be chosen wisely.
282		///
283		/// If set to 0, this config has no impact.
284		type RelayParentOffset: Get<u32>;
285
286		/// Verifier for V3 scheduling proofs.
287		///
288		/// Reports whether V3 scheduling validation is enabled and supplies the
289		/// verification logic for the proof itself. Use `()` to keep V3 scheduling
290		/// disabled.
291		///
292		/// When enabled, this changes how building on older relay parents is enforced:
293		/// - The old `relay_parent_descendants` validation in the inherent is disabled
294		/// - V3 scheduling validation is used instead, with the header chain provided via PVF
295		///   parameters
296		///
297		/// # Migration Guide
298		///
299		/// v3 scheduling is work in progress, and for the moment this should be left as
300		/// `()`. If V3 is wrongfully enabled, the parachain will stall.
301		///
302		/// Before enabling this:
303		/// 1. Ensure all collators are updated to a version that supports V3 candidates
304		/// 2. Ensure the relay chain has `CandidateReceiptV3` node feature enabled
305		/// 3. Swap the verifier for one whose `V3_SCHEDULING_ENABLED` const is `true`, via a
306		///    runtime upgrade.
307		///
308		/// Once enabled, collators will:
309		/// - Stop providing `relay_parent_descendants` in the inherent (empty vec)
310		/// - Provide the header chain via V3 extension in PVF parameters
311		///
312		/// The `RelayParentOffset` config continues to define the header chain length.
313		type SchedulingSignatureVerifier: cumulus_primitives_core::VerifySchedulingSignature;
314	}
315
316	#[pallet::hooks]
317	impl<T: Config> Hooks<BlockNumberFor<T>> for Pallet<T> {
318		/// Handles actually sending upward messages by moving them from `PendingUpwardMessages` to
319		/// `UpwardMessages`. Decreases the delivery fee factor if after sending messages, the queue
320		/// total size is less than the threshold (see [`ump_constants::THRESHOLD_FACTOR`]).
321		/// Also does the sending for HRMP messages it takes from `OutboundXcmpMessageSource`.
322		fn on_finalize(_: BlockNumberFor<T>) {
323			<DidSetValidationCode<T>>::kill();
324			<UpgradeRestrictionSignal<T>>::kill();
325			let relay_upgrade_go_ahead = <UpgradeGoAhead<T>>::take();
326
327			let vfp = <ValidationData<T>>::get().expect(
328				r"Missing required set_validation_data inherent. This inherent must be
329				present in every block. This error typically occurs when the set_validation_data
330				execution failed and was rejected by the block builder. Check earlier log entries
331				for the specific cause of the failure.",
332			);
333
334			LastRelayChainBlockNumber::<T>::put(vfp.relay_parent_number);
335
336			let host_config = match HostConfiguration::<T>::get() {
337				Some(ok) => ok,
338				None => {
339					debug_assert!(
340						false,
341						"host configuration is promised to set until `on_finalize`; qed",
342					);
343					return;
344				},
345			};
346
347			// Before updating the relevant messaging state, we need to extract
348			// the total bandwidth limits for the purpose of updating the unincluded
349			// segment.
350			let total_bandwidth_out = match RelevantMessagingState::<T>::get() {
351				Some(s) => OutboundBandwidthLimits::from_relay_chain_state(&s),
352				None => {
353					debug_assert!(
354						false,
355						"relevant messaging state is promised to be set until `on_finalize`; \
356							qed",
357					);
358					return;
359				},
360			};
361
362			// After this point, the `RelevantMessagingState` in storage reflects the
363			// unincluded segment.
364			Self::adjust_egress_bandwidth_limits();
365
366			let current_core_selector =
367				CumulusDigestItem::find_core_info(&frame_system::Pallet::<T>::digest())
368					.map_or(0, |ci| ci.selector.0);
369
370			let current_bundle_index =
371				CumulusDigestItem::find_block_bundle_info(&frame_system::Pallet::<T>::digest())
372					.map_or(0, |bi| bi.index);
373
374			let mut pov_tracker = PoVMessagesTracker::<T>::get()
375				.filter(|tracker| {
376					// If the relay parent changes, this is for sure a different `PoV`.
377					tracker.relay_storage_root_or_hash == vfp.relay_parent_storage_root &&
378					// A different core selector also means we are on a different `PoV`.
379					tracker.core_selector == current_core_selector &&
380					// The bundle index needs to increase, or we are in a different `PoV`.
381					current_bundle_index > tracker.bundle_index
382				})
383				.unwrap_or_default();
384
385			pov_tracker.bundle_index = current_bundle_index;
386			pov_tracker.core_selector = current_core_selector;
387			pov_tracker.relay_storage_root_or_hash = vfp.relay_parent_storage_root;
388
389			let (ump_msg_count, ump_total_bytes) = <PendingUpwardMessages<T>>::mutate(|up| {
390				let (available_capacity, available_size) = match RelevantMessagingState::<T>::get()
391				{
392					Some(limits) => (
393						limits.relay_dispatch_queue_remaining_capacity.remaining_count,
394						limits.relay_dispatch_queue_remaining_capacity.remaining_size,
395					),
396					None => {
397						debug_assert!(
398							false,
399							"relevant messaging state is promised to be set until `on_finalize`; \
400								qed",
401						);
402						return (0, 0);
403					},
404				};
405
406				let available_capacity = cmp::min(
407					available_capacity,
408					host_config
409						.max_upward_message_num_per_candidate
410						.saturating_sub(pov_tracker.ump_msg_count),
411				);
412
413				// Count the number of messages we can possibly fit in the given constraints, i.e.
414				// available_capacity and available_size.
415				let (num, total_size) = up
416					.iter()
417					.scan((0u32, 0u32), |state, msg| {
418						let (cap_used, size_used) = *state;
419						let new_cap = cap_used.saturating_add(1);
420						let new_size = size_used.saturating_add(msg.len() as u32);
421						match available_capacity
422							.checked_sub(new_cap)
423							.and(available_size.checked_sub(new_size))
424						{
425							Some(_) => {
426								*state = (new_cap, new_size);
427								Some(*state)
428							},
429							_ => None,
430						}
431					})
432					.last()
433					.unwrap_or_default();
434
435				// TODO: #274 Return back messages that do not longer fit into the queue.
436
437				UpwardMessages::<T>::put(&up[..num as usize]);
438				*up = up.split_off(num as usize);
439
440				pov_tracker.ump_msg_count = pov_tracker.ump_msg_count.saturating_add(num);
441
442				let digest = frame_system::Pallet::<T>::digest();
443
444				let core_info = CumulusDigestItem::find_core_info(&digest);
445				PreviousCoreCount::<T>::put(
446					core_info.as_ref().map_or(Compact(1u16), |ci| ci.number_of_cores),
447				);
448
449				// Only send UMP signals on the last block of a PoV.
450				// For single-block PoVs (no BlockBundleInfo), always send signals.
451				if CumulusDigestItem::is_last_block_in_core(&digest).unwrap_or(true) {
452					Self::send_ump_signals(core_info);
453				}
454
455				// If the total size of the pending messages is less than the threshold,
456				// we decrease the fee factor, since the queue is less congested.
457				// This makes delivery of new messages cheaper.
458				let threshold = host_config
459					.max_upward_queue_size
460					.saturating_div(ump_constants::THRESHOLD_FACTOR);
461				let remaining_total_size: usize = up.iter().map(UpwardMessage::len).sum();
462				if remaining_total_size <= threshold as usize {
463					Self::decrease_fee_factor(());
464				}
465
466				(num, total_size)
467			});
468
469			// Sending HRMP messages is a little bit more involved. There are the following
470			// constraints:
471			//
472			// - a channel should exist (and it can be closed while a message is buffered),
473			// - at most one message can be sent in a channel,
474			// - the sent out messages should be ordered by ascension of recipient para id.
475			// - the capacity and total size of the channel is limited,
476			// - the maximum size of a message is limited (and can potentially be changed),
477
478			let maximum_channels = host_config
479				.hrmp_max_message_num_per_candidate
480				.min(<AnnouncedHrmpMessagesPerCandidate<T>>::take())
481				as usize;
482
483			let maximum_channels =
484				maximum_channels.saturating_sub(pov_tracker.hrmp_outbound_count as usize);
485
486			// Note: this internally calls the `GetChannelInfo` implementation for this
487			// pallet, which draws on the `RelevantMessagingState`. That in turn has
488			// been adjusted above to reflect the correct limits in all channels.
489			let outbound_messages = T::OutboundXcmpMessageSource::take_outbound_messages(
490				maximum_channels,
491				&pov_tracker.hrmp_outbound_recipients,
492			)
493			.into_iter()
494			.map(|(recipient, data)| OutboundHrmpMessage { recipient, data })
495			.collect::<Vec<_>>();
496
497			pov_tracker
498				.hrmp_outbound_recipients
499				.extend(outbound_messages.iter().map(|m| m.recipient));
500			pov_tracker.hrmp_outbound_count =
501				pov_tracker.hrmp_outbound_count.saturating_add(outbound_messages.len() as u32);
502			PoVMessagesTracker::<T>::put(pov_tracker);
503
504			// Update the unincluded segment length; capacity checks were done previously in
505			// `set_validation_data`, so this can be done unconditionally.
506			{
507				let hrmp_outgoing = outbound_messages
508					.iter()
509					.map(|msg| {
510						(
511							msg.recipient,
512							HrmpChannelUpdate { msg_count: 1, total_bytes: msg.data.len() as u32 },
513						)
514					})
515					.collect();
516				let used_bandwidth =
517					UsedBandwidth { ump_msg_count, ump_total_bytes, hrmp_outgoing };
518
519				let mut aggregated_segment =
520					AggregatedUnincludedSegment::<T>::get().unwrap_or_default();
521				let consumed_go_ahead_signal =
522					if aggregated_segment.consumed_go_ahead_signal().is_some() {
523						// Some ancestor within the segment already processed this signal --
524						// validated during inherent creation.
525						None
526					} else {
527						relay_upgrade_go_ahead
528					};
529				// The bandwidth constructed was ensured to satisfy relay chain constraints.
530				let ancestor = Ancestor::new_unchecked(used_bandwidth, consumed_go_ahead_signal);
531
532				let watermark = HrmpWatermark::<T>::get();
533				let watermark_update = HrmpWatermarkUpdate::new(watermark, vfp.relay_parent_number);
534
535				aggregated_segment
536					.append(&ancestor, watermark_update, &total_bandwidth_out)
537					.expect("unincluded segment limits exceeded");
538				AggregatedUnincludedSegment::<T>::put(aggregated_segment);
539				// Check in `on_initialize` guarantees there's space for this block.
540				UnincludedSegment::<T>::append(ancestor);
541			}
542
543			HrmpOutboundMessages::<T>::put(outbound_messages);
544		}
545
546		fn on_initialize(_n: BlockNumberFor<T>) -> Weight {
547			let mut weight = Weight::zero();
548
549			// To prevent removing `NewValidationCode` that was set by another `on_initialize`
550			// like for example from scheduler, we only kill the storage entry if it was not yet
551			// updated in the current block.
552			if !<DidSetValidationCode<T>>::get() {
553				// NOTE: Killing here is required to at least include the trie nodes down to the key
554				// in the proof. Because this value will be read in `validate_block` and thus,
555				// needs to be reachable by the proof.
556				NewValidationCode::<T>::kill();
557				weight += T::DbWeight::get().writes(1);
558			}
559
560			// The parent hash was unknown during block finalization. Update it here.
561			{
562				<UnincludedSegment<T>>::mutate(|chain| {
563					if let Some(ancestor) = chain.last_mut() {
564						let parent = frame_system::Pallet::<T>::parent_hash();
565						// Ancestor is the latest finalized block, thus current parent is
566						// its output head.
567						ancestor.replace_para_head_hash(parent);
568					}
569				});
570				weight += T::DbWeight::get().reads_writes(1, 1);
571
572				// Weight used during finalization.
573				weight += T::DbWeight::get().reads_writes(3, 2);
574			}
575
576			BlockWeightMode::<T>::kill();
577
578			// Remove the validation from the old block.
579			ValidationData::<T>::kill();
580			// NOTE: Killing here is required to at least include the trie nodes down to the keys
581			// in the proof. Because these values will be read in `validate_block` and thus,
582			// need to be reachable by the proof.
583			ProcessedDownwardMessages::<T>::kill();
584			UpwardMessages::<T>::kill();
585			HrmpOutboundMessages::<T>::kill();
586			CustomValidationHeadData::<T>::kill();
587			// The same as above. Reading here to make sure that the key is included in the proof.
588			HrmpWatermark::<T>::get();
589			weight += T::DbWeight::get().reads_writes(1, 5);
590
591			// Here, in `on_initialize` we must report the weight for both `on_initialize` and
592			// `on_finalize`.
593			//
594			// One complication here, is that the `host_configuration` is updated by an inherent
595			// and those are processed after the block initialization phase. Therefore, we have to
596			// be content only with the configuration as per the previous block. That means that
597			// the configuration can be either stale (or be absent altogether in case of the
598			// beginning of the chain).
599			//
600			// In order to mitigate this, we do the following. At the time, we are only concerned
601			// about `hrmp_max_message_num_per_candidate`. We reserve the amount of weight to
602			// process the number of HRMP messages according to the potentially stale
603			// configuration. In `on_finalize` we will process only the maximum between the
604			// announced number of messages and the actual received in the fresh configuration.
605			//
606			// In the common case, they will be the same. In the case the actual value is smaller
607			// than the announced, we would waste some of weight. In the case the actual value is
608			// greater than the announced, we will miss opportunity to send a couple of messages.
609			weight += T::DbWeight::get().reads_writes(1, 1);
610			let hrmp_max_message_num_per_candidate = HostConfiguration::<T>::get()
611				.map(|cfg| cfg.hrmp_max_message_num_per_candidate)
612				.unwrap_or(0);
613			<AnnouncedHrmpMessagesPerCandidate<T>>::put(hrmp_max_message_num_per_candidate);
614
615			// NOTE that the actual weight consumed by `on_finalize` may turn out lower.
616			weight += T::DbWeight::get().reads_writes(
617				3 + hrmp_max_message_num_per_candidate as u64,
618				4 + hrmp_max_message_num_per_candidate as u64,
619			);
620
621			// Weight for updating the last relay chain block number in `on_finalize`.
622			weight += T::DbWeight::get().reads_writes(1, 1);
623
624			// Weight for adjusting the unincluded segment in `on_finalize`.
625			weight += T::DbWeight::get().reads_writes(6, 3);
626
627			// Always try to read `UpgradeGoAhead` in `on_finalize`.
628			weight += T::DbWeight::get().reads(1);
629
630			// Ensure `CoreInfo` digest exists only once and validate claim_queue_offset.
631			//
632			// With V3: the collator looks up the claim queue at the scheduling parent
633			// (fresh tip), so the max offset is just the `max_claim_queue_offset()`.
634			// Without V3: the collator looks up at the relay parent which is offset
635			// behind the tip, so the effective max includes relay_parent_offset.
636			match CumulusDigestItem::core_info_exists_at_max_once(
637				&frame_system::Pallet::<T>::digest(),
638			) {
639				CoreInfoExistsAtMaxOnce::Once(core_info) => {
640					let mut max_allowed_offset = Self::max_claim_queue_offset();
641					if !T::SchedulingSignatureVerifier::V3_SCHEDULING_ENABLED {
642						max_allowed_offset = max_allowed_offset
643							.saturating_add(T::RelayParentOffset::get().saturated_into::<u8>())
644					}
645					assert!(
646						core_info.claim_queue_offset.0 <= max_allowed_offset,
647						"claim_queue_offset {} exceeds maximum allowed {}",
648						core_info.claim_queue_offset.0,
649						max_allowed_offset,
650					);
651				},
652				CoreInfoExistsAtMaxOnce::NotFound => {},
653				CoreInfoExistsAtMaxOnce::MoreThanOnce => {
654					panic!("`CumulusDigestItem::CoreInfo` must exist at max once.");
655				},
656			}
657
658			weight
659		}
660	}
661
662	#[pallet::call]
663	impl<T: Config> Pallet<T> {
664		/// Set the current validation data.
665		///
666		/// This should be invoked exactly once per block. It will panic at the finalization
667		/// phase if the call was not invoked.
668		///
669		/// The dispatch origin for this call must be `Inherent`
670		///
671		/// As a side effect, this function upgrades the current validation function
672		/// if the appropriate time has come.
673		#[pallet::call_index(0)]
674		#[pallet::weight((0, DispatchClass::Mandatory))]
675		// TODO: This weight should be corrected. Currently the weight is registered manually in the
676		// call with `register_extra_weight_unchecked`.
677		pub fn set_validation_data(
678			origin: OriginFor<T>,
679			data: BasicParachainInherentData,
680			inbound_messages_data: InboundMessagesData,
681		) -> DispatchResult {
682			ensure_none(origin)?;
683			assert!(
684				!<ValidationData<T>>::exists(),
685				"ValidationData must be updated only once in a block",
686			);
687
688			// TODO: This is more than zero, but will need benchmarking to figure out what.
689			let mut total_weight = Weight::zero();
690
691			// NOTE: the inherent data is expected to be unique, even if this block is build
692			// in the context of the same relay parent as the previous one. In particular,
693			// the inherent shouldn't contain messages that were already processed by any of the
694			// ancestors.
695			//
696			// This invariant should be upheld by the `ProvideInherent` implementation.
697			let BasicParachainInherentData {
698				validation_data: vfp,
699				relay_chain_state,
700				relay_parent_descendants,
701				collator_peer_id,
702			} = data;
703
704			// Check that the associated relay chain block number is as expected.
705			T::CheckAssociatedRelayNumber::check_associated_relay_number(
706				vfp.relay_parent_number,
707				LastRelayChainBlockNumber::<T>::get(),
708			);
709
710			let relay_state_proof = RelayChainStateProof::new(
711				T::SelfParaId::get(),
712				vfp.relay_parent_storage_root,
713				relay_chain_state.clone(),
714			)
715			.expect("Invalid relay chain state proof");
716
717			// Relay parent offset validation:
718			// When V3 scheduling is disabled: validate relay_parent_descendants (old mechanism)
719			// When V3 scheduling is enabled: skip this validation, V3 scheduling validation
720			// happens in validate_block with header chain from PVF params
721			let expected_rp_descendants_num = T::RelayParentOffset::get();
722			let v3_enabled = T::SchedulingSignatureVerifier::V3_SCHEDULING_ENABLED;
723
724			if expected_rp_descendants_num > 0 && !v3_enabled {
725				if let Err(err) = descendant_validation::verify_relay_parent_descendants(
726					&relay_state_proof,
727					relay_parent_descendants,
728					vfp.relay_parent_storage_root,
729					expected_rp_descendants_num,
730				) {
731					panic!(
732						"Unable to verify provided relay parent descendants. \
733						expected_rp_descendants_num: {expected_rp_descendants_num} \
734						error: {err:?}"
735					);
736				};
737			}
738
739			// Update the desired maximum capacity according to the consensus hook.
740			let (consensus_hook_weight, capacity) =
741				T::ConsensusHook::on_state_proof(&relay_state_proof);
742			total_weight += consensus_hook_weight;
743			total_weight += Self::maybe_drop_included_ancestors(&relay_state_proof, capacity);
744			// Deposit a log indicating the relay-parent storage root.
745			// TODO: remove this in favor of the relay-parent's hash after
746			// https://github.com/paritytech/cumulus/issues/303
747			frame_system::Pallet::<T>::deposit_log(
748				cumulus_primitives_core::rpsr_digest::relay_parent_storage_root_item(
749					vfp.relay_parent_storage_root,
750					vfp.relay_parent_number,
751				),
752			);
753
754			// Initialization logic: we know that this runs exactly once every block,
755			// which means we can put the initialization logic here to remove the
756			// sequencing problem.
757			let upgrade_go_ahead_signal = relay_state_proof
758				.read_upgrade_go_ahead_signal()
759				.expect("Invalid upgrade go ahead signal");
760
761			let upgrade_signal_in_segment = AggregatedUnincludedSegment::<T>::get()
762				.as_ref()
763				.and_then(SegmentTracker::consumed_go_ahead_signal);
764			if let Some(signal_in_segment) = upgrade_signal_in_segment.as_ref() {
765				// Unincluded ancestor consuming upgrade signal is still within the segment,
766				// sanity check that it matches with the signal from relay chain.
767				assert_eq!(upgrade_go_ahead_signal, Some(*signal_in_segment));
768			}
769			match upgrade_go_ahead_signal {
770				Some(_signal) if upgrade_signal_in_segment.is_some() => {
771					// Do nothing, processing logic was executed by unincluded ancestor.
772				},
773				Some(relay_chain::UpgradeGoAhead::GoAhead) => {
774					assert!(
775						<PendingValidationCode<T>>::exists(),
776						"No new validation function found in storage, GoAhead signal is not expected",
777					);
778					let validation_code = <PendingValidationCode<T>>::take();
779
780					frame_system::Pallet::<T>::update_code_in_storage(&validation_code);
781					<T::OnSystemEvent as OnSystemEvent>::on_validation_code_applied();
782					Self::deposit_event(Event::ValidationFunctionApplied {
783						relay_chain_block_num: vfp.relay_parent_number,
784					});
785				},
786				Some(relay_chain::UpgradeGoAhead::Abort) => {
787					<PendingValidationCode<T>>::kill();
788					Self::deposit_event(Event::ValidationFunctionDiscarded);
789				},
790				None => {},
791			}
792			<UpgradeRestrictionSignal<T>>::put(
793				relay_state_proof
794					.read_upgrade_restriction_signal()
795					.expect("Invalid upgrade restriction signal"),
796			);
797			<UpgradeGoAhead<T>>::put(upgrade_go_ahead_signal);
798
799			let host_config = relay_state_proof
800				.read_abridged_host_configuration()
801				.expect("Invalid host configuration in relay chain state proof");
802
803			let relevant_messaging_state = relay_state_proof
804				.read_messaging_state_snapshot(&host_config)
805				.expect("Invalid messaging state in relay chain state proof");
806
807			<ValidationData<T>>::put(&vfp);
808			<RelayStateProof<T>>::put(relay_chain_state);
809			<RelevantMessagingState<T>>::put(relevant_messaging_state.clone());
810			<HostConfiguration<T>>::put(host_config);
811
812			total_weight.saturating_accrue(
813				<T::OnSystemEvent as OnSystemEvent>::on_relay_state_proof(&relay_state_proof),
814			);
815
816			<T::OnSystemEvent as OnSystemEvent>::on_validation_data(&vfp);
817
818			match collator_peer_id {
819				Some(peer_id) => PendingApprovedPeer::<T>::put(peer_id),
820				None => PendingApprovedPeer::<T>::kill(),
821			}
822
823			total_weight.saturating_accrue(Self::enqueue_inbound_downward_messages(
824				relevant_messaging_state.dmq_mqc_head,
825				inbound_messages_data.downward_messages,
826			));
827			total_weight.saturating_accrue(Self::enqueue_inbound_horizontal_messages(
828				&relevant_messaging_state.ingress_channels,
829				inbound_messages_data.horizontal_messages,
830				vfp.relay_parent_number,
831			));
832
833			frame_system::Pallet::<T>::register_extra_weight_unchecked(
834				total_weight,
835				DispatchClass::Mandatory,
836			);
837
838			Ok(())
839		}
840
841		#[pallet::call_index(1)]
842		#[pallet::weight((1_000, DispatchClass::Operational))]
843		pub fn sudo_send_upward_message(
844			origin: OriginFor<T>,
845			message: UpwardMessage,
846		) -> DispatchResult {
847			ensure_root(origin)?;
848			let _ = Self::send_upward_message(message);
849			Ok(())
850		}
851
852		// WARNING: call indices 2 and 3 were used in a former version of this pallet. Using them
853		// again will require to bump the transaction version of runtimes using this pallet.
854	}
855
856	#[pallet::event]
857	#[pallet::generate_deposit(pub(super) fn deposit_event)]
858	pub enum Event<T: Config> {
859		/// The validation function has been scheduled to apply.
860		ValidationFunctionStored,
861		/// The validation function was applied as of the contained relay chain block number.
862		ValidationFunctionApplied { relay_chain_block_num: RelayChainBlockNumber },
863		/// The relay-chain aborted the upgrade process.
864		ValidationFunctionDiscarded,
865		/// Some downward messages have been received and will be processed.
866		DownwardMessagesReceived { count: u32 },
867		/// Downward messages were processed using the given weight.
868		DownwardMessagesProcessed { weight_used: Weight, dmq_head: relay_chain::Hash },
869		/// An upward message was sent to the relay chain.
870		UpwardMessageSent { message_hash: Option<XcmHash> },
871	}
872
873	#[pallet::error]
874	pub enum Error<T> {
875		/// Attempt to upgrade validation function while existing upgrade pending.
876		OverlappingUpgrades,
877		/// Polkadot currently prohibits this parachain from upgrading its validation function.
878		ProhibitedByPolkadot,
879		/// The supplied validation function has compiled into a blob larger than Polkadot is
880		/// willing to run.
881		TooBig,
882		/// The inherent which supplies the validation data did not run this block.
883		ValidationDataNotAvailable,
884		/// The inherent which supplies the host configuration did not run this block.
885		HostConfigurationNotAvailable,
886		/// No validation function upgrade is currently scheduled.
887		NotScheduled,
888	}
889
890	/// The current block weight mode.
891	///
892	/// This is used to determine what is the maximum allowed block weight, for more information see
893	/// [`block_weight`].
894	///
895	/// Killed in [`Self::on_initialize`] and set by the [`block_weight`] logic.
896	#[pallet::storage]
897	#[pallet::whitelist_storage]
898	pub type BlockWeightMode<T: Config> =
899		StorageValue<_, block_weight::BlockWeightMode<T>, OptionQuery>;
900
901	/// The core count available to the parachain in the previous block.
902	///
903	/// This is mainly used for offchain functionality to calculate the correct target block weight.
904	#[pallet::storage]
905	#[pallet::whitelist_storage]
906	pub type PreviousCoreCount<T: Config> = StorageValue<_, Compact<u16>, OptionQuery>;
907
908	/// Latest included block descendants the runtime accepted. In other words, these are
909	/// ancestors of the currently executing block which have not been included in the observed
910	/// relay-chain state.
911	///
912	/// The segment length is limited by the capacity returned from the [`ConsensusHook`] configured
913	/// in the pallet.
914	#[pallet::storage]
915	pub type UnincludedSegment<T: Config> = StorageValue<_, Vec<Ancestor<T::Hash>>, ValueQuery>;
916
917	/// Storage field that keeps track of bandwidth used by the unincluded segment along with the
918	/// latest HRMP watermark. Used for limiting the acceptance of new blocks with
919	/// respect to relay chain constraints.
920	#[pallet::storage]
921	pub type AggregatedUnincludedSegment<T: Config> =
922		StorageValue<_, SegmentTracker<T::Hash>, OptionQuery>;
923
924	/// In case of a scheduled upgrade, this storage field contains the validation code to be
925	/// applied.
926	///
927	/// As soon as the relay chain gives us the go-ahead signal, we will overwrite the
928	/// [`:pending_code`][sp_core::storage::well_known_keys::PENDING_CODE] which will result the
929	/// next block to be processed with the new validation code. This concludes the upgrade process.
930	#[pallet::storage]
931	pub type PendingValidationCode<T: Config> = StorageValue<_, Vec<u8>, ValueQuery>;
932
933	/// Validation code that is set by the parachain and is to be communicated to collator and
934	/// consequently the relay-chain.
935	///
936	/// This will be cleared in `on_initialize` of each new block if no other pallet already set
937	/// the value.
938	#[pallet::storage]
939	pub type NewValidationCode<T: Config> = StorageValue<_, Vec<u8>, OptionQuery>;
940
941	/// The [`PersistedValidationData`] set for this block.
942	///
943	/// This value is expected to be set only once by the [`Pallet::set_validation_data`] inherent.
944	#[pallet::storage]
945	pub type ValidationData<T: Config> = StorageValue<_, PersistedValidationData>;
946
947	/// Were the validation data set to notify the relay chain?
948	#[pallet::storage]
949	pub type DidSetValidationCode<T: Config> = StorageValue<_, bool, ValueQuery>;
950
951	/// The relay chain block number associated with the last parachain block.
952	///
953	/// This is updated in `on_finalize`.
954	#[pallet::storage]
955	pub type LastRelayChainBlockNumber<T: Config> =
956		StorageValue<_, RelayChainBlockNumber, ValueQuery>;
957
958	/// An option which indicates if the relay-chain restricts signalling a validation code upgrade.
959	/// In other words, if this is `Some` and [`NewValidationCode`] is `Some` then the produced
960	/// candidate will be invalid.
961	///
962	/// This storage item is a mirror of the corresponding value for the current parachain from the
963	/// relay-chain. This value is ephemeral which means it doesn't hit the storage. This value is
964	/// set after the inherent.
965	#[pallet::storage]
966	pub type UpgradeRestrictionSignal<T: Config> =
967		StorageValue<_, Option<relay_chain::UpgradeRestriction>, ValueQuery>;
968
969	/// Optional upgrade go-ahead signal from the relay-chain.
970	///
971	/// This storage item is a mirror of the corresponding value for the current parachain from the
972	/// relay-chain. This value is ephemeral which means it doesn't hit the storage. This value is
973	/// set after the inherent.
974	#[pallet::storage]
975	pub type UpgradeGoAhead<T: Config> =
976		StorageValue<_, Option<relay_chain::UpgradeGoAhead>, ValueQuery>;
977
978	/// The state proof for the last relay parent block.
979	///
980	/// This field is meant to be updated each block with the validation data inherent. Therefore,
981	/// before processing of the inherent, e.g. in `on_initialize` this data may be stale.
982	///
983	/// This data is also absent from the genesis.
984	#[pallet::storage]
985	pub type RelayStateProof<T: Config> = StorageValue<_, sp_trie::StorageProof>;
986
987	/// The snapshot of some state related to messaging relevant to the current parachain as per
988	/// the relay parent.
989	///
990	/// This field is meant to be updated each block with the validation data inherent. Therefore,
991	/// before processing of the inherent, e.g. in `on_initialize` this data may be stale.
992	///
993	/// This data is also absent from the genesis.
994	#[pallet::storage]
995	pub type RelevantMessagingState<T: Config> = StorageValue<_, MessagingStateSnapshot>;
996
997	/// The parachain host configuration that was obtained from the relay parent.
998	///
999	/// This field is meant to be updated each block with the validation data inherent. Therefore,
1000	/// before processing of the inherent, e.g. in `on_initialize` this data may be stale.
1001	///
1002	/// This data is also absent from the genesis.
1003	#[pallet::storage]
1004	#[pallet::disable_try_decode_storage]
1005	pub type HostConfiguration<T: Config> = StorageValue<_, AbridgedHostConfiguration>;
1006
1007	/// The last downward message queue chain head we have observed.
1008	///
1009	/// This value is loaded before and saved after processing inbound downward messages carried
1010	/// by the system inherent.
1011	#[pallet::storage]
1012	pub type LastDmqMqcHead<T: Config> = StorageValue<_, MessageQueueChain, ValueQuery>;
1013
1014	/// The message queue chain heads we have observed per each channel incoming channel.
1015	///
1016	/// This value is loaded before and saved after processing inbound downward messages carried
1017	/// by the system inherent.
1018	#[pallet::storage]
1019	pub type LastHrmpMqcHeads<T: Config> =
1020		StorageValue<_, BTreeMap<ParaId, MessageQueueChain>, ValueQuery>;
1021
1022	/// Number of downward messages processed in a block.
1023	///
1024	/// This will be cleared in `on_initialize` of each new block.
1025	#[pallet::storage]
1026	pub type ProcessedDownwardMessages<T: Config> = StorageValue<_, u32, ValueQuery>;
1027
1028	/// The last processed downward message.
1029	///
1030	/// We need to keep track of this to filter the messages that have been already processed.
1031	#[pallet::storage]
1032	pub type LastProcessedDownwardMessage<T: Config> = StorageValue<_, InboundMessageId>;
1033
1034	/// HRMP watermark that was set in a block.
1035	#[pallet::storage]
1036	pub type HrmpWatermark<T: Config> = StorageValue<_, relay_chain::BlockNumber, ValueQuery>;
1037
1038	/// The last processed HRMP message.
1039	///
1040	/// We need to keep track of this to filter the messages that have been already processed.
1041	#[pallet::storage]
1042	pub type LastProcessedHrmpMessage<T: Config> = StorageValue<_, InboundMessageId>;
1043
1044	/// HRMP messages that were sent in a block.
1045	///
1046	/// This will be cleared in `on_initialize` of each new block.
1047	#[pallet::storage]
1048	pub type HrmpOutboundMessages<T: Config> =
1049		StorageValue<_, Vec<OutboundHrmpMessage>, ValueQuery>;
1050
1051	/// Upward messages that were sent in a block.
1052	///
1053	/// This will be cleared in `on_initialize` for each new block.
1054	#[pallet::storage]
1055	pub type UpwardMessages<T: Config> = StorageValue<_, Vec<UpwardMessage>, ValueQuery>;
1056
1057	/// Upward messages that are still pending and not yet sent to the relay chain.
1058	#[pallet::storage]
1059	pub type PendingUpwardMessages<T: Config> = StorageValue<_, Vec<UpwardMessage>, ValueQuery>;
1060
1061	/// Upward signals that are still pending and not yet sent to the relay chain.
1062	///
1063	/// This will be cleared in `on_finalize` for each block.
1064	#[pallet::storage]
1065	pub type PendingUpwardSignals<T: Config> = StorageValue<_, Vec<UpwardMessage>, ValueQuery>;
1066
1067	/// The approved peer id to be sent as a UMP signal on the last block of the PoV.
1068	#[pallet::storage]
1069	pub type PendingApprovedPeer<T: Config> =
1070		StorageValue<_, relay_chain::ApprovedPeerId, OptionQuery>;
1071
1072	/// The factor to multiply the base delivery fee by for UMP.
1073	#[pallet::storage]
1074	pub type UpwardDeliveryFeeFactor<T: Config> =
1075		StorageValue<_, FixedU128, ValueQuery, GetMinFeeFactor<Pallet<T>>>;
1076
1077	/// The number of HRMP messages we observed in `on_initialize` and thus used that number for
1078	/// announcing the weight of `on_initialize` and `on_finalize`.
1079	#[pallet::storage]
1080	pub type AnnouncedHrmpMessagesPerCandidate<T: Config> = StorageValue<_, u32, ValueQuery>;
1081
1082	/// The weight we reserve at the beginning of the block for processing XCMP messages. This
1083	/// overrides the amount set in the Config trait.
1084	#[pallet::storage]
1085	pub type ReservedXcmpWeightOverride<T: Config> = StorageValue<_, Weight>;
1086
1087	/// The weight we reserve at the beginning of the block for processing DMP messages. This
1088	/// overrides the amount set in the Config trait.
1089	#[pallet::storage]
1090	pub type ReservedDmpWeightOverride<T: Config> = StorageValue<_, Weight>;
1091
1092	/// A custom head data that should be returned as result of `validate_block`.
1093	///
1094	/// See `Pallet::set_custom_validation_head_data` for more information.
1095	#[pallet::storage]
1096	pub type CustomValidationHeadData<T: Config> = StorageValue<_, Vec<u8>, OptionQuery>;
1097
1098	/// Tracks cumulative `UMP` and `HRMP` messages sent across blocks in the current `PoV`.
1099	///
1100	/// Across different candidates/PoVs the budgets are tracked by [`AggregatedUnincludedSegment`].
1101	#[pallet::storage]
1102	pub type PoVMessagesTracker<T: Config> = StorageValue<_, PoVMessages, OptionQuery>;
1103
1104	#[pallet::inherent]
1105	impl<T: Config> ProvideInherent for Pallet<T> {
1106		type Call = Call<T>;
1107		type Error = sp_inherents::MakeFatalError<()>;
1108		const INHERENT_IDENTIFIER: InherentIdentifier =
1109			cumulus_primitives_parachain_inherent::INHERENT_IDENTIFIER;
1110
1111		fn create_inherent(data: &InherentData) -> Option<Self::Call> {
1112			let data = match data
1113				.get_data::<ParachainInherentData>(&Self::INHERENT_IDENTIFIER)
1114				.ok()
1115				.flatten()
1116			{
1117				None => {
1118					// Key Self::INHERENT_IDENTIFIER is expected to contain versioned inherent
1119					// data. Older nodes are unaware of the new format and might provide the
1120					// legacy data format. We try to load it and transform it into the current
1121					// version.
1122					let data = data
1123						.get_data::<v0::ParachainInherentData>(
1124							&cumulus_primitives_parachain_inherent::PARACHAIN_INHERENT_IDENTIFIER_V0,
1125						)
1126						.ok()
1127						.flatten()?;
1128					data.into()
1129				},
1130				Some(data) => data,
1131			};
1132
1133			Some(Self::do_create_inherent(data))
1134		}
1135
1136		fn is_inherent(call: &Self::Call) -> bool {
1137			matches!(call, Call::set_validation_data { .. })
1138		}
1139	}
1140
1141	#[pallet::genesis_config]
1142	#[derive(frame_support::DefaultNoBound)]
1143	pub struct GenesisConfig<T: Config> {
1144		#[serde(skip)]
1145		pub _config: core::marker::PhantomData<T>,
1146	}
1147
1148	#[pallet::genesis_build]
1149	impl<T: Config> BuildGenesisConfig for GenesisConfig<T> {
1150		fn build(&self) {
1151			// TODO: Remove after https://github.com/paritytech/cumulus/issues/479
1152			sp_io::storage::set(b":c", &[]);
1153		}
1154	}
1155}
1156
1157impl<T: Config> Pallet<T> {
1158	/// Get the unincluded segment size after the given hash.
1159	///
1160	/// If the unincluded segment doesn't contain the given hash, this returns the
1161	/// length of the entire unincluded segment.
1162	///
1163	/// This is intended to be used for determining how long the unincluded segment _would be_
1164	/// in runtime APIs related to authoring.
1165	pub fn unincluded_segment_size_after(included_hash: T::Hash) -> u32 {
1166		let segment = UnincludedSegment::<T>::get();
1167		crate::unincluded_segment::size_after_included(included_hash, &segment)
1168	}
1169
1170	/// Returns the configured maximum claim queue offset.
1171	///
1172	/// This is used by the [cumulus_primitives_core::RelayParentOffsetApi::max_claim_queue_offset]
1173	/// runtime API to expose the value to collators.
1174	pub fn max_claim_queue_offset() -> u8 {
1175		if !T::SchedulingSignatureVerifier::V3_SCHEDULING_ENABLED {
1176			return 1;
1177		}
1178
1179		2
1180	}
1181}
1182
1183impl<T: Config> FeeTracker for Pallet<T> {
1184	type Id = ();
1185
1186	fn get_fee_factor(_id: Self::Id) -> FixedU128 {
1187		UpwardDeliveryFeeFactor::<T>::get()
1188	}
1189
1190	fn set_fee_factor(_id: Self::Id, val: FixedU128) {
1191		UpwardDeliveryFeeFactor::<T>::set(val);
1192	}
1193}
1194
1195impl<T: Config> ListChannelInfos for Pallet<T> {
1196	fn outgoing_channels() -> Vec<ParaId> {
1197		let Some(state) = RelevantMessagingState::<T>::get() else { return Vec::new() };
1198		state.egress_channels.into_iter().map(|(id, _)| id).collect()
1199	}
1200}
1201
1202impl<T: Config> GetChannelInfo for Pallet<T> {
1203	fn get_channel_status(id: ParaId) -> ChannelStatus {
1204		// Note, that we are using `relevant_messaging_state` which may be from the previous
1205		// block, in case this is called from `on_initialize`, i.e. before the inherent with
1206		// fresh data is submitted.
1207		//
1208		// That shouldn't be a problem though because this is anticipated and already can
1209		// happen. This is because sending implies that a message is buffered until there is
1210		// space to send a message in the candidate. After a while waiting in a buffer, it may
1211		// be discovered that the channel to which a message were addressed is now closed.
1212		// Another possibility, is that the maximum message size was decreased so that a
1213		// message in the buffer doesn't fit. Should any of that happen the sender should be
1214		// notified about the message was discarded.
1215		//
1216		// Here it a similar case, with the difference that the realization that the channel is
1217		// closed came the same block.
1218		let channels = match RelevantMessagingState::<T>::get() {
1219			None => {
1220				log::warn!("calling `get_channel_status` with no RelevantMessagingState?!");
1221				return ChannelStatus::Closed;
1222			},
1223			Some(d) => d.egress_channels,
1224		};
1225		// ^^^ NOTE: This storage field should carry over from the previous block. So if it's
1226		// None then it must be that this is an edge-case where a message is attempted to be
1227		// sent at the first block. It should be safe to assume that there are no channels
1228		// opened at all so early. At least, relying on this assumption seems to be a better
1229		// trade-off, compared to introducing an error variant that the clients should be
1230		// prepared to handle.
1231		let index = match channels.binary_search_by_key(&id, |item| item.0) {
1232			Err(_) => return ChannelStatus::Closed,
1233			Ok(i) => i,
1234		};
1235		let meta = &channels[index].1;
1236		if meta.msg_count + 1 > meta.max_capacity {
1237			// The channel is at its capacity. Skip it for now.
1238			return ChannelStatus::Full;
1239		}
1240		let max_size_now = meta.max_total_size - meta.total_size;
1241		let max_size_ever = meta.max_message_size;
1242		ChannelStatus::Ready(max_size_now as usize, max_size_ever as usize)
1243	}
1244
1245	fn get_channel_info(id: ParaId) -> Option<ChannelInfo> {
1246		let channels = RelevantMessagingState::<T>::get()?.egress_channels;
1247		let index = channels.binary_search_by_key(&id, |item| item.0).ok()?;
1248		let info = ChannelInfo {
1249			max_capacity: channels[index].1.max_capacity,
1250			max_total_size: channels[index].1.max_total_size,
1251			max_message_size: channels[index].1.max_message_size,
1252			msg_count: channels[index].1.msg_count,
1253			total_size: channels[index].1.total_size,
1254		};
1255		Some(info)
1256	}
1257}
1258
1259impl<T: Config> Pallet<T> {
1260	/// The bandwidth limit per block that applies when receiving messages from the relay chain via
1261	/// DMP or XCMP.
1262	///
1263	/// The limit is per message passing mechanism (e.g. 1 MiB for DMP, 1 MiB for XCMP).
1264	///
1265	/// The purpose of this limit is to make sure that the total size of the messages received by
1266	/// the parachain from the relay chain doesn't exceed the block size. Currently each message
1267	/// passing mechanism can use 1/6 of the total block PoV which means that in total 1/3
1268	/// of the block PoV can be used for message passing.
1269	fn messages_collection_size_limit() -> usize {
1270		let max_block_weight = <T as frame_system::Config>::BlockWeights::get().max_block;
1271		let max_block_pov = max_block_weight.proof_size();
1272
1273		let remaining_proof_size =
1274			frame_system::Pallet::<T>::remaining_block_weight().remaining().proof_size();
1275
1276		(max_block_pov / 6).min(remaining_proof_size).saturated_into()
1277	}
1278
1279	/// Updates inherent data to only include the messages that weren't already processed
1280	/// by the runtime and to compress (hash) the messages that exceed the allocated size.
1281	///
1282	/// This method doesn't check for mqc heads mismatch. If the MQC doesn't match after
1283	/// dropping messages, the runtime will panic when executing the inherent.
1284	fn do_create_inherent(data: ParachainInherentData) -> Call<T> {
1285		let (data, mut downward_messages, mut horizontal_messages) =
1286			deconstruct_parachain_inherent_data(data);
1287		let last_relay_block_number = LastRelayChainBlockNumber::<T>::get();
1288
1289		let messages_collection_size_limit = Self::messages_collection_size_limit();
1290		// DMQ.
1291		let last_processed_msg = LastProcessedDownwardMessage::<T>::get()
1292			.unwrap_or(InboundMessageId { sent_at: last_relay_block_number, reverse_idx: 0 });
1293		downward_messages.drop_processed_messages(&last_processed_msg);
1294		let mut size_limit = messages_collection_size_limit;
1295		let downward_messages = downward_messages.into_abridged(&mut size_limit);
1296
1297		// HRMP.
1298		let last_processed_msg = LastProcessedHrmpMessage::<T>::get()
1299			.unwrap_or(InboundMessageId { sent_at: last_relay_block_number, reverse_idx: 0 });
1300		horizontal_messages.drop_processed_messages(&last_processed_msg);
1301		size_limit = size_limit.saturating_add(messages_collection_size_limit);
1302		let horizontal_messages = horizontal_messages.into_abridged(&mut size_limit);
1303
1304		let inbound_messages_data =
1305			InboundMessagesData::new(downward_messages, horizontal_messages);
1306
1307		Call::set_validation_data { data, inbound_messages_data }
1308	}
1309
1310	/// Enqueue all inbound downward messages relayed by the collator into the MQ pallet.
1311	///
1312	/// Checks if the sequence of the messages is valid, dispatches them and communicates the
1313	/// number of processed messages to the collator via a storage update.
1314	///
1315	/// # Panics
1316	///
1317	/// If it turns out that after processing all messages the Message Queue Chain
1318	/// hash doesn't match the expected.
1319	fn enqueue_inbound_downward_messages(
1320		expected_dmq_mqc_head: relay_chain::Hash,
1321		downward_messages: AbridgedInboundDownwardMessages,
1322	) -> Weight {
1323		downward_messages.check_enough_messages_included_basic("DMQ");
1324
1325		let mut dmq_head = <LastDmqMqcHead<T>>::get();
1326
1327		let (messages, hashed_messages) = downward_messages.messages();
1328		let message_count = messages.len() as u32;
1329		let weight_used = T::WeightInfo::enqueue_inbound_downward_messages(message_count);
1330		if let Some(last_msg) = messages.last() {
1331			Self::deposit_event(Event::DownwardMessagesReceived { count: message_count });
1332
1333			// Eagerly update the MQC head hash:
1334			for msg in messages {
1335				dmq_head.extend_downward(msg);
1336			}
1337			<LastDmqMqcHead<T>>::put(&dmq_head);
1338			Self::deposit_event(Event::DownwardMessagesProcessed {
1339				weight_used,
1340				dmq_head: dmq_head.head(),
1341			});
1342
1343			let mut last_processed_msg =
1344				InboundMessageId { sent_at: last_msg.sent_at, reverse_idx: 0 };
1345			for msg in hashed_messages {
1346				dmq_head.extend_with_hashed_msg(msg);
1347
1348				if msg.sent_at == last_processed_msg.sent_at {
1349					last_processed_msg.reverse_idx += 1;
1350				}
1351			}
1352			LastProcessedDownwardMessage::<T>::put(last_processed_msg);
1353
1354			T::DmpQueue::handle_messages(downward_messages.bounded_msgs_iter());
1355		}
1356
1357		// After hashing each message in the message queue chain submitted by the collator, we
1358		// should arrive to the MQC head provided by the relay chain.
1359		//
1360		// A mismatch means that at least some of the submitted messages were altered, omitted or
1361		// added improperly.
1362		assert_eq!(dmq_head.head(), expected_dmq_mqc_head, "DMQ head mismatch");
1363
1364		ProcessedDownwardMessages::<T>::put(message_count);
1365
1366		weight_used
1367	}
1368
1369	fn get_ingress_channel_or_panic(
1370		ingress_channels: &[(ParaId, cumulus_primitives_core::AbridgedHrmpChannel)],
1371		sender: ParaId,
1372	) -> &cumulus_primitives_core::AbridgedHrmpChannel {
1373		let maybe_channel_idx = ingress_channels
1374			.binary_search_by_key(&sender, |&(channel_sender, _)| channel_sender)
1375			.ok();
1376		let maybe_channel = maybe_channel_idx
1377			.and_then(|channel_idx| ingress_channels.get(channel_idx))
1378			.map(|(_, channel)| channel);
1379		maybe_channel.unwrap_or_else(|| {
1380			panic!(
1381				"One of the messages submitted by the collator was sent from a sender ({}) \
1382				that doesn't have a channel opened to this parachain",
1383				<ParaId as Into<u32>>::into(sender)
1384			)
1385		})
1386	}
1387
1388	fn check_hrmp_mcq_heads(
1389		ingress_channels: &[(ParaId, cumulus_primitives_core::AbridgedHrmpChannel)],
1390		mqc_heads: &mut BTreeMap<ParaId, MessageQueueChain>,
1391	) {
1392		// Check that the MQC heads for each channel provided by the relay chain match the MQC
1393		// heads we have after processing all incoming messages.
1394		//
1395		// Along the way we also carry over the relevant entries from the `last_mqc_heads` to
1396		// `running_mqc_heads`. Otherwise, in a block where no messages were sent in a channel
1397		// it won't get into next block's `last_mqc_heads` and thus will be all zeros, which
1398		// would corrupt the message queue chain.
1399		for (sender, channel) in ingress_channels {
1400			let cur_head = mqc_heads.entry(*sender).or_default().head();
1401			let target_head = channel.mqc_head.unwrap_or_default();
1402			assert_eq!(cur_head, target_head, "HRMP head mismatch");
1403		}
1404	}
1405
1406	/// Performs some checks related to the sender and the `sent_at` field of an HRMP message.
1407	///
1408	/// **Panics** if the message submitted by the collator doesn't respect the expected order or if
1409	///            it was sent from a para which has no open channel to this parachain.
1410	fn check_hrmp_message_metadata(
1411		ingress_channels: &[(ParaId, cumulus_primitives_core::AbridgedHrmpChannel)],
1412		maybe_prev_msg_metadata: &mut Option<(u32, ParaId)>,
1413		msg_metadata: (u32, ParaId),
1414	) {
1415		// Check that the message is properly ordered.
1416		if let Some(prev_msg) = maybe_prev_msg_metadata {
1417			assert!(&msg_metadata >= prev_msg, "[HRMP] Messages order violation");
1418		}
1419		*maybe_prev_msg_metadata = Some(msg_metadata);
1420
1421		// Check that the message is sent from an existing channel.
1422		Self::get_ingress_channel_or_panic(ingress_channels, msg_metadata.1);
1423	}
1424
1425	/// Process all inbound horizontal messages relayed by the collator.
1426	///
1427	/// This is similar to [`enqueue_inbound_downward_messages`], but works with multiple inbound
1428	/// channels. It immediately dispatches signals and queues all other XCMs. Blob messages are
1429	/// ignored.
1430	///
1431	/// **Panics** if either any of horizontal messages submitted by the collator was sent from
1432	///            a para which has no open channel to this parachain or if after processing
1433	///            messages across all inbound channels MQCs were obtained which do not
1434	///            correspond to the ones found on the relay-chain.
1435	fn enqueue_inbound_horizontal_messages(
1436		ingress_channels: &[(ParaId, cumulus_primitives_core::AbridgedHrmpChannel)],
1437		horizontal_messages: AbridgedInboundHrmpMessages,
1438		relay_parent_number: relay_chain::BlockNumber,
1439	) -> Weight {
1440		let mut mqc_heads = <LastHrmpMqcHeads<T>>::get();
1441		let (messages, hashed_messages) = horizontal_messages.messages();
1442
1443		// First, check the HRMP advancement rule.
1444		let maybe_first_hashed_msg_sender = hashed_messages.first().map(|(sender, _msg)| *sender);
1445		if let Some(first_hashed_msg_sender) = maybe_first_hashed_msg_sender {
1446			let channel =
1447				Self::get_ingress_channel_or_panic(ingress_channels, first_hashed_msg_sender);
1448			horizontal_messages.check_enough_messages_included_advanced(
1449				"HRMP",
1450				AbridgedInboundMessagesSizeInfo {
1451					max_full_messages_size: Self::messages_collection_size_limit(),
1452					first_hashed_msg_max_size: channel.max_message_size as usize,
1453				},
1454			);
1455		}
1456
1457		Self::prune_closed_mqc_heads(ingress_channels, &mut mqc_heads);
1458
1459		if messages.is_empty() {
1460			Self::check_hrmp_mcq_heads(ingress_channels, &mut mqc_heads);
1461			let last_processed_msg =
1462				InboundMessageId { sent_at: relay_parent_number, reverse_idx: 0 };
1463
1464			LastProcessedHrmpMessage::<T>::put(last_processed_msg);
1465			HrmpWatermark::<T>::put(relay_parent_number);
1466			LastHrmpMqcHeads::<T>::put(&mqc_heads); // write back in case of modification
1467
1468			return T::DbWeight::get().reads_writes(1, 2);
1469		}
1470
1471		let mut prev_msg_metadata = None;
1472		let mut last_processed_block = HrmpWatermark::<T>::get();
1473		let mut last_processed_msg = InboundMessageId { sent_at: 0, reverse_idx: 0 };
1474		for (sender, msg) in messages {
1475			Self::check_hrmp_message_metadata(
1476				ingress_channels,
1477				&mut prev_msg_metadata,
1478				(msg.sent_at, *sender),
1479			);
1480			mqc_heads.entry(*sender).or_default().extend_hrmp(msg);
1481
1482			if msg.sent_at > last_processed_msg.sent_at && last_processed_msg.sent_at > 0 {
1483				last_processed_block = last_processed_msg.sent_at;
1484			}
1485			last_processed_msg.sent_at = msg.sent_at;
1486		}
1487
1488		LastHrmpMqcHeads::<T>::put(&mqc_heads);
1489
1490		for (sender, msg) in hashed_messages {
1491			Self::check_hrmp_message_metadata(
1492				ingress_channels,
1493				&mut prev_msg_metadata,
1494				(msg.sent_at, *sender),
1495			);
1496			mqc_heads.entry(*sender).or_default().extend_with_hashed_msg(msg);
1497
1498			if msg.sent_at == last_processed_msg.sent_at {
1499				last_processed_msg.reverse_idx += 1;
1500			}
1501		}
1502		if last_processed_msg.sent_at > 0 && last_processed_msg.reverse_idx == 0 {
1503			last_processed_block = last_processed_msg.sent_at;
1504		}
1505		LastProcessedHrmpMessage::<T>::put(&last_processed_msg);
1506		Self::check_hrmp_mcq_heads(ingress_channels, &mut mqc_heads);
1507
1508		let max_weight =
1509			<ReservedXcmpWeightOverride<T>>::get().unwrap_or_else(T::ReservedXcmpWeight::get);
1510		let weight_used = T::XcmpMessageHandler::handle_xcmp_messages(
1511			horizontal_messages.flat_msgs_iter(),
1512			max_weight,
1513		);
1514
1515		// Update watermark
1516		HrmpWatermark::<T>::put(last_processed_block);
1517
1518		weight_used.saturating_add(T::DbWeight::get().reads_writes(2, 3))
1519	}
1520
1521	/// Remove all MQC heads that do not correspond to an open channel.
1522	fn prune_closed_mqc_heads(
1523		ingress_channels: &[(ParaId, cumulus_primitives_core::AbridgedHrmpChannel)],
1524		mqc_heads: &mut BTreeMap<ParaId, MessageQueueChain>,
1525	) {
1526		// Complexity is O(N * lg N) but could be optimized for O(N)
1527		mqc_heads.retain(|para, _| {
1528			ingress_channels
1529				.binary_search_by_key(para, |&(channel_sender, _)| channel_sender)
1530				.is_ok()
1531		});
1532	}
1533
1534	/// Drop blocks from the unincluded segment with respect to the latest parachain head.
1535	fn maybe_drop_included_ancestors(
1536		relay_state_proof: &RelayChainStateProof,
1537		capacity: consensus_hook::UnincludedSegmentCapacity,
1538	) -> Weight {
1539		let mut weight_used = Weight::zero();
1540		// If the unincluded segment length is nonzero, then the parachain head must be present.
1541		let para_head =
1542			relay_state_proof.read_included_para_head().ok().map(|h| T::Hashing::hash(&h.0));
1543
1544		let unincluded_segment_len = <UnincludedSegment<T>>::decode_len().unwrap_or(0);
1545		weight_used += T::DbWeight::get().reads(1);
1546
1547		// Clean up unincluded segment if nonempty.
1548		let included_head = match (para_head, capacity.is_expecting_included_parent()) {
1549			(Some(h), true) => {
1550				assert_eq!(
1551					h,
1552					frame_system::Pallet::<T>::parent_hash(),
1553					"expected parent to be included"
1554				);
1555
1556				h
1557			},
1558			(Some(h), false) => h,
1559			(None, true) => {
1560				// All this logic is essentially a workaround to support collators which
1561				// might still not provide the included block with the state proof.
1562				frame_system::Pallet::<T>::parent_hash()
1563			},
1564			(None, false) => panic!("included head not present in relay storage proof"),
1565		};
1566
1567		let new_len = {
1568			let para_head_hash = included_head;
1569			let dropped: Vec<Ancestor<T::Hash>> = <UnincludedSegment<T>>::mutate(|chain| {
1570				// Drop everything up to (inclusive) the block with an included para head, if
1571				// present.
1572				let idx = chain
1573					.iter()
1574					.position(|block| {
1575						let head_hash = block
1576							.para_head_hash()
1577							.expect("para head hash is updated during block initialization; qed");
1578						head_hash == &para_head_hash
1579					})
1580					.map_or(0, |idx| idx + 1); // inclusive.
1581
1582				chain.drain(..idx).collect()
1583			});
1584			weight_used += T::DbWeight::get().reads_writes(1, 1);
1585
1586			let new_len = unincluded_segment_len - dropped.len();
1587			if !dropped.is_empty() {
1588				<AggregatedUnincludedSegment<T>>::mutate(|agg| {
1589					let agg = agg.as_mut().expect(
1590						"dropped part of the segment wasn't empty, hence value exists; qed",
1591					);
1592					for block in dropped {
1593						agg.subtract(&block);
1594					}
1595				});
1596				weight_used += T::DbWeight::get().reads_writes(1, 1);
1597			}
1598
1599			new_len as u32
1600		};
1601
1602		// Current block validity check: ensure there is space in the unincluded segment.
1603		//
1604		// If this fails, the parachain needs to wait for ancestors to be included before
1605		// a new block is allowed.
1606		assert!(
1607			new_len < capacity.get(),
1608			"No space left for the block in the unincluded segment: new_len({new_len}) < capacity({})",
1609			capacity.get()
1610		);
1611		weight_used
1612	}
1613
1614	/// This adjusts the `RelevantMessagingState` according to the bandwidth limits in the
1615	/// unincluded segment.
1616	// Reads: 2
1617	// Writes: 1
1618	fn adjust_egress_bandwidth_limits() {
1619		let Some(unincluded_segment) = AggregatedUnincludedSegment::<T>::get() else { return };
1620
1621		<RelevantMessagingState<T>>::mutate(|messaging_state| {
1622			let Some(messaging_state) = messaging_state else { return };
1623
1624			let used_bandwidth = unincluded_segment.used_bandwidth();
1625
1626			let channels = &mut messaging_state.egress_channels;
1627			for (para_id, used) in used_bandwidth.hrmp_outgoing.iter() {
1628				let Ok(i) = channels.binary_search_by_key(para_id, |item| item.0) else {
1629					continue; // indicates channel closed.
1630				};
1631
1632				let c = &mut channels[i].1;
1633
1634				c.total_size = (c.total_size + used.total_bytes).min(c.max_total_size);
1635				c.msg_count = (c.msg_count + used.msg_count).min(c.max_capacity);
1636			}
1637
1638			let upward_capacity = &mut messaging_state.relay_dispatch_queue_remaining_capacity;
1639			upward_capacity.remaining_count =
1640				upward_capacity.remaining_count.saturating_sub(used_bandwidth.ump_msg_count);
1641			upward_capacity.remaining_size =
1642				upward_capacity.remaining_size.saturating_sub(used_bandwidth.ump_total_bytes);
1643		});
1644	}
1645
1646	/// Put a new validation function into a particular location where polkadot
1647	/// monitors for updates. Calling this function notifies polkadot that a new
1648	/// upgrade has been scheduled.
1649	fn notify_polkadot_of_pending_upgrade(code: &[u8]) {
1650		NewValidationCode::<T>::put(code);
1651		<DidSetValidationCode<T>>::put(true);
1652	}
1653
1654	/// The maximum code size permitted, in bytes.
1655	///
1656	/// Returns `None` if the relay chain parachain host configuration hasn't been submitted yet.
1657	pub fn max_code_size() -> Option<u32> {
1658		<HostConfiguration<T>>::get().map(|cfg| cfg.max_code_size)
1659	}
1660
1661	/// The implementation of the runtime upgrade functionality for parachains.
1662	pub fn schedule_code_upgrade(validation_function: Vec<u8>) -> DispatchResult {
1663		// Ensure that `ValidationData` exists. We do not care about the validation data per se,
1664		// but we do care about the [`UpgradeRestrictionSignal`] which arrives with the same
1665		// inherent.
1666		ensure!(<ValidationData<T>>::exists(), Error::<T>::ValidationDataNotAvailable);
1667		ensure!(<UpgradeRestrictionSignal<T>>::get().is_none(), Error::<T>::ProhibitedByPolkadot);
1668
1669		ensure!(!<PendingValidationCode<T>>::exists(), Error::<T>::OverlappingUpgrades);
1670		let cfg = HostConfiguration::<T>::get().ok_or(Error::<T>::HostConfigurationNotAvailable)?;
1671		ensure!(validation_function.len() <= cfg.max_code_size as usize, Error::<T>::TooBig);
1672
1673		// When a code upgrade is scheduled, it has to be applied in two
1674		// places, synchronized: both polkadot and the individual parachain
1675		// have to upgrade on the same relay chain block.
1676		//
1677		// `notify_polkadot_of_pending_upgrade` notifies polkadot; the `PendingValidationCode`
1678		// storage keeps track locally for the parachain upgrade, which will
1679		// be applied later: when the relay-chain communicates go-ahead signal to us.
1680		Self::notify_polkadot_of_pending_upgrade(&validation_function);
1681		<PendingValidationCode<T>>::put(validation_function);
1682		Self::deposit_event(Event::ValidationFunctionStored);
1683
1684		Ok(())
1685	}
1686
1687	/// Returns the [`CollationInfo`] of the current active block.
1688	///
1689	/// The given `header` is the header of the built block we are collecting the collation info
1690	/// for.
1691	///
1692	/// This is expected to be used by the
1693	/// [`CollectCollationInfo`](cumulus_primitives_core::CollectCollationInfo) runtime api.
1694	pub fn collect_collation_info(header: &HeaderFor<T>) -> CollationInfo {
1695		CollationInfo {
1696			hrmp_watermark: HrmpWatermark::<T>::get(),
1697			horizontal_messages: HrmpOutboundMessages::<T>::get(),
1698			upward_messages: UpwardMessages::<T>::get(),
1699			processed_downward_messages: ProcessedDownwardMessages::<T>::get(),
1700			new_validation_code: NewValidationCode::<T>::get().map(Into::into),
1701			// Check if there is a custom header that will also be returned by the validation phase.
1702			// If so, we need to also return it here.
1703			head_data: CustomValidationHeadData::<T>::get()
1704				.map_or_else(|| header.encode(), |v| v)
1705				.into(),
1706		}
1707	}
1708
1709	/// Set a custom head data that should be returned as result of `validate_block`.
1710	///
1711	/// This will overwrite the head data that is returned as result of `validate_block` while
1712	/// validating a `PoV` on the relay chain. Normally the head data that is being returned
1713	/// by `validate_block` is the header of the block that is validated, thus it can be
1714	/// enacted as the new best block. However, for features like forking it can be useful
1715	/// to overwrite the head data with a custom header.
1716	///
1717	/// # Attention
1718	///
1719	/// This should only be used when you are sure what you are doing as this can brick
1720	/// your Parachain.
1721	pub fn set_custom_validation_head_data(head_data: Vec<u8>) {
1722		CustomValidationHeadData::<T>::put(head_data);
1723	}
1724
1725	/// Send the pending ump signals
1726	fn send_ump_signals(core_info: Option<CoreInfo>) {
1727		let mut ump_signals = PendingUpwardSignals::<T>::take();
1728
1729		if let Some(core_info) = core_info {
1730			ump_signals.push(
1731				UMPSignal::SelectCore(core_info.selector, core_info.claim_queue_offset).encode(),
1732			);
1733		}
1734
1735		if let Some(approved_peer) = PendingApprovedPeer::<T>::take() {
1736			ump_signals.push(UMPSignal::ApprovedPeer(approved_peer).encode());
1737		}
1738
1739		if !ump_signals.is_empty() {
1740			UpwardMessages::<T>::append(UMP_SEPARATOR);
1741			ump_signals.into_iter().for_each(|s| UpwardMessages::<T>::append(s));
1742		}
1743	}
1744
1745	/// Open HRMP channel for using it in benchmarks or tests.
1746	///
1747	/// The caller assumes that the pallet will accept regular outbound message to the sibling
1748	/// `target_parachain` after this call. No other assumptions are made.
1749	#[cfg(any(feature = "runtime-benchmarks", feature = "std"))]
1750	pub fn open_outbound_hrmp_channel_for_benchmarks_or_tests(target_parachain: ParaId) {
1751		RelevantMessagingState::<T>::put(MessagingStateSnapshot {
1752			dmq_mqc_head: Default::default(),
1753			relay_dispatch_queue_remaining_capacity: Default::default(),
1754			ingress_channels: Default::default(),
1755			egress_channels: vec![(
1756				target_parachain,
1757				cumulus_primitives_core::AbridgedHrmpChannel {
1758					max_capacity: 10,
1759					max_total_size: 10_000_000_u32,
1760					max_message_size: 10_000_000_u32,
1761					msg_count: 5,
1762					total_size: 5_000_000_u32,
1763					mqc_head: None,
1764				},
1765			)],
1766		})
1767	}
1768
1769	/// Open HRMP channel for using it in benchmarks or tests.
1770	///
1771	/// The caller assumes that the pallet will accept regular outbound message to the sibling
1772	/// `target_parachain` after this call. No other assumptions are made.
1773	#[cfg(any(feature = "runtime-benchmarks", feature = "std"))]
1774	pub fn open_custom_outbound_hrmp_channel_for_benchmarks_or_tests(
1775		target_parachain: ParaId,
1776		channel: cumulus_primitives_core::AbridgedHrmpChannel,
1777	) {
1778		RelevantMessagingState::<T>::put(MessagingStateSnapshot {
1779			dmq_mqc_head: Default::default(),
1780			relay_dispatch_queue_remaining_capacity: Default::default(),
1781			ingress_channels: Default::default(),
1782			egress_channels: vec![(target_parachain, channel)],
1783		})
1784	}
1785
1786	/// Prepare/insert relevant data for `schedule_code_upgrade` for benchmarks.
1787	#[cfg(feature = "runtime-benchmarks")]
1788	pub fn initialize_for_set_code_benchmark(max_code_size: u32) {
1789		// insert dummy ValidationData
1790		let vfp = PersistedValidationData {
1791			parent_head: polkadot_parachain_primitives::primitives::HeadData(Default::default()),
1792			relay_parent_number: 1,
1793			relay_parent_storage_root: Default::default(),
1794			max_pov_size: 1_000,
1795		};
1796		<ValidationData<T>>::put(&vfp);
1797
1798		// insert dummy HostConfiguration with
1799		let host_config = AbridgedHostConfiguration {
1800			max_code_size,
1801			max_head_data_size: 32 * 1024,
1802			max_upward_queue_count: 8,
1803			max_upward_queue_size: 1024 * 1024,
1804			max_upward_message_size: 4 * 1024,
1805			max_upward_message_num_per_candidate: 2,
1806			hrmp_max_message_num_per_candidate: 2,
1807			validation_upgrade_cooldown: 2,
1808			validation_upgrade_delay: 2,
1809			async_backing_params: relay_chain::AsyncBackingParams {
1810				allowed_ancestry_len: 0,
1811				max_candidate_depth: 0,
1812			},
1813		};
1814		<HostConfiguration<T>>::put(host_config);
1815	}
1816}
1817
1818/// Type that implements `SetCode`.
1819pub struct ParachainSetCode<T>(core::marker::PhantomData<T>);
1820impl<T: Config> frame_system::SetCode<T> for ParachainSetCode<T> {
1821	fn set_code(code: Vec<u8>) -> DispatchResult {
1822		Pallet::<T>::schedule_code_upgrade(code)
1823	}
1824}
1825
1826impl<T: Config> Pallet<T> {
1827	/// Puts a message in the `PendingUpwardMessages` storage item.
1828	/// The message will be later sent in `on_finalize`.
1829	/// Checks host configuration to see if message is too big.
1830	/// Increases the delivery fee factor if the queue is sufficiently (see
1831	/// [`ump_constants::THRESHOLD_FACTOR`]) congested.
1832	pub fn send_upward_message(message: UpwardMessage) -> Result<(u32, XcmHash), MessageSendError> {
1833		let message_len = message.len();
1834		// Check if the message fits into the relay-chain constraints.
1835		//
1836		// Note, that we are using `host_configuration` here which may be from the previous
1837		// block, in case this is called from `on_initialize`, i.e. before the inherent with fresh
1838		// data is submitted.
1839		//
1840		// That shouldn't be a problem since this is a preliminary check and the actual check would
1841		// be performed just before submitting the message from the candidate, and it already can
1842		// happen that during the time the message is buffered for sending the relay-chain setting
1843		// may change so that the message is no longer valid.
1844		//
1845		// However, changing this setting is expected to be rare.
1846		if let Some(cfg) = HostConfiguration::<T>::get() {
1847			if message_len > cfg.max_upward_message_size as usize {
1848				return Err(MessageSendError::TooBig);
1849			}
1850			let threshold =
1851				cfg.max_upward_queue_size.saturating_div(ump_constants::THRESHOLD_FACTOR);
1852			// We check the threshold against total size and not number of messages since messages
1853			// could be big or small.
1854			<PendingUpwardMessages<T>>::append(message.clone());
1855			let pending_messages = PendingUpwardMessages::<T>::get();
1856			let total_size: usize = pending_messages.iter().map(UpwardMessage::len).sum();
1857			if total_size > threshold as usize {
1858				// We increase the fee factor by a factor based on the new message's size in KB
1859				Self::increase_fee_factor((), message_len as u128);
1860			}
1861		} else {
1862			// This storage field should carry over from the previous block. So if it's None
1863			// then it must be that this is an edge-case where a message is attempted to be
1864			// sent at the first block.
1865			//
1866			// Let's pass this message through. I think it's not unreasonable to expect that
1867			// the message is not huge and it comes through, but if it doesn't it can be
1868			// returned back to the sender.
1869			//
1870			// Thus fall through here.
1871			<PendingUpwardMessages<T>>::append(message.clone());
1872		};
1873
1874		// The relay ump does not use using_encoded
1875		// We apply the same this to use the same hash
1876		let hash = sp_io::hashing::blake2_256(&message);
1877		Self::deposit_event(Event::UpwardMessageSent { message_hash: Some(hash) });
1878		Ok((0, hash))
1879	}
1880
1881	/// Get the relay chain block number which was used as an anchor for the last block in this
1882	/// chain.
1883	pub fn last_relay_block_number() -> RelayChainBlockNumber {
1884		LastRelayChainBlockNumber::<T>::get()
1885	}
1886}
1887
1888impl<T: Config> UpwardMessageSender for Pallet<T> {
1889	fn send_upward_message(message: UpwardMessage) -> Result<(u32, XcmHash), MessageSendError> {
1890		Self::send_upward_message(message)
1891	}
1892
1893	fn can_send_upward_message(message: &UpwardMessage) -> Result<(), MessageSendError> {
1894		let max_upward_message_size = HostConfiguration::<T>::get()
1895			.map(|cfg| cfg.max_upward_message_size)
1896			.ok_or(MessageSendError::Other)?;
1897		if message.len() > max_upward_message_size as usize {
1898			Err(MessageSendError::TooBig)
1899		} else {
1900			Ok(())
1901		}
1902	}
1903
1904	#[cfg(any(feature = "std", feature = "runtime-benchmarks", test))]
1905	fn ensure_successful_delivery() {
1906		const MAX_UPWARD_MESSAGE_SIZE: u32 = 65_531 * 3;
1907		const MAX_CODE_SIZE: u32 = 3 * 1024 * 1024;
1908		HostConfiguration::<T>::mutate(|cfg| match cfg {
1909			Some(cfg) => cfg.max_upward_message_size = MAX_UPWARD_MESSAGE_SIZE,
1910			None => {
1911				*cfg = Some(AbridgedHostConfiguration {
1912					max_code_size: MAX_CODE_SIZE,
1913					max_head_data_size: 32 * 1024,
1914					max_upward_queue_count: 8,
1915					max_upward_queue_size: 1024 * 1024,
1916					max_upward_message_size: MAX_UPWARD_MESSAGE_SIZE,
1917					max_upward_message_num_per_candidate: 2,
1918					hrmp_max_message_num_per_candidate: 2,
1919					validation_upgrade_cooldown: 2,
1920					validation_upgrade_delay: 2,
1921					async_backing_params: relay_chain::AsyncBackingParams {
1922						allowed_ancestry_len: 0,
1923						max_candidate_depth: 0,
1924					},
1925				})
1926			},
1927		})
1928	}
1929}
1930
1931impl<T: Config> InspectMessageQueues for Pallet<T> {
1932	fn clear_messages() {
1933		PendingUpwardMessages::<T>::kill();
1934	}
1935
1936	fn get_messages() -> Vec<(VersionedLocation, Vec<VersionedXcm<()>>)> {
1937		use xcm::prelude::*;
1938
1939		let messages: Vec<VersionedXcm<()>> = PendingUpwardMessages::<T>::get()
1940			.iter()
1941			.map(|encoded_message| {
1942				VersionedXcm::<()>::decode_all_with_depth_limit(
1943					MAX_XCM_DECODE_DEPTH,
1944					&mut &encoded_message[..],
1945				)
1946				.unwrap()
1947			})
1948			.collect();
1949
1950		if messages.is_empty() {
1951			vec![]
1952		} else {
1953			vec![(VersionedLocation::from(Location::parent()), messages)]
1954		}
1955	}
1956}
1957
1958#[cfg(feature = "runtime-benchmarks")]
1959impl<T: Config> polkadot_runtime_parachains::EnsureForParachain for Pallet<T> {
1960	fn ensure(para_id: ParaId) {
1961		if let ChannelStatus::Closed = Self::get_channel_status(para_id) {
1962			Self::open_outbound_hrmp_channel_for_benchmarks_or_tests(para_id)
1963		}
1964	}
1965}
1966
1967/// Something that should be informed about system related events.
1968///
1969/// This includes events like [`on_validation_data`](Self::on_validation_data) that is being
1970/// called when the parachain inherent is executed that contains the validation data.
1971/// Or like [`on_validation_code_applied`](Self::on_validation_code_applied) that is called
1972/// when the new validation is written to the state. This means that
1973/// from the next block the runtime is being using this new code.
1974pub trait OnSystemEvent {
1975	/// Called in each blocks once when the validation data is set by the inherent.
1976	fn on_validation_data(data: &PersistedValidationData);
1977	/// Called when the validation code is being applied, aka from the next block on this is the new
1978	/// runtime.
1979	fn on_validation_code_applied();
1980	/// Called to process keys from the verified relay chain state proof.
1981	fn on_relay_state_proof(
1982		relay_state_proof: &relay_state_snapshot::RelayChainStateProof,
1983	) -> Weight;
1984}
1985
1986#[impl_trait_for_tuples::impl_for_tuples(30)]
1987impl OnSystemEvent for Tuple {
1988	fn on_validation_data(data: &PersistedValidationData) {
1989		for_tuples!( #( Tuple::on_validation_data(data); )* );
1990	}
1991
1992	fn on_validation_code_applied() {
1993		for_tuples!( #( Tuple::on_validation_code_applied(); )* );
1994	}
1995
1996	fn on_relay_state_proof(
1997		relay_state_proof: &relay_state_snapshot::RelayChainStateProof,
1998	) -> Weight {
1999		let mut weight = Weight::zero();
2000		for_tuples!( #( weight = weight.saturating_add(Tuple::on_relay_state_proof(relay_state_proof)); )* );
2001		weight
2002	}
2003}
2004
2005/// Holds the most recent relay-parent state root and block number of the current parachain block.
2006#[derive(PartialEq, Eq, Clone, Encode, Decode, TypeInfo, Default, Debug)]
2007pub struct RelayChainState {
2008	/// Current relay chain height.
2009	pub number: relay_chain::BlockNumber,
2010	/// State root for current relay chain height.
2011	pub state_root: relay_chain::Hash,
2012}
2013
2014/// This exposes the [`RelayChainState`] to other runtime modules.
2015///
2016/// Enables parachains to read relay chain state via state proofs.
2017pub trait RelaychainStateProvider {
2018	/// May be called by any runtime module to obtain the current state of the relay chain.
2019	///
2020	/// **NOTE**: This is not guaranteed to return monotonically increasing relay parents.
2021	fn current_relay_chain_state() -> RelayChainState;
2022
2023	/// Utility function only to be used in benchmarking scenarios, to be implemented optionally,
2024	/// else a noop.
2025	///
2026	/// It allows for setting a custom RelayChainState.
2027	#[cfg(feature = "runtime-benchmarks")]
2028	fn set_current_relay_chain_state(_state: RelayChainState) {}
2029}
2030
2031/// Implements [`BlockNumberProvider`] that returns relay chain block number fetched from validation
2032/// data.
2033///
2034/// When validation data is not available (e.g. within `on_initialize`), it will fallback to use
2035/// [`Pallet::last_relay_block_number()`].
2036///
2037/// **NOTE**: This has been deprecated, please use [`RelaychainDataProvider`]
2038#[deprecated = "Use `RelaychainDataProvider` instead"]
2039pub type RelaychainBlockNumberProvider<T> = RelaychainDataProvider<T>;
2040
2041/// Implements [`BlockNumberProvider`] and [`RelaychainStateProvider`] that returns relevant relay
2042/// data fetched from validation data.
2043///
2044/// NOTE: When validation data is not available (e.g. within `on_initialize`):
2045///
2046/// - [`current_relay_chain_state`](Self::current_relay_chain_state): Will return the default value
2047///   of [`RelayChainState`].
2048/// - [`current_block_number`](Self::current_block_number): Will return
2049///   [`Pallet::last_relay_block_number()`].
2050pub struct RelaychainDataProvider<T>(core::marker::PhantomData<T>);
2051
2052impl<T: Config> BlockNumberProvider for RelaychainDataProvider<T> {
2053	type BlockNumber = relay_chain::BlockNumber;
2054
2055	fn current_block_number() -> relay_chain::BlockNumber {
2056		ValidationData::<T>::get()
2057			.map(|d| d.relay_parent_number)
2058			.unwrap_or_else(|| Pallet::<T>::last_relay_block_number())
2059	}
2060
2061	#[cfg(any(feature = "std", feature = "runtime-benchmarks", test))]
2062	fn set_block_number(block: Self::BlockNumber) {
2063		let mut validation_data = ValidationData::<T>::get().unwrap_or_else(||
2064			// PersistedValidationData does not impl default in non-std
2065			PersistedValidationData {
2066				parent_head: vec![].into(),
2067				relay_parent_number: Default::default(),
2068				max_pov_size: Default::default(),
2069				relay_parent_storage_root: Default::default(),
2070			});
2071		validation_data.relay_parent_number = block;
2072		ValidationData::<T>::put(validation_data)
2073	}
2074}
2075
2076impl<T: Config> RelaychainStateProvider for RelaychainDataProvider<T> {
2077	fn current_relay_chain_state() -> RelayChainState {
2078		ValidationData::<T>::get()
2079			.map(|d| RelayChainState {
2080				number: d.relay_parent_number,
2081				state_root: d.relay_parent_storage_root,
2082			})
2083			.unwrap_or_default()
2084	}
2085
2086	#[cfg(feature = "runtime-benchmarks")]
2087	fn set_current_relay_chain_state(state: RelayChainState) {
2088		let mut validation_data = ValidationData::<T>::get().unwrap_or_else(||
2089			// PersistedValidationData does not impl default in non-std
2090			PersistedValidationData {
2091				parent_head: vec![].into(),
2092				relay_parent_number: Default::default(),
2093				max_pov_size: Default::default(),
2094				relay_parent_storage_root: Default::default(),
2095			});
2096		validation_data.relay_parent_number = state.number;
2097		validation_data.relay_parent_storage_root = state.state_root;
2098		ValidationData::<T>::put(validation_data)
2099	}
2100}