polkadot_runtime_parachains/
dmp.rs1use crate::{
46 configuration::{self, HostConfiguration},
47 initializer, paras, FeeTracker, GetMinFeeFactor,
48};
49use alloc::vec::Vec;
50use core::fmt;
51use frame_support::pallet_prelude::*;
52use frame_system::pallet_prelude::BlockNumberFor;
53use polkadot_primitives::{DownwardMessage, Hash, Id as ParaId, InboundDownwardMessage};
54use sp_core::MAX_POSSIBLE_ALLOCATION;
55use sp_runtime::{
56 traits::{BlakeTwo256, Hash as HashT, SaturatedConversion},
57 FixedU128,
58};
59use xcm::latest::SendError;
60
61pub use pallet::*;
62
63#[cfg(test)]
64mod tests;
65
66const THRESHOLD_FACTOR: u32 = 2;
67
68#[derive(Debug)]
70pub enum QueueDownwardMessageError {
71 ExceedsMaxMessageSize,
73 Unroutable,
75}
76
77impl From<QueueDownwardMessageError> for SendError {
78 fn from(err: QueueDownwardMessageError) -> Self {
79 match err {
80 QueueDownwardMessageError::ExceedsMaxMessageSize => SendError::ExceedsMaxMessageSize,
81 QueueDownwardMessageError::Unroutable => SendError::Unroutable,
82 }
83 }
84}
85
86pub(crate) enum ProcessedDownwardMessagesAcceptanceErr {
89 AdvancementRule,
91 Underflow { processed_downward_messages: u32, dmq_length: u32 },
93}
94
95impl fmt::Debug for ProcessedDownwardMessagesAcceptanceErr {
96 fn fmt(&self, fmt: &mut fmt::Formatter) -> fmt::Result {
97 use ProcessedDownwardMessagesAcceptanceErr::*;
98 match *self {
99 AdvancementRule => {
100 write!(fmt, "DMQ is not empty, but processed_downward_messages is 0",)
101 },
102 Underflow { processed_downward_messages, dmq_length } => write!(
103 fmt,
104 "processed_downward_messages = {}, but dmq_length is only {}",
105 processed_downward_messages, dmq_length,
106 ),
107 }
108 }
109}
110
111#[frame_support::pallet]
112pub mod pallet {
113 use super::*;
114
115 #[pallet::pallet]
116 #[pallet::without_storage_info]
117 pub struct Pallet<T>(_);
118
119 #[pallet::config]
120 pub trait Config: frame_system::Config + configuration::Config + paras::Config {}
121
122 #[pallet::storage]
124 pub type DownwardMessageQueues<T: Config> = StorageMap<
125 _,
126 Twox64Concat,
127 ParaId,
128 Vec<InboundDownwardMessage<BlockNumberFor<T>>>,
129 ValueQuery,
130 >;
131
132 #[pallet::storage]
140 pub(crate) type DownwardMessageQueueHeads<T: Config> =
141 StorageMap<_, Twox64Concat, ParaId, Hash, ValueQuery>;
142
143 #[pallet::storage]
145 pub(crate) type DeliveryFeeFactor<T: Config> =
146 StorageMap<_, Twox64Concat, ParaId, FixedU128, ValueQuery, GetMinFeeFactor<Pallet<T>>>;
147}
148impl<T: Config> Pallet<T> {
150 pub(crate) fn initializer_initialize(_now: BlockNumberFor<T>) -> Weight {
152 Weight::zero()
153 }
154
155 pub(crate) fn initializer_finalize() {}
157
158 pub(crate) fn initializer_on_new_session(
160 _notification: &initializer::SessionChangeNotification<BlockNumberFor<T>>,
161 outgoing_paras: &[ParaId],
162 ) {
163 Self::perform_outgoing_para_cleanup(outgoing_paras);
164 }
165
166 fn perform_outgoing_para_cleanup(outgoing: &[ParaId]) {
169 for outgoing_para in outgoing {
170 Self::clean_dmp_after_outgoing(outgoing_para);
171 }
172 }
173
174 fn clean_dmp_after_outgoing(outgoing_para: &ParaId) {
176 DownwardMessageQueues::<T>::remove(outgoing_para);
177 DownwardMessageQueueHeads::<T>::remove(outgoing_para);
178 }
179
180 pub fn can_queue_downward_message(
184 config: &HostConfiguration<BlockNumberFor<T>>,
185 para: &ParaId,
186 msg: &DownwardMessage,
187 ) -> Result<(), QueueDownwardMessageError> {
188 let serialized_len = msg.len() as u32;
189 if serialized_len > config.max_downward_message_size {
190 return Err(QueueDownwardMessageError::ExceedsMaxMessageSize)
191 }
192
193 if Self::dmq_length(*para) > Self::dmq_max_length(config.max_downward_message_size) {
195 return Err(QueueDownwardMessageError::ExceedsMaxMessageSize)
196 }
197
198 if !paras::Heads::<T>::contains_key(para) {
200 return Err(QueueDownwardMessageError::Unroutable)
201 }
202
203 Ok(())
204 }
205
206 pub fn queue_downward_message(
215 config: &HostConfiguration<BlockNumberFor<T>>,
216 para: ParaId,
217 msg: DownwardMessage,
218 ) -> Result<(), QueueDownwardMessageError> {
219 let serialized_len = msg.len();
220 Self::can_queue_downward_message(config, ¶, &msg)?;
221
222 let inbound =
223 InboundDownwardMessage { msg, sent_at: frame_system::Pallet::<T>::block_number() };
224
225 DownwardMessageQueueHeads::<T>::mutate(para, |head| {
227 let new_head =
228 BlakeTwo256::hash_of(&(*head, inbound.sent_at, T::Hashing::hash_of(&inbound.msg)));
229 *head = new_head;
230 });
231
232 let q_len = DownwardMessageQueues::<T>::mutate(para, |v| {
233 v.push(inbound);
234 v.len()
235 });
236
237 let threshold =
238 Self::dmq_max_length(config.max_downward_message_size).saturating_div(THRESHOLD_FACTOR);
239 if q_len > (threshold as usize) {
240 Self::increase_fee_factor(para, serialized_len as u128);
241 }
242
243 Ok(())
244 }
245
246 pub(crate) fn check_processed_downward_messages(
248 para: ParaId,
249 relay_parent_number: BlockNumberFor<T>,
250 processed_downward_messages: u32,
251 ) -> Result<(), ProcessedDownwardMessagesAcceptanceErr> {
252 let dmq_length = Self::dmq_length(para);
253
254 if dmq_length > 0 && processed_downward_messages == 0 {
255 let contents = Self::dmq_contents(para);
259
260 if contents.get(0).map_or(false, |msg| msg.sent_at <= relay_parent_number) {
262 return Err(ProcessedDownwardMessagesAcceptanceErr::AdvancementRule)
263 }
264 }
265
266 if dmq_length < processed_downward_messages {
271 return Err(ProcessedDownwardMessagesAcceptanceErr::Underflow {
272 processed_downward_messages,
273 dmq_length,
274 })
275 }
276
277 Ok(())
278 }
279
280 pub(crate) fn prune_dmq(para: ParaId, processed_downward_messages: u32) {
282 let q_len = DownwardMessageQueues::<T>::mutate(para, |q| {
283 let processed_downward_messages = processed_downward_messages as usize;
284 if processed_downward_messages > q.len() {
285 q.clear();
288 } else {
289 *q = q.split_off(processed_downward_messages);
290 }
291 q.len()
292 });
293
294 let config = configuration::ActiveConfig::<T>::get();
295 let threshold =
296 Self::dmq_max_length(config.max_downward_message_size).saturating_div(THRESHOLD_FACTOR);
297 if q_len <= (threshold as usize) {
298 Self::decrease_fee_factor(para);
299 }
300 }
301
302 #[cfg(test)]
305 fn dmq_mqc_head(para: ParaId) -> Hash {
306 DownwardMessageQueueHeads::<T>::get(¶)
307 }
308
309 pub(crate) fn dmq_length(para: ParaId) -> u32 {
313 DownwardMessageQueues::<T>::decode_len(¶)
314 .unwrap_or(0)
315 .saturated_into::<u32>()
316 }
317
318 fn dmq_max_length(max_downward_message_size: u32) -> u32 {
319 MAX_POSSIBLE_ALLOCATION.checked_div(max_downward_message_size).unwrap_or(0)
320 }
321
322 pub(crate) fn dmq_contents(
326 recipient: ParaId,
327 ) -> Vec<InboundDownwardMessage<BlockNumberFor<T>>> {
328 DownwardMessageQueues::<T>::get(&recipient)
329 }
330
331 #[cfg(any(feature = "runtime-benchmarks", feature = "std"))]
335 pub fn make_parachain_reachable(para: impl Into<ParaId>) {
336 let para = para.into();
337 crate::paras::Heads::<T>::insert(para, para.encode());
338 }
339}
340
341impl<T: Config> FeeTracker for Pallet<T> {
342 type Id = ParaId;
343
344 fn get_fee_factor(id: Self::Id) -> FixedU128 {
345 DeliveryFeeFactor::<T>::get(id)
346 }
347
348 fn set_fee_factor(id: Self::Id, val: FixedU128) {
349 <DeliveryFeeFactor<T>>::set(id, val);
350 }
351}
352
353#[cfg(feature = "runtime-benchmarks")]
354impl<T: Config> crate::EnsureForParachain for Pallet<T> {
355 fn ensure(para: ParaId) {
356 Self::make_parachain_reachable(para);
357 }
358}