1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 164 165 166 167 168 169 170 171 172 173 174 175 176 177 178 179 180 181 182 183 184 185 186 187 188 189 190 191 192 193 194 195 196 197 198 199 200 201 202 203 204 205 206 207 208 209 210 211 212 213 214 215 216 217 218 219 220 221 222 223 224 225 226 227 228 229 230 231 232 233 234 235 236 237 238 239 240 241 242 243 244 245 246 247 248 249 250 251 252 253 254 255 256 257 258 259 260 261 262 263 264 265 266 267 268 269 270 271 272 273 274 275 276 277 278 279 280 281 282 283 284 285 286 287 288 289 290 291 292 293 294 295 296 297 298 299 300 301 302 303 304 305 306 307 308 309 310 311 312 313 314 315 316 317 318 319 320 321 322 323 324 325 326 327 328 329 330 331 332 333 334 335 336 337 338 339 340 341 342 343 344 345 346 347 348 349 350 351 352 353 354 355 356 357 358 359 360 361
// Copyright (C) Parity Technologies (UK) Ltd.
// This file is part of Polkadot.
// Polkadot is free software: you can redistribute it and/or modify
// it under the terms of the GNU General Public License as published by
// the Free Software Foundation, either version 3 of the License, or
// (at your option) any later version.
// Polkadot is distributed in the hope that it will be useful,
// but WITHOUT ANY WARRANTY; without even the implied warranty of
// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
// GNU General Public License for more details.
// You should have received a copy of the GNU General Public License
// along with Polkadot. If not, see <http://www.gnu.org/licenses/>.
//! To prevent Out of Memory errors on the `DownwardMessageQueue`, an
//! exponential fee factor (`DeliveryFeeFactor`) is set. The fee factor
//! increments exponentially after the number of messages in the
//! `DownwardMessageQueue` passes a threshold. This threshold is set as:
//!
//! ```ignore
//! // Maximum max sized messages that can be send to
//! // the DownwardMessageQueue before it runs out of memory
//! max_messages = MAX_POSSIBLE_ALLOCATION / max_downward_message_size
//! threshold = max_messages / THRESHOLD_FACTOR
//! ```
//! Based on the THRESHOLD_FACTOR, the threshold is set as a fraction of the
//! total messages. The `DeliveryFeeFactor` increases for a message over the
//! threshold by:
//!
//! `DeliveryFeeFactor = DeliveryFeeFactor *
//! (EXPONENTIAL_FEE_BASE + MESSAGE_SIZE_FEE_BASE * encoded_message_size_in_KB)`
//!
//! And decreases when the number of messages in the `DownwardMessageQueue` fall
//! below the threshold by:
//!
//! `DeliveryFeeFactor = DeliveryFeeFactor / EXPONENTIAL_FEE_BASE`
//!
//! As an extra defensive measure, a `max_messages` hard
//! limit is set to the number of messages in the DownwardMessageQueue. Messages
//! that would increase the number of messages in the queue above this hard
//! limit are dropped.
use crate::{
configuration::{self, HostConfiguration},
initializer, FeeTracker,
};
use alloc::vec::Vec;
use core::fmt;
use frame_support::pallet_prelude::*;
use frame_system::pallet_prelude::BlockNumberFor;
use polkadot_primitives::{DownwardMessage, Hash, Id as ParaId, InboundDownwardMessage};
use sp_core::MAX_POSSIBLE_ALLOCATION;
use sp_runtime::{
traits::{BlakeTwo256, Hash as HashT, SaturatedConversion},
FixedU128, Saturating,
};
use xcm::latest::SendError;
pub use pallet::*;
#[cfg(test)]
mod tests;
const THRESHOLD_FACTOR: u32 = 2;
const EXPONENTIAL_FEE_BASE: FixedU128 = FixedU128::from_rational(105, 100); // 1.05
const MESSAGE_SIZE_FEE_BASE: FixedU128 = FixedU128::from_rational(1, 1000); // 0.001
/// An error sending a downward message.
#[cfg_attr(test, derive(Debug))]
pub enum QueueDownwardMessageError {
/// The message being sent exceeds the configured max message size.
ExceedsMaxMessageSize,
}
impl From<QueueDownwardMessageError> for SendError {
fn from(err: QueueDownwardMessageError) -> Self {
match err {
QueueDownwardMessageError::ExceedsMaxMessageSize => SendError::ExceedsMaxMessageSize,
}
}
}
/// An error returned by [`Pallet::check_processed_downward_messages`] that indicates an acceptance
/// check didn't pass.
pub(crate) enum ProcessedDownwardMessagesAcceptanceErr {
/// If there are pending messages then `processed_downward_messages` should be at least 1,
AdvancementRule,
/// `processed_downward_messages` should not be greater than the number of pending messages.
Underflow { processed_downward_messages: u32, dmq_length: u32 },
}
impl fmt::Debug for ProcessedDownwardMessagesAcceptanceErr {
fn fmt(&self, fmt: &mut fmt::Formatter) -> fmt::Result {
use ProcessedDownwardMessagesAcceptanceErr::*;
match *self {
AdvancementRule => {
write!(fmt, "DMQ is not empty, but processed_downward_messages is 0",)
},
Underflow { processed_downward_messages, dmq_length } => write!(
fmt,
"processed_downward_messages = {}, but dmq_length is only {}",
processed_downward_messages, dmq_length,
),
}
}
}
#[frame_support::pallet]
pub mod pallet {
use super::*;
#[pallet::pallet]
#[pallet::without_storage_info]
pub struct Pallet<T>(_);
#[pallet::config]
pub trait Config: frame_system::Config + configuration::Config {}
/// The downward messages addressed for a certain para.
#[pallet::storage]
pub type DownwardMessageQueues<T: Config> = StorageMap<
_,
Twox64Concat,
ParaId,
Vec<InboundDownwardMessage<BlockNumberFor<T>>>,
ValueQuery,
>;
/// A mapping that stores the downward message queue MQC head for each para.
///
/// Each link in this chain has a form:
/// `(prev_head, B, H(M))`, where
/// - `prev_head`: is the previous head hash or zero if none.
/// - `B`: is the relay-chain block number in which a message was appended.
/// - `H(M)`: is the hash of the message being appended.
#[pallet::storage]
pub(crate) type DownwardMessageQueueHeads<T: Config> =
StorageMap<_, Twox64Concat, ParaId, Hash, ValueQuery>;
/// Initialization value for the DeliveryFee factor.
#[pallet::type_value]
pub fn InitialFactor() -> FixedU128 {
FixedU128::from_u32(1)
}
/// The factor to multiply the base delivery fee by.
#[pallet::storage]
pub(crate) type DeliveryFeeFactor<T: Config> =
StorageMap<_, Twox64Concat, ParaId, FixedU128, ValueQuery, InitialFactor>;
}
/// Routines and getters related to downward message passing.
impl<T: Config> Pallet<T> {
/// Block initialization logic, called by initializer.
pub(crate) fn initializer_initialize(_now: BlockNumberFor<T>) -> Weight {
Weight::zero()
}
/// Block finalization logic, called by initializer.
pub(crate) fn initializer_finalize() {}
/// Called by the initializer to note that a new session has started.
pub(crate) fn initializer_on_new_session(
_notification: &initializer::SessionChangeNotification<BlockNumberFor<T>>,
outgoing_paras: &[ParaId],
) {
Self::perform_outgoing_para_cleanup(outgoing_paras);
}
/// Iterate over all paras that were noted for offboarding and remove all the data
/// associated with them.
fn perform_outgoing_para_cleanup(outgoing: &[ParaId]) {
for outgoing_para in outgoing {
Self::clean_dmp_after_outgoing(outgoing_para);
}
}
/// Remove all relevant storage items for an outgoing parachain.
fn clean_dmp_after_outgoing(outgoing_para: &ParaId) {
DownwardMessageQueues::<T>::remove(outgoing_para);
DownwardMessageQueueHeads::<T>::remove(outgoing_para);
}
/// Determine whether enqueuing a downward message to a specific recipient para would result
/// in an error. If this returns `Ok(())` the caller can be certain that a call to
/// `queue_downward_message` with the same parameters will be successful.
pub fn can_queue_downward_message(
config: &HostConfiguration<BlockNumberFor<T>>,
para: &ParaId,
msg: &DownwardMessage,
) -> Result<(), QueueDownwardMessageError> {
let serialized_len = msg.len() as u32;
if serialized_len > config.max_downward_message_size {
return Err(QueueDownwardMessageError::ExceedsMaxMessageSize)
}
// Hard limit on Queue size
if Self::dmq_length(*para) > Self::dmq_max_length(config.max_downward_message_size) {
return Err(QueueDownwardMessageError::ExceedsMaxMessageSize)
}
Ok(())
}
/// Enqueue a downward message to a specific recipient para.
///
/// When encoded, the message should not exceed the `config.max_downward_message_size`.
/// Otherwise, the message won't be sent and `Err` will be returned.
///
/// It is possible to send a downward message to a non-existent para. That, however, would lead
/// to a dangling storage. If the caller cannot statically prove that the recipient exists
/// then the caller should perform a runtime check.
pub fn queue_downward_message(
config: &HostConfiguration<BlockNumberFor<T>>,
para: ParaId,
msg: DownwardMessage,
) -> Result<(), QueueDownwardMessageError> {
let serialized_len = msg.len() as u32;
if serialized_len > config.max_downward_message_size {
return Err(QueueDownwardMessageError::ExceedsMaxMessageSize)
}
// Hard limit on Queue size
if Self::dmq_length(para) > Self::dmq_max_length(config.max_downward_message_size) {
return Err(QueueDownwardMessageError::ExceedsMaxMessageSize)
}
let inbound =
InboundDownwardMessage { msg, sent_at: frame_system::Pallet::<T>::block_number() };
// obtain the new link in the MQC and update the head.
DownwardMessageQueueHeads::<T>::mutate(para, |head| {
let new_head =
BlakeTwo256::hash_of(&(*head, inbound.sent_at, T::Hashing::hash_of(&inbound.msg)));
*head = new_head;
});
let q_len = DownwardMessageQueues::<T>::mutate(para, |v| {
v.push(inbound);
v.len()
});
let threshold =
Self::dmq_max_length(config.max_downward_message_size).saturating_div(THRESHOLD_FACTOR);
if q_len > (threshold as usize) {
let message_size_factor = FixedU128::from((serialized_len / 1024) as u128)
.saturating_mul(MESSAGE_SIZE_FEE_BASE);
Self::increase_fee_factor(para, message_size_factor);
}
Ok(())
}
/// Checks if the number of processed downward messages is valid.
pub(crate) fn check_processed_downward_messages(
para: ParaId,
relay_parent_number: BlockNumberFor<T>,
processed_downward_messages: u32,
) -> Result<(), ProcessedDownwardMessagesAcceptanceErr> {
let dmq_length = Self::dmq_length(para);
if dmq_length > 0 && processed_downward_messages == 0 {
// The advancement rule is for at least one downwards message to be processed
// if the queue is non-empty at the relay-parent. Downwards messages are annotated
// with the block number, so we compare the earliest (first) against the relay parent.
let contents = Self::dmq_contents(para);
// sanity: if dmq_length is >0 this should always be 'Some'.
if contents.get(0).map_or(false, |msg| msg.sent_at <= relay_parent_number) {
return Err(ProcessedDownwardMessagesAcceptanceErr::AdvancementRule)
}
}
// Note that we might be allowing a parachain to signal that it's processed
// messages that hadn't been placed in the queue at the relay_parent.
// only 'stupid' parachains would do it and we don't (and can't) force anyone
// to act on messages, so the lenient approach is fine here.
if dmq_length < processed_downward_messages {
return Err(ProcessedDownwardMessagesAcceptanceErr::Underflow {
processed_downward_messages,
dmq_length,
})
}
Ok(())
}
/// Prunes the specified number of messages from the downward message queue of the given para.
pub(crate) fn prune_dmq(para: ParaId, processed_downward_messages: u32) {
let q_len = DownwardMessageQueues::<T>::mutate(para, |q| {
let processed_downward_messages = processed_downward_messages as usize;
if processed_downward_messages > q.len() {
// reaching this branch is unexpected due to the constraint established by
// `check_processed_downward_messages`. But better be safe than sorry.
q.clear();
} else {
*q = q.split_off(processed_downward_messages);
}
q.len()
});
let config = configuration::ActiveConfig::<T>::get();
let threshold =
Self::dmq_max_length(config.max_downward_message_size).saturating_div(THRESHOLD_FACTOR);
if q_len <= (threshold as usize) {
Self::decrease_fee_factor(para);
}
}
/// Returns the Head of Message Queue Chain for the given para or `None` if there is none
/// associated with it.
#[cfg(test)]
fn dmq_mqc_head(para: ParaId) -> Hash {
DownwardMessageQueueHeads::<T>::get(¶)
}
/// Returns the number of pending downward messages addressed to the given para.
///
/// Returns 0 if the para doesn't have an associated downward message queue.
pub(crate) fn dmq_length(para: ParaId) -> u32 {
DownwardMessageQueues::<T>::decode_len(¶)
.unwrap_or(0)
.saturated_into::<u32>()
}
fn dmq_max_length(max_downward_message_size: u32) -> u32 {
MAX_POSSIBLE_ALLOCATION.checked_div(max_downward_message_size).unwrap_or(0)
}
/// Returns the downward message queue contents for the given para.
///
/// The most recent messages are the latest in the vector.
pub(crate) fn dmq_contents(
recipient: ParaId,
) -> Vec<InboundDownwardMessage<BlockNumberFor<T>>> {
DownwardMessageQueues::<T>::get(&recipient)
}
}
impl<T: Config> FeeTracker for Pallet<T> {
type Id = ParaId;
fn get_fee_factor(id: Self::Id) -> FixedU128 {
DeliveryFeeFactor::<T>::get(id)
}
fn increase_fee_factor(id: Self::Id, message_size_factor: FixedU128) -> FixedU128 {
DeliveryFeeFactor::<T>::mutate(id, |f| {
*f = f.saturating_mul(EXPONENTIAL_FEE_BASE.saturating_add(message_size_factor));
*f
})
}
fn decrease_fee_factor(id: Self::Id) -> FixedU128 {
DeliveryFeeFactor::<T>::mutate(id, |f| {
*f = InitialFactor::get().max(*f / EXPONENTIAL_FEE_BASE);
*f
})
}
}