frame_support/traits/messages.rs
1// This file is part of Substrate.
2
3// Copyright (C) Parity Technologies (UK) Ltd.
4// SPDX-License-Identifier: Apache-2.0
5
6// Licensed under the Apache License, Version 2.0 (the "License");
7// you may not use this file except in compliance with the License.
8// You may obtain a copy of the License at
9//
10// http://www.apache.org/licenses/LICENSE-2.0
11//
12// Unless required by applicable law or agreed to in writing, software
13// distributed under the License is distributed on an "AS IS" BASIS,
14// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
15// See the License for the specific language governing permissions and
16// limitations under the License.
17
18//! Traits for managing message queuing and handling.
19
20use super::storage::Footprint;
21use crate::defensive;
22
23use alloc::vec::Vec;
24use codec::{Decode, DecodeWithMemTracking, Encode, FullCodec, MaxEncodedLen};
25use core::{cmp::Ordering, fmt::Debug, marker::PhantomData};
26use scale_info::TypeInfo;
27use sp_core::{ConstU32, Get, TypedGet};
28use sp_runtime::{traits::Convert, BoundedSlice, RuntimeDebug};
29use sp_weights::{Weight, WeightMeter};
30
31/// Errors that can happen when attempting to process a message with
32/// [`ProcessMessage::process_message()`].
33#[derive(Copy, Clone, Eq, PartialEq, Encode, Decode, DecodeWithMemTracking, TypeInfo, Debug)]
34pub enum ProcessMessageError {
35 /// The message data format is unknown (e.g. unrecognised header)
36 BadFormat,
37 /// The message data is bad (e.g. decoding returns an error).
38 Corrupt,
39 /// The message format is unsupported (e.g. old XCM version).
40 Unsupported,
41 /// Message processing was not attempted because it was not certain that the weight limit
42 /// would be respected. The parameter gives the maximum weight which the message could take
43 /// to process.
44 Overweight(Weight),
45 /// The queue wants to give up its current processing slot.
46 ///
47 /// Hints the message processor to cease servicing this queue and proceed to the next
48 /// one. This is seen as a *hint*, not an instruction. Implementations must therefore handle
49 /// the case that a queue is re-serviced within the same block after *yielding*. A queue is
50 /// not required to *yield* again when it is being re-serviced withing the same block.
51 Yield,
52 /// The message could not be processed for reaching the stack depth limit.
53 StackLimitReached,
54}
55
56/// Can process messages from a specific origin.
57pub trait ProcessMessage {
58 /// The transport from where a message originates.
59 type Origin: FullCodec + MaxEncodedLen + Clone + Eq + PartialEq + TypeInfo + Debug;
60
61 /// Process the given message, using no more than the remaining `meter` weight to do so.
62 ///
63 /// Returns whether the message was processed.
64 fn process_message(
65 message: &[u8],
66 origin: Self::Origin,
67 meter: &mut WeightMeter,
68 id: &mut [u8; 32],
69 ) -> Result<bool, ProcessMessageError>;
70}
71
72/// Errors that can happen when attempting to execute an overweight message with
73/// [`ServiceQueues::execute_overweight()`].
74#[derive(Eq, PartialEq, RuntimeDebug)]
75pub enum ExecuteOverweightError {
76 /// The referenced message was not found.
77 NotFound,
78 /// The message was already processed.
79 ///
80 /// This can be treated as success condition.
81 AlreadyProcessed,
82 /// The available weight was insufficient to execute the message.
83 InsufficientWeight,
84 /// The queue is paused and no message can be executed from it.
85 ///
86 /// This can change at any time and may resolve in the future by re-trying.
87 QueuePaused,
88 /// An unspecified error.
89 Other,
90 /// Another call is currently ongoing and prevents this call from executing.
91 RecursiveDisallowed,
92}
93
94/// Can service queues and execute overweight messages.
95pub trait ServiceQueues {
96 /// Addresses a specific overweight message.
97 type OverweightMessageAddress;
98
99 /// Service all message queues in some fair manner.
100 ///
101 /// - `weight_limit`: The maximum amount of dynamic weight that this call can use.
102 ///
103 /// Returns the dynamic weight used by this call; is never greater than `weight_limit`.
104 /// Should only be called in top-level runtime entry points like `on_initialize` or `on_idle`.
105 /// Otherwise, stack depth limit errors may be miss-handled.
106 fn service_queues(weight_limit: Weight) -> Weight;
107
108 /// Executes a message that could not be executed by [`Self::service_queues()`] because it was
109 /// temporarily overweight.
110 fn execute_overweight(
111 _weight_limit: Weight,
112 _address: Self::OverweightMessageAddress,
113 ) -> Result<Weight, ExecuteOverweightError> {
114 Err(ExecuteOverweightError::NotFound)
115 }
116}
117
118/// Services queues by doing nothing.
119pub struct NoopServiceQueues<OverweightAddr>(PhantomData<OverweightAddr>);
120impl<OverweightAddr> ServiceQueues for NoopServiceQueues<OverweightAddr> {
121 type OverweightMessageAddress = OverweightAddr;
122
123 fn service_queues(_: Weight) -> Weight {
124 Weight::zero()
125 }
126}
127
128/// Can enqueue messages for multiple origins.
129pub trait EnqueueMessage<Origin: MaxEncodedLen> {
130 /// The maximal length any enqueued message may have.
131 type MaxMessageLen: Get<u32>;
132
133 /// Enqueue a single `message` from a specific `origin`.
134 fn enqueue_message(message: BoundedSlice<u8, Self::MaxMessageLen>, origin: Origin);
135
136 /// Enqueue multiple `messages` from a specific `origin`.
137 fn enqueue_messages<'a>(
138 messages: impl Iterator<Item = BoundedSlice<'a, u8, Self::MaxMessageLen>>,
139 origin: Origin,
140 );
141
142 /// Any remaining unprocessed messages should happen only lazily, not proactively.
143 fn sweep_queue(origin: Origin);
144}
145
146impl<Origin: MaxEncodedLen> EnqueueMessage<Origin> for () {
147 type MaxMessageLen = ConstU32<0>;
148 fn enqueue_message(_: BoundedSlice<u8, Self::MaxMessageLen>, _: Origin) {}
149 fn enqueue_messages<'a>(
150 _: impl Iterator<Item = BoundedSlice<'a, u8, Self::MaxMessageLen>>,
151 _: Origin,
152 ) {
153 }
154 fn sweep_queue(_: Origin) {}
155}
156
157/// The resource footprint of a queue.
158#[derive(Default, Copy, Clone, Eq, PartialEq, RuntimeDebug)]
159pub struct QueueFootprint {
160 /// The number of pages in the queue (including overweight pages).
161 pub pages: u32,
162 /// The number of pages that are ready (not yet processed and also not overweight).
163 pub ready_pages: u32,
164 /// The storage footprint of the queue (including overweight messages).
165 pub storage: Footprint,
166}
167
168/// The resource footprint of a batch of messages.
169#[derive(Default, Copy, Clone, PartialEq, RuntimeDebug)]
170pub struct BatchFootprint {
171 /// The number of messages in the batch.
172 pub msgs_count: usize,
173 /// The total size in bytes of all the messages in the batch.
174 pub size_in_bytes: usize,
175 /// The number of resulting new pages in the queue if the current batch was added.
176 pub new_pages_count: u32,
177}
178
179/// The resource footprints of continuous subsets of messages.
180///
181/// For a set of messages `xcms[0..n]`, each `footprints[i]` contains the footprint
182/// of the batch `xcms[0..i]`, so as `i` increases `footprints[i]` contains the footprint
183/// of a bigger batch.
184#[derive(Default, RuntimeDebug)]
185pub struct BatchesFootprints {
186 /// The position in the first available MQ page where the batch will start being appended.
187 ///
188 /// The messages in the batch will be enqueued to the message queue. Since the message queue is
189 /// organized in pages, the messages may be enqueued across multiple contiguous pages.
190 /// The position where we start appending messages to the first available MQ page is of
191 /// particular importance since it impacts the performance of the enqueuing operation.
192 /// That's because the first page has to be decoded first. This is not needed for the following
193 /// pages.
194 pub first_page_pos: usize,
195 pub footprints: Vec<BatchFootprint>,
196}
197
198impl BatchesFootprints {
199 /// Appends a batch footprint to the back of the collection.
200 ///
201 /// The new footprint represents a batch that includes all the messages contained by the
202 /// previous batches plus the provided `msg`. If `new_page` is true, we will consider that
203 /// the provided `msg` is appended to a new message queue page. Otherwise, we consider
204 /// that it is appended to the current page.
205 pub fn push(&mut self, msg: &[u8], new_page: bool) {
206 let previous_footprint =
207 self.footprints.last().map(|footprint| *footprint).unwrap_or_default();
208
209 let mut new_pages_count = previous_footprint.new_pages_count;
210 if new_page {
211 new_pages_count = new_pages_count.saturating_add(1);
212 }
213 self.footprints.push(BatchFootprint {
214 msgs_count: previous_footprint.msgs_count.saturating_add(1),
215 size_in_bytes: previous_footprint.size_in_bytes.saturating_add(msg.len()),
216 new_pages_count,
217 });
218 }
219
220 /// Gets the biggest batch for which the comparator function returns `Ordering::Less`.
221 pub fn search_best_by<F>(&self, f: F) -> &BatchFootprint
222 where
223 F: FnMut(&BatchFootprint) -> Ordering,
224 {
225 // Since the batches are sorted by size, we can use binary search.
226 let maybe_best_idx = match self.footprints.binary_search_by(f) {
227 Ok(last_ok_idx) => Some(last_ok_idx),
228 Err(first_err_idx) => first_err_idx.checked_sub(1),
229 };
230 if let Some(best_idx) = maybe_best_idx {
231 match self.footprints.get(best_idx) {
232 Some(best_footprint) => return best_footprint,
233 None => {
234 defensive!("Invalid best_batch_idx: {}", best_idx);
235 },
236 }
237 }
238 &BatchFootprint { msgs_count: 0, size_in_bytes: 0, new_pages_count: 0 }
239 }
240}
241
242/// Provides information on queue footprint.
243pub trait QueueFootprintQuery<Origin> {
244 /// The maximal length any enqueued message may have.
245 type MaxMessageLen: Get<u32>;
246
247 /// Return the state footprint of the given queue.
248 fn footprint(origin: Origin) -> QueueFootprint;
249
250 /// Get the `BatchFootprint` for each batch of messages `[0..n]`
251 /// as long as the total number of pages would be <= `total_pages_limit`.
252 ///
253 /// # Examples
254 ///
255 /// Let's consider that each message would result in a new page and that there's already 1
256 /// full page in the queue. Then, for the messages `["1", "2", "3"]`
257 /// and `total_pages_limit = 3`, `get_batches_footprints()` would return:
258 /// ```
259 /// use frame_support::traits::BatchFootprint;
260 ///
261 /// vec![
262 /// // The footprint of batch ["1"]
263 /// BatchFootprint {
264 /// msgs_count: 1,
265 /// size_in_bytes: 1,
266 /// new_pages_count: 1, // total pages count = 2
267 /// },
268 /// // The footprint of batch ["1", "2"]
269 /// BatchFootprint {
270 /// msgs_count: 2,
271 /// size_in_bytes: 2,
272 /// new_pages_count: 2, // total pages count = 3
273 /// }
274 /// // For the batch ["1", "2", "3"], the total pages count would be 4, which would exceed
275 /// // the `total_pages_limit`.
276 /// ];
277 /// ```
278 fn get_batches_footprints<'a>(
279 origin: Origin,
280 msgs: impl Iterator<Item = BoundedSlice<'a, u8, Self::MaxMessageLen>>,
281 total_pages_limit: u32,
282 ) -> BatchesFootprints;
283}
284
285impl<Origin: MaxEncodedLen> QueueFootprintQuery<Origin> for () {
286 type MaxMessageLen = ConstU32<0>;
287
288 fn footprint(_: Origin) -> QueueFootprint {
289 QueueFootprint::default()
290 }
291
292 fn get_batches_footprints<'a>(
293 _origin: Origin,
294 _msgs: impl Iterator<Item = BoundedSlice<'a, u8, Self::MaxMessageLen>>,
295 _total_pages_limit: u32,
296 ) -> BatchesFootprints {
297 BatchesFootprints::default()
298 }
299}
300
301/// Transform the origin of an [`EnqueueMessage`] via `C::convert`.
302pub struct TransformOrigin<E, O, N, C>(PhantomData<(E, O, N, C)>);
303impl<E: EnqueueMessage<O>, O: MaxEncodedLen, N: MaxEncodedLen, C: Convert<N, O>> EnqueueMessage<N>
304 for TransformOrigin<E, O, N, C>
305{
306 type MaxMessageLen = E::MaxMessageLen;
307
308 fn enqueue_message(message: BoundedSlice<u8, Self::MaxMessageLen>, origin: N) {
309 E::enqueue_message(message, C::convert(origin));
310 }
311
312 fn enqueue_messages<'a>(
313 messages: impl Iterator<Item = BoundedSlice<'a, u8, Self::MaxMessageLen>>,
314 origin: N,
315 ) {
316 E::enqueue_messages(messages, C::convert(origin));
317 }
318
319 fn sweep_queue(origin: N) {
320 E::sweep_queue(C::convert(origin));
321 }
322}
323
324impl<E: QueueFootprintQuery<O>, O: MaxEncodedLen, N: MaxEncodedLen, C: Convert<N, O>>
325 QueueFootprintQuery<N> for TransformOrigin<E, O, N, C>
326{
327 type MaxMessageLen = E::MaxMessageLen;
328
329 fn footprint(origin: N) -> QueueFootprint {
330 E::footprint(C::convert(origin))
331 }
332
333 fn get_batches_footprints<'a>(
334 origin: N,
335 msgs: impl Iterator<Item = BoundedSlice<'a, u8, Self::MaxMessageLen>>,
336 total_pages_limit: u32,
337 ) -> BatchesFootprints {
338 E::get_batches_footprints(C::convert(origin), msgs, total_pages_limit)
339 }
340}
341
342/// Handles incoming messages for a single origin.
343pub trait HandleMessage {
344 /// The maximal length any enqueued message may have.
345 type MaxMessageLen: Get<u32>;
346
347 /// Enqueue a single `message` with an implied origin.
348 fn handle_message(message: BoundedSlice<u8, Self::MaxMessageLen>);
349
350 /// Enqueue multiple `messages` from an implied origin.
351 fn handle_messages<'a>(
352 messages: impl Iterator<Item = BoundedSlice<'a, u8, Self::MaxMessageLen>>,
353 );
354
355 /// Any remaining unprocessed messages should happen only lazily, not proactively.
356 fn sweep_queue();
357}
358
359/// Adapter type to transform an [`EnqueueMessage`] with an origin into a [`HandleMessage`] impl.
360pub struct EnqueueWithOrigin<E, O>(PhantomData<(E, O)>);
361impl<E: EnqueueMessage<O::Type>, O: TypedGet> HandleMessage for EnqueueWithOrigin<E, O>
362where
363 O::Type: MaxEncodedLen,
364{
365 type MaxMessageLen = E::MaxMessageLen;
366
367 fn handle_message(message: BoundedSlice<u8, Self::MaxMessageLen>) {
368 E::enqueue_message(message, O::get());
369 }
370
371 fn handle_messages<'a>(
372 messages: impl Iterator<Item = BoundedSlice<'a, u8, Self::MaxMessageLen>>,
373 ) {
374 E::enqueue_messages(messages, O::get());
375 }
376
377 fn sweep_queue() {
378 E::sweep_queue(O::get());
379 }
380}
381
382/// Provides information on paused queues.
383pub trait QueuePausedQuery<Origin> {
384 /// Whether this queue is paused.
385 fn is_paused(origin: &Origin) -> bool;
386}
387
388#[impl_trait_for_tuples::impl_for_tuples(8)]
389impl<Origin> QueuePausedQuery<Origin> for Tuple {
390 fn is_paused(origin: &Origin) -> bool {
391 for_tuples!( #(
392 if Tuple::is_paused(origin) {
393 return true;
394 }
395 )* );
396 false
397 }
398}