referrerpolicy=no-referrer-when-downgrade

cumulus_pallet_parachain_system/
lib.rs

1// Copyright (C) Parity Technologies (UK) Ltd.
2// This file is part of Cumulus.
3// SPDX-License-Identifier: Apache-2.0
4
5// Licensed under the Apache License, Version 2.0 (the "License");
6// you may not use this file except in compliance with the License.
7// You may obtain a copy of the License at
8//
9// 	http://www.apache.org/licenses/LICENSE-2.0
10//
11// Unless required by applicable law or agreed to in writing, software
12// distributed under the License is distributed on an "AS IS" BASIS,
13// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14// See the License for the specific language governing permissions and
15// limitations under the License.
16
17#![cfg_attr(not(feature = "std"), no_std)]
18
19//! `cumulus-pallet-parachain-system` is a base pallet for Cumulus-based parachains.
20//!
21//! This pallet handles low-level details of being a parachain. Its responsibilities include:
22//!
23//! - ingestion of the parachain validation data;
24//! - ingestion and dispatch of incoming downward and lateral messages;
25//! - coordinating upgrades with the Relay Chain; and
26//! - communication of parachain outputs, such as sent messages, signaling an upgrade, etc.
27//!
28//! Users must ensure that they register this pallet as an inherent provider.
29
30extern crate alloc;
31
32use alloc::{collections::btree_map::BTreeMap, vec, vec::Vec};
33use codec::{Decode, DecodeLimit, Encode};
34use core::cmp;
35use cumulus_primitives_core::{
36	relay_chain, AbridgedHostConfiguration, ChannelInfo, ChannelStatus, CollationInfo,
37	CumulusDigestItem, GetChannelInfo, ListChannelInfos, MessageSendError, OutboundHrmpMessage,
38	ParaId, PersistedValidationData, UpwardMessage, UpwardMessageSender, XcmpMessageHandler,
39	XcmpMessageSource,
40};
41use cumulus_primitives_parachain_inherent::{v0, MessageQueueChain, ParachainInherentData};
42use frame_support::{
43	dispatch::{DispatchClass, DispatchResult},
44	ensure,
45	inherent::{InherentData, InherentIdentifier, ProvideInherent},
46	traits::{Get, HandleMessage},
47	weights::Weight,
48};
49use frame_system::{ensure_none, ensure_root, pallet_prelude::HeaderFor};
50use parachain_inherent::{
51	deconstruct_parachain_inherent_data, AbridgedInboundDownwardMessages,
52	AbridgedInboundHrmpMessages, BasicParachainInherentData, InboundMessageId, InboundMessagesData,
53};
54use polkadot_parachain_primitives::primitives::RelayChainBlockNumber;
55use polkadot_runtime_parachains::{FeeTracker, GetMinFeeFactor};
56use scale_info::TypeInfo;
57use sp_runtime::{
58	traits::{BlockNumberProvider, Hash},
59	FixedU128, RuntimeDebug, SaturatedConversion,
60};
61use xcm::{latest::XcmHash, VersionedLocation, VersionedXcm, MAX_XCM_DECODE_DEPTH};
62use xcm_builder::InspectMessageQueues;
63
64mod benchmarking;
65pub mod migration;
66mod mock;
67#[cfg(test)]
68mod tests;
69pub mod weights;
70
71pub use weights::WeightInfo;
72
73mod unincluded_segment;
74
75pub mod consensus_hook;
76pub mod relay_state_snapshot;
77#[macro_use]
78pub mod validate_block;
79mod descendant_validation;
80pub mod parachain_inherent;
81
82use unincluded_segment::{
83	HrmpChannelUpdate, HrmpWatermarkUpdate, OutboundBandwidthLimits, SegmentTracker,
84};
85
86pub use consensus_hook::{ConsensusHook, ExpectParentIncluded};
87/// Register the `validate_block` function that is used by parachains to validate blocks on a
88/// validator.
89///
90/// Does *nothing* when `std` feature is enabled.
91///
92/// Expects as parameters the runtime, a block executor and an inherent checker.
93///
94/// # Example
95///
96/// ```
97///     struct BlockExecutor;
98///     struct Runtime;
99///
100///     cumulus_pallet_parachain_system::register_validate_block! {
101///         Runtime = Runtime,
102///         BlockExecutor = Executive,
103///     }
104///
105/// # fn main() {}
106/// ```
107pub use cumulus_pallet_parachain_system_proc_macro::register_validate_block;
108pub use relay_state_snapshot::{MessagingStateSnapshot, RelayChainStateProof};
109pub use unincluded_segment::{Ancestor, UsedBandwidth};
110
111pub use pallet::*;
112
113const LOG_TARGET: &str = "parachain-system";
114
115/// Something that can check the associated relay block number.
116///
117/// Each Parachain block is built in the context of a relay chain block, this trait allows us
118/// to validate the given relay chain block number. With async backing it is legal to build
119/// multiple Parachain blocks per relay chain parent. With this trait it is possible for the
120/// Parachain to ensure that still only one Parachain block is build per relay chain parent.
121///
122/// By default [`RelayNumberStrictlyIncreases`] and [`AnyRelayNumber`] are provided.
123pub trait CheckAssociatedRelayNumber {
124	/// Check the current relay number versus the previous relay number.
125	///
126	/// The implementation should panic when there is something wrong.
127	fn check_associated_relay_number(
128		current: RelayChainBlockNumber,
129		previous: RelayChainBlockNumber,
130	);
131}
132
133/// Provides an implementation of [`CheckAssociatedRelayNumber`].
134///
135/// It will ensure that the associated relay block number strictly increases between Parachain
136/// blocks. This should be used by production Parachains when in doubt.
137pub struct RelayNumberStrictlyIncreases;
138
139impl CheckAssociatedRelayNumber for RelayNumberStrictlyIncreases {
140	fn check_associated_relay_number(
141		current: RelayChainBlockNumber,
142		previous: RelayChainBlockNumber,
143	) {
144		if current <= previous {
145			panic!("Relay chain block number needs to strictly increase between Parachain blocks!")
146		}
147	}
148}
149
150/// Provides an implementation of [`CheckAssociatedRelayNumber`].
151///
152/// This will accept any relay chain block number combination. This is mainly useful for
153/// test parachains.
154pub struct AnyRelayNumber;
155
156impl CheckAssociatedRelayNumber for AnyRelayNumber {
157	fn check_associated_relay_number(_: RelayChainBlockNumber, _: RelayChainBlockNumber) {}
158}
159
160/// Provides an implementation of [`CheckAssociatedRelayNumber`].
161///
162/// It will ensure that the associated relay block number monotonically increases between Parachain
163/// blocks. This should be used when asynchronous backing is enabled.
164pub struct RelayNumberMonotonicallyIncreases;
165
166impl CheckAssociatedRelayNumber for RelayNumberMonotonicallyIncreases {
167	fn check_associated_relay_number(
168		current: RelayChainBlockNumber,
169		previous: RelayChainBlockNumber,
170	) {
171		if current < previous {
172			panic!("Relay chain block number needs to monotonically increase between Parachain blocks!")
173		}
174	}
175}
176
177/// The max length of a DMP message.
178pub type MaxDmpMessageLenOf<T> = <<T as Config>::DmpQueue as HandleMessage>::MaxMessageLen;
179
180pub mod ump_constants {
181	/// `host_config.max_upward_queue_size / THRESHOLD_FACTOR` is the threshold after which delivery
182	/// starts getting exponentially more expensive.
183	/// `2` means the price starts to increase when queue is half full.
184	pub const THRESHOLD_FACTOR: u32 = 2;
185}
186
187#[frame_support::pallet]
188pub mod pallet {
189	use super::*;
190	use cumulus_primitives_core::CoreInfoExistsAtMaxOnce;
191	use frame_support::pallet_prelude::*;
192	use frame_system::pallet_prelude::*;
193
194	#[pallet::pallet]
195	#[pallet::storage_version(migration::STORAGE_VERSION)]
196	#[pallet::without_storage_info]
197	pub struct Pallet<T>(_);
198
199	#[pallet::config]
200	pub trait Config: frame_system::Config<OnSetCode = ParachainSetCode<Self>> {
201		/// The overarching event type.
202		#[allow(deprecated)]
203		type RuntimeEvent: From<Event<Self>> + IsType<<Self as frame_system::Config>::RuntimeEvent>;
204
205		/// Something which can be notified when the validation data is set.
206		type OnSystemEvent: OnSystemEvent;
207
208		/// Returns the parachain ID we are running with.
209		#[pallet::constant]
210		type SelfParaId: Get<ParaId>;
211
212		/// The place where outbound XCMP messages come from. This is queried in `finalize_block`.
213		type OutboundXcmpMessageSource: XcmpMessageSource;
214
215		/// Queues inbound downward messages for delayed processing.
216		///
217		/// All inbound DMP messages from the relay are pushed into this. The handler is expected to
218		/// eventually process all the messages that are pushed to it.
219		type DmpQueue: HandleMessage;
220
221		/// The weight we reserve at the beginning of the block for processing DMP messages.
222		type ReservedDmpWeight: Get<Weight>;
223
224		/// The message handler that will be invoked when messages are received via XCMP.
225		///
226		/// This should normally link to the XCMP Queue pallet.
227		type XcmpMessageHandler: XcmpMessageHandler;
228
229		/// The weight we reserve at the beginning of the block for processing XCMP messages.
230		type ReservedXcmpWeight: Get<Weight>;
231
232		/// Something that can check the associated relay parent block number.
233		type CheckAssociatedRelayNumber: CheckAssociatedRelayNumber;
234
235		/// Weight info for functions and calls.
236		type WeightInfo: WeightInfo;
237
238		/// An entry-point for higher-level logic to manage the backlog of unincluded parachain
239		/// blocks and authorship rights for those blocks.
240		///
241		/// Typically, this should be a hook tailored to the collator-selection/consensus mechanism
242		/// that is used for this chain.
243		///
244		/// However, to maintain the same behavior as prior to asynchronous backing, provide the
245		/// [`consensus_hook::ExpectParentIncluded`] here. This is only necessary in the case
246		/// that collators aren't expected to have node versions that supply the included block
247		/// in the relay-chain state proof.
248		type ConsensusHook: ConsensusHook;
249
250		/// The offset between the tip of the relay chain and the parent relay block used as parent
251		/// when authoring a parachain block.
252		///
253		/// This setting directly impacts the number of descendant headers that are expected in the
254		/// `set_validation_data` inherent.
255		///
256		/// For any setting `N` larger than zero, the inherent expects that the inherent includes
257		/// the relay parent plus `N` descendants. These headers are required to validate that new
258		/// parachain blocks are authored at the correct offset.
259		///
260		/// While this helps to reduce forks on the parachain side, it increases the delay for
261		/// processing XCM messages. So, the value should be chosen wisely.
262		///
263		/// If set to 0, this config has no impact.
264		type RelayParentOffset: Get<u32>;
265	}
266
267	#[pallet::hooks]
268	impl<T: Config> Hooks<BlockNumberFor<T>> for Pallet<T> {
269		/// Handles actually sending upward messages by moving them from `PendingUpwardMessages` to
270		/// `UpwardMessages`. Decreases the delivery fee factor if after sending messages, the queue
271		/// total size is less than the threshold (see [`ump_constants::THRESHOLD_FACTOR`]).
272		/// Also does the sending for HRMP messages it takes from `OutboundXcmpMessageSource`.
273		fn on_finalize(_: BlockNumberFor<T>) {
274			<DidSetValidationCode<T>>::kill();
275			<UpgradeRestrictionSignal<T>>::kill();
276			let relay_upgrade_go_ahead = <UpgradeGoAhead<T>>::take();
277
278			let vfp = <ValidationData<T>>::get().expect(
279				r"Missing required set_validation_data inherent. This inherent must be
280				present in every block. This error typically occurs when the set_validation_data
281				execution failed and was rejected by the block builder. Check earlier log entries
282				for the specific cause of the failure.",
283			);
284
285			LastRelayChainBlockNumber::<T>::put(vfp.relay_parent_number);
286
287			let host_config = match HostConfiguration::<T>::get() {
288				Some(ok) => ok,
289				None => {
290					debug_assert!(
291						false,
292						"host configuration is promised to set until `on_finalize`; qed",
293					);
294					return
295				},
296			};
297
298			// Before updating the relevant messaging state, we need to extract
299			// the total bandwidth limits for the purpose of updating the unincluded
300			// segment.
301			let total_bandwidth_out = match RelevantMessagingState::<T>::get() {
302				Some(s) => OutboundBandwidthLimits::from_relay_chain_state(&s),
303				None => {
304					debug_assert!(
305						false,
306						"relevant messaging state is promised to be set until `on_finalize`; \
307							qed",
308					);
309					return
310				},
311			};
312
313			// After this point, the `RelevantMessagingState` in storage reflects the
314			// unincluded segment.
315			Self::adjust_egress_bandwidth_limits();
316
317			let (ump_msg_count, ump_total_bytes) = <PendingUpwardMessages<T>>::mutate(|up| {
318				let (available_capacity, available_size) = match RelevantMessagingState::<T>::get()
319				{
320					Some(limits) => (
321						limits.relay_dispatch_queue_remaining_capacity.remaining_count,
322						limits.relay_dispatch_queue_remaining_capacity.remaining_size,
323					),
324					None => {
325						debug_assert!(
326							false,
327							"relevant messaging state is promised to be set until `on_finalize`; \
328								qed",
329						);
330						return (0, 0)
331					},
332				};
333
334				let available_capacity =
335					cmp::min(available_capacity, host_config.max_upward_message_num_per_candidate);
336
337				// Count the number of messages we can possibly fit in the given constraints, i.e.
338				// available_capacity and available_size.
339				let (num, total_size) = up
340					.iter()
341					.scan((0u32, 0u32), |state, msg| {
342						let (cap_used, size_used) = *state;
343						let new_cap = cap_used.saturating_add(1);
344						let new_size = size_used.saturating_add(msg.len() as u32);
345						match available_capacity
346							.checked_sub(new_cap)
347							.and(available_size.checked_sub(new_size))
348						{
349							Some(_) => {
350								*state = (new_cap, new_size);
351								Some(*state)
352							},
353							_ => None,
354						}
355					})
356					.last()
357					.unwrap_or_default();
358
359				// TODO: #274 Return back messages that do not longer fit into the queue.
360
361				UpwardMessages::<T>::put(&up[..num as usize]);
362				*up = up.split_off(num as usize);
363
364				// Send the core selector UMP signal. This is experimental until relay chain
365				// validators are upgraded to handle ump signals.
366				#[cfg(feature = "experimental-ump-signals")]
367				Self::send_ump_signal();
368
369				// If the total size of the pending messages is less than the threshold,
370				// we decrease the fee factor, since the queue is less congested.
371				// This makes delivery of new messages cheaper.
372				let threshold = host_config
373					.max_upward_queue_size
374					.saturating_div(ump_constants::THRESHOLD_FACTOR);
375				let remaining_total_size: usize = up.iter().map(UpwardMessage::len).sum();
376				if remaining_total_size <= threshold as usize {
377					Self::decrease_fee_factor(());
378				}
379
380				(num, total_size)
381			});
382
383			// Sending HRMP messages is a little bit more involved. There are the following
384			// constraints:
385			//
386			// - a channel should exist (and it can be closed while a message is buffered),
387			// - at most one message can be sent in a channel,
388			// - the sent out messages should be ordered by ascension of recipient para id.
389			// - the capacity and total size of the channel is limited,
390			// - the maximum size of a message is limited (and can potentially be changed),
391
392			let maximum_channels = host_config
393				.hrmp_max_message_num_per_candidate
394				.min(<AnnouncedHrmpMessagesPerCandidate<T>>::take())
395				as usize;
396
397			// Note: this internally calls the `GetChannelInfo` implementation for this
398			// pallet, which draws on the `RelevantMessagingState`. That in turn has
399			// been adjusted above to reflect the correct limits in all channels.
400			let outbound_messages =
401				T::OutboundXcmpMessageSource::take_outbound_messages(maximum_channels)
402					.into_iter()
403					.map(|(recipient, data)| OutboundHrmpMessage { recipient, data })
404					.collect::<Vec<_>>();
405
406			// Update the unincluded segment length; capacity checks were done previously in
407			// `set_validation_data`, so this can be done unconditionally.
408			{
409				let hrmp_outgoing = outbound_messages
410					.iter()
411					.map(|msg| {
412						(
413							msg.recipient,
414							HrmpChannelUpdate { msg_count: 1, total_bytes: msg.data.len() as u32 },
415						)
416					})
417					.collect();
418				let used_bandwidth =
419					UsedBandwidth { ump_msg_count, ump_total_bytes, hrmp_outgoing };
420
421				let mut aggregated_segment =
422					AggregatedUnincludedSegment::<T>::get().unwrap_or_default();
423				let consumed_go_ahead_signal =
424					if aggregated_segment.consumed_go_ahead_signal().is_some() {
425						// Some ancestor within the segment already processed this signal --
426						// validated during inherent creation.
427						None
428					} else {
429						relay_upgrade_go_ahead
430					};
431				// The bandwidth constructed was ensured to satisfy relay chain constraints.
432				let ancestor = Ancestor::new_unchecked(used_bandwidth, consumed_go_ahead_signal);
433
434				let watermark = HrmpWatermark::<T>::get();
435				let watermark_update = HrmpWatermarkUpdate::new(watermark, vfp.relay_parent_number);
436
437				aggregated_segment
438					.append(&ancestor, watermark_update, &total_bandwidth_out)
439					.expect("unincluded segment limits exceeded");
440				AggregatedUnincludedSegment::<T>::put(aggregated_segment);
441				// Check in `on_initialize` guarantees there's space for this block.
442				UnincludedSegment::<T>::append(ancestor);
443			}
444			HrmpOutboundMessages::<T>::put(outbound_messages);
445		}
446
447		fn on_initialize(_n: BlockNumberFor<T>) -> Weight {
448			let mut weight = Weight::zero();
449
450			// To prevent removing `NewValidationCode` that was set by another `on_initialize`
451			// like for example from scheduler, we only kill the storage entry if it was not yet
452			// updated in the current block.
453			if !<DidSetValidationCode<T>>::get() {
454				// NOTE: Killing here is required to at least include the trie nodes down to the key
455				// in the proof. Because this value will be read in `validate_block` and thus,
456				// needs to be reachable by the proof.
457				NewValidationCode::<T>::kill();
458				weight += T::DbWeight::get().writes(1);
459			}
460
461			// The parent hash was unknown during block finalization. Update it here.
462			{
463				<UnincludedSegment<T>>::mutate(|chain| {
464					if let Some(ancestor) = chain.last_mut() {
465						let parent = frame_system::Pallet::<T>::parent_hash();
466						// Ancestor is the latest finalized block, thus current parent is
467						// its output head.
468						ancestor.replace_para_head_hash(parent);
469					}
470				});
471				weight += T::DbWeight::get().reads_writes(1, 1);
472
473				// Weight used during finalization.
474				weight += T::DbWeight::get().reads_writes(3, 2);
475			}
476
477			// Remove the validation from the old block.
478			ValidationData::<T>::kill();
479			// NOTE: Killing here is required to at least include the trie nodes down to the keys
480			// in the proof. Because these values will be read in `validate_block` and thus,
481			// need to be reachable by the proof.
482			ProcessedDownwardMessages::<T>::kill();
483			UpwardMessages::<T>::kill();
484			HrmpOutboundMessages::<T>::kill();
485			CustomValidationHeadData::<T>::kill();
486			// The same as above. Reading here to make sure that the key is included in the proof.
487			HrmpWatermark::<T>::get();
488			weight += T::DbWeight::get().reads_writes(1, 5);
489
490			// Here, in `on_initialize` we must report the weight for both `on_initialize` and
491			// `on_finalize`.
492			//
493			// One complication here, is that the `host_configuration` is updated by an inherent
494			// and those are processed after the block initialization phase. Therefore, we have to
495			// be content only with the configuration as per the previous block. That means that
496			// the configuration can be either stale (or be absent altogether in case of the
497			// beginning of the chain).
498			//
499			// In order to mitigate this, we do the following. At the time, we are only concerned
500			// about `hrmp_max_message_num_per_candidate`. We reserve the amount of weight to
501			// process the number of HRMP messages according to the potentially stale
502			// configuration. In `on_finalize` we will process only the maximum between the
503			// announced number of messages and the actual received in the fresh configuration.
504			//
505			// In the common case, they will be the same. In the case the actual value is smaller
506			// than the announced, we would waste some of weight. In the case the actual value is
507			// greater than the announced, we will miss opportunity to send a couple of messages.
508			weight += T::DbWeight::get().reads_writes(1, 1);
509			let hrmp_max_message_num_per_candidate = HostConfiguration::<T>::get()
510				.map(|cfg| cfg.hrmp_max_message_num_per_candidate)
511				.unwrap_or(0);
512			<AnnouncedHrmpMessagesPerCandidate<T>>::put(hrmp_max_message_num_per_candidate);
513
514			// NOTE that the actual weight consumed by `on_finalize` may turn out lower.
515			weight += T::DbWeight::get().reads_writes(
516				3 + hrmp_max_message_num_per_candidate as u64,
517				4 + hrmp_max_message_num_per_candidate as u64,
518			);
519
520			// Weight for updating the last relay chain block number in `on_finalize`.
521			weight += T::DbWeight::get().reads_writes(1, 1);
522
523			// Weight for adjusting the unincluded segment in `on_finalize`.
524			weight += T::DbWeight::get().reads_writes(6, 3);
525
526			// Always try to read `UpgradeGoAhead` in `on_finalize`.
527			weight += T::DbWeight::get().reads(1);
528
529			// We need to ensure that `CoreInfo` digest exists only once.
530			match CumulusDigestItem::core_info_exists_at_max_once(
531				&frame_system::Pallet::<T>::digest(),
532			) {
533				CoreInfoExistsAtMaxOnce::Once(core_info) => {
534					assert_eq!(
535						core_info.claim_queue_offset.0,
536						T::RelayParentOffset::get() as u8,
537						"Only {} is supported as valid claim queue offset",
538						T::RelayParentOffset::get()
539					);
540				},
541				CoreInfoExistsAtMaxOnce::NotFound => {},
542				CoreInfoExistsAtMaxOnce::MoreThanOnce => {
543					panic!("`CumulusDigestItem::CoreInfo` must exist at max once.");
544				},
545			}
546
547			weight
548		}
549	}
550
551	#[pallet::call]
552	impl<T: Config> Pallet<T> {
553		/// Set the current validation data.
554		///
555		/// This should be invoked exactly once per block. It will panic at the finalization
556		/// phase if the call was not invoked.
557		///
558		/// The dispatch origin for this call must be `Inherent`
559		///
560		/// As a side effect, this function upgrades the current validation function
561		/// if the appropriate time has come.
562		#[pallet::call_index(0)]
563		#[pallet::weight((0, DispatchClass::Mandatory))]
564		// TODO: This weight should be corrected. Currently the weight is registered manually in the
565		// call with `register_extra_weight_unchecked`.
566		pub fn set_validation_data(
567			origin: OriginFor<T>,
568			data: BasicParachainInherentData,
569			inbound_messages_data: InboundMessagesData,
570		) -> DispatchResult {
571			ensure_none(origin)?;
572			assert!(
573				!<ValidationData<T>>::exists(),
574				"ValidationData must be updated only once in a block",
575			);
576
577			// TODO: This is more than zero, but will need benchmarking to figure out what.
578			let mut total_weight = Weight::zero();
579
580			// NOTE: the inherent data is expected to be unique, even if this block is built
581			// in the context of the same relay parent as the previous one. In particular,
582			// the inherent shouldn't contain messages that were already processed by any of the
583			// ancestors.
584			//
585			// This invariant should be upheld by the `ProvideInherent` implementation.
586			let BasicParachainInherentData {
587				validation_data: vfp,
588				relay_chain_state,
589				relay_parent_descendants,
590				collator_peer_id: _,
591			} = data;
592
593			// Check that the associated relay chain block number is as expected.
594			T::CheckAssociatedRelayNumber::check_associated_relay_number(
595				vfp.relay_parent_number,
596				LastRelayChainBlockNumber::<T>::get(),
597			);
598
599			let relay_state_proof = RelayChainStateProof::new(
600				T::SelfParaId::get(),
601				vfp.relay_parent_storage_root,
602				relay_chain_state.clone(),
603			)
604			.expect("Invalid relay chain state proof");
605
606			let expected_rp_descendants_num = T::RelayParentOffset::get();
607
608			if expected_rp_descendants_num > 0 {
609				if let Err(err) = descendant_validation::verify_relay_parent_descendants(
610					&relay_state_proof,
611					relay_parent_descendants,
612					vfp.relay_parent_storage_root,
613					expected_rp_descendants_num,
614				) {
615					panic!(
616						"Unable to verify provided relay parent descendants. \
617						expected_rp_descendants_num: {expected_rp_descendants_num} \
618						error: {err:?}"
619					);
620				};
621			}
622
623			// Update the desired maximum capacity according to the consensus hook.
624			let (consensus_hook_weight, capacity) =
625				T::ConsensusHook::on_state_proof(&relay_state_proof);
626			total_weight += consensus_hook_weight;
627			total_weight += Self::maybe_drop_included_ancestors(&relay_state_proof, capacity);
628			// Deposit a log indicating the relay-parent storage root.
629			// TODO: remove this in favor of the relay-parent's hash after
630			// https://github.com/paritytech/cumulus/issues/303
631			frame_system::Pallet::<T>::deposit_log(
632				cumulus_primitives_core::rpsr_digest::relay_parent_storage_root_item(
633					vfp.relay_parent_storage_root,
634					vfp.relay_parent_number,
635				),
636			);
637
638			// initialization logic: we know that this runs exactly once every block,
639			// which means we can put the initialization logic here to remove the
640			// sequencing problem.
641			let upgrade_go_ahead_signal = relay_state_proof
642				.read_upgrade_go_ahead_signal()
643				.expect("Invalid upgrade go ahead signal");
644
645			let upgrade_signal_in_segment = AggregatedUnincludedSegment::<T>::get()
646				.as_ref()
647				.and_then(SegmentTracker::consumed_go_ahead_signal);
648			if let Some(signal_in_segment) = upgrade_signal_in_segment.as_ref() {
649				// Unincluded ancestor consuming upgrade signal is still within the segment,
650				// sanity check that it matches with the signal from relay chain.
651				assert_eq!(upgrade_go_ahead_signal, Some(*signal_in_segment));
652			}
653			match upgrade_go_ahead_signal {
654				Some(_signal) if upgrade_signal_in_segment.is_some() => {
655					// Do nothing, processing logic was executed by unincluded ancestor.
656				},
657				Some(relay_chain::UpgradeGoAhead::GoAhead) => {
658					assert!(
659						<PendingValidationCode<T>>::exists(),
660						"No new validation function found in storage, GoAhead signal is not expected",
661					);
662					let validation_code = <PendingValidationCode<T>>::take();
663
664					frame_system::Pallet::<T>::update_code_in_storage(&validation_code);
665					<T::OnSystemEvent as OnSystemEvent>::on_validation_code_applied();
666					Self::deposit_event(Event::ValidationFunctionApplied {
667						relay_chain_block_num: vfp.relay_parent_number,
668					});
669				},
670				Some(relay_chain::UpgradeGoAhead::Abort) => {
671					<PendingValidationCode<T>>::kill();
672					Self::deposit_event(Event::ValidationFunctionDiscarded);
673				},
674				None => {},
675			}
676			<UpgradeRestrictionSignal<T>>::put(
677				relay_state_proof
678					.read_upgrade_restriction_signal()
679					.expect("Invalid upgrade restriction signal"),
680			);
681			<UpgradeGoAhead<T>>::put(upgrade_go_ahead_signal);
682
683			let host_config = relay_state_proof
684				.read_abridged_host_configuration()
685				.expect("Invalid host configuration in relay chain state proof");
686
687			let relevant_messaging_state = relay_state_proof
688				.read_messaging_state_snapshot(&host_config)
689				.expect("Invalid messaging state in relay chain state proof");
690
691			<ValidationData<T>>::put(&vfp);
692			<RelayStateProof<T>>::put(relay_chain_state);
693			<RelevantMessagingState<T>>::put(relevant_messaging_state.clone());
694			<HostConfiguration<T>>::put(host_config);
695
696			<T::OnSystemEvent as OnSystemEvent>::on_validation_data(&vfp);
697
698			total_weight.saturating_accrue(Self::enqueue_inbound_downward_messages(
699				relevant_messaging_state.dmq_mqc_head,
700				inbound_messages_data.downward_messages,
701			));
702			total_weight.saturating_accrue(Self::enqueue_inbound_horizontal_messages(
703				&relevant_messaging_state.ingress_channels,
704				inbound_messages_data.horizontal_messages,
705				vfp.relay_parent_number,
706			));
707
708			frame_system::Pallet::<T>::register_extra_weight_unchecked(
709				total_weight,
710				DispatchClass::Mandatory,
711			);
712
713			Ok(())
714		}
715
716		#[pallet::call_index(1)]
717		#[pallet::weight((1_000, DispatchClass::Operational))]
718		pub fn sudo_send_upward_message(
719			origin: OriginFor<T>,
720			message: UpwardMessage,
721		) -> DispatchResult {
722			ensure_root(origin)?;
723			let _ = Self::send_upward_message(message);
724			Ok(())
725		}
726
727		// WARNING: call indices 2 and 3 were used in a former version of this pallet. Using them
728		// again will require to bump the transaction version of runtimes using this pallet.
729	}
730
731	#[pallet::event]
732	#[pallet::generate_deposit(pub(super) fn deposit_event)]
733	pub enum Event<T: Config> {
734		/// The validation function has been scheduled to apply.
735		ValidationFunctionStored,
736		/// The validation function was applied as of the contained relay chain block number.
737		ValidationFunctionApplied { relay_chain_block_num: RelayChainBlockNumber },
738		/// The relay-chain aborted the upgrade process.
739		ValidationFunctionDiscarded,
740		/// Some downward messages have been received and will be processed.
741		DownwardMessagesReceived { count: u32 },
742		/// Downward messages were processed using the given weight.
743		DownwardMessagesProcessed { weight_used: Weight, dmq_head: relay_chain::Hash },
744		/// An upward message was sent to the relay chain.
745		UpwardMessageSent { message_hash: Option<XcmHash> },
746	}
747
748	#[pallet::error]
749	pub enum Error<T> {
750		/// Attempt to upgrade validation function while existing upgrade pending.
751		OverlappingUpgrades,
752		/// Polkadot currently prohibits this parachain from upgrading its validation function.
753		ProhibitedByPolkadot,
754		/// The supplied validation function has compiled into a blob larger than Polkadot is
755		/// willing to run.
756		TooBig,
757		/// The inherent which supplies the validation data did not run this block.
758		ValidationDataNotAvailable,
759		/// The inherent which supplies the host configuration did not run this block.
760		HostConfigurationNotAvailable,
761		/// No validation function upgrade is currently scheduled.
762		NotScheduled,
763	}
764
765	/// Latest included block descendants the runtime accepted. In other words, these are
766	/// ancestors of the currently executing block which have not been included in the observed
767	/// relay-chain state.
768	///
769	/// The segment length is limited by the capacity returned from the [`ConsensusHook`] configured
770	/// in the pallet.
771	#[pallet::storage]
772	pub type UnincludedSegment<T: Config> = StorageValue<_, Vec<Ancestor<T::Hash>>, ValueQuery>;
773
774	/// Storage field that keeps track of bandwidth used by the unincluded segment along with the
775	/// latest HRMP watermark. Used for limiting the acceptance of new blocks with
776	/// respect to relay chain constraints.
777	#[pallet::storage]
778	pub type AggregatedUnincludedSegment<T: Config> =
779		StorageValue<_, SegmentTracker<T::Hash>, OptionQuery>;
780
781	/// In case of a scheduled upgrade, this storage field contains the validation code to be
782	/// applied.
783	///
784	/// As soon as the relay chain gives us the go-ahead signal, we will overwrite the
785	/// [`:code`][sp_core::storage::well_known_keys::CODE] which will result the next block process
786	/// with the new validation code. This concludes the upgrade process.
787	#[pallet::storage]
788	pub type PendingValidationCode<T: Config> = StorageValue<_, Vec<u8>, ValueQuery>;
789
790	/// Validation code that is set by the parachain and is to be communicated to collator and
791	/// consequently the relay-chain.
792	///
793	/// This will be cleared in `on_initialize` of each new block if no other pallet already set
794	/// the value.
795	#[pallet::storage]
796	pub type NewValidationCode<T: Config> = StorageValue<_, Vec<u8>, OptionQuery>;
797
798	/// The [`PersistedValidationData`] set for this block.
799	///
800	/// This value is expected to be set only once by the [`Pallet::set_validation_data`] inherent.
801	#[pallet::storage]
802	pub type ValidationData<T: Config> = StorageValue<_, PersistedValidationData>;
803
804	/// Were the validation data set to notify the relay chain?
805	#[pallet::storage]
806	pub type DidSetValidationCode<T: Config> = StorageValue<_, bool, ValueQuery>;
807
808	/// The relay chain block number associated with the last parachain block.
809	///
810	/// This is updated in `on_finalize`.
811	#[pallet::storage]
812	pub type LastRelayChainBlockNumber<T: Config> =
813		StorageValue<_, RelayChainBlockNumber, ValueQuery>;
814
815	/// An option which indicates if the relay-chain restricts signalling a validation code upgrade.
816	/// In other words, if this is `Some` and [`NewValidationCode`] is `Some` then the produced
817	/// candidate will be invalid.
818	///
819	/// This storage item is a mirror of the corresponding value for the current parachain from the
820	/// relay-chain. This value is ephemeral which means it doesn't hit the storage. This value is
821	/// set after the inherent.
822	#[pallet::storage]
823	pub type UpgradeRestrictionSignal<T: Config> =
824		StorageValue<_, Option<relay_chain::UpgradeRestriction>, ValueQuery>;
825
826	/// Optional upgrade go-ahead signal from the relay-chain.
827	///
828	/// This storage item is a mirror of the corresponding value for the current parachain from the
829	/// relay-chain. This value is ephemeral which means it doesn't hit the storage. This value is
830	/// set after the inherent.
831	#[pallet::storage]
832	pub type UpgradeGoAhead<T: Config> =
833		StorageValue<_, Option<relay_chain::UpgradeGoAhead>, ValueQuery>;
834
835	/// The state proof for the last relay parent block.
836	///
837	/// This field is meant to be updated each block with the validation data inherent. Therefore,
838	/// before processing of the inherent, e.g. in `on_initialize` this data may be stale.
839	///
840	/// This data is also absent from the genesis.
841	#[pallet::storage]
842	pub type RelayStateProof<T: Config> = StorageValue<_, sp_trie::StorageProof>;
843
844	/// The snapshot of some state related to messaging relevant to the current parachain as per
845	/// the relay parent.
846	///
847	/// This field is meant to be updated each block with the validation data inherent. Therefore,
848	/// before processing of the inherent, e.g. in `on_initialize` this data may be stale.
849	///
850	/// This data is also absent from the genesis.
851	#[pallet::storage]
852	pub type RelevantMessagingState<T: Config> = StorageValue<_, MessagingStateSnapshot>;
853
854	/// The parachain host configuration that was obtained from the relay parent.
855	///
856	/// This field is meant to be updated each block with the validation data inherent. Therefore,
857	/// before processing of the inherent, e.g. in `on_initialize` this data may be stale.
858	///
859	/// This data is also absent from the genesis.
860	#[pallet::storage]
861	#[pallet::disable_try_decode_storage]
862	pub type HostConfiguration<T: Config> = StorageValue<_, AbridgedHostConfiguration>;
863
864	/// The last downward message queue chain head we have observed.
865	///
866	/// This value is loaded before and saved after processing inbound downward messages carried
867	/// by the system inherent.
868	#[pallet::storage]
869	pub type LastDmqMqcHead<T: Config> = StorageValue<_, MessageQueueChain, ValueQuery>;
870
871	/// The message queue chain heads we have observed per each channel incoming channel.
872	///
873	/// This value is loaded before and saved after processing inbound downward messages carried
874	/// by the system inherent.
875	#[pallet::storage]
876	pub type LastHrmpMqcHeads<T: Config> =
877		StorageValue<_, BTreeMap<ParaId, MessageQueueChain>, ValueQuery>;
878
879	/// Number of downward messages processed in a block.
880	///
881	/// This will be cleared in `on_initialize` of each new block.
882	#[pallet::storage]
883	pub type ProcessedDownwardMessages<T: Config> = StorageValue<_, u32, ValueQuery>;
884
885	/// The last processed downward message.
886	///
887	/// We need to keep track of this to filter the messages that have been already processed.
888	#[pallet::storage]
889	pub type LastProcessedDownwardMessage<T: Config> = StorageValue<_, InboundMessageId>;
890
891	/// HRMP watermark that was set in a block.
892	#[pallet::storage]
893	pub type HrmpWatermark<T: Config> = StorageValue<_, relay_chain::BlockNumber, ValueQuery>;
894
895	/// The last processed HRMP message.
896	///
897	/// We need to keep track of this to filter the messages that have been already processed.
898	#[pallet::storage]
899	pub type LastProcessedHrmpMessage<T: Config> = StorageValue<_, InboundMessageId>;
900
901	/// HRMP messages that were sent in a block.
902	///
903	/// This will be cleared in `on_initialize` of each new block.
904	#[pallet::storage]
905	pub type HrmpOutboundMessages<T: Config> =
906		StorageValue<_, Vec<OutboundHrmpMessage>, ValueQuery>;
907
908	/// Upward messages that were sent in a block.
909	///
910	/// This will be cleared in `on_initialize` of each new block.
911	#[pallet::storage]
912	pub type UpwardMessages<T: Config> = StorageValue<_, Vec<UpwardMessage>, ValueQuery>;
913
914	/// Upward messages that are still pending and not yet send to the relay chain.
915	#[pallet::storage]
916	pub type PendingUpwardMessages<T: Config> = StorageValue<_, Vec<UpwardMessage>, ValueQuery>;
917
918	/// The factor to multiply the base delivery fee by for UMP.
919	#[pallet::storage]
920	pub type UpwardDeliveryFeeFactor<T: Config> =
921		StorageValue<_, FixedU128, ValueQuery, GetMinFeeFactor<Pallet<T>>>;
922
923	/// The number of HRMP messages we observed in `on_initialize` and thus used that number for
924	/// announcing the weight of `on_initialize` and `on_finalize`.
925	#[pallet::storage]
926	pub type AnnouncedHrmpMessagesPerCandidate<T: Config> = StorageValue<_, u32, ValueQuery>;
927
928	/// The weight we reserve at the beginning of the block for processing XCMP messages. This
929	/// overrides the amount set in the Config trait.
930	#[pallet::storage]
931	pub type ReservedXcmpWeightOverride<T: Config> = StorageValue<_, Weight>;
932
933	/// The weight we reserve at the beginning of the block for processing DMP messages. This
934	/// overrides the amount set in the Config trait.
935	#[pallet::storage]
936	pub type ReservedDmpWeightOverride<T: Config> = StorageValue<_, Weight>;
937
938	/// A custom head data that should be returned as result of `validate_block`.
939	///
940	/// See `Pallet::set_custom_validation_head_data` for more information.
941	#[pallet::storage]
942	pub type CustomValidationHeadData<T: Config> = StorageValue<_, Vec<u8>, OptionQuery>;
943
944	#[pallet::inherent]
945	impl<T: Config> ProvideInherent for Pallet<T> {
946		type Call = Call<T>;
947		type Error = sp_inherents::MakeFatalError<()>;
948		const INHERENT_IDENTIFIER: InherentIdentifier =
949			cumulus_primitives_parachain_inherent::INHERENT_IDENTIFIER;
950
951		fn create_inherent(data: &InherentData) -> Option<Self::Call> {
952			let data = match data
953				.get_data::<ParachainInherentData>(&Self::INHERENT_IDENTIFIER)
954				.ok()
955				.flatten()
956			{
957				None => {
958					// Key Self::INHERENT_IDENTIFIER is expected to contain versioned inherent
959					// data. Older nodes are unaware of the new format and might provide the
960					// legacy data format. We try to load it and transform it into the current
961					// version.
962					let data = data
963						.get_data::<v0::ParachainInherentData>(
964							&cumulus_primitives_parachain_inherent::PARACHAIN_INHERENT_IDENTIFIER_V0,
965						)
966						.ok()
967						.flatten()?;
968					data.into()
969				},
970				Some(data) => data,
971			};
972
973			Some(Self::do_create_inherent(data))
974		}
975
976		fn is_inherent(call: &Self::Call) -> bool {
977			matches!(call, Call::set_validation_data { .. })
978		}
979	}
980
981	#[pallet::genesis_config]
982	#[derive(frame_support::DefaultNoBound)]
983	pub struct GenesisConfig<T: Config> {
984		#[serde(skip)]
985		pub _config: core::marker::PhantomData<T>,
986	}
987
988	#[pallet::genesis_build]
989	impl<T: Config> BuildGenesisConfig for GenesisConfig<T> {
990		fn build(&self) {
991			// TODO: Remove after https://github.com/paritytech/cumulus/issues/479
992			sp_io::storage::set(b":c", &[]);
993		}
994	}
995}
996
997impl<T: Config> Pallet<T> {
998	/// Get the unincluded segment size after the given hash.
999	///
1000	/// If the unincluded segment doesn't contain the given hash, this returns the
1001	/// length of the entire unincluded segment.
1002	///
1003	/// This is intended to be used for determining how long the unincluded segment _would be_
1004	/// in runtime APIs related to authoring.
1005	pub fn unincluded_segment_size_after(included_hash: T::Hash) -> u32 {
1006		let segment = UnincludedSegment::<T>::get();
1007		crate::unincluded_segment::size_after_included(included_hash, &segment)
1008	}
1009}
1010
1011impl<T: Config> FeeTracker for Pallet<T> {
1012	type Id = ();
1013
1014	fn get_fee_factor(_id: Self::Id) -> FixedU128 {
1015		UpwardDeliveryFeeFactor::<T>::get()
1016	}
1017
1018	fn set_fee_factor(_id: Self::Id, val: FixedU128) {
1019		UpwardDeliveryFeeFactor::<T>::set(val);
1020	}
1021}
1022
1023impl<T: Config> ListChannelInfos for Pallet<T> {
1024	fn outgoing_channels() -> Vec<ParaId> {
1025		let Some(state) = RelevantMessagingState::<T>::get() else { return Vec::new() };
1026		state.egress_channels.into_iter().map(|(id, _)| id).collect()
1027	}
1028}
1029
1030impl<T: Config> GetChannelInfo for Pallet<T> {
1031	fn get_channel_status(id: ParaId) -> ChannelStatus {
1032		// Note, that we are using `relevant_messaging_state` which may be from the previous
1033		// block, in case this is called from `on_initialize`, i.e. before the inherent with
1034		// fresh data is submitted.
1035		//
1036		// That shouldn't be a problem though because this is anticipated and already can
1037		// happen. This is because sending implies that a message is buffered until there is
1038		// space to send a message in the candidate. After a while waiting in a buffer, it may
1039		// be discovered that the channel to which a message were addressed is now closed.
1040		// Another possibility, is that the maximum message size was decreased so that a
1041		// message in the buffer doesn't fit. Should any of that happen the sender should be
1042		// notified about the message was discarded.
1043		//
1044		// Here it a similar case, with the difference that the realization that the channel is
1045		// closed came the same block.
1046		let channels = match RelevantMessagingState::<T>::get() {
1047			None => {
1048				log::warn!("calling `get_channel_status` with no RelevantMessagingState?!");
1049				return ChannelStatus::Closed
1050			},
1051			Some(d) => d.egress_channels,
1052		};
1053		// ^^^ NOTE: This storage field should carry over from the previous block. So if it's
1054		// None then it must be that this is an edge-case where a message is attempted to be
1055		// sent at the first block. It should be safe to assume that there are no channels
1056		// opened at all so early. At least, relying on this assumption seems to be a better
1057		// trade-off, compared to introducing an error variant that the clients should be
1058		// prepared to handle.
1059		let index = match channels.binary_search_by_key(&id, |item| item.0) {
1060			Err(_) => return ChannelStatus::Closed,
1061			Ok(i) => i,
1062		};
1063		let meta = &channels[index].1;
1064		if meta.msg_count + 1 > meta.max_capacity {
1065			// The channel is at its capacity. Skip it for now.
1066			return ChannelStatus::Full
1067		}
1068		let max_size_now = meta.max_total_size - meta.total_size;
1069		let max_size_ever = meta.max_message_size;
1070		ChannelStatus::Ready(max_size_now as usize, max_size_ever as usize)
1071	}
1072
1073	fn get_channel_info(id: ParaId) -> Option<ChannelInfo> {
1074		let channels = RelevantMessagingState::<T>::get()?.egress_channels;
1075		let index = channels.binary_search_by_key(&id, |item| item.0).ok()?;
1076		let info = ChannelInfo {
1077			max_capacity: channels[index].1.max_capacity,
1078			max_total_size: channels[index].1.max_total_size,
1079			max_message_size: channels[index].1.max_message_size,
1080			msg_count: channels[index].1.msg_count,
1081			total_size: channels[index].1.total_size,
1082		};
1083		Some(info)
1084	}
1085}
1086
1087impl<T: Config> Pallet<T> {
1088	/// The bandwidth limit per block that applies when receiving messages from the relay chain via
1089	/// DMP or XCMP.
1090	///
1091	/// The limit is per message passing mechanism (e.g. 1 MiB for DMP, 1 MiB for XCMP).
1092	///
1093	/// The purpose of this limit is to make sure that the total size of the messages received by
1094	/// the parachain from the relay chain doesn't exceed the block size. Currently each message
1095	/// passing mechanism can use 1/6 of the total block PoV which means that in total 1/3
1096	/// of the block PoV can be used for message passing.
1097	fn messages_collection_size_limit() -> usize {
1098		let max_block_weight = <T as frame_system::Config>::BlockWeights::get().max_block;
1099		let max_block_pov = max_block_weight.proof_size();
1100		(max_block_pov / 6).saturated_into()
1101	}
1102
1103	/// Updates inherent data to only include the messages that weren't already processed
1104	/// by the runtime and to compress (hash) the messages that exceed the allocated size.
1105	///
1106	/// This method doesn't check for mqc heads mismatch. If the MQC doesn't match after
1107	/// dropping messages, the runtime will panic when executing the inherent.
1108	fn do_create_inherent(data: ParachainInherentData) -> Call<T> {
1109		let (data, mut downward_messages, mut horizontal_messages) =
1110			deconstruct_parachain_inherent_data(data);
1111		let last_relay_block_number = LastRelayChainBlockNumber::<T>::get();
1112
1113		let messages_collection_size_limit = Self::messages_collection_size_limit();
1114		// DMQ.
1115		let last_processed_msg = LastProcessedDownwardMessage::<T>::get()
1116			.unwrap_or(InboundMessageId { sent_at: last_relay_block_number, reverse_idx: 0 });
1117		downward_messages.drop_processed_messages(&last_processed_msg);
1118		let mut size_limit = messages_collection_size_limit;
1119		let downward_messages = downward_messages.into_abridged(&mut size_limit);
1120
1121		// HRMP.
1122		let last_processed_msg = LastProcessedHrmpMessage::<T>::get()
1123			.unwrap_or(InboundMessageId { sent_at: last_relay_block_number, reverse_idx: 0 });
1124		horizontal_messages.drop_processed_messages(&last_processed_msg);
1125		size_limit = size_limit.saturating_add(messages_collection_size_limit);
1126		let horizontal_messages = horizontal_messages.into_abridged(&mut size_limit);
1127
1128		let inbound_messages_data =
1129			InboundMessagesData::new(downward_messages, horizontal_messages);
1130
1131		Call::set_validation_data { data, inbound_messages_data }
1132	}
1133
1134	/// Enqueue all inbound downward messages relayed by the collator into the MQ pallet.
1135	///
1136	/// Checks if the sequence of the messages is valid, dispatches them and communicates the
1137	/// number of processed messages to the collator via a storage update.
1138	///
1139	/// # Panics
1140	///
1141	/// If it turns out that after processing all messages the Message Queue Chain
1142	/// hash doesn't match the expected.
1143	fn enqueue_inbound_downward_messages(
1144		expected_dmq_mqc_head: relay_chain::Hash,
1145		downward_messages: AbridgedInboundDownwardMessages,
1146	) -> Weight {
1147		downward_messages.check_enough_messages_included("DMQ");
1148
1149		let mut dmq_head = <LastDmqMqcHead<T>>::get();
1150
1151		let (messages, hashed_messages) = downward_messages.messages();
1152		let message_count = messages.len() as u32;
1153		let weight_used = T::WeightInfo::enqueue_inbound_downward_messages(message_count);
1154		if let Some(last_msg) = messages.last() {
1155			Self::deposit_event(Event::DownwardMessagesReceived { count: message_count });
1156
1157			// Eagerly update the MQC head hash:
1158			for msg in messages {
1159				dmq_head.extend_downward(msg);
1160			}
1161			<LastDmqMqcHead<T>>::put(&dmq_head);
1162			Self::deposit_event(Event::DownwardMessagesProcessed {
1163				weight_used,
1164				dmq_head: dmq_head.head(),
1165			});
1166
1167			let mut last_processed_msg =
1168				InboundMessageId { sent_at: last_msg.sent_at, reverse_idx: 0 };
1169			for msg in hashed_messages {
1170				dmq_head.extend_with_hashed_msg(msg);
1171
1172				if msg.sent_at == last_processed_msg.sent_at {
1173					last_processed_msg.reverse_idx += 1;
1174				}
1175			}
1176			LastProcessedDownwardMessage::<T>::put(last_processed_msg);
1177
1178			T::DmpQueue::handle_messages(downward_messages.bounded_msgs_iter());
1179		}
1180
1181		// After hashing each message in the message queue chain submitted by the collator, we
1182		// should arrive to the MQC head provided by the relay chain.
1183		//
1184		// A mismatch means that at least some of the submitted messages were altered, omitted or
1185		// added improperly.
1186		assert_eq!(dmq_head.head(), expected_dmq_mqc_head, "DMQ head mismatch");
1187
1188		ProcessedDownwardMessages::<T>::put(message_count);
1189
1190		weight_used
1191	}
1192
1193	fn check_hrmp_mcq_heads(
1194		ingress_channels: &[(ParaId, cumulus_primitives_core::AbridgedHrmpChannel)],
1195		mqc_heads: &mut BTreeMap<ParaId, MessageQueueChain>,
1196	) {
1197		// Check that the MQC heads for each channel provided by the relay chain match the MQC
1198		// heads we have after processing all incoming messages.
1199		//
1200		// Along the way we also carry over the relevant entries from the `last_mqc_heads` to
1201		// `running_mqc_heads`. Otherwise, in a block where no messages were sent in a channel
1202		// it won't get into next block's `last_mqc_heads` and thus will be all zeros, which
1203		// would corrupt the message queue chain.
1204		for (sender, channel) in ingress_channels {
1205			let cur_head = mqc_heads.entry(*sender).or_default().head();
1206			let target_head = channel.mqc_head.unwrap_or_default();
1207			assert_eq!(cur_head, target_head, "HRMP head mismatch");
1208		}
1209	}
1210
1211	/// Performs some checks related to the sender and the `sent_at` field of an HRMP message.
1212	///
1213	/// **Panics** if the message submitted by the collator doesn't respect the expected order or if
1214	///            it was sent from a para which has no open channel to this parachain.
1215	fn check_hrmp_message_metadata(
1216		ingress_channels: &[(ParaId, cumulus_primitives_core::AbridgedHrmpChannel)],
1217		maybe_prev_msg_metadata: &mut Option<(u32, ParaId)>,
1218		msg_metadata: (u32, ParaId),
1219	) {
1220		// Check that the message is properly ordered.
1221		if let Some(prev_msg) = maybe_prev_msg_metadata {
1222			assert!(&msg_metadata >= prev_msg, "[HRMP] Messages order violation");
1223		}
1224		*maybe_prev_msg_metadata = Some(msg_metadata);
1225
1226		// Check that the message is sent from an existing channel. The channel exists
1227		// if its MQC head is present in `vfp.hrmp_mqc_heads`.
1228		let sender = msg_metadata.1;
1229		let maybe_channel_idx =
1230			ingress_channels.binary_search_by_key(&sender, |&(channel_sender, _)| channel_sender);
1231		assert!(
1232			maybe_channel_idx.is_ok(),
1233			"One of the messages submitted by the collator was sent from a sender ({}) \
1234					that doesn't have a channel opened to this parachain",
1235			<ParaId as Into<u32>>::into(sender)
1236		);
1237	}
1238
1239	/// Process all inbound horizontal messages relayed by the collator.
1240	///
1241	/// This is similar to [`enqueue_inbound_downward_messages`], but works with multiple inbound
1242	/// channels. It immediately dispatches signals and queues all other XCMs. Blob messages are
1243	/// ignored.
1244	///
1245	/// **Panics** if either any of horizontal messages submitted by the collator was sent from
1246	///            a para which has no open channel to this parachain or if after processing
1247	///            messages across all inbound channels MQCs were obtained which do not
1248	///            correspond to the ones found on the relay-chain.
1249	fn enqueue_inbound_horizontal_messages(
1250		ingress_channels: &[(ParaId, cumulus_primitives_core::AbridgedHrmpChannel)],
1251		horizontal_messages: AbridgedInboundHrmpMessages,
1252		relay_parent_number: relay_chain::BlockNumber,
1253	) -> Weight {
1254		// First, check the HRMP advancement rule.
1255		horizontal_messages.check_enough_messages_included("HRMP");
1256
1257		let (messages, hashed_messages) = horizontal_messages.messages();
1258		let mut mqc_heads = <LastHrmpMqcHeads<T>>::get();
1259
1260		if messages.is_empty() {
1261			Self::check_hrmp_mcq_heads(ingress_channels, &mut mqc_heads);
1262			let last_processed_msg =
1263				InboundMessageId { sent_at: relay_parent_number, reverse_idx: 0 };
1264			LastProcessedHrmpMessage::<T>::put(last_processed_msg);
1265			HrmpWatermark::<T>::put(relay_parent_number);
1266			return T::DbWeight::get().reads_writes(1, 2);
1267		}
1268
1269		let mut prev_msg_metadata = None;
1270		let mut last_processed_block = HrmpWatermark::<T>::get();
1271		let mut last_processed_msg = InboundMessageId { sent_at: 0, reverse_idx: 0 };
1272		for (sender, msg) in messages {
1273			Self::check_hrmp_message_metadata(
1274				ingress_channels,
1275				&mut prev_msg_metadata,
1276				(msg.sent_at, *sender),
1277			);
1278			mqc_heads.entry(*sender).or_default().extend_hrmp(msg);
1279
1280			if msg.sent_at > last_processed_msg.sent_at && last_processed_msg.sent_at > 0 {
1281				last_processed_block = last_processed_msg.sent_at;
1282			}
1283			last_processed_msg.sent_at = msg.sent_at;
1284		}
1285		<LastHrmpMqcHeads<T>>::put(&mqc_heads);
1286		for (sender, msg) in hashed_messages {
1287			Self::check_hrmp_message_metadata(
1288				ingress_channels,
1289				&mut prev_msg_metadata,
1290				(msg.sent_at, *sender),
1291			);
1292			mqc_heads.entry(*sender).or_default().extend_with_hashed_msg(msg);
1293
1294			if msg.sent_at == last_processed_msg.sent_at {
1295				last_processed_msg.reverse_idx += 1;
1296			}
1297		}
1298		if last_processed_msg.sent_at > 0 && last_processed_msg.reverse_idx == 0 {
1299			last_processed_block = last_processed_msg.sent_at;
1300		}
1301		LastProcessedHrmpMessage::<T>::put(&last_processed_msg);
1302		Self::check_hrmp_mcq_heads(ingress_channels, &mut mqc_heads);
1303
1304		let max_weight =
1305			<ReservedXcmpWeightOverride<T>>::get().unwrap_or_else(T::ReservedXcmpWeight::get);
1306		let weight_used = T::XcmpMessageHandler::handle_xcmp_messages(
1307			horizontal_messages.flat_msgs_iter(),
1308			max_weight,
1309		);
1310
1311		// Update watermark
1312		HrmpWatermark::<T>::put(last_processed_block);
1313
1314		weight_used.saturating_add(T::DbWeight::get().reads_writes(2, 3))
1315	}
1316
1317	/// Drop blocks from the unincluded segment with respect to the latest parachain head.
1318	fn maybe_drop_included_ancestors(
1319		relay_state_proof: &RelayChainStateProof,
1320		capacity: consensus_hook::UnincludedSegmentCapacity,
1321	) -> Weight {
1322		let mut weight_used = Weight::zero();
1323		// If the unincluded segment length is nonzero, then the parachain head must be present.
1324		let para_head =
1325			relay_state_proof.read_included_para_head().ok().map(|h| T::Hashing::hash(&h.0));
1326
1327		let unincluded_segment_len = <UnincludedSegment<T>>::decode_len().unwrap_or(0);
1328		weight_used += T::DbWeight::get().reads(1);
1329
1330		// Clean up unincluded segment if nonempty.
1331		let included_head = match (para_head, capacity.is_expecting_included_parent()) {
1332			(Some(h), true) => {
1333				assert_eq!(
1334					h,
1335					frame_system::Pallet::<T>::parent_hash(),
1336					"expected parent to be included"
1337				);
1338
1339				h
1340			},
1341			(Some(h), false) => h,
1342			(None, true) => {
1343				// All this logic is essentially a workaround to support collators which
1344				// might still not provide the included block with the state proof.
1345				frame_system::Pallet::<T>::parent_hash()
1346			},
1347			(None, false) => panic!("included head not present in relay storage proof"),
1348		};
1349
1350		let new_len = {
1351			let para_head_hash = included_head;
1352			let dropped: Vec<Ancestor<T::Hash>> = <UnincludedSegment<T>>::mutate(|chain| {
1353				// Drop everything up to (inclusive) the block with an included para head, if
1354				// present.
1355				let idx = chain
1356					.iter()
1357					.position(|block| {
1358						let head_hash = block
1359							.para_head_hash()
1360							.expect("para head hash is updated during block initialization; qed");
1361						head_hash == &para_head_hash
1362					})
1363					.map_or(0, |idx| idx + 1); // inclusive.
1364
1365				chain.drain(..idx).collect()
1366			});
1367			weight_used += T::DbWeight::get().reads_writes(1, 1);
1368
1369			let new_len = unincluded_segment_len - dropped.len();
1370			if !dropped.is_empty() {
1371				<AggregatedUnincludedSegment<T>>::mutate(|agg| {
1372					let agg = agg.as_mut().expect(
1373						"dropped part of the segment wasn't empty, hence value exists; qed",
1374					);
1375					for block in dropped {
1376						agg.subtract(&block);
1377					}
1378				});
1379				weight_used += T::DbWeight::get().reads_writes(1, 1);
1380			}
1381
1382			new_len as u32
1383		};
1384
1385		// Current block validity check: ensure there is space in the unincluded segment.
1386		//
1387		// If this fails, the parachain needs to wait for ancestors to be included before
1388		// a new block is allowed.
1389		assert!(new_len < capacity.get(), "no space left for the block in the unincluded segment");
1390		weight_used
1391	}
1392
1393	/// This adjusts the `RelevantMessagingState` according to the bandwidth limits in the
1394	/// unincluded segment.
1395	//
1396	// Reads: 2
1397	// Writes: 1
1398	fn adjust_egress_bandwidth_limits() {
1399		let unincluded_segment = match AggregatedUnincludedSegment::<T>::get() {
1400			None => return,
1401			Some(s) => s,
1402		};
1403
1404		<RelevantMessagingState<T>>::mutate(|messaging_state| {
1405			let messaging_state = match messaging_state {
1406				None => return,
1407				Some(s) => s,
1408			};
1409
1410			let used_bandwidth = unincluded_segment.used_bandwidth();
1411
1412			let channels = &mut messaging_state.egress_channels;
1413			for (para_id, used) in used_bandwidth.hrmp_outgoing.iter() {
1414				let i = match channels.binary_search_by_key(para_id, |item| item.0) {
1415					Ok(i) => i,
1416					Err(_) => continue, // indicates channel closed.
1417				};
1418
1419				let c = &mut channels[i].1;
1420
1421				c.total_size = (c.total_size + used.total_bytes).min(c.max_total_size);
1422				c.msg_count = (c.msg_count + used.msg_count).min(c.max_capacity);
1423			}
1424
1425			let upward_capacity = &mut messaging_state.relay_dispatch_queue_remaining_capacity;
1426			upward_capacity.remaining_count =
1427				upward_capacity.remaining_count.saturating_sub(used_bandwidth.ump_msg_count);
1428			upward_capacity.remaining_size =
1429				upward_capacity.remaining_size.saturating_sub(used_bandwidth.ump_total_bytes);
1430		});
1431	}
1432
1433	/// Put a new validation function into a particular location where polkadot
1434	/// monitors for updates. Calling this function notifies polkadot that a new
1435	/// upgrade has been scheduled.
1436	fn notify_polkadot_of_pending_upgrade(code: &[u8]) {
1437		NewValidationCode::<T>::put(code);
1438		<DidSetValidationCode<T>>::put(true);
1439	}
1440
1441	/// The maximum code size permitted, in bytes.
1442	///
1443	/// Returns `None` if the relay chain parachain host configuration hasn't been submitted yet.
1444	pub fn max_code_size() -> Option<u32> {
1445		<HostConfiguration<T>>::get().map(|cfg| cfg.max_code_size)
1446	}
1447
1448	/// The implementation of the runtime upgrade functionality for parachains.
1449	pub fn schedule_code_upgrade(validation_function: Vec<u8>) -> DispatchResult {
1450		// Ensure that `ValidationData` exists. We do not care about the validation data per se,
1451		// but we do care about the [`UpgradeRestrictionSignal`] which arrives with the same
1452		// inherent.
1453		ensure!(<ValidationData<T>>::exists(), Error::<T>::ValidationDataNotAvailable,);
1454		ensure!(<UpgradeRestrictionSignal<T>>::get().is_none(), Error::<T>::ProhibitedByPolkadot);
1455
1456		ensure!(!<PendingValidationCode<T>>::exists(), Error::<T>::OverlappingUpgrades);
1457		let cfg = HostConfiguration::<T>::get().ok_or(Error::<T>::HostConfigurationNotAvailable)?;
1458		ensure!(validation_function.len() <= cfg.max_code_size as usize, Error::<T>::TooBig);
1459
1460		// When a code upgrade is scheduled, it has to be applied in two
1461		// places, synchronized: both polkadot and the individual parachain
1462		// have to upgrade on the same relay chain block.
1463		//
1464		// `notify_polkadot_of_pending_upgrade` notifies polkadot; the `PendingValidationCode`
1465		// storage keeps track locally for the parachain upgrade, which will
1466		// be applied later: when the relay-chain communicates go-ahead signal to us.
1467		Self::notify_polkadot_of_pending_upgrade(&validation_function);
1468		<PendingValidationCode<T>>::put(validation_function);
1469		Self::deposit_event(Event::ValidationFunctionStored);
1470
1471		Ok(())
1472	}
1473
1474	/// Returns the [`CollationInfo`] of the current active block.
1475	///
1476	/// The given `header` is the header of the built block we are collecting the collation info
1477	/// for.
1478	///
1479	/// This is expected to be used by the
1480	/// [`CollectCollationInfo`](cumulus_primitives_core::CollectCollationInfo) runtime api.
1481	pub fn collect_collation_info(header: &HeaderFor<T>) -> CollationInfo {
1482		CollationInfo {
1483			hrmp_watermark: HrmpWatermark::<T>::get(),
1484			horizontal_messages: HrmpOutboundMessages::<T>::get(),
1485			upward_messages: UpwardMessages::<T>::get(),
1486			processed_downward_messages: ProcessedDownwardMessages::<T>::get(),
1487			new_validation_code: NewValidationCode::<T>::get().map(Into::into),
1488			// Check if there is a custom header that will also be returned by the validation phase.
1489			// If so, we need to also return it here.
1490			head_data: CustomValidationHeadData::<T>::get()
1491				.map_or_else(|| header.encode(), |v| v)
1492				.into(),
1493		}
1494	}
1495
1496	/// Set a custom head data that should be returned as result of `validate_block`.
1497	///
1498	/// This will overwrite the head data that is returned as result of `validate_block` while
1499	/// validating a `PoV` on the relay chain. Normally the head data that is being returned
1500	/// by `validate_block` is the header of the block that is validated, thus it can be
1501	/// enacted as the new best block. However, for features like forking it can be useful
1502	/// to overwrite the head data with a custom header.
1503	///
1504	/// # Attention
1505	///
1506	/// This should only be used when you are sure what you are doing as this can brick
1507	/// your Parachain.
1508	pub fn set_custom_validation_head_data(head_data: Vec<u8>) {
1509		CustomValidationHeadData::<T>::put(head_data);
1510	}
1511
1512	/// Send the ump signals
1513	#[cfg(feature = "experimental-ump-signals")]
1514	fn send_ump_signal() {
1515		use cumulus_primitives_core::relay_chain::{UMPSignal, UMP_SEPARATOR};
1516
1517		UpwardMessages::<T>::mutate(|up| {
1518			if let Some(core_info) =
1519				CumulusDigestItem::find_core_info(&frame_system::Pallet::<T>::digest())
1520			{
1521				up.push(UMP_SEPARATOR);
1522
1523				// Send the core selector signal.
1524				up.push(
1525					UMPSignal::SelectCore(core_info.selector, core_info.claim_queue_offset)
1526						.encode(),
1527				);
1528			}
1529		});
1530	}
1531
1532	/// Open HRMP channel for using it in benchmarks or tests.
1533	///
1534	/// The caller assumes that the pallet will accept regular outbound message to the sibling
1535	/// `target_parachain` after this call. No other assumptions are made.
1536	#[cfg(any(feature = "runtime-benchmarks", feature = "std"))]
1537	pub fn open_outbound_hrmp_channel_for_benchmarks_or_tests(target_parachain: ParaId) {
1538		RelevantMessagingState::<T>::put(MessagingStateSnapshot {
1539			dmq_mqc_head: Default::default(),
1540			relay_dispatch_queue_remaining_capacity: Default::default(),
1541			ingress_channels: Default::default(),
1542			egress_channels: vec![(
1543				target_parachain,
1544				cumulus_primitives_core::AbridgedHrmpChannel {
1545					max_capacity: 10,
1546					max_total_size: 10_000_000_u32,
1547					max_message_size: 10_000_000_u32,
1548					msg_count: 5,
1549					total_size: 5_000_000_u32,
1550					mqc_head: None,
1551				},
1552			)],
1553		})
1554	}
1555
1556	/// Open HRMP channel for using it in benchmarks or tests.
1557	///
1558	/// The caller assumes that the pallet will accept regular outbound message to the sibling
1559	/// `target_parachain` after this call. No other assumptions are made.
1560	#[cfg(any(feature = "runtime-benchmarks", feature = "std"))]
1561	pub fn open_custom_outbound_hrmp_channel_for_benchmarks_or_tests(
1562		target_parachain: ParaId,
1563		channel: cumulus_primitives_core::AbridgedHrmpChannel,
1564	) {
1565		RelevantMessagingState::<T>::put(MessagingStateSnapshot {
1566			dmq_mqc_head: Default::default(),
1567			relay_dispatch_queue_remaining_capacity: Default::default(),
1568			ingress_channels: Default::default(),
1569			egress_channels: vec![(target_parachain, channel)],
1570		})
1571	}
1572
1573	/// Prepare/insert relevant data for `schedule_code_upgrade` for benchmarks.
1574	#[cfg(feature = "runtime-benchmarks")]
1575	pub fn initialize_for_set_code_benchmark(max_code_size: u32) {
1576		// insert dummy ValidationData
1577		let vfp = PersistedValidationData {
1578			parent_head: polkadot_parachain_primitives::primitives::HeadData(Default::default()),
1579			relay_parent_number: 1,
1580			relay_parent_storage_root: Default::default(),
1581			max_pov_size: 1_000,
1582		};
1583		<ValidationData<T>>::put(&vfp);
1584
1585		// insert dummy HostConfiguration with
1586		let host_config = AbridgedHostConfiguration {
1587			max_code_size,
1588			max_head_data_size: 32 * 1024,
1589			max_upward_queue_count: 8,
1590			max_upward_queue_size: 1024 * 1024,
1591			max_upward_message_size: 4 * 1024,
1592			max_upward_message_num_per_candidate: 2,
1593			hrmp_max_message_num_per_candidate: 2,
1594			validation_upgrade_cooldown: 2,
1595			validation_upgrade_delay: 2,
1596			async_backing_params: relay_chain::AsyncBackingParams {
1597				allowed_ancestry_len: 0,
1598				max_candidate_depth: 0,
1599			},
1600		};
1601		<HostConfiguration<T>>::put(host_config);
1602	}
1603}
1604
1605/// Type that implements `SetCode`.
1606pub struct ParachainSetCode<T>(core::marker::PhantomData<T>);
1607impl<T: Config> frame_system::SetCode<T> for ParachainSetCode<T> {
1608	fn set_code(code: Vec<u8>) -> DispatchResult {
1609		Pallet::<T>::schedule_code_upgrade(code)
1610	}
1611}
1612
1613impl<T: Config> Pallet<T> {
1614	/// Puts a message in the `PendingUpwardMessages` storage item.
1615	/// The message will be later sent in `on_finalize`.
1616	/// Checks host configuration to see if message is too big.
1617	/// Increases the delivery fee factor if the queue is sufficiently (see
1618	/// [`ump_constants::THRESHOLD_FACTOR`]) congested.
1619	pub fn send_upward_message(message: UpwardMessage) -> Result<(u32, XcmHash), MessageSendError> {
1620		let message_len = message.len();
1621		// Check if the message fits into the relay-chain constraints.
1622		//
1623		// Note, that we are using `host_configuration` here which may be from the previous
1624		// block, in case this is called from `on_initialize`, i.e. before the inherent with fresh
1625		// data is submitted.
1626		//
1627		// That shouldn't be a problem since this is a preliminary check and the actual check would
1628		// be performed just before submitting the message from the candidate, and it already can
1629		// happen that during the time the message is buffered for sending the relay-chain setting
1630		// may change so that the message is no longer valid.
1631		//
1632		// However, changing this setting is expected to be rare.
1633		if let Some(cfg) = HostConfiguration::<T>::get() {
1634			if message_len > cfg.max_upward_message_size as usize {
1635				return Err(MessageSendError::TooBig);
1636			}
1637			let threshold =
1638				cfg.max_upward_queue_size.saturating_div(ump_constants::THRESHOLD_FACTOR);
1639			// We check the threshold against total size and not number of messages since messages
1640			// could be big or small.
1641			<PendingUpwardMessages<T>>::append(message.clone());
1642			let pending_messages = PendingUpwardMessages::<T>::get();
1643			let total_size: usize = pending_messages.iter().map(UpwardMessage::len).sum();
1644			if total_size > threshold as usize {
1645				// We increase the fee factor by a factor based on the new message's size in KB
1646				Self::increase_fee_factor((), message_len as u128);
1647			}
1648		} else {
1649			// This storage field should carry over from the previous block. So if it's None
1650			// then it must be that this is an edge-case where a message is attempted to be
1651			// sent at the first block.
1652			//
1653			// Let's pass this message through. I think it's not unreasonable to expect that
1654			// the message is not huge and it comes through, but if it doesn't it can be
1655			// returned back to the sender.
1656			//
1657			// Thus fall through here.
1658			<PendingUpwardMessages<T>>::append(message.clone());
1659		};
1660
1661		// The relay ump does not use using_encoded
1662		// We apply the same this to use the same hash
1663		let hash = sp_io::hashing::blake2_256(&message);
1664		Self::deposit_event(Event::UpwardMessageSent { message_hash: Some(hash) });
1665		Ok((0, hash))
1666	}
1667
1668	/// Get the relay chain block number which was used as an anchor for the last block in this
1669	/// chain.
1670	pub fn last_relay_block_number() -> RelayChainBlockNumber {
1671		LastRelayChainBlockNumber::<T>::get()
1672	}
1673}
1674
1675impl<T: Config> UpwardMessageSender for Pallet<T> {
1676	fn send_upward_message(message: UpwardMessage) -> Result<(u32, XcmHash), MessageSendError> {
1677		Self::send_upward_message(message)
1678	}
1679
1680	fn can_send_upward_message(message: &UpwardMessage) -> Result<(), MessageSendError> {
1681		let max_upward_message_size = HostConfiguration::<T>::get()
1682			.map(|cfg| cfg.max_upward_message_size)
1683			.ok_or(MessageSendError::Other)?;
1684		if message.len() > max_upward_message_size as usize {
1685			Err(MessageSendError::TooBig)
1686		} else {
1687			Ok(())
1688		}
1689	}
1690
1691	#[cfg(any(feature = "std", feature = "runtime-benchmarks", test))]
1692	fn ensure_successful_delivery() {
1693		const MAX_UPWARD_MESSAGE_SIZE: u32 = 65_531 * 3;
1694		const MAX_CODE_SIZE: u32 = 3 * 1024 * 1024;
1695		HostConfiguration::<T>::mutate(|cfg| match cfg {
1696			Some(cfg) => cfg.max_upward_message_size = MAX_UPWARD_MESSAGE_SIZE,
1697			None =>
1698				*cfg = Some(AbridgedHostConfiguration {
1699					max_code_size: MAX_CODE_SIZE,
1700					max_head_data_size: 32 * 1024,
1701					max_upward_queue_count: 8,
1702					max_upward_queue_size: 1024 * 1024,
1703					max_upward_message_size: MAX_UPWARD_MESSAGE_SIZE,
1704					max_upward_message_num_per_candidate: 2,
1705					hrmp_max_message_num_per_candidate: 2,
1706					validation_upgrade_cooldown: 2,
1707					validation_upgrade_delay: 2,
1708					async_backing_params: relay_chain::AsyncBackingParams {
1709						allowed_ancestry_len: 0,
1710						max_candidate_depth: 0,
1711					},
1712				}),
1713		})
1714	}
1715}
1716
1717impl<T: Config> InspectMessageQueues for Pallet<T> {
1718	fn clear_messages() {
1719		PendingUpwardMessages::<T>::kill();
1720	}
1721
1722	fn get_messages() -> Vec<(VersionedLocation, Vec<VersionedXcm<()>>)> {
1723		use xcm::prelude::*;
1724
1725		let messages: Vec<VersionedXcm<()>> = PendingUpwardMessages::<T>::get()
1726			.iter()
1727			.map(|encoded_message| {
1728				VersionedXcm::<()>::decode_all_with_depth_limit(
1729					MAX_XCM_DECODE_DEPTH,
1730					&mut &encoded_message[..],
1731				)
1732				.unwrap()
1733			})
1734			.collect();
1735
1736		if messages.is_empty() {
1737			vec![]
1738		} else {
1739			vec![(VersionedLocation::from(Location::parent()), messages)]
1740		}
1741	}
1742}
1743
1744#[cfg(feature = "runtime-benchmarks")]
1745impl<T: Config> polkadot_runtime_parachains::EnsureForParachain for Pallet<T> {
1746	fn ensure(para_id: ParaId) {
1747		if let ChannelStatus::Closed = Self::get_channel_status(para_id) {
1748			Self::open_outbound_hrmp_channel_for_benchmarks_or_tests(para_id)
1749		}
1750	}
1751}
1752
1753/// Something that should be informed about system related events.
1754///
1755/// This includes events like [`on_validation_data`](Self::on_validation_data) that is being
1756/// called when the parachain inherent is executed that contains the validation data.
1757/// Or like [`on_validation_code_applied`](Self::on_validation_code_applied) that is called
1758/// when the new validation is written to the state. This means that
1759/// from the next block the runtime is being using this new code.
1760#[impl_trait_for_tuples::impl_for_tuples(30)]
1761pub trait OnSystemEvent {
1762	/// Called in each blocks once when the validation data is set by the inherent.
1763	fn on_validation_data(data: &PersistedValidationData);
1764	/// Called when the validation code is being applied, aka from the next block on this is the new
1765	/// runtime.
1766	fn on_validation_code_applied();
1767}
1768
1769/// Holds the most recent relay-parent state root and block number of the current parachain block.
1770#[derive(PartialEq, Eq, Clone, Encode, Decode, TypeInfo, Default, RuntimeDebug)]
1771pub struct RelayChainState {
1772	/// Current relay chain height.
1773	pub number: relay_chain::BlockNumber,
1774	/// State root for current relay chain height.
1775	pub state_root: relay_chain::Hash,
1776}
1777
1778/// This exposes the [`RelayChainState`] to other runtime modules.
1779///
1780/// Enables parachains to read relay chain state via state proofs.
1781pub trait RelaychainStateProvider {
1782	/// May be called by any runtime module to obtain the current state of the relay chain.
1783	///
1784	/// **NOTE**: This is not guaranteed to return monotonically increasing relay parents.
1785	fn current_relay_chain_state() -> RelayChainState;
1786
1787	/// Utility function only to be used in benchmarking scenarios, to be implemented optionally,
1788	/// else a noop.
1789	///
1790	/// It allows for setting a custom RelayChainState.
1791	#[cfg(feature = "runtime-benchmarks")]
1792	fn set_current_relay_chain_state(_state: RelayChainState) {}
1793}
1794
1795/// Implements [`BlockNumberProvider`] that returns relay chain block number fetched from validation
1796/// data.
1797///
1798/// When validation data is not available (e.g. within `on_initialize`), it will fallback to use
1799/// [`Pallet::last_relay_block_number()`].
1800///
1801/// **NOTE**: This has been deprecated, please use [`RelaychainDataProvider`]
1802#[deprecated = "Use `RelaychainDataProvider` instead"]
1803pub type RelaychainBlockNumberProvider<T> = RelaychainDataProvider<T>;
1804
1805/// Implements [`BlockNumberProvider`] and [`RelaychainStateProvider`] that returns relevant relay
1806/// data fetched from validation data.
1807///
1808/// NOTE: When validation data is not available (e.g. within `on_initialize`):
1809///
1810/// - [`current_relay_chain_state`](Self::current_relay_chain_state): Will return the default value
1811///   of [`RelayChainState`].
1812/// - [`current_block_number`](Self::current_block_number): Will return
1813///   [`Pallet::last_relay_block_number()`].
1814pub struct RelaychainDataProvider<T>(core::marker::PhantomData<T>);
1815
1816impl<T: Config> BlockNumberProvider for RelaychainDataProvider<T> {
1817	type BlockNumber = relay_chain::BlockNumber;
1818
1819	fn current_block_number() -> relay_chain::BlockNumber {
1820		ValidationData::<T>::get()
1821			.map(|d| d.relay_parent_number)
1822			.unwrap_or_else(|| Pallet::<T>::last_relay_block_number())
1823	}
1824
1825	#[cfg(any(feature = "std", feature = "runtime-benchmarks", test))]
1826	fn set_block_number(block: Self::BlockNumber) {
1827		let mut validation_data = ValidationData::<T>::get().unwrap_or_else(||
1828			// PersistedValidationData does not impl default in non-std
1829			PersistedValidationData {
1830				parent_head: vec![].into(),
1831				relay_parent_number: Default::default(),
1832				max_pov_size: Default::default(),
1833				relay_parent_storage_root: Default::default(),
1834			});
1835		validation_data.relay_parent_number = block;
1836		ValidationData::<T>::put(validation_data)
1837	}
1838}
1839
1840impl<T: Config> RelaychainStateProvider for RelaychainDataProvider<T> {
1841	fn current_relay_chain_state() -> RelayChainState {
1842		ValidationData::<T>::get()
1843			.map(|d| RelayChainState {
1844				number: d.relay_parent_number,
1845				state_root: d.relay_parent_storage_root,
1846			})
1847			.unwrap_or_default()
1848	}
1849
1850	#[cfg(feature = "runtime-benchmarks")]
1851	fn set_current_relay_chain_state(state: RelayChainState) {
1852		let mut validation_data = ValidationData::<T>::get().unwrap_or_else(||
1853			// PersistedValidationData does not impl default in non-std
1854			PersistedValidationData {
1855				parent_head: vec![].into(),
1856				relay_parent_number: Default::default(),
1857				max_pov_size: Default::default(),
1858				relay_parent_storage_root: Default::default(),
1859			});
1860		validation_data.relay_parent_number = state.number;
1861		validation_data.relay_parent_storage_root = state.state_root;
1862		ValidationData::<T>::put(validation_data)
1863	}
1864}