1#![deny(missing_docs)]
197#![cfg_attr(not(feature = "std"), no_std)]
198
199mod benchmarking;
200mod integration_test;
201mod mock;
202pub mod mock_helpers;
203mod tests;
204pub mod weights;
205
206extern crate alloc;
207
208use alloc::{vec, vec::Vec};
209use codec::{Codec, ConstEncodedLen, Decode, DecodeWithMemTracking, Encode, MaxEncodedLen};
210use core::{fmt::Debug, ops::Deref};
211use frame_support::{
212 defensive,
213 pallet_prelude::*,
214 traits::{
215 BatchesFootprints, Defensive, DefensiveSaturating, DefensiveTruncateFrom, EnqueueMessage,
216 ExecuteOverweightError, Footprint, ProcessMessage, ProcessMessageError, QueueFootprint,
217 QueueFootprintQuery, QueuePausedQuery, ServiceQueues,
218 },
219 BoundedSlice, CloneNoBound, DefaultNoBound,
220};
221use frame_system::pallet_prelude::*;
222pub use pallet::*;
223use scale_info::TypeInfo;
224use sp_arithmetic::traits::{BaseArithmetic, Unsigned};
225use sp_core::{defer, H256};
226use sp_runtime::{
227 traits::{One, Zero},
228 SaturatedConversion, Saturating, TransactionOutcome,
229};
230use sp_weights::WeightMeter;
231pub use weights::WeightInfo;
232
233type PageIndex = u32;
235
236#[derive(Encode, Decode, PartialEq, MaxEncodedLen, Debug)]
238pub struct ItemHeader<Size> {
239 payload_len: Size,
242 is_processed: bool,
244}
245
246impl<Size: ConstEncodedLen> ConstEncodedLen for ItemHeader<Size> {} #[derive(CloneNoBound, Encode, Decode, DebugNoBound, DefaultNoBound, TypeInfo, MaxEncodedLen)]
250#[scale_info(skip_type_params(HeapSize))]
251#[codec(mel_bound(Size: MaxEncodedLen))]
252pub struct Page<Size: Into<u32> + Debug + Clone + Default, HeapSize: Get<Size>> {
253 remaining: Size,
256 remaining_size: Size,
260 first_index: Size,
262 first: Size,
265 last: Size,
267 heap: BoundedVec<u8, IntoU32<HeapSize, Size>>,
269}
270
271impl<
272 Size: BaseArithmetic + Unsigned + Copy + Into<u32> + Codec + MaxEncodedLen + Debug + Default,
273 HeapSize: Get<Size>,
274 > Page<Size, HeapSize>
275where
276 ItemHeader<Size>: ConstEncodedLen,
277{
278 fn from_message<T: Config>(message: BoundedSlice<u8, MaxMessageLenOf<T>>) -> Self {
280 let payload_len = message.len();
281 let data_len = ItemHeader::<Size>::max_encoded_len().saturating_add(payload_len);
282 let payload_len = payload_len.saturated_into();
283 let header = ItemHeader::<Size> { payload_len, is_processed: false };
284
285 let mut heap = Vec::with_capacity(data_len);
286 header.using_encoded(|h| heap.extend_from_slice(h));
287 heap.extend_from_slice(message.deref());
288
289 Page {
290 remaining: One::one(),
291 remaining_size: payload_len,
292 first_index: Zero::zero(),
293 first: Zero::zero(),
294 last: Zero::zero(),
295 heap: BoundedVec::defensive_truncate_from(heap),
296 }
297 }
298
299 fn heap_pos(&self) -> usize {
301 self.heap.len()
304 }
305
306 fn can_append_message_at(pos: usize, message_len: usize) -> Result<usize, ()> {
311 let header_size = ItemHeader::<Size>::max_encoded_len();
312 let data_len = header_size.saturating_add(message_len);
313 let heap_size = HeapSize::get().into() as usize;
314 let new_pos = pos.saturating_add(data_len);
315 if new_pos <= heap_size {
316 Ok(new_pos)
317 } else {
318 Err(())
319 }
320 }
321
322 fn try_append_message<T: Config>(
324 &mut self,
325 message: BoundedSlice<u8, MaxMessageLenOf<T>>,
326 ) -> Result<(), ()> {
327 let pos = self.heap_pos();
328 Self::can_append_message_at(pos, message.len())?;
329 let payload_len = message.len().saturated_into();
330 let header = ItemHeader::<Size> { payload_len, is_processed: false };
331
332 let mut heap = core::mem::take(&mut self.heap).into_inner();
333 header.using_encoded(|h| heap.extend_from_slice(h));
334 heap.extend_from_slice(message.deref());
335 self.heap = BoundedVec::defensive_truncate_from(heap);
336 self.last = pos.saturated_into();
337 self.remaining.saturating_inc();
338 self.remaining_size.saturating_accrue(payload_len);
339 Ok(())
340 }
341
342 fn peek_first(&self) -> Option<BoundedSlice<'_, u8, IntoU32<HeapSize, Size>>> {
346 if self.first > self.last {
347 return None
348 }
349 let f = (self.first.into() as usize).min(self.heap.len());
350 let mut item_slice = &self.heap[f..];
351 if let Ok(h) = ItemHeader::<Size>::decode(&mut item_slice) {
352 let payload_len = h.payload_len.into() as usize;
353 if payload_len <= item_slice.len() {
354 return Some(BoundedSlice::defensive_truncate_from(&item_slice[..payload_len]))
357 }
358 }
359 defensive!("message-queue: heap corruption");
360 None
361 }
362
363 fn skip_first(&mut self, is_processed: bool) {
365 let f = (self.first.into() as usize).min(self.heap.len());
366 if let Ok(mut h) = ItemHeader::decode(&mut &self.heap[f..]) {
367 if is_processed && !h.is_processed {
368 h.is_processed = true;
369 h.using_encoded(|d| self.heap[f..f + d.len()].copy_from_slice(d));
370 self.remaining.saturating_dec();
371 self.remaining_size.saturating_reduce(h.payload_len);
372 }
373 self.first
374 .saturating_accrue(ItemHeader::<Size>::max_encoded_len().saturated_into());
375 self.first.saturating_accrue(h.payload_len);
376 self.first_index.saturating_inc();
377 }
378 }
379
380 fn peek_index(&self, index: usize) -> Option<(usize, bool, &[u8])> {
382 let mut pos = 0;
383 let mut item_slice = &self.heap[..];
384 let header_len: usize = ItemHeader::<Size>::max_encoded_len().saturated_into();
385 for _ in 0..index {
386 let h = ItemHeader::<Size>::decode(&mut item_slice).ok()?;
387 let item_len = h.payload_len.into() as usize;
388 if item_slice.len() < item_len {
389 return None
390 }
391 item_slice = &item_slice[item_len..];
392 pos.saturating_accrue(header_len.saturating_add(item_len));
393 }
394 let h = ItemHeader::<Size>::decode(&mut item_slice).ok()?;
395 if item_slice.len() < h.payload_len.into() as usize {
396 return None
397 }
398 item_slice = &item_slice[..h.payload_len.into() as usize];
399 Some((pos, h.is_processed, item_slice))
400 }
401
402 fn note_processed_at_pos(&mut self, pos: usize) {
407 if let Ok(mut h) = ItemHeader::<Size>::decode(&mut &self.heap[pos..]) {
408 if !h.is_processed {
409 h.is_processed = true;
410 h.using_encoded(|d| self.heap[pos..pos + d.len()].copy_from_slice(d));
411 self.remaining.saturating_dec();
412 self.remaining_size.saturating_reduce(h.payload_len);
413 }
414 }
415 }
416
417 fn is_complete(&self) -> bool {
419 self.remaining.is_zero()
420 }
421}
422
423#[derive(Clone, Encode, Decode, MaxEncodedLen, TypeInfo, Debug, PartialEq)]
425pub struct Neighbours<MessageOrigin> {
426 prev: MessageOrigin,
428 next: MessageOrigin,
430}
431
432#[derive(Clone, Encode, Decode, MaxEncodedLen, TypeInfo, Debug)]
438pub struct BookState<MessageOrigin> {
439 begin: PageIndex,
442 end: PageIndex,
444 count: PageIndex,
449 ready_neighbours: Option<Neighbours<MessageOrigin>>,
452 message_count: u64,
454 size: u64,
456}
457
458impl<MessageOrigin> Default for BookState<MessageOrigin> {
459 fn default() -> Self {
460 Self { begin: 0, end: 0, count: 0, ready_neighbours: None, message_count: 0, size: 0 }
461 }
462}
463
464impl<MessageOrigin> From<BookState<MessageOrigin>> for QueueFootprint {
465 fn from(book: BookState<MessageOrigin>) -> Self {
466 QueueFootprint {
467 pages: book.count,
468 ready_pages: book.end.defensive_saturating_sub(book.begin),
469 storage: Footprint { count: book.message_count, size: book.size },
470 }
471 }
472}
473
474pub trait OnQueueChanged<Id> {
476 fn on_queue_changed(id: Id, fp: QueueFootprint);
478}
479
480impl<Id> OnQueueChanged<Id> for () {
481 fn on_queue_changed(_: Id, _: QueueFootprint) {}
482}
483
484pub trait ForceSetHead<O> {
486 fn force_set_head(weight: &mut WeightMeter, origin: &O) -> Result<bool, ()>;
493}
494
495#[frame_support::pallet]
496pub mod pallet {
497 use super::*;
498
499 #[pallet::pallet]
500 pub struct Pallet<T>(_);
501
502 #[pallet::config]
504 pub trait Config: frame_system::Config {
505 #[allow(deprecated)]
507 type RuntimeEvent: From<Event<Self>> + IsType<<Self as frame_system::Config>::RuntimeEvent>;
508
509 type WeightInfo: WeightInfo;
511
512 type MessageProcessor: ProcessMessage;
523
524 type Size: BaseArithmetic
526 + Unsigned
527 + Copy
528 + Into<u32>
529 + Member
530 + Encode
531 + Decode
532 + MaxEncodedLen
533 + ConstEncodedLen
534 + TypeInfo
535 + Default;
536
537 type QueueChangeHandler: OnQueueChanged<<Self::MessageProcessor as ProcessMessage>::Origin>;
540
541 type QueuePausedQuery: QueuePausedQuery<<Self::MessageProcessor as ProcessMessage>::Origin>;
547
548 #[pallet::constant]
554 type HeapSize: Get<Self::Size>;
555
556 #[pallet::constant]
560 type MaxStale: Get<u32>;
561
562 #[pallet::constant]
569 type ServiceWeight: Get<Option<Weight>>;
570
571 #[pallet::constant]
577 type IdleMaxServiceWeight: Get<Option<Weight>>;
578 }
579
580 #[pallet::event]
581 #[pallet::generate_deposit(pub(super) fn deposit_event)]
582 pub enum Event<T: Config> {
583 ProcessingFailed {
585 id: H256,
587 origin: MessageOriginOf<T>,
589 error: ProcessMessageError,
594 },
595 Processed {
597 id: H256,
599 origin: MessageOriginOf<T>,
601 weight_used: Weight,
603 success: bool,
610 },
611 OverweightEnqueued {
613 id: [u8; 32],
615 origin: MessageOriginOf<T>,
617 page_index: PageIndex,
619 message_index: T::Size,
621 },
622 PageReaped {
624 origin: MessageOriginOf<T>,
626 index: PageIndex,
628 },
629 }
630
631 #[pallet::error]
632 pub enum Error<T> {
633 NotReapable,
636 NoPage,
638 NoMessage,
640 AlreadyProcessed,
642 Queued,
644 InsufficientWeight,
646 TemporarilyUnprocessable,
651 QueuePaused,
655 RecursiveDisallowed,
657 }
658
659 #[pallet::storage]
661 pub type BookStateFor<T: Config> =
662 StorageMap<_, Twox64Concat, MessageOriginOf<T>, BookState<MessageOriginOf<T>>, ValueQuery>;
663
664 #[pallet::storage]
666 pub type ServiceHead<T: Config> = StorageValue<_, MessageOriginOf<T>, OptionQuery>;
667
668 #[pallet::storage]
670 pub type Pages<T: Config> = StorageDoubleMap<
671 _,
672 Twox64Concat,
673 MessageOriginOf<T>,
674 Twox64Concat,
675 PageIndex,
676 Page<T::Size, T::HeapSize>,
677 OptionQuery,
678 >;
679
680 #[pallet::hooks]
681 impl<T: Config> Hooks<BlockNumberFor<T>> for Pallet<T> {
682 fn on_initialize(_n: BlockNumberFor<T>) -> Weight {
683 if let Some(weight_limit) = T::ServiceWeight::get() {
684 Self::service_queues_impl(weight_limit, ServiceQueuesContext::OnInitialize)
685 } else {
686 Weight::zero()
687 }
688 }
689
690 fn on_idle(_n: BlockNumberFor<T>, remaining_weight: Weight) -> Weight {
691 if let Some(weight_limit) = T::IdleMaxServiceWeight::get() {
692 Self::service_queues_impl(
694 weight_limit.min(remaining_weight),
695 ServiceQueuesContext::OnIdle,
696 )
697 } else {
698 Weight::zero()
699 }
700 }
701
702 #[cfg(feature = "try-runtime")]
703 fn try_state(_: BlockNumberFor<T>) -> Result<(), sp_runtime::TryRuntimeError> {
704 Self::do_try_state()
705 }
706
707 #[cfg(test)]
709 fn integrity_test() {
710 Self::do_integrity_test().expect("Pallet config is valid; qed")
711 }
712 }
713
714 #[pallet::call]
715 impl<T: Config> Pallet<T> {
716 #[pallet::call_index(0)]
718 #[pallet::weight(T::WeightInfo::reap_page())]
719 pub fn reap_page(
720 origin: OriginFor<T>,
721 message_origin: MessageOriginOf<T>,
722 page_index: PageIndex,
723 ) -> DispatchResult {
724 ensure_signed(origin)?;
725 Self::do_reap_page(&message_origin, page_index)
726 }
727
728 #[pallet::call_index(1)]
742 #[pallet::weight(
743 T::WeightInfo::execute_overweight_page_updated().max(
744 T::WeightInfo::execute_overweight_page_removed()).saturating_add(*weight_limit)
745 )]
746 pub fn execute_overweight(
747 origin: OriginFor<T>,
748 message_origin: MessageOriginOf<T>,
749 page: PageIndex,
750 index: T::Size,
751 weight_limit: Weight,
752 ) -> DispatchResultWithPostInfo {
753 ensure_signed(origin)?;
754 let actual_weight =
755 Self::do_execute_overweight(message_origin, page, index, weight_limit)?;
756 Ok(Some(actual_weight).into())
757 }
758 }
759}
760
761#[derive(PartialEq, Debug)]
763enum PageExecutionStatus {
764 Bailed,
766 NoProgress,
770 NoMore,
776}
777
778#[derive(PartialEq, Debug)]
780enum ItemExecutionStatus {
781 Bailed,
783 NoProgress,
787 NoItem,
789 Executed(bool),
793}
794
795#[derive(PartialEq)]
797enum MessageExecutionStatus {
798 InsufficientWeight,
800 Overweight,
802 Processed,
804 Unprocessable { permanent: bool },
806 StackLimitReached,
813}
814
815#[derive(PartialEq)]
818enum ServiceQueuesContext {
819 OnIdle,
821 OnInitialize,
823 ServiceQueues,
825}
826
827impl<T: Config> Pallet<T> {
828 fn ready_ring_knit(origin: &MessageOriginOf<T>) -> Result<Neighbours<MessageOriginOf<T>>, ()> {
832 if let Some(head) = ServiceHead::<T>::get() {
833 let mut head_book_state = BookStateFor::<T>::get(&head);
834 let mut head_neighbours = head_book_state.ready_neighbours.take().ok_or(())?;
835 let tail = head_neighbours.prev;
836 head_neighbours.prev = origin.clone();
837 head_book_state.ready_neighbours = Some(head_neighbours);
838 BookStateFor::<T>::insert(&head, head_book_state);
839
840 let mut tail_book_state = BookStateFor::<T>::get(&tail);
841 let mut tail_neighbours = tail_book_state.ready_neighbours.take().ok_or(())?;
842 tail_neighbours.next = origin.clone();
843 tail_book_state.ready_neighbours = Some(tail_neighbours);
844 BookStateFor::<T>::insert(&tail, tail_book_state);
845
846 Ok(Neighbours { next: head, prev: tail })
847 } else {
848 ServiceHead::<T>::put(origin);
849 Ok(Neighbours { next: origin.clone(), prev: origin.clone() })
850 }
851 }
852
853 fn ready_ring_unknit(origin: &MessageOriginOf<T>, neighbours: Neighbours<MessageOriginOf<T>>) {
854 if origin == &neighbours.next {
855 debug_assert!(
856 origin == &neighbours.prev,
857 "unknitting from single item ring; outgoing must be only item"
858 );
859 ServiceHead::<T>::kill();
861 } else {
862 BookStateFor::<T>::mutate(&neighbours.next, |book_state| {
863 if let Some(ref mut n) = book_state.ready_neighbours {
864 n.prev = neighbours.prev.clone()
865 }
866 });
867 BookStateFor::<T>::mutate(&neighbours.prev, |book_state| {
868 if let Some(ref mut n) = book_state.ready_neighbours {
869 n.next = neighbours.next.clone()
870 }
871 });
872 if let Some(head) = ServiceHead::<T>::get() {
873 if &head == origin {
874 ServiceHead::<T>::put(neighbours.next);
875 }
876 } else {
877 defensive!("`ServiceHead` must be some if there was a ready queue");
878 }
879 }
880 }
881
882 fn bump_service_head(weight: &mut WeightMeter) -> Option<MessageOriginOf<T>> {
886 if weight.try_consume(T::WeightInfo::bump_service_head()).is_err() {
887 return None
888 }
889
890 if let Some(head) = ServiceHead::<T>::get() {
891 let mut head_book_state = BookStateFor::<T>::get(&head);
892 if let Some(head_neighbours) = head_book_state.ready_neighbours.take() {
893 ServiceHead::<T>::put(&head_neighbours.next);
894 Some(head)
895 } else {
896 defensive!("The head must point to a queue in the ready ring");
897 None
898 }
899 } else {
900 None
901 }
902 }
903
904 fn set_service_head(weight: &mut WeightMeter, queue: &MessageOriginOf<T>) -> Result<bool, ()> {
905 if weight.try_consume(T::WeightInfo::set_service_head()).is_err() {
906 return Err(())
907 }
908
909 if BookStateFor::<T>::get(queue).ready_neighbours.is_some() {
911 ServiceHead::<T>::put(queue);
912 Ok(true)
913 } else {
914 Ok(false)
915 }
916 }
917
918 fn max_message_weight(limit: Weight) -> Option<Weight> {
924 let service_weight = T::ServiceWeight::get().unwrap_or_default();
925 let on_idle_weight = T::IdleMaxServiceWeight::get().unwrap_or_default();
926
927 let max_message_weight =
930 if service_weight.any_gt(on_idle_weight) { service_weight } else { on_idle_weight };
931
932 if max_message_weight.is_zero() {
933 limit.checked_sub(&Self::single_msg_overhead())
935 } else {
936 max_message_weight.checked_sub(&Self::single_msg_overhead())
937 }
938 }
939
940 fn single_msg_overhead() -> Weight {
942 T::WeightInfo::bump_service_head()
943 .saturating_add(T::WeightInfo::service_queue_base())
944 .saturating_add(
945 T::WeightInfo::service_page_base_completion()
946 .max(T::WeightInfo::service_page_base_no_completion()),
947 )
948 .saturating_add(T::WeightInfo::service_page_item())
949 .saturating_add(T::WeightInfo::ready_ring_unknit())
950 }
951
952 #[cfg(test)]
956 fn do_integrity_test() -> Result<(), String> {
957 ensure!(!MaxMessageLenOf::<T>::get().is_zero(), "HeapSize too low");
958
959 let max_block = T::BlockWeights::get().max_block;
960
961 if let Some(service) = T::ServiceWeight::get() {
962 if Self::max_message_weight(service).is_none() {
963 return Err(format!(
964 "ServiceWeight too low: {}. Must be at least {}",
965 service,
966 Self::single_msg_overhead(),
967 ))
968 }
969
970 if service.any_gt(max_block) {
971 return Err(format!(
972 "ServiceWeight {service} is bigger than max block weight {max_block}"
973 ))
974 }
975 }
976
977 if let Some(on_idle) = T::IdleMaxServiceWeight::get() {
978 if on_idle.any_gt(max_block) {
979 return Err(format!(
980 "IdleMaxServiceWeight {on_idle} is bigger than max block weight {max_block}"
981 ))
982 }
983 }
984
985 if let (Some(service_weight), Some(on_idle)) =
986 (T::ServiceWeight::get(), T::IdleMaxServiceWeight::get())
987 {
988 if !(service_weight.all_gt(on_idle) ||
989 on_idle.all_gt(service_weight) ||
990 service_weight == on_idle)
991 {
992 return Err("One of `ServiceWeight` or `IdleMaxServiceWeight` needs to be `all_gt` or both need to be equal.".into())
993 }
994 }
995
996 Ok(())
997 }
998
999 fn do_enqueue_messages<'a>(
1000 origin: &MessageOriginOf<T>,
1001 messages: impl Iterator<Item = BoundedSlice<'a, u8, MaxMessageLenOf<T>>>,
1002 ) {
1003 let mut book_state = BookStateFor::<T>::get(origin);
1004
1005 let mut maybe_page = None;
1006 if book_state.end > book_state.begin {
1008 debug_assert!(book_state.ready_neighbours.is_some(), "Must be in ready ring if ready");
1009 maybe_page = Pages::<T>::get(origin, book_state.end - 1).or_else(|| {
1010 defensive!("Corruption: referenced page doesn't exist.");
1011 None
1012 });
1013 }
1014
1015 for message in messages {
1016 if let Some(mut page) = maybe_page {
1018 maybe_page = match page.try_append_message::<T>(message) {
1019 Ok(_) => Some(page),
1020 Err(_) => {
1021 Pages::<T>::insert(origin, book_state.end - 1, page);
1024 None
1025 },
1026 }
1027 }
1028 if maybe_page.is_none() {
1030 book_state.end.saturating_inc();
1031 book_state.count.saturating_inc();
1032 maybe_page = Some(Page::from_message::<T>(message));
1033 }
1034
1035 book_state.message_count.saturating_inc();
1037 book_state
1038 .size
1039 .saturating_accrue(message.len() as u64);
1041 }
1042
1043 if let Some(page) = maybe_page {
1045 Pages::<T>::insert(origin, book_state.end - 1, page);
1046 }
1047
1048 if book_state.ready_neighbours.is_none() {
1050 match Self::ready_ring_knit(origin) {
1051 Ok(neighbours) => book_state.ready_neighbours = Some(neighbours),
1052 Err(()) => {
1053 defensive!("Ring state invalid when knitting");
1054 },
1055 }
1056 }
1057
1058 BookStateFor::<T>::insert(origin, book_state);
1060 }
1061
1062 pub fn do_execute_overweight(
1067 origin: MessageOriginOf<T>,
1068 page_index: PageIndex,
1069 index: T::Size,
1070 weight_limit: Weight,
1071 ) -> Result<Weight, Error<T>> {
1072 match with_service_mutex(|| {
1073 Self::do_execute_overweight_inner(origin, page_index, index, weight_limit)
1074 }) {
1075 Err(()) => Err(Error::<T>::RecursiveDisallowed),
1076 Ok(x) => x,
1077 }
1078 }
1079
1080 fn do_execute_overweight_inner(
1082 origin: MessageOriginOf<T>,
1083 page_index: PageIndex,
1084 index: T::Size,
1085 weight_limit: Weight,
1086 ) -> Result<Weight, Error<T>> {
1087 let mut book_state = BookStateFor::<T>::get(&origin);
1088 ensure!(!T::QueuePausedQuery::is_paused(&origin), Error::<T>::QueuePaused);
1089
1090 let mut page = Pages::<T>::get(&origin, page_index).ok_or(Error::<T>::NoPage)?;
1091 let (pos, is_processed, payload) =
1092 page.peek_index(index.into() as usize).ok_or(Error::<T>::NoMessage)?;
1093 let payload_len = payload.len() as u64;
1094 ensure!(
1095 page_index < book_state.begin ||
1096 (page_index == book_state.begin && pos < page.first.into() as usize),
1097 Error::<T>::Queued
1098 );
1099 ensure!(!is_processed, Error::<T>::AlreadyProcessed);
1100 use MessageExecutionStatus::*;
1101 let mut weight_counter = WeightMeter::with_limit(weight_limit);
1102 match Self::process_message_payload(
1103 origin.clone(),
1104 page_index,
1105 index,
1106 payload,
1107 &mut weight_counter,
1108 Weight::MAX,
1109 ) {
1112 Overweight | InsufficientWeight => Err(Error::<T>::InsufficientWeight),
1113 StackLimitReached | Unprocessable { permanent: false } =>
1114 Err(Error::<T>::TemporarilyUnprocessable),
1115 Unprocessable { permanent: true } | Processed => {
1116 page.note_processed_at_pos(pos);
1117 book_state.message_count.saturating_dec();
1118 book_state.size.saturating_reduce(payload_len);
1119 let page_weight = if page.remaining.is_zero() {
1120 debug_assert!(
1121 page.remaining_size.is_zero(),
1122 "no messages remaining; no space taken; qed"
1123 );
1124 Pages::<T>::remove(&origin, page_index);
1125 debug_assert!(book_state.count >= 1, "page exists, so book must have pages");
1126 book_state.count.saturating_dec();
1127 T::WeightInfo::execute_overweight_page_removed()
1128 } else {
1131 Pages::<T>::insert(&origin, page_index, page);
1132 T::WeightInfo::execute_overweight_page_updated()
1133 };
1134 BookStateFor::<T>::insert(&origin, &book_state);
1135 T::QueueChangeHandler::on_queue_changed(origin, book_state.into());
1136 Ok(weight_counter.consumed().saturating_add(page_weight))
1137 },
1138 }
1139 }
1140
1141 fn do_reap_page(origin: &MessageOriginOf<T>, page_index: PageIndex) -> DispatchResult {
1143 match with_service_mutex(|| Self::do_reap_page_inner(origin, page_index)) {
1144 Err(()) => Err(Error::<T>::RecursiveDisallowed.into()),
1145 Ok(x) => x,
1146 }
1147 }
1148
1149 fn do_reap_page_inner(origin: &MessageOriginOf<T>, page_index: PageIndex) -> DispatchResult {
1151 let mut book_state = BookStateFor::<T>::get(origin);
1152 ensure!(page_index < book_state.begin, Error::<T>::NotReapable);
1155
1156 let page = Pages::<T>::get(origin, page_index).ok_or(Error::<T>::NoPage)?;
1157
1158 let reapable = page.remaining.is_zero();
1160
1161 let cullable = || {
1163 let total_pages = book_state.count;
1164 let ready_pages = book_state.end.saturating_sub(book_state.begin).min(total_pages);
1165
1166 let stale_pages = total_pages - ready_pages;
1174
1175 let max_stale = T::MaxStale::get();
1179
1180 let overflow = match stale_pages.checked_sub(max_stale + 1) {
1183 Some(x) => x + 1,
1184 None => return false,
1185 };
1186
1187 let backlog = (max_stale * max_stale / overflow).max(max_stale);
1196
1197 let watermark = book_state.begin.saturating_sub(backlog);
1198 page_index < watermark
1199 };
1200 ensure!(reapable || cullable(), Error::<T>::NotReapable);
1201
1202 Pages::<T>::remove(origin, page_index);
1203 debug_assert!(book_state.count > 0, "reaping a page implies there are pages");
1204 book_state.count.saturating_dec();
1205 book_state.message_count.saturating_reduce(page.remaining.into() as u64);
1206 book_state.size.saturating_reduce(page.remaining_size.into() as u64);
1207 BookStateFor::<T>::insert(origin, &book_state);
1208 T::QueueChangeHandler::on_queue_changed(origin.clone(), book_state.into());
1209 Self::deposit_event(Event::PageReaped { origin: origin.clone(), index: page_index });
1210
1211 Ok(())
1212 }
1213
1214 fn service_queue(
1218 origin: MessageOriginOf<T>,
1219 weight: &mut WeightMeter,
1220 overweight_limit: Weight,
1221 ) -> (bool, Option<MessageOriginOf<T>>) {
1222 use PageExecutionStatus::*;
1223 if weight
1224 .try_consume(
1225 T::WeightInfo::service_queue_base()
1226 .saturating_add(T::WeightInfo::ready_ring_unknit()),
1227 )
1228 .is_err()
1229 {
1230 return (false, None)
1231 }
1232
1233 let mut book_state = BookStateFor::<T>::get(&origin);
1234 let mut total_processed = 0;
1235 if T::QueuePausedQuery::is_paused(&origin) {
1236 let next_ready = book_state.ready_neighbours.as_ref().map(|x| x.next.clone());
1237 return (false, next_ready)
1238 }
1239
1240 while book_state.end > book_state.begin {
1241 let (processed, status) =
1242 Self::service_page(&origin, &mut book_state, weight, overweight_limit);
1243 total_processed.saturating_accrue(processed);
1244 match status {
1245 Bailed | NoProgress => break,
1247 NoMore => (),
1249 };
1250 book_state.begin.saturating_inc();
1251 }
1252 let next_ready = book_state.ready_neighbours.as_ref().map(|x| x.next.clone());
1253 if book_state.begin >= book_state.end {
1254 if let Some(neighbours) = book_state.ready_neighbours.take() {
1256 Self::ready_ring_unknit(&origin, neighbours);
1257 } else if total_processed > 0 {
1258 defensive!("Freshly processed queue must have been ready");
1259 }
1260 }
1261 BookStateFor::<T>::insert(&origin, &book_state);
1262 if total_processed > 0 {
1263 T::QueueChangeHandler::on_queue_changed(origin, book_state.into());
1264 }
1265 (total_processed > 0, next_ready)
1266 }
1267
1268 fn service_page(
1272 origin: &MessageOriginOf<T>,
1273 book_state: &mut BookStateOf<T>,
1274 weight: &mut WeightMeter,
1275 overweight_limit: Weight,
1276 ) -> (u32, PageExecutionStatus) {
1277 use PageExecutionStatus::*;
1278 if weight
1279 .try_consume(
1280 T::WeightInfo::service_page_base_completion()
1281 .max(T::WeightInfo::service_page_base_no_completion()),
1282 )
1283 .is_err()
1284 {
1285 return (0, Bailed)
1286 }
1287
1288 let page_index = book_state.begin;
1289 let mut page = match Pages::<T>::get(origin, page_index) {
1290 Some(p) => p,
1291 None => {
1292 defensive!("message-queue: referenced page not found");
1293 return (0, NoMore)
1294 },
1295 };
1296
1297 let mut total_processed = 0;
1298
1299 let status = loop {
1301 use ItemExecutionStatus::*;
1302 match Self::service_page_item(
1303 origin,
1304 page_index,
1305 book_state,
1306 &mut page,
1307 weight,
1308 overweight_limit,
1309 ) {
1310 Bailed => break PageExecutionStatus::Bailed,
1311 NoItem => break PageExecutionStatus::NoMore,
1312 NoProgress => break PageExecutionStatus::NoProgress,
1313 Executed(true) => total_processed.saturating_inc(),
1315 Executed(false) => (),
1316 }
1317 };
1318
1319 if page.is_complete() {
1320 debug_assert!(status != Bailed, "we never bail if a page became complete");
1321 Pages::<T>::remove(origin, page_index);
1322 debug_assert!(book_state.count > 0, "completing a page implies there are pages");
1323 book_state.count.saturating_dec();
1324 } else {
1325 Pages::<T>::insert(origin, page_index, page);
1326 }
1327 (total_processed, status)
1328 }
1329
1330 pub(crate) fn service_page_item(
1332 origin: &MessageOriginOf<T>,
1333 page_index: PageIndex,
1334 book_state: &mut BookStateOf<T>,
1335 page: &mut PageOf<T>,
1336 weight: &mut WeightMeter,
1337 overweight_limit: Weight,
1338 ) -> ItemExecutionStatus {
1339 use MessageExecutionStatus::*;
1340 if page.is_complete() {
1343 return ItemExecutionStatus::NoItem
1344 }
1345 if weight.try_consume(T::WeightInfo::service_page_item()).is_err() {
1346 return ItemExecutionStatus::Bailed
1347 }
1348
1349 let payload = &match page.peek_first() {
1350 Some(m) => m,
1351 None => return ItemExecutionStatus::NoItem,
1352 }[..];
1353 let payload_len = payload.len() as u64;
1354
1355 Pages::<T>::insert(origin, page_index, &*page);
1357 BookStateFor::<T>::insert(origin, &*book_state);
1358
1359 let res = Self::process_message_payload(
1360 origin.clone(),
1361 page_index,
1362 page.first_index,
1363 payload,
1364 weight,
1365 overweight_limit,
1366 );
1367
1368 *book_state = BookStateFor::<T>::get(origin);
1370 if let Some(new_page) = Pages::<T>::get(origin, page_index) {
1371 *page = new_page;
1372 } else {
1373 defensive!("page must exist since we just inserted it and recursive calls are not allowed to remove anything");
1374 return ItemExecutionStatus::NoItem
1375 };
1376
1377 let is_processed = match res {
1378 InsufficientWeight => return ItemExecutionStatus::Bailed,
1379 Unprocessable { permanent: false } => return ItemExecutionStatus::NoProgress,
1380 Processed | Unprocessable { permanent: true } | StackLimitReached => true,
1381 Overweight => false,
1382 };
1383
1384 if is_processed {
1385 book_state.message_count.saturating_dec();
1386 book_state.size.saturating_reduce(payload_len as u64);
1387 }
1388 page.skip_first(is_processed);
1389 ItemExecutionStatus::Executed(is_processed)
1390 }
1391
1392 #[cfg(any(test, feature = "try-runtime", feature = "std"))]
1412 pub fn do_try_state() -> Result<(), sp_runtime::TryRuntimeError> {
1413 ensure!(
1415 BookStateFor::<T>::iter_keys().count() == BookStateFor::<T>::iter_values().count(),
1416 "Memory Corruption in BookStateFor"
1417 );
1418 ensure!(
1420 Pages::<T>::iter_keys().count() == Pages::<T>::iter_values().count(),
1421 "Memory Corruption in Pages"
1422 );
1423
1424 for book in BookStateFor::<T>::iter_values() {
1426 ensure!(book.end >= book.begin, "Invariant");
1427 ensure!(book.end < 1 << 30, "Likely overflow or corruption");
1428 ensure!(book.message_count < 1 << 30, "Likely overflow or corruption");
1429 ensure!(book.size < 1 << 30, "Likely overflow or corruption");
1430 ensure!(book.count < 1 << 30, "Likely overflow or corruption");
1431
1432 let fp: QueueFootprint = book.into();
1433 ensure!(fp.ready_pages <= fp.pages, "There cannot be more ready than total pages");
1434 }
1435
1436 let Some(starting_origin) = ServiceHead::<T>::get() else { return Ok(()) };
1438
1439 while let Some(head) = Self::bump_service_head(&mut WeightMeter::new()) {
1440 ensure!(
1441 BookStateFor::<T>::contains_key(&head),
1442 "Service head must point to an existing book"
1443 );
1444
1445 let head_book_state = BookStateFor::<T>::get(&head);
1446 ensure!(
1447 head_book_state.message_count > 0,
1448 "There must be some messages if in ReadyRing"
1449 );
1450 ensure!(head_book_state.size > 0, "There must be some message size if in ReadyRing");
1451 ensure!(
1452 head_book_state.end > head_book_state.begin,
1453 "End > Begin if unprocessed messages exists"
1454 );
1455 ensure!(
1456 head_book_state.ready_neighbours.is_some(),
1457 "There must be neighbours if in ReadyRing"
1458 );
1459
1460 if head_book_state.ready_neighbours.as_ref().unwrap().next == head {
1461 ensure!(
1462 head_book_state.ready_neighbours.as_ref().unwrap().prev == head,
1463 "Can only happen if only queue in ReadyRing"
1464 );
1465 }
1466
1467 for page_index in head_book_state.begin..head_book_state.end {
1468 let page = Pages::<T>::get(&head, page_index).unwrap();
1469 let remaining_messages = page.remaining;
1470 let mut counted_remaining_messages: u32 = 0;
1471 ensure!(
1472 remaining_messages > 0.into(),
1473 "These must be some messages that have not been processed yet!"
1474 );
1475
1476 for i in 0..u32::MAX {
1477 if let Some((_, processed, _)) = page.peek_index(i as usize) {
1478 if !processed {
1479 counted_remaining_messages += 1;
1480 }
1481 } else {
1482 break
1483 }
1484 }
1485
1486 ensure!(
1487 remaining_messages.into() == counted_remaining_messages,
1488 "Memory Corruption"
1489 );
1490 }
1491
1492 if head_book_state.ready_neighbours.as_ref().unwrap().next == starting_origin {
1493 break
1494 }
1495 }
1496 Ok(())
1497 }
1498
1499 #[cfg(feature = "std")]
1515 pub fn debug_info() -> String {
1516 let mut info = String::new();
1517 for (origin, book_state) in BookStateFor::<T>::iter() {
1518 let mut queue = format!("queue {:?}:\n", &origin);
1519 let mut pages = Pages::<T>::iter_prefix(&origin).collect::<Vec<_>>();
1520 pages.sort_by(|(a, _), (b, _)| a.cmp(b));
1521 for (page_index, mut page) in pages.into_iter() {
1522 let page_info = if book_state.begin == page_index { ">" } else { " " };
1523 let mut page_info = format!(
1524 "{} page {} ({:?} first, {:?} last, {:?} remain): [ ",
1525 page_info, page_index, page.first, page.last, page.remaining
1526 );
1527 for i in 0..u32::MAX {
1528 if let Some((_, processed, message)) =
1529 page.peek_index(i.try_into().expect("std-only code"))
1530 {
1531 let msg = String::from_utf8_lossy(message);
1532 if processed {
1533 page_info.push('*');
1534 }
1535 page_info.push_str(&format!("{:?}, ", msg));
1536 page.skip_first(true);
1537 } else {
1538 break
1539 }
1540 }
1541 page_info.push_str("]\n");
1542 queue.push_str(&page_info);
1543 }
1544 info.push_str(&queue);
1545 }
1546 info
1547 }
1548
1549 fn process_message_payload(
1557 origin: MessageOriginOf<T>,
1558 page_index: PageIndex,
1559 message_index: T::Size,
1560 message: &[u8],
1561 meter: &mut WeightMeter,
1562 overweight_limit: Weight,
1563 ) -> MessageExecutionStatus {
1564 let mut id = sp_io::hashing::blake2_256(message);
1565 use ProcessMessageError::*;
1566 let prev_consumed = meter.consumed();
1567
1568 let transaction =
1569 storage::with_transaction(|| -> TransactionOutcome<Result<_, DispatchError>> {
1570 let res =
1571 T::MessageProcessor::process_message(message, origin.clone(), meter, &mut id);
1572 match &res {
1573 Ok(_) => TransactionOutcome::Commit(Ok(res)),
1574 Err(_) => TransactionOutcome::Rollback(Ok(res)),
1575 }
1576 });
1577
1578 let transaction = match transaction {
1579 Ok(result) => result,
1580 _ => {
1581 defensive!(
1582 "Error occurred processing message, storage changes will be rolled back"
1583 );
1584 return MessageExecutionStatus::Unprocessable { permanent: true }
1585 },
1586 };
1587
1588 match transaction {
1589 Err(Overweight(w)) if w.any_gt(overweight_limit) => {
1590 Self::deposit_event(Event::<T>::OverweightEnqueued {
1592 id,
1593 origin,
1594 page_index,
1595 message_index,
1596 });
1597 MessageExecutionStatus::Overweight
1598 },
1599 Err(Overweight(_)) => {
1600 MessageExecutionStatus::InsufficientWeight
1603 },
1604 Err(Yield) => {
1605 MessageExecutionStatus::Unprocessable { permanent: false }
1607 },
1608 Err(error @ BadFormat | error @ Corrupt | error @ Unsupported) => {
1609 Self::deposit_event(Event::<T>::ProcessingFailed { id: id.into(), origin, error });
1611 MessageExecutionStatus::Unprocessable { permanent: true }
1612 },
1613 Err(error @ StackLimitReached) => {
1614 Self::deposit_event(Event::<T>::ProcessingFailed { id: id.into(), origin, error });
1615 MessageExecutionStatus::StackLimitReached
1616 },
1617 Ok(success) => {
1618 let weight_used = meter.consumed().saturating_sub(prev_consumed);
1620 Self::deposit_event(Event::<T>::Processed {
1621 id: id.into(),
1622 origin,
1623 weight_used,
1624 success,
1625 });
1626 MessageExecutionStatus::Processed
1627 },
1628 }
1629 }
1630
1631 fn service_queues_impl(weight_limit: Weight, context: ServiceQueuesContext) -> Weight {
1632 let mut weight = WeightMeter::with_limit(weight_limit);
1633
1634 let overweight_limit = Self::max_message_weight(weight_limit).unwrap_or_else(|| {
1636 if matches!(context, ServiceQueuesContext::OnInitialize) {
1637 defensive!("Not enough weight to service a single message.");
1638 }
1639 Weight::zero()
1640 });
1641
1642 match with_service_mutex(|| {
1643 let mut next = match Self::bump_service_head(&mut weight) {
1644 Some(h) => h,
1645 None => return weight.consumed(),
1646 };
1647 let mut last_no_progress = None;
1651
1652 loop {
1653 let (progressed, n) =
1654 Self::service_queue(next.clone(), &mut weight, overweight_limit);
1655 next = match n {
1656 Some(n) =>
1657 if !progressed {
1658 if last_no_progress == Some(n.clone()) {
1659 break
1660 }
1661 if last_no_progress.is_none() {
1662 last_no_progress = Some(next.clone())
1663 }
1664 n
1665 } else {
1666 last_no_progress = None;
1667 n
1668 },
1669 None => break,
1670 }
1671 }
1672 weight.consumed()
1673 }) {
1674 Err(()) => weight.consumed(),
1675 Ok(w) => w,
1676 }
1677 }
1678}
1679
1680impl<T: Config> ForceSetHead<MessageOriginOf<T>> for Pallet<T> {
1681 fn force_set_head(weight: &mut WeightMeter, origin: &MessageOriginOf<T>) -> Result<bool, ()> {
1682 Pallet::<T>::set_service_head(weight, origin)
1683 }
1684}
1685
1686pub(crate) fn with_service_mutex<F: FnOnce() -> R, R>(f: F) -> Result<R, ()> {
1688 environmental::environmental!(token: Option<()>);
1690
1691 token::using_once(&mut Some(()), || {
1692 let hold = token::with(|t| t.take()).ok_or(()).defensive()?.ok_or(())?;
1694
1695 defer! {
1697 token::with(|t| {
1698 *t = Some(hold);
1699 });
1700 }
1701
1702 Ok(f())
1703 })
1704}
1705
1706pub struct MaxEncodedLenOf<T>(core::marker::PhantomData<T>);
1708impl<T: MaxEncodedLen> Get<u32> for MaxEncodedLenOf<T> {
1709 fn get() -> u32 {
1710 T::max_encoded_len() as u32
1711 }
1712}
1713
1714pub struct MaxMessageLen<Origin, Size, HeapSize>(
1716 core::marker::PhantomData<(Origin, Size, HeapSize)>,
1717);
1718impl<Origin: MaxEncodedLen, Size: MaxEncodedLen + Into<u32>, HeapSize: Get<Size>> Get<u32>
1719 for MaxMessageLen<Origin, Size, HeapSize>
1720{
1721 fn get() -> u32 {
1722 (HeapSize::get().into()).saturating_sub(ItemHeader::<Size>::max_encoded_len() as u32)
1723 }
1724}
1725
1726pub type MaxMessageLenOf<T> =
1728 MaxMessageLen<MessageOriginOf<T>, <T as Config>::Size, <T as Config>::HeapSize>;
1729pub type MaxOriginLenOf<T> = MaxEncodedLenOf<MessageOriginOf<T>>;
1731pub type MessageOriginOf<T> = <<T as Config>::MessageProcessor as ProcessMessage>::Origin;
1733pub type HeapSizeU32Of<T> = IntoU32<<T as Config>::HeapSize, <T as Config>::Size>;
1735pub type PageOf<T> = Page<<T as Config>::Size, <T as Config>::HeapSize>;
1737pub type BookStateOf<T> = BookState<MessageOriginOf<T>>;
1739
1740pub struct IntoU32<T, O>(core::marker::PhantomData<(T, O)>);
1743impl<T: Get<O>, O: Into<u32>> Get<u32> for IntoU32<T, O> {
1744 fn get() -> u32 {
1745 T::get().into()
1746 }
1747}
1748
1749impl<T: Config> ServiceQueues for Pallet<T> {
1750 type OverweightMessageAddress = (MessageOriginOf<T>, PageIndex, T::Size);
1751
1752 fn service_queues(weight_limit: Weight) -> Weight {
1753 Self::service_queues_impl(weight_limit, ServiceQueuesContext::ServiceQueues)
1754 }
1755
1756 fn execute_overweight(
1760 weight_limit: Weight,
1761 (message_origin, page, index): Self::OverweightMessageAddress,
1762 ) -> Result<Weight, ExecuteOverweightError> {
1763 let mut weight = WeightMeter::with_limit(weight_limit);
1764 if weight
1765 .try_consume(
1766 T::WeightInfo::execute_overweight_page_removed()
1767 .max(T::WeightInfo::execute_overweight_page_updated()),
1768 )
1769 .is_err()
1770 {
1771 return Err(ExecuteOverweightError::InsufficientWeight)
1772 }
1773
1774 Pallet::<T>::do_execute_overweight(message_origin, page, index, weight.remaining()).map_err(
1775 |e| match e {
1776 Error::<T>::InsufficientWeight => ExecuteOverweightError::InsufficientWeight,
1777 Error::<T>::AlreadyProcessed => ExecuteOverweightError::AlreadyProcessed,
1778 Error::<T>::QueuePaused => ExecuteOverweightError::QueuePaused,
1779 Error::<T>::NoPage | Error::<T>::NoMessage | Error::<T>::Queued =>
1780 ExecuteOverweightError::NotFound,
1781 Error::<T>::RecursiveDisallowed => ExecuteOverweightError::RecursiveDisallowed,
1782 _ => ExecuteOverweightError::Other,
1783 },
1784 )
1785 }
1786}
1787
1788impl<T: Config> EnqueueMessage<MessageOriginOf<T>> for Pallet<T> {
1789 type MaxMessageLen =
1790 MaxMessageLen<<T::MessageProcessor as ProcessMessage>::Origin, T::Size, T::HeapSize>;
1791
1792 fn enqueue_message(
1793 message: BoundedSlice<u8, Self::MaxMessageLen>,
1794 origin: <T::MessageProcessor as ProcessMessage>::Origin,
1795 ) {
1796 Self::do_enqueue_messages(&origin, [message].into_iter());
1797 let book_state = BookStateFor::<T>::get(&origin);
1798 T::QueueChangeHandler::on_queue_changed(origin, book_state.into());
1799 }
1800
1801 fn enqueue_messages<'a>(
1802 messages: impl Iterator<Item = BoundedSlice<'a, u8, Self::MaxMessageLen>>,
1803 origin: <T::MessageProcessor as ProcessMessage>::Origin,
1804 ) {
1805 Self::do_enqueue_messages(&origin, messages);
1806 let book_state = BookStateFor::<T>::get(&origin);
1807 T::QueueChangeHandler::on_queue_changed(origin, book_state.into());
1808 }
1809
1810 fn sweep_queue(origin: MessageOriginOf<T>) {
1811 if !BookStateFor::<T>::contains_key(&origin) {
1812 return
1813 }
1814 let mut book_state = BookStateFor::<T>::get(&origin);
1815 book_state.begin = book_state.end;
1816 if let Some(neighbours) = book_state.ready_neighbours.take() {
1817 Self::ready_ring_unknit(&origin, neighbours);
1818 }
1819 BookStateFor::<T>::insert(&origin, &book_state);
1820 }
1821}
1822
1823impl<T: Config> QueueFootprintQuery<MessageOriginOf<T>> for Pallet<T> {
1824 type MaxMessageLen =
1825 MaxMessageLen<<T::MessageProcessor as ProcessMessage>::Origin, T::Size, T::HeapSize>;
1826
1827 fn get_batches_footprints<'a>(
1828 origin: MessageOriginOf<T>,
1829 msgs: impl Iterator<Item = BoundedSlice<'a, u8, Self::MaxMessageLen>>,
1830 total_pages_limit: u32,
1831 ) -> BatchesFootprints {
1832 let mut batches_footprints = BatchesFootprints::default();
1833
1834 let mut new_page = false;
1835 let mut total_pages_count = 0;
1836 let mut current_page_pos: usize = T::HeapSize::get().into() as usize;
1837
1838 let book = BookStateFor::<T>::get(&origin);
1839 if book.end > book.begin {
1840 total_pages_count = book.end - book.begin;
1841 if let Some(page) = Pages::<T>::get(origin, book.end - 1) {
1842 current_page_pos = page.heap_pos();
1843 batches_footprints.first_page_pos = current_page_pos;
1844 }
1845 }
1846
1847 let mut msgs = msgs.peekable();
1848 while let Some(msg) = msgs.peek() {
1849 if total_pages_count > total_pages_limit {
1850 return batches_footprints;
1851 }
1852
1853 match Page::<T::Size, T::HeapSize>::can_append_message_at(current_page_pos, msg.len()) {
1854 Ok(new_pos) => {
1855 current_page_pos = new_pos;
1856 batches_footprints.push(msg, new_page);
1857 new_page = false;
1858 msgs.next();
1859 },
1860 Err(_) => {
1861 new_page = true;
1864 total_pages_count += 1;
1865 current_page_pos = 0;
1866 },
1867 }
1868 }
1869
1870 batches_footprints
1871 }
1872
1873 fn footprint(origin: MessageOriginOf<T>) -> QueueFootprint {
1874 BookStateFor::<T>::get(&origin).into()
1875 }
1876}