referrerpolicy=no-referrer-when-downgrade

frame_support/traits/
messages.rs

1// This file is part of Substrate.
2
3// Copyright (C) Parity Technologies (UK) Ltd.
4// SPDX-License-Identifier: Apache-2.0
5
6// Licensed under the Apache License, Version 2.0 (the "License");
7// you may not use this file except in compliance with the License.
8// You may obtain a copy of the License at
9//
10// 	http://www.apache.org/licenses/LICENSE-2.0
11//
12// Unless required by applicable law or agreed to in writing, software
13// distributed under the License is distributed on an "AS IS" BASIS,
14// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
15// See the License for the specific language governing permissions and
16// limitations under the License.
17
18//! Traits for managing message queuing and handling.
19
20use super::storage::Footprint;
21use crate::defensive;
22
23use alloc::vec::Vec;
24use codec::{Decode, DecodeWithMemTracking, Encode, FullCodec, MaxEncodedLen};
25use core::{cmp::Ordering, fmt::Debug, marker::PhantomData};
26use scale_info::TypeInfo;
27use sp_core::{ConstU32, Get, TypedGet};
28use sp_runtime::{traits::Convert, BoundedSlice, RuntimeDebug};
29use sp_weights::{Weight, WeightMeter};
30
31/// Errors that can happen when attempting to process a message with
32/// [`ProcessMessage::process_message()`].
33#[derive(Copy, Clone, Eq, PartialEq, Encode, Decode, DecodeWithMemTracking, TypeInfo, Debug)]
34pub enum ProcessMessageError {
35	/// The message data format is unknown (e.g. unrecognised header)
36	BadFormat,
37	/// The message data is bad (e.g. decoding returns an error).
38	Corrupt,
39	/// The message format is unsupported (e.g. old XCM version).
40	Unsupported,
41	/// Message processing was not attempted because it was not certain that the weight limit
42	/// would be respected. The parameter gives the maximum weight which the message could take
43	/// to process.
44	Overweight(Weight),
45	/// The queue wants to give up its current processing slot.
46	///
47	/// Hints the message processor to cease servicing this queue and proceed to the next
48	/// one. This is seen as a *hint*, not an instruction. Implementations must therefore handle
49	/// the case that a queue is re-serviced within the same block after *yielding*. A queue is
50	/// not required to *yield* again when it is being re-serviced withing the same block.
51	Yield,
52	/// The message could not be processed for reaching the stack depth limit.
53	StackLimitReached,
54}
55
56/// Can process messages from a specific origin.
57pub trait ProcessMessage {
58	/// The transport from where a message originates.
59	type Origin: FullCodec + MaxEncodedLen + Clone + Eq + PartialEq + TypeInfo + Debug;
60
61	/// Process the given message, using no more than the remaining `meter` weight to do so.
62	///
63	/// Returns whether the message was processed.
64	fn process_message(
65		message: &[u8],
66		origin: Self::Origin,
67		meter: &mut WeightMeter,
68		id: &mut [u8; 32],
69	) -> Result<bool, ProcessMessageError>;
70}
71
72/// Errors that can happen when attempting to execute an overweight message with
73/// [`ServiceQueues::execute_overweight()`].
74#[derive(Eq, PartialEq, RuntimeDebug)]
75pub enum ExecuteOverweightError {
76	/// The referenced message was not found.
77	NotFound,
78	/// The message was already processed.
79	///
80	/// This can be treated as success condition.
81	AlreadyProcessed,
82	/// The available weight was insufficient to execute the message.
83	InsufficientWeight,
84	/// The queue is paused and no message can be executed from it.
85	///
86	/// This can change at any time and may resolve in the future by re-trying.
87	QueuePaused,
88	/// An unspecified error.
89	Other,
90	/// Another call is currently ongoing and prevents this call from executing.
91	RecursiveDisallowed,
92}
93
94/// Can service queues and execute overweight messages.
95pub trait ServiceQueues {
96	/// Addresses a specific overweight message.
97	type OverweightMessageAddress;
98
99	/// Service all message queues in some fair manner.
100	///
101	/// - `weight_limit`: The maximum amount of dynamic weight that this call can use.
102	///
103	/// Returns the dynamic weight used by this call; is never greater than `weight_limit`.
104	/// Should only be called in top-level runtime entry points like `on_initialize` or `on_idle`.
105	/// Otherwise, stack depth limit errors may be miss-handled.
106	fn service_queues(weight_limit: Weight) -> Weight;
107
108	/// Executes a message that could not be executed by [`Self::service_queues()`] because it was
109	/// temporarily overweight.
110	fn execute_overweight(
111		_weight_limit: Weight,
112		_address: Self::OverweightMessageAddress,
113	) -> Result<Weight, ExecuteOverweightError> {
114		Err(ExecuteOverweightError::NotFound)
115	}
116}
117
118/// Services queues by doing nothing.
119pub struct NoopServiceQueues<OverweightAddr>(PhantomData<OverweightAddr>);
120impl<OverweightAddr> ServiceQueues for NoopServiceQueues<OverweightAddr> {
121	type OverweightMessageAddress = OverweightAddr;
122
123	fn service_queues(_: Weight) -> Weight {
124		Weight::zero()
125	}
126}
127
128/// Can enqueue messages for multiple origins.
129pub trait EnqueueMessage<Origin: MaxEncodedLen> {
130	/// The maximal length any enqueued message may have.
131	type MaxMessageLen: Get<u32>;
132
133	/// Enqueue a single `message` from a specific `origin`.
134	fn enqueue_message(message: BoundedSlice<u8, Self::MaxMessageLen>, origin: Origin);
135
136	/// Enqueue multiple `messages` from a specific `origin`.
137	fn enqueue_messages<'a>(
138		messages: impl Iterator<Item = BoundedSlice<'a, u8, Self::MaxMessageLen>>,
139		origin: Origin,
140	);
141
142	/// Any remaining unprocessed messages should happen only lazily, not proactively.
143	fn sweep_queue(origin: Origin);
144}
145
146impl<Origin: MaxEncodedLen> EnqueueMessage<Origin> for () {
147	type MaxMessageLen = ConstU32<0>;
148	fn enqueue_message(_: BoundedSlice<u8, Self::MaxMessageLen>, _: Origin) {}
149	fn enqueue_messages<'a>(
150		_: impl Iterator<Item = BoundedSlice<'a, u8, Self::MaxMessageLen>>,
151		_: Origin,
152	) {
153	}
154	fn sweep_queue(_: Origin) {}
155}
156
157/// The resource footprint of a queue.
158#[derive(Default, Copy, Clone, Eq, PartialEq, RuntimeDebug)]
159pub struct QueueFootprint {
160	/// The number of pages in the queue (including overweight pages).
161	pub pages: u32,
162	/// The number of pages that are ready (not yet processed and also not overweight).
163	pub ready_pages: u32,
164	/// The storage footprint of the queue (including overweight messages).
165	pub storage: Footprint,
166}
167
168/// The resource footprint of a batch of messages.
169#[derive(Default, Copy, Clone, PartialEq, RuntimeDebug)]
170pub struct BatchFootprint {
171	/// The number of messages in the batch.
172	pub msgs_count: usize,
173	/// The total size in bytes of all the messages in the batch.
174	pub size_in_bytes: usize,
175	/// The number of resulting new pages in the queue if the current batch was added.
176	pub new_pages_count: u32,
177}
178
179/// The resource footprints of continuous subsets of messages.
180///
181/// For a set of messages `xcms[0..n]`, each `footprints[i]` contains the footprint
182/// of the batch `xcms[0..i]`, so as `i` increases `footprints[i]` contains the footprint
183/// of a bigger batch.
184#[derive(Default, RuntimeDebug)]
185pub struct BatchesFootprints {
186	/// The position in the first available MQ page where the batch will start being appended.
187	///
188	/// The messages in the batch will be enqueued to the message queue. Since the message queue is
189	/// organized in pages, the messages may be enqueued across multiple contiguous pages.
190	/// The position where we start appending messages to the first available MQ page is of
191	/// particular importance since it impacts the performance of the enqueuing operation.
192	/// That's because the first page has to be decoded first. This is not needed for the following
193	/// pages.
194	pub first_page_pos: usize,
195	pub footprints: Vec<BatchFootprint>,
196}
197
198impl BatchesFootprints {
199	/// Appends a batch footprint to the back of the collection.
200	///
201	/// The new footprint represents a batch that includes all the messages contained by the
202	/// previous batches plus the provided `msg`. If `new_page` is true, we will consider that
203	/// the provided `msg` is appended to a new message queue page. Otherwise, we consider
204	/// that it is appended to the current page.
205	pub fn push(&mut self, msg: &[u8], new_page: bool) {
206		let previous_footprint =
207			self.footprints.last().map(|footprint| *footprint).unwrap_or_default();
208
209		let mut new_pages_count = previous_footprint.new_pages_count;
210		if new_page {
211			new_pages_count = new_pages_count.saturating_add(1);
212		}
213		self.footprints.push(BatchFootprint {
214			msgs_count: previous_footprint.msgs_count.saturating_add(1),
215			size_in_bytes: previous_footprint.size_in_bytes.saturating_add(msg.len()),
216			new_pages_count,
217		});
218	}
219
220	/// Gets the biggest batch for which the comparator function returns `Ordering::Less`.
221	pub fn search_best_by<F>(&self, f: F) -> &BatchFootprint
222	where
223		F: FnMut(&BatchFootprint) -> Ordering,
224	{
225		// Since the batches are sorted by size, we can use binary search.
226		let maybe_best_idx = match self.footprints.binary_search_by(f) {
227			Ok(last_ok_idx) => Some(last_ok_idx),
228			Err(first_err_idx) => first_err_idx.checked_sub(1),
229		};
230		if let Some(best_idx) = maybe_best_idx {
231			match self.footprints.get(best_idx) {
232				Some(best_footprint) => return best_footprint,
233				None => {
234					defensive!("Invalid best_batch_idx: {}", best_idx);
235				},
236			}
237		}
238		&BatchFootprint { msgs_count: 0, size_in_bytes: 0, new_pages_count: 0 }
239	}
240}
241
242/// Provides information on queue footprint.
243pub trait QueueFootprintQuery<Origin> {
244	/// The maximal length any enqueued message may have.
245	type MaxMessageLen: Get<u32>;
246
247	/// Return the state footprint of the given queue.
248	fn footprint(origin: Origin) -> QueueFootprint;
249
250	/// Get the `BatchFootprint` for each batch of messages `[0..n]`
251	/// as long as the total number of pages would be <= `total_pages_limit`.
252	///
253	/// # Examples
254	///
255	/// Let's consider that each message would result in a new page and that there's already 1
256	/// full page in the queue. Then, for the messages `["1", "2", "3"]`
257	/// and `total_pages_limit = 3`, `get_batches_footprints()` would return:
258	/// ```
259	/// use frame_support::traits::BatchFootprint;
260	///
261	/// vec![
262	/// 	// The footprint of batch ["1"]
263	/// 	BatchFootprint {
264	/// 		msgs_count: 1,
265	/// 		size_in_bytes: 1,
266	/// 		new_pages_count: 1, // total pages count = 2
267	/// 	},
268	/// 	// The footprint of batch ["1", "2"]
269	/// 	BatchFootprint {
270	/// 		msgs_count: 2,
271	/// 		size_in_bytes: 2,
272	/// 		new_pages_count: 2, // total pages count = 3
273	/// 	}
274	/// 	// For the batch ["1", "2", "3"], the total pages count would be 4, which would exceed
275	/// 	// the `total_pages_limit`.
276	/// ];
277	/// ```
278	fn get_batches_footprints<'a>(
279		origin: Origin,
280		msgs: impl Iterator<Item = BoundedSlice<'a, u8, Self::MaxMessageLen>>,
281		total_pages_limit: u32,
282	) -> BatchesFootprints;
283}
284
285impl<Origin: MaxEncodedLen> QueueFootprintQuery<Origin> for () {
286	type MaxMessageLen = ConstU32<0>;
287
288	fn footprint(_: Origin) -> QueueFootprint {
289		QueueFootprint::default()
290	}
291
292	fn get_batches_footprints<'a>(
293		_origin: Origin,
294		_msgs: impl Iterator<Item = BoundedSlice<'a, u8, Self::MaxMessageLen>>,
295		_total_pages_limit: u32,
296	) -> BatchesFootprints {
297		BatchesFootprints::default()
298	}
299}
300
301/// Transform the origin of an [`EnqueueMessage`] via `C::convert`.
302pub struct TransformOrigin<E, O, N, C>(PhantomData<(E, O, N, C)>);
303impl<E: EnqueueMessage<O>, O: MaxEncodedLen, N: MaxEncodedLen, C: Convert<N, O>> EnqueueMessage<N>
304	for TransformOrigin<E, O, N, C>
305{
306	type MaxMessageLen = E::MaxMessageLen;
307
308	fn enqueue_message(message: BoundedSlice<u8, Self::MaxMessageLen>, origin: N) {
309		E::enqueue_message(message, C::convert(origin));
310	}
311
312	fn enqueue_messages<'a>(
313		messages: impl Iterator<Item = BoundedSlice<'a, u8, Self::MaxMessageLen>>,
314		origin: N,
315	) {
316		E::enqueue_messages(messages, C::convert(origin));
317	}
318
319	fn sweep_queue(origin: N) {
320		E::sweep_queue(C::convert(origin));
321	}
322}
323
324impl<E: QueueFootprintQuery<O>, O: MaxEncodedLen, N: MaxEncodedLen, C: Convert<N, O>>
325	QueueFootprintQuery<N> for TransformOrigin<E, O, N, C>
326{
327	type MaxMessageLen = E::MaxMessageLen;
328
329	fn footprint(origin: N) -> QueueFootprint {
330		E::footprint(C::convert(origin))
331	}
332
333	fn get_batches_footprints<'a>(
334		origin: N,
335		msgs: impl Iterator<Item = BoundedSlice<'a, u8, Self::MaxMessageLen>>,
336		total_pages_limit: u32,
337	) -> BatchesFootprints {
338		E::get_batches_footprints(C::convert(origin), msgs, total_pages_limit)
339	}
340}
341
342/// Handles incoming messages for a single origin.
343pub trait HandleMessage {
344	/// The maximal length any enqueued message may have.
345	type MaxMessageLen: Get<u32>;
346
347	/// Enqueue a single `message` with an implied origin.
348	fn handle_message(message: BoundedSlice<u8, Self::MaxMessageLen>);
349
350	/// Enqueue multiple `messages` from an implied origin.
351	fn handle_messages<'a>(
352		messages: impl Iterator<Item = BoundedSlice<'a, u8, Self::MaxMessageLen>>,
353	);
354
355	/// Any remaining unprocessed messages should happen only lazily, not proactively.
356	fn sweep_queue();
357}
358
359/// Adapter type to transform an [`EnqueueMessage`] with an origin into a [`HandleMessage`] impl.
360pub struct EnqueueWithOrigin<E, O>(PhantomData<(E, O)>);
361impl<E: EnqueueMessage<O::Type>, O: TypedGet> HandleMessage for EnqueueWithOrigin<E, O>
362where
363	O::Type: MaxEncodedLen,
364{
365	type MaxMessageLen = E::MaxMessageLen;
366
367	fn handle_message(message: BoundedSlice<u8, Self::MaxMessageLen>) {
368		E::enqueue_message(message, O::get());
369	}
370
371	fn handle_messages<'a>(
372		messages: impl Iterator<Item = BoundedSlice<'a, u8, Self::MaxMessageLen>>,
373	) {
374		E::enqueue_messages(messages, O::get());
375	}
376
377	fn sweep_queue() {
378		E::sweep_queue(O::get());
379	}
380}
381
382/// Provides information on paused queues.
383pub trait QueuePausedQuery<Origin> {
384	/// Whether this queue is paused.
385	fn is_paused(origin: &Origin) -> bool;
386}
387
388#[impl_trait_for_tuples::impl_for_tuples(8)]
389impl<Origin> QueuePausedQuery<Origin> for Tuple {
390	fn is_paused(origin: &Origin) -> bool {
391		for_tuples!( #(
392			if Tuple::is_paused(origin) {
393				return true;
394			}
395		)* );
396		false
397	}
398}