1#![deny(missing_docs)]
197#![cfg_attr(not(feature = "std"), no_std)]
198
199mod benchmarking;
200mod integration_test;
201mod mock;
202pub mod mock_helpers;
203mod tests;
204pub mod weights;
205
206extern crate alloc;
207
208use alloc::{vec, vec::Vec};
209use codec::{Codec, ConstEncodedLen, Decode, DecodeWithMemTracking, Encode, MaxEncodedLen};
210use core::{fmt::Debug, ops::Deref};
211use frame_support::{
212 defensive,
213 pallet_prelude::*,
214 traits::{
215 BatchesFootprints, Defensive, DefensiveSaturating, DefensiveTruncateFrom, EnqueueMessage,
216 ExecuteOverweightError, Footprint, ProcessMessage, ProcessMessageError, QueueFootprint,
217 QueueFootprintQuery, QueuePausedQuery, ServiceQueues,
218 },
219 BoundedSlice, CloneNoBound, DefaultNoBound,
220};
221use frame_system::pallet_prelude::*;
222pub use pallet::*;
223use scale_info::TypeInfo;
224use sp_arithmetic::traits::{BaseArithmetic, Unsigned};
225use sp_core::{defer, H256};
226use sp_runtime::{
227 traits::{One, Zero},
228 SaturatedConversion, Saturating, TransactionOutcome,
229};
230use sp_weights::WeightMeter;
231pub use weights::WeightInfo;
232
233type PageIndex = u32;
235
236#[derive(Encode, Decode, PartialEq, MaxEncodedLen, Debug)]
238pub struct ItemHeader<Size> {
239 payload_len: Size,
242 is_processed: bool,
244}
245
246impl<Size: ConstEncodedLen> ConstEncodedLen for ItemHeader<Size> {} #[derive(
250 CloneNoBound, Encode, Decode, RuntimeDebugNoBound, DefaultNoBound, TypeInfo, MaxEncodedLen,
251)]
252#[scale_info(skip_type_params(HeapSize))]
253#[codec(mel_bound(Size: MaxEncodedLen))]
254pub struct Page<Size: Into<u32> + Debug + Clone + Default, HeapSize: Get<Size>> {
255 remaining: Size,
258 remaining_size: Size,
262 first_index: Size,
264 first: Size,
267 last: Size,
269 heap: BoundedVec<u8, IntoU32<HeapSize, Size>>,
271}
272
273impl<
274 Size: BaseArithmetic + Unsigned + Copy + Into<u32> + Codec + MaxEncodedLen + Debug + Default,
275 HeapSize: Get<Size>,
276 > Page<Size, HeapSize>
277where
278 ItemHeader<Size>: ConstEncodedLen,
279{
280 fn from_message<T: Config>(message: BoundedSlice<u8, MaxMessageLenOf<T>>) -> Self {
282 let payload_len = message.len();
283 let data_len = ItemHeader::<Size>::max_encoded_len().saturating_add(payload_len);
284 let payload_len = payload_len.saturated_into();
285 let header = ItemHeader::<Size> { payload_len, is_processed: false };
286
287 let mut heap = Vec::with_capacity(data_len);
288 header.using_encoded(|h| heap.extend_from_slice(h));
289 heap.extend_from_slice(message.deref());
290
291 Page {
292 remaining: One::one(),
293 remaining_size: payload_len,
294 first_index: Zero::zero(),
295 first: Zero::zero(),
296 last: Zero::zero(),
297 heap: BoundedVec::defensive_truncate_from(heap),
298 }
299 }
300
301 fn heap_pos(&self) -> usize {
303 self.heap.len()
306 }
307
308 fn can_append_message_at(pos: usize, message_len: usize) -> Result<usize, ()> {
313 let header_size = ItemHeader::<Size>::max_encoded_len();
314 let data_len = header_size.saturating_add(message_len);
315 let heap_size = HeapSize::get().into() as usize;
316 let new_pos = pos.saturating_add(data_len);
317 if new_pos <= heap_size {
318 Ok(new_pos)
319 } else {
320 Err(())
321 }
322 }
323
324 fn try_append_message<T: Config>(
326 &mut self,
327 message: BoundedSlice<u8, MaxMessageLenOf<T>>,
328 ) -> Result<(), ()> {
329 let pos = self.heap_pos();
330 Self::can_append_message_at(pos, message.len())?;
331 let payload_len = message.len().saturated_into();
332 let header = ItemHeader::<Size> { payload_len, is_processed: false };
333
334 let mut heap = core::mem::take(&mut self.heap).into_inner();
335 header.using_encoded(|h| heap.extend_from_slice(h));
336 heap.extend_from_slice(message.deref());
337 self.heap = BoundedVec::defensive_truncate_from(heap);
338 self.last = pos.saturated_into();
339 self.remaining.saturating_inc();
340 self.remaining_size.saturating_accrue(payload_len);
341 Ok(())
342 }
343
344 fn peek_first(&self) -> Option<BoundedSlice<u8, IntoU32<HeapSize, Size>>> {
348 if self.first > self.last {
349 return None
350 }
351 let f = (self.first.into() as usize).min(self.heap.len());
352 let mut item_slice = &self.heap[f..];
353 if let Ok(h) = ItemHeader::<Size>::decode(&mut item_slice) {
354 let payload_len = h.payload_len.into() as usize;
355 if payload_len <= item_slice.len() {
356 return Some(BoundedSlice::defensive_truncate_from(&item_slice[..payload_len]))
359 }
360 }
361 defensive!("message-queue: heap corruption");
362 None
363 }
364
365 fn skip_first(&mut self, is_processed: bool) {
367 let f = (self.first.into() as usize).min(self.heap.len());
368 if let Ok(mut h) = ItemHeader::decode(&mut &self.heap[f..]) {
369 if is_processed && !h.is_processed {
370 h.is_processed = true;
371 h.using_encoded(|d| self.heap[f..f + d.len()].copy_from_slice(d));
372 self.remaining.saturating_dec();
373 self.remaining_size.saturating_reduce(h.payload_len);
374 }
375 self.first
376 .saturating_accrue(ItemHeader::<Size>::max_encoded_len().saturated_into());
377 self.first.saturating_accrue(h.payload_len);
378 self.first_index.saturating_inc();
379 }
380 }
381
382 fn peek_index(&self, index: usize) -> Option<(usize, bool, &[u8])> {
384 let mut pos = 0;
385 let mut item_slice = &self.heap[..];
386 let header_len: usize = ItemHeader::<Size>::max_encoded_len().saturated_into();
387 for _ in 0..index {
388 let h = ItemHeader::<Size>::decode(&mut item_slice).ok()?;
389 let item_len = h.payload_len.into() as usize;
390 if item_slice.len() < item_len {
391 return None
392 }
393 item_slice = &item_slice[item_len..];
394 pos.saturating_accrue(header_len.saturating_add(item_len));
395 }
396 let h = ItemHeader::<Size>::decode(&mut item_slice).ok()?;
397 if item_slice.len() < h.payload_len.into() as usize {
398 return None
399 }
400 item_slice = &item_slice[..h.payload_len.into() as usize];
401 Some((pos, h.is_processed, item_slice))
402 }
403
404 fn note_processed_at_pos(&mut self, pos: usize) {
409 if let Ok(mut h) = ItemHeader::<Size>::decode(&mut &self.heap[pos..]) {
410 if !h.is_processed {
411 h.is_processed = true;
412 h.using_encoded(|d| self.heap[pos..pos + d.len()].copy_from_slice(d));
413 self.remaining.saturating_dec();
414 self.remaining_size.saturating_reduce(h.payload_len);
415 }
416 }
417 }
418
419 fn is_complete(&self) -> bool {
421 self.remaining.is_zero()
422 }
423}
424
425#[derive(Clone, Encode, Decode, MaxEncodedLen, TypeInfo, RuntimeDebug, PartialEq)]
427pub struct Neighbours<MessageOrigin> {
428 prev: MessageOrigin,
430 next: MessageOrigin,
432}
433
434#[derive(Clone, Encode, Decode, MaxEncodedLen, TypeInfo, RuntimeDebug)]
440pub struct BookState<MessageOrigin> {
441 begin: PageIndex,
444 end: PageIndex,
446 count: PageIndex,
451 ready_neighbours: Option<Neighbours<MessageOrigin>>,
454 message_count: u64,
456 size: u64,
458}
459
460impl<MessageOrigin> Default for BookState<MessageOrigin> {
461 fn default() -> Self {
462 Self { begin: 0, end: 0, count: 0, ready_neighbours: None, message_count: 0, size: 0 }
463 }
464}
465
466impl<MessageOrigin> From<BookState<MessageOrigin>> for QueueFootprint {
467 fn from(book: BookState<MessageOrigin>) -> Self {
468 QueueFootprint {
469 pages: book.count,
470 ready_pages: book.end.defensive_saturating_sub(book.begin),
471 storage: Footprint { count: book.message_count, size: book.size },
472 }
473 }
474}
475
476pub trait OnQueueChanged<Id> {
478 fn on_queue_changed(id: Id, fp: QueueFootprint);
480}
481
482impl<Id> OnQueueChanged<Id> for () {
483 fn on_queue_changed(_: Id, _: QueueFootprint) {}
484}
485
486pub trait ForceSetHead<O> {
488 fn force_set_head(weight: &mut WeightMeter, origin: &O) -> Result<bool, ()>;
495}
496
497#[frame_support::pallet]
498pub mod pallet {
499 use super::*;
500
501 #[pallet::pallet]
502 pub struct Pallet<T>(_);
503
504 #[pallet::config]
506 pub trait Config: frame_system::Config {
507 #[allow(deprecated)]
509 type RuntimeEvent: From<Event<Self>> + IsType<<Self as frame_system::Config>::RuntimeEvent>;
510
511 type WeightInfo: WeightInfo;
513
514 type MessageProcessor: ProcessMessage;
525
526 type Size: BaseArithmetic
528 + Unsigned
529 + Copy
530 + Into<u32>
531 + Member
532 + Encode
533 + Decode
534 + MaxEncodedLen
535 + ConstEncodedLen
536 + TypeInfo
537 + Default;
538
539 type QueueChangeHandler: OnQueueChanged<<Self::MessageProcessor as ProcessMessage>::Origin>;
542
543 type QueuePausedQuery: QueuePausedQuery<<Self::MessageProcessor as ProcessMessage>::Origin>;
549
550 #[pallet::constant]
556 type HeapSize: Get<Self::Size>;
557
558 #[pallet::constant]
562 type MaxStale: Get<u32>;
563
564 #[pallet::constant]
571 type ServiceWeight: Get<Option<Weight>>;
572
573 #[pallet::constant]
579 type IdleMaxServiceWeight: Get<Option<Weight>>;
580 }
581
582 #[pallet::event]
583 #[pallet::generate_deposit(pub(super) fn deposit_event)]
584 pub enum Event<T: Config> {
585 ProcessingFailed {
587 id: H256,
589 origin: MessageOriginOf<T>,
591 error: ProcessMessageError,
596 },
597 Processed {
599 id: H256,
601 origin: MessageOriginOf<T>,
603 weight_used: Weight,
605 success: bool,
612 },
613 OverweightEnqueued {
615 id: [u8; 32],
617 origin: MessageOriginOf<T>,
619 page_index: PageIndex,
621 message_index: T::Size,
623 },
624 PageReaped {
626 origin: MessageOriginOf<T>,
628 index: PageIndex,
630 },
631 }
632
633 #[pallet::error]
634 pub enum Error<T> {
635 NotReapable,
638 NoPage,
640 NoMessage,
642 AlreadyProcessed,
644 Queued,
646 InsufficientWeight,
648 TemporarilyUnprocessable,
653 QueuePaused,
657 RecursiveDisallowed,
659 }
660
661 #[pallet::storage]
663 pub type BookStateFor<T: Config> =
664 StorageMap<_, Twox64Concat, MessageOriginOf<T>, BookState<MessageOriginOf<T>>, ValueQuery>;
665
666 #[pallet::storage]
668 pub type ServiceHead<T: Config> = StorageValue<_, MessageOriginOf<T>, OptionQuery>;
669
670 #[pallet::storage]
672 pub type Pages<T: Config> = StorageDoubleMap<
673 _,
674 Twox64Concat,
675 MessageOriginOf<T>,
676 Twox64Concat,
677 PageIndex,
678 Page<T::Size, T::HeapSize>,
679 OptionQuery,
680 >;
681
682 #[pallet::hooks]
683 impl<T: Config> Hooks<BlockNumberFor<T>> for Pallet<T> {
684 fn on_initialize(_n: BlockNumberFor<T>) -> Weight {
685 if let Some(weight_limit) = T::ServiceWeight::get() {
686 Self::service_queues_impl(weight_limit, ServiceQueuesContext::OnInitialize)
687 } else {
688 Weight::zero()
689 }
690 }
691
692 fn on_idle(_n: BlockNumberFor<T>, remaining_weight: Weight) -> Weight {
693 if let Some(weight_limit) = T::IdleMaxServiceWeight::get() {
694 Self::service_queues_impl(
696 weight_limit.min(remaining_weight),
697 ServiceQueuesContext::OnIdle,
698 )
699 } else {
700 Weight::zero()
701 }
702 }
703
704 #[cfg(feature = "try-runtime")]
705 fn try_state(_: BlockNumberFor<T>) -> Result<(), sp_runtime::TryRuntimeError> {
706 Self::do_try_state()
707 }
708
709 #[cfg(test)]
711 fn integrity_test() {
712 Self::do_integrity_test().expect("Pallet config is valid; qed")
713 }
714 }
715
716 #[pallet::call]
717 impl<T: Config> Pallet<T> {
718 #[pallet::call_index(0)]
720 #[pallet::weight(T::WeightInfo::reap_page())]
721 pub fn reap_page(
722 origin: OriginFor<T>,
723 message_origin: MessageOriginOf<T>,
724 page_index: PageIndex,
725 ) -> DispatchResult {
726 ensure_signed(origin)?;
727 Self::do_reap_page(&message_origin, page_index)
728 }
729
730 #[pallet::call_index(1)]
744 #[pallet::weight(
745 T::WeightInfo::execute_overweight_page_updated().max(
746 T::WeightInfo::execute_overweight_page_removed()).saturating_add(*weight_limit)
747 )]
748 pub fn execute_overweight(
749 origin: OriginFor<T>,
750 message_origin: MessageOriginOf<T>,
751 page: PageIndex,
752 index: T::Size,
753 weight_limit: Weight,
754 ) -> DispatchResultWithPostInfo {
755 ensure_signed(origin)?;
756 let actual_weight =
757 Self::do_execute_overweight(message_origin, page, index, weight_limit)?;
758 Ok(Some(actual_weight).into())
759 }
760 }
761}
762
763#[derive(PartialEq, Debug)]
765enum PageExecutionStatus {
766 Bailed,
768 NoProgress,
772 NoMore,
778}
779
780#[derive(PartialEq, Debug)]
782enum ItemExecutionStatus {
783 Bailed,
785 NoProgress,
789 NoItem,
791 Executed(bool),
795}
796
797#[derive(PartialEq)]
799enum MessageExecutionStatus {
800 InsufficientWeight,
802 Overweight,
804 Processed,
806 Unprocessable { permanent: bool },
808 StackLimitReached,
815}
816
817#[derive(PartialEq)]
820enum ServiceQueuesContext {
821 OnIdle,
823 OnInitialize,
825 ServiceQueues,
827}
828
829impl<T: Config> Pallet<T> {
830 fn ready_ring_knit(origin: &MessageOriginOf<T>) -> Result<Neighbours<MessageOriginOf<T>>, ()> {
834 if let Some(head) = ServiceHead::<T>::get() {
835 let mut head_book_state = BookStateFor::<T>::get(&head);
836 let mut head_neighbours = head_book_state.ready_neighbours.take().ok_or(())?;
837 let tail = head_neighbours.prev;
838 head_neighbours.prev = origin.clone();
839 head_book_state.ready_neighbours = Some(head_neighbours);
840 BookStateFor::<T>::insert(&head, head_book_state);
841
842 let mut tail_book_state = BookStateFor::<T>::get(&tail);
843 let mut tail_neighbours = tail_book_state.ready_neighbours.take().ok_or(())?;
844 tail_neighbours.next = origin.clone();
845 tail_book_state.ready_neighbours = Some(tail_neighbours);
846 BookStateFor::<T>::insert(&tail, tail_book_state);
847
848 Ok(Neighbours { next: head, prev: tail })
849 } else {
850 ServiceHead::<T>::put(origin);
851 Ok(Neighbours { next: origin.clone(), prev: origin.clone() })
852 }
853 }
854
855 fn ready_ring_unknit(origin: &MessageOriginOf<T>, neighbours: Neighbours<MessageOriginOf<T>>) {
856 if origin == &neighbours.next {
857 debug_assert!(
858 origin == &neighbours.prev,
859 "unknitting from single item ring; outgoing must be only item"
860 );
861 ServiceHead::<T>::kill();
863 } else {
864 BookStateFor::<T>::mutate(&neighbours.next, |book_state| {
865 if let Some(ref mut n) = book_state.ready_neighbours {
866 n.prev = neighbours.prev.clone()
867 }
868 });
869 BookStateFor::<T>::mutate(&neighbours.prev, |book_state| {
870 if let Some(ref mut n) = book_state.ready_neighbours {
871 n.next = neighbours.next.clone()
872 }
873 });
874 if let Some(head) = ServiceHead::<T>::get() {
875 if &head == origin {
876 ServiceHead::<T>::put(neighbours.next);
877 }
878 } else {
879 defensive!("`ServiceHead` must be some if there was a ready queue");
880 }
881 }
882 }
883
884 fn bump_service_head(weight: &mut WeightMeter) -> Option<MessageOriginOf<T>> {
888 if weight.try_consume(T::WeightInfo::bump_service_head()).is_err() {
889 return None
890 }
891
892 if let Some(head) = ServiceHead::<T>::get() {
893 let mut head_book_state = BookStateFor::<T>::get(&head);
894 if let Some(head_neighbours) = head_book_state.ready_neighbours.take() {
895 ServiceHead::<T>::put(&head_neighbours.next);
896 Some(head)
897 } else {
898 defensive!("The head must point to a queue in the ready ring");
899 None
900 }
901 } else {
902 None
903 }
904 }
905
906 fn set_service_head(weight: &mut WeightMeter, queue: &MessageOriginOf<T>) -> Result<bool, ()> {
907 if weight.try_consume(T::WeightInfo::set_service_head()).is_err() {
908 return Err(())
909 }
910
911 if BookStateFor::<T>::get(queue).ready_neighbours.is_some() {
913 ServiceHead::<T>::put(queue);
914 Ok(true)
915 } else {
916 Ok(false)
917 }
918 }
919
920 fn max_message_weight(limit: Weight) -> Option<Weight> {
926 let service_weight = T::ServiceWeight::get().unwrap_or_default();
927 let on_idle_weight = T::IdleMaxServiceWeight::get().unwrap_or_default();
928
929 let max_message_weight =
932 if service_weight.any_gt(on_idle_weight) { service_weight } else { on_idle_weight };
933
934 if max_message_weight.is_zero() {
935 limit.checked_sub(&Self::single_msg_overhead())
937 } else {
938 max_message_weight.checked_sub(&Self::single_msg_overhead())
939 }
940 }
941
942 fn single_msg_overhead() -> Weight {
944 T::WeightInfo::bump_service_head()
945 .saturating_add(T::WeightInfo::service_queue_base())
946 .saturating_add(
947 T::WeightInfo::service_page_base_completion()
948 .max(T::WeightInfo::service_page_base_no_completion()),
949 )
950 .saturating_add(T::WeightInfo::service_page_item())
951 .saturating_add(T::WeightInfo::ready_ring_unknit())
952 }
953
954 #[cfg(test)]
958 fn do_integrity_test() -> Result<(), String> {
959 ensure!(!MaxMessageLenOf::<T>::get().is_zero(), "HeapSize too low");
960
961 let max_block = T::BlockWeights::get().max_block;
962
963 if let Some(service) = T::ServiceWeight::get() {
964 if Self::max_message_weight(service).is_none() {
965 return Err(format!(
966 "ServiceWeight too low: {}. Must be at least {}",
967 service,
968 Self::single_msg_overhead(),
969 ))
970 }
971
972 if service.any_gt(max_block) {
973 return Err(format!(
974 "ServiceWeight {service} is bigger than max block weight {max_block}"
975 ))
976 }
977 }
978
979 if let Some(on_idle) = T::IdleMaxServiceWeight::get() {
980 if on_idle.any_gt(max_block) {
981 return Err(format!(
982 "IdleMaxServiceWeight {on_idle} is bigger than max block weight {max_block}"
983 ))
984 }
985 }
986
987 if let (Some(service_weight), Some(on_idle)) =
988 (T::ServiceWeight::get(), T::IdleMaxServiceWeight::get())
989 {
990 if !(service_weight.all_gt(on_idle) ||
991 on_idle.all_gt(service_weight) ||
992 service_weight == on_idle)
993 {
994 return Err("One of `ServiceWeight` or `IdleMaxServiceWeight` needs to be `all_gt` or both need to be equal.".into())
995 }
996 }
997
998 Ok(())
999 }
1000
1001 fn do_enqueue_messages<'a>(
1002 origin: &MessageOriginOf<T>,
1003 messages: impl Iterator<Item = BoundedSlice<'a, u8, MaxMessageLenOf<T>>>,
1004 ) {
1005 let mut book_state = BookStateFor::<T>::get(origin);
1006
1007 let mut maybe_page = None;
1008 if book_state.end > book_state.begin {
1010 debug_assert!(book_state.ready_neighbours.is_some(), "Must be in ready ring if ready");
1011 maybe_page = Pages::<T>::get(origin, book_state.end - 1).or_else(|| {
1012 defensive!("Corruption: referenced page doesn't exist.");
1013 None
1014 });
1015 }
1016
1017 for message in messages {
1018 if let Some(mut page) = maybe_page {
1020 maybe_page = match page.try_append_message::<T>(message) {
1021 Ok(_) => Some(page),
1022 Err(_) => {
1023 Pages::<T>::insert(origin, book_state.end - 1, page);
1026 None
1027 },
1028 }
1029 }
1030 if maybe_page.is_none() {
1032 book_state.end.saturating_inc();
1033 book_state.count.saturating_inc();
1034 maybe_page = Some(Page::from_message::<T>(message));
1035 }
1036
1037 book_state.message_count.saturating_inc();
1039 book_state
1040 .size
1041 .saturating_accrue(message.len() as u64);
1043 }
1044
1045 if let Some(page) = maybe_page {
1047 Pages::<T>::insert(origin, book_state.end - 1, page);
1048 }
1049
1050 if book_state.ready_neighbours.is_none() {
1052 match Self::ready_ring_knit(origin) {
1053 Ok(neighbours) => book_state.ready_neighbours = Some(neighbours),
1054 Err(()) => {
1055 defensive!("Ring state invalid when knitting");
1056 },
1057 }
1058 }
1059
1060 BookStateFor::<T>::insert(origin, book_state);
1062 }
1063
1064 pub fn do_execute_overweight(
1069 origin: MessageOriginOf<T>,
1070 page_index: PageIndex,
1071 index: T::Size,
1072 weight_limit: Weight,
1073 ) -> Result<Weight, Error<T>> {
1074 match with_service_mutex(|| {
1075 Self::do_execute_overweight_inner(origin, page_index, index, weight_limit)
1076 }) {
1077 Err(()) => Err(Error::<T>::RecursiveDisallowed),
1078 Ok(x) => x,
1079 }
1080 }
1081
1082 fn do_execute_overweight_inner(
1084 origin: MessageOriginOf<T>,
1085 page_index: PageIndex,
1086 index: T::Size,
1087 weight_limit: Weight,
1088 ) -> Result<Weight, Error<T>> {
1089 let mut book_state = BookStateFor::<T>::get(&origin);
1090 ensure!(!T::QueuePausedQuery::is_paused(&origin), Error::<T>::QueuePaused);
1091
1092 let mut page = Pages::<T>::get(&origin, page_index).ok_or(Error::<T>::NoPage)?;
1093 let (pos, is_processed, payload) =
1094 page.peek_index(index.into() as usize).ok_or(Error::<T>::NoMessage)?;
1095 let payload_len = payload.len() as u64;
1096 ensure!(
1097 page_index < book_state.begin ||
1098 (page_index == book_state.begin && pos < page.first.into() as usize),
1099 Error::<T>::Queued
1100 );
1101 ensure!(!is_processed, Error::<T>::AlreadyProcessed);
1102 use MessageExecutionStatus::*;
1103 let mut weight_counter = WeightMeter::with_limit(weight_limit);
1104 match Self::process_message_payload(
1105 origin.clone(),
1106 page_index,
1107 index,
1108 payload,
1109 &mut weight_counter,
1110 Weight::MAX,
1111 ) {
1114 Overweight | InsufficientWeight => Err(Error::<T>::InsufficientWeight),
1115 StackLimitReached | Unprocessable { permanent: false } =>
1116 Err(Error::<T>::TemporarilyUnprocessable),
1117 Unprocessable { permanent: true } | Processed => {
1118 page.note_processed_at_pos(pos);
1119 book_state.message_count.saturating_dec();
1120 book_state.size.saturating_reduce(payload_len);
1121 let page_weight = if page.remaining.is_zero() {
1122 debug_assert!(
1123 page.remaining_size.is_zero(),
1124 "no messages remaining; no space taken; qed"
1125 );
1126 Pages::<T>::remove(&origin, page_index);
1127 debug_assert!(book_state.count >= 1, "page exists, so book must have pages");
1128 book_state.count.saturating_dec();
1129 T::WeightInfo::execute_overweight_page_removed()
1130 } else {
1133 Pages::<T>::insert(&origin, page_index, page);
1134 T::WeightInfo::execute_overweight_page_updated()
1135 };
1136 BookStateFor::<T>::insert(&origin, &book_state);
1137 T::QueueChangeHandler::on_queue_changed(origin, book_state.into());
1138 Ok(weight_counter.consumed().saturating_add(page_weight))
1139 },
1140 }
1141 }
1142
1143 fn do_reap_page(origin: &MessageOriginOf<T>, page_index: PageIndex) -> DispatchResult {
1145 match with_service_mutex(|| Self::do_reap_page_inner(origin, page_index)) {
1146 Err(()) => Err(Error::<T>::RecursiveDisallowed.into()),
1147 Ok(x) => x,
1148 }
1149 }
1150
1151 fn do_reap_page_inner(origin: &MessageOriginOf<T>, page_index: PageIndex) -> DispatchResult {
1153 let mut book_state = BookStateFor::<T>::get(origin);
1154 ensure!(page_index < book_state.begin, Error::<T>::NotReapable);
1157
1158 let page = Pages::<T>::get(origin, page_index).ok_or(Error::<T>::NoPage)?;
1159
1160 let reapable = page.remaining.is_zero();
1162
1163 let cullable = || {
1165 let total_pages = book_state.count;
1166 let ready_pages = book_state.end.saturating_sub(book_state.begin).min(total_pages);
1167
1168 let stale_pages = total_pages - ready_pages;
1176
1177 let max_stale = T::MaxStale::get();
1181
1182 let overflow = match stale_pages.checked_sub(max_stale + 1) {
1185 Some(x) => x + 1,
1186 None => return false,
1187 };
1188
1189 let backlog = (max_stale * max_stale / overflow).max(max_stale);
1198
1199 let watermark = book_state.begin.saturating_sub(backlog);
1200 page_index < watermark
1201 };
1202 ensure!(reapable || cullable(), Error::<T>::NotReapable);
1203
1204 Pages::<T>::remove(origin, page_index);
1205 debug_assert!(book_state.count > 0, "reaping a page implies there are pages");
1206 book_state.count.saturating_dec();
1207 book_state.message_count.saturating_reduce(page.remaining.into() as u64);
1208 book_state.size.saturating_reduce(page.remaining_size.into() as u64);
1209 BookStateFor::<T>::insert(origin, &book_state);
1210 T::QueueChangeHandler::on_queue_changed(origin.clone(), book_state.into());
1211 Self::deposit_event(Event::PageReaped { origin: origin.clone(), index: page_index });
1212
1213 Ok(())
1214 }
1215
1216 fn service_queue(
1220 origin: MessageOriginOf<T>,
1221 weight: &mut WeightMeter,
1222 overweight_limit: Weight,
1223 ) -> (bool, Option<MessageOriginOf<T>>) {
1224 use PageExecutionStatus::*;
1225 if weight
1226 .try_consume(
1227 T::WeightInfo::service_queue_base()
1228 .saturating_add(T::WeightInfo::ready_ring_unknit()),
1229 )
1230 .is_err()
1231 {
1232 return (false, None)
1233 }
1234
1235 let mut book_state = BookStateFor::<T>::get(&origin);
1236 let mut total_processed = 0;
1237 if T::QueuePausedQuery::is_paused(&origin) {
1238 let next_ready = book_state.ready_neighbours.as_ref().map(|x| x.next.clone());
1239 return (false, next_ready)
1240 }
1241
1242 while book_state.end > book_state.begin {
1243 let (processed, status) =
1244 Self::service_page(&origin, &mut book_state, weight, overweight_limit);
1245 total_processed.saturating_accrue(processed);
1246 match status {
1247 Bailed | NoProgress => break,
1249 NoMore => (),
1251 };
1252 book_state.begin.saturating_inc();
1253 }
1254 let next_ready = book_state.ready_neighbours.as_ref().map(|x| x.next.clone());
1255 if book_state.begin >= book_state.end {
1256 if let Some(neighbours) = book_state.ready_neighbours.take() {
1258 Self::ready_ring_unknit(&origin, neighbours);
1259 } else if total_processed > 0 {
1260 defensive!("Freshly processed queue must have been ready");
1261 }
1262 }
1263 BookStateFor::<T>::insert(&origin, &book_state);
1264 if total_processed > 0 {
1265 T::QueueChangeHandler::on_queue_changed(origin, book_state.into());
1266 }
1267 (total_processed > 0, next_ready)
1268 }
1269
1270 fn service_page(
1274 origin: &MessageOriginOf<T>,
1275 book_state: &mut BookStateOf<T>,
1276 weight: &mut WeightMeter,
1277 overweight_limit: Weight,
1278 ) -> (u32, PageExecutionStatus) {
1279 use PageExecutionStatus::*;
1280 if weight
1281 .try_consume(
1282 T::WeightInfo::service_page_base_completion()
1283 .max(T::WeightInfo::service_page_base_no_completion()),
1284 )
1285 .is_err()
1286 {
1287 return (0, Bailed)
1288 }
1289
1290 let page_index = book_state.begin;
1291 let mut page = match Pages::<T>::get(origin, page_index) {
1292 Some(p) => p,
1293 None => {
1294 defensive!("message-queue: referenced page not found");
1295 return (0, NoMore)
1296 },
1297 };
1298
1299 let mut total_processed = 0;
1300
1301 let status = loop {
1303 use ItemExecutionStatus::*;
1304 match Self::service_page_item(
1305 origin,
1306 page_index,
1307 book_state,
1308 &mut page,
1309 weight,
1310 overweight_limit,
1311 ) {
1312 Bailed => break PageExecutionStatus::Bailed,
1313 NoItem => break PageExecutionStatus::NoMore,
1314 NoProgress => break PageExecutionStatus::NoProgress,
1315 Executed(true) => total_processed.saturating_inc(),
1317 Executed(false) => (),
1318 }
1319 };
1320
1321 if page.is_complete() {
1322 debug_assert!(status != Bailed, "we never bail if a page became complete");
1323 Pages::<T>::remove(origin, page_index);
1324 debug_assert!(book_state.count > 0, "completing a page implies there are pages");
1325 book_state.count.saturating_dec();
1326 } else {
1327 Pages::<T>::insert(origin, page_index, page);
1328 }
1329 (total_processed, status)
1330 }
1331
1332 pub(crate) fn service_page_item(
1334 origin: &MessageOriginOf<T>,
1335 page_index: PageIndex,
1336 book_state: &mut BookStateOf<T>,
1337 page: &mut PageOf<T>,
1338 weight: &mut WeightMeter,
1339 overweight_limit: Weight,
1340 ) -> ItemExecutionStatus {
1341 use MessageExecutionStatus::*;
1342 if page.is_complete() {
1345 return ItemExecutionStatus::NoItem
1346 }
1347 if weight.try_consume(T::WeightInfo::service_page_item()).is_err() {
1348 return ItemExecutionStatus::Bailed
1349 }
1350
1351 let payload = &match page.peek_first() {
1352 Some(m) => m,
1353 None => return ItemExecutionStatus::NoItem,
1354 }[..];
1355 let payload_len = payload.len() as u64;
1356
1357 Pages::<T>::insert(origin, page_index, &*page);
1359 BookStateFor::<T>::insert(origin, &*book_state);
1360
1361 let res = Self::process_message_payload(
1362 origin.clone(),
1363 page_index,
1364 page.first_index,
1365 payload,
1366 weight,
1367 overweight_limit,
1368 );
1369
1370 *book_state = BookStateFor::<T>::get(origin);
1372 if let Some(new_page) = Pages::<T>::get(origin, page_index) {
1373 *page = new_page;
1374 } else {
1375 defensive!("page must exist since we just inserted it and recursive calls are not allowed to remove anything");
1376 return ItemExecutionStatus::NoItem
1377 };
1378
1379 let is_processed = match res {
1380 InsufficientWeight => return ItemExecutionStatus::Bailed,
1381 Unprocessable { permanent: false } => return ItemExecutionStatus::NoProgress,
1382 Processed | Unprocessable { permanent: true } | StackLimitReached => true,
1383 Overweight => false,
1384 };
1385
1386 if is_processed {
1387 book_state.message_count.saturating_dec();
1388 book_state.size.saturating_reduce(payload_len as u64);
1389 }
1390 page.skip_first(is_processed);
1391 ItemExecutionStatus::Executed(is_processed)
1392 }
1393
1394 #[cfg(any(test, feature = "try-runtime", feature = "std"))]
1414 pub fn do_try_state() -> Result<(), sp_runtime::TryRuntimeError> {
1415 ensure!(
1417 BookStateFor::<T>::iter_keys().count() == BookStateFor::<T>::iter_values().count(),
1418 "Memory Corruption in BookStateFor"
1419 );
1420 ensure!(
1422 Pages::<T>::iter_keys().count() == Pages::<T>::iter_values().count(),
1423 "Memory Corruption in Pages"
1424 );
1425
1426 for book in BookStateFor::<T>::iter_values() {
1428 ensure!(book.end >= book.begin, "Invariant");
1429 ensure!(book.end < 1 << 30, "Likely overflow or corruption");
1430 ensure!(book.message_count < 1 << 30, "Likely overflow or corruption");
1431 ensure!(book.size < 1 << 30, "Likely overflow or corruption");
1432 ensure!(book.count < 1 << 30, "Likely overflow or corruption");
1433
1434 let fp: QueueFootprint = book.into();
1435 ensure!(fp.ready_pages <= fp.pages, "There cannot be more ready than total pages");
1436 }
1437
1438 let Some(starting_origin) = ServiceHead::<T>::get() else { return Ok(()) };
1440
1441 while let Some(head) = Self::bump_service_head(&mut WeightMeter::new()) {
1442 ensure!(
1443 BookStateFor::<T>::contains_key(&head),
1444 "Service head must point to an existing book"
1445 );
1446
1447 let head_book_state = BookStateFor::<T>::get(&head);
1448 ensure!(
1449 head_book_state.message_count > 0,
1450 "There must be some messages if in ReadyRing"
1451 );
1452 ensure!(head_book_state.size > 0, "There must be some message size if in ReadyRing");
1453 ensure!(
1454 head_book_state.end > head_book_state.begin,
1455 "End > Begin if unprocessed messages exists"
1456 );
1457 ensure!(
1458 head_book_state.ready_neighbours.is_some(),
1459 "There must be neighbours if in ReadyRing"
1460 );
1461
1462 if head_book_state.ready_neighbours.as_ref().unwrap().next == head {
1463 ensure!(
1464 head_book_state.ready_neighbours.as_ref().unwrap().prev == head,
1465 "Can only happen if only queue in ReadyRing"
1466 );
1467 }
1468
1469 for page_index in head_book_state.begin..head_book_state.end {
1470 let page = Pages::<T>::get(&head, page_index).unwrap();
1471 let remaining_messages = page.remaining;
1472 let mut counted_remaining_messages: u32 = 0;
1473 ensure!(
1474 remaining_messages > 0.into(),
1475 "These must be some messages that have not been processed yet!"
1476 );
1477
1478 for i in 0..u32::MAX {
1479 if let Some((_, processed, _)) = page.peek_index(i as usize) {
1480 if !processed {
1481 counted_remaining_messages += 1;
1482 }
1483 } else {
1484 break
1485 }
1486 }
1487
1488 ensure!(
1489 remaining_messages.into() == counted_remaining_messages,
1490 "Memory Corruption"
1491 );
1492 }
1493
1494 if head_book_state.ready_neighbours.as_ref().unwrap().next == starting_origin {
1495 break
1496 }
1497 }
1498 Ok(())
1499 }
1500
1501 #[cfg(feature = "std")]
1517 pub fn debug_info() -> String {
1518 let mut info = String::new();
1519 for (origin, book_state) in BookStateFor::<T>::iter() {
1520 let mut queue = format!("queue {:?}:\n", &origin);
1521 let mut pages = Pages::<T>::iter_prefix(&origin).collect::<Vec<_>>();
1522 pages.sort_by(|(a, _), (b, _)| a.cmp(b));
1523 for (page_index, mut page) in pages.into_iter() {
1524 let page_info = if book_state.begin == page_index { ">" } else { " " };
1525 let mut page_info = format!(
1526 "{} page {} ({:?} first, {:?} last, {:?} remain): [ ",
1527 page_info, page_index, page.first, page.last, page.remaining
1528 );
1529 for i in 0..u32::MAX {
1530 if let Some((_, processed, message)) =
1531 page.peek_index(i.try_into().expect("std-only code"))
1532 {
1533 let msg = String::from_utf8_lossy(message);
1534 if processed {
1535 page_info.push('*');
1536 }
1537 page_info.push_str(&format!("{:?}, ", msg));
1538 page.skip_first(true);
1539 } else {
1540 break
1541 }
1542 }
1543 page_info.push_str("]\n");
1544 queue.push_str(&page_info);
1545 }
1546 info.push_str(&queue);
1547 }
1548 info
1549 }
1550
1551 fn process_message_payload(
1559 origin: MessageOriginOf<T>,
1560 page_index: PageIndex,
1561 message_index: T::Size,
1562 message: &[u8],
1563 meter: &mut WeightMeter,
1564 overweight_limit: Weight,
1565 ) -> MessageExecutionStatus {
1566 let mut id = sp_io::hashing::blake2_256(message);
1567 use ProcessMessageError::*;
1568 let prev_consumed = meter.consumed();
1569
1570 let transaction =
1571 storage::with_transaction(|| -> TransactionOutcome<Result<_, DispatchError>> {
1572 let res =
1573 T::MessageProcessor::process_message(message, origin.clone(), meter, &mut id);
1574 match &res {
1575 Ok(_) => TransactionOutcome::Commit(Ok(res)),
1576 Err(_) => TransactionOutcome::Rollback(Ok(res)),
1577 }
1578 });
1579
1580 let transaction = match transaction {
1581 Ok(result) => result,
1582 _ => {
1583 defensive!(
1584 "Error occurred processing message, storage changes will be rolled back"
1585 );
1586 return MessageExecutionStatus::Unprocessable { permanent: true }
1587 },
1588 };
1589
1590 match transaction {
1591 Err(Overweight(w)) if w.any_gt(overweight_limit) => {
1592 Self::deposit_event(Event::<T>::OverweightEnqueued {
1594 id,
1595 origin,
1596 page_index,
1597 message_index,
1598 });
1599 MessageExecutionStatus::Overweight
1600 },
1601 Err(Overweight(_)) => {
1602 MessageExecutionStatus::InsufficientWeight
1605 },
1606 Err(Yield) => {
1607 MessageExecutionStatus::Unprocessable { permanent: false }
1609 },
1610 Err(error @ BadFormat | error @ Corrupt | error @ Unsupported) => {
1611 Self::deposit_event(Event::<T>::ProcessingFailed { id: id.into(), origin, error });
1613 MessageExecutionStatus::Unprocessable { permanent: true }
1614 },
1615 Err(error @ StackLimitReached) => {
1616 Self::deposit_event(Event::<T>::ProcessingFailed { id: id.into(), origin, error });
1617 MessageExecutionStatus::StackLimitReached
1618 },
1619 Ok(success) => {
1620 let weight_used = meter.consumed().saturating_sub(prev_consumed);
1622 Self::deposit_event(Event::<T>::Processed {
1623 id: id.into(),
1624 origin,
1625 weight_used,
1626 success,
1627 });
1628 MessageExecutionStatus::Processed
1629 },
1630 }
1631 }
1632
1633 fn service_queues_impl(weight_limit: Weight, context: ServiceQueuesContext) -> Weight {
1634 let mut weight = WeightMeter::with_limit(weight_limit);
1635
1636 let overweight_limit = Self::max_message_weight(weight_limit).unwrap_or_else(|| {
1638 if matches!(context, ServiceQueuesContext::OnInitialize) {
1639 defensive!("Not enough weight to service a single message.");
1640 }
1641 Weight::zero()
1642 });
1643
1644 match with_service_mutex(|| {
1645 let mut next = match Self::bump_service_head(&mut weight) {
1646 Some(h) => h,
1647 None => return weight.consumed(),
1648 };
1649 let mut last_no_progress = None;
1653
1654 loop {
1655 let (progressed, n) =
1656 Self::service_queue(next.clone(), &mut weight, overweight_limit);
1657 next = match n {
1658 Some(n) =>
1659 if !progressed {
1660 if last_no_progress == Some(n.clone()) {
1661 break
1662 }
1663 if last_no_progress.is_none() {
1664 last_no_progress = Some(next.clone())
1665 }
1666 n
1667 } else {
1668 last_no_progress = None;
1669 n
1670 },
1671 None => break,
1672 }
1673 }
1674 weight.consumed()
1675 }) {
1676 Err(()) => weight.consumed(),
1677 Ok(w) => w,
1678 }
1679 }
1680}
1681
1682impl<T: Config> ForceSetHead<MessageOriginOf<T>> for Pallet<T> {
1683 fn force_set_head(weight: &mut WeightMeter, origin: &MessageOriginOf<T>) -> Result<bool, ()> {
1684 Pallet::<T>::set_service_head(weight, origin)
1685 }
1686}
1687
1688pub(crate) fn with_service_mutex<F: FnOnce() -> R, R>(f: F) -> Result<R, ()> {
1690 environmental::environmental!(token: Option<()>);
1692
1693 token::using_once(&mut Some(()), || {
1694 let hold = token::with(|t| t.take()).ok_or(()).defensive()?.ok_or(())?;
1696
1697 defer! {
1699 token::with(|t| {
1700 *t = Some(hold);
1701 });
1702 }
1703
1704 Ok(f())
1705 })
1706}
1707
1708pub struct MaxEncodedLenOf<T>(core::marker::PhantomData<T>);
1710impl<T: MaxEncodedLen> Get<u32> for MaxEncodedLenOf<T> {
1711 fn get() -> u32 {
1712 T::max_encoded_len() as u32
1713 }
1714}
1715
1716pub struct MaxMessageLen<Origin, Size, HeapSize>(
1718 core::marker::PhantomData<(Origin, Size, HeapSize)>,
1719);
1720impl<Origin: MaxEncodedLen, Size: MaxEncodedLen + Into<u32>, HeapSize: Get<Size>> Get<u32>
1721 for MaxMessageLen<Origin, Size, HeapSize>
1722{
1723 fn get() -> u32 {
1724 (HeapSize::get().into()).saturating_sub(ItemHeader::<Size>::max_encoded_len() as u32)
1725 }
1726}
1727
1728pub type MaxMessageLenOf<T> =
1730 MaxMessageLen<MessageOriginOf<T>, <T as Config>::Size, <T as Config>::HeapSize>;
1731pub type MaxOriginLenOf<T> = MaxEncodedLenOf<MessageOriginOf<T>>;
1733pub type MessageOriginOf<T> = <<T as Config>::MessageProcessor as ProcessMessage>::Origin;
1735pub type HeapSizeU32Of<T> = IntoU32<<T as Config>::HeapSize, <T as Config>::Size>;
1737pub type PageOf<T> = Page<<T as Config>::Size, <T as Config>::HeapSize>;
1739pub type BookStateOf<T> = BookState<MessageOriginOf<T>>;
1741
1742pub struct IntoU32<T, O>(core::marker::PhantomData<(T, O)>);
1745impl<T: Get<O>, O: Into<u32>> Get<u32> for IntoU32<T, O> {
1746 fn get() -> u32 {
1747 T::get().into()
1748 }
1749}
1750
1751impl<T: Config> ServiceQueues for Pallet<T> {
1752 type OverweightMessageAddress = (MessageOriginOf<T>, PageIndex, T::Size);
1753
1754 fn service_queues(weight_limit: Weight) -> Weight {
1755 Self::service_queues_impl(weight_limit, ServiceQueuesContext::ServiceQueues)
1756 }
1757
1758 fn execute_overweight(
1762 weight_limit: Weight,
1763 (message_origin, page, index): Self::OverweightMessageAddress,
1764 ) -> Result<Weight, ExecuteOverweightError> {
1765 let mut weight = WeightMeter::with_limit(weight_limit);
1766 if weight
1767 .try_consume(
1768 T::WeightInfo::execute_overweight_page_removed()
1769 .max(T::WeightInfo::execute_overweight_page_updated()),
1770 )
1771 .is_err()
1772 {
1773 return Err(ExecuteOverweightError::InsufficientWeight)
1774 }
1775
1776 Pallet::<T>::do_execute_overweight(message_origin, page, index, weight.remaining()).map_err(
1777 |e| match e {
1778 Error::<T>::InsufficientWeight => ExecuteOverweightError::InsufficientWeight,
1779 Error::<T>::AlreadyProcessed => ExecuteOverweightError::AlreadyProcessed,
1780 Error::<T>::QueuePaused => ExecuteOverweightError::QueuePaused,
1781 Error::<T>::NoPage | Error::<T>::NoMessage | Error::<T>::Queued =>
1782 ExecuteOverweightError::NotFound,
1783 Error::<T>::RecursiveDisallowed => ExecuteOverweightError::RecursiveDisallowed,
1784 _ => ExecuteOverweightError::Other,
1785 },
1786 )
1787 }
1788}
1789
1790impl<T: Config> EnqueueMessage<MessageOriginOf<T>> for Pallet<T> {
1791 type MaxMessageLen =
1792 MaxMessageLen<<T::MessageProcessor as ProcessMessage>::Origin, T::Size, T::HeapSize>;
1793
1794 fn enqueue_message(
1795 message: BoundedSlice<u8, Self::MaxMessageLen>,
1796 origin: <T::MessageProcessor as ProcessMessage>::Origin,
1797 ) {
1798 Self::do_enqueue_messages(&origin, [message].into_iter());
1799 let book_state = BookStateFor::<T>::get(&origin);
1800 T::QueueChangeHandler::on_queue_changed(origin, book_state.into());
1801 }
1802
1803 fn enqueue_messages<'a>(
1804 messages: impl Iterator<Item = BoundedSlice<'a, u8, Self::MaxMessageLen>>,
1805 origin: <T::MessageProcessor as ProcessMessage>::Origin,
1806 ) {
1807 Self::do_enqueue_messages(&origin, messages);
1808 let book_state = BookStateFor::<T>::get(&origin);
1809 T::QueueChangeHandler::on_queue_changed(origin, book_state.into());
1810 }
1811
1812 fn sweep_queue(origin: MessageOriginOf<T>) {
1813 if !BookStateFor::<T>::contains_key(&origin) {
1814 return
1815 }
1816 let mut book_state = BookStateFor::<T>::get(&origin);
1817 book_state.begin = book_state.end;
1818 if let Some(neighbours) = book_state.ready_neighbours.take() {
1819 Self::ready_ring_unknit(&origin, neighbours);
1820 }
1821 BookStateFor::<T>::insert(&origin, &book_state);
1822 }
1823}
1824
1825impl<T: Config> QueueFootprintQuery<MessageOriginOf<T>> for Pallet<T> {
1826 type MaxMessageLen =
1827 MaxMessageLen<<T::MessageProcessor as ProcessMessage>::Origin, T::Size, T::HeapSize>;
1828
1829 fn get_batches_footprints<'a>(
1830 origin: MessageOriginOf<T>,
1831 msgs: impl Iterator<Item = BoundedSlice<'a, u8, Self::MaxMessageLen>>,
1832 total_pages_limit: u32,
1833 ) -> BatchesFootprints {
1834 let mut batches_footprints = BatchesFootprints::default();
1835
1836 let mut new_page = false;
1837 let mut total_pages_count = 0;
1838 let mut current_page_pos: usize = T::HeapSize::get().into() as usize;
1839
1840 let book = BookStateFor::<T>::get(&origin);
1841 if book.end > book.begin {
1842 total_pages_count = book.end - book.begin;
1843 if let Some(page) = Pages::<T>::get(origin, book.end - 1) {
1844 current_page_pos = page.heap_pos();
1845 batches_footprints.first_page_pos = current_page_pos;
1846 }
1847 }
1848
1849 let mut msgs = msgs.peekable();
1850 while let Some(msg) = msgs.peek() {
1851 if total_pages_count > total_pages_limit {
1852 return batches_footprints;
1853 }
1854
1855 match Page::<T::Size, T::HeapSize>::can_append_message_at(current_page_pos, msg.len()) {
1856 Ok(new_pos) => {
1857 current_page_pos = new_pos;
1858 batches_footprints.push(msg, new_page);
1859 new_page = false;
1860 msgs.next();
1861 },
1862 Err(_) => {
1863 new_page = true;
1866 total_pages_count += 1;
1867 current_page_pos = 0;
1868 },
1869 }
1870 }
1871
1872 batches_footprints
1873 }
1874
1875 fn footprint(origin: MessageOriginOf<T>) -> QueueFootprint {
1876 BookStateFor::<T>::get(&origin).into()
1877 }
1878}