polkadot_runtime_parachains/dmp/
migration.rs1use super::{inbound_downward_queue::InboundDownwardQueue, *};
24#[cfg(feature = "try-runtime")]
25use alloc::collections::btree_map::BTreeMap;
26use alloc::vec::Vec;
27use codec::{Decode, Encode, MaxEncodedLen};
28use frame_support::{
29 migrations::{MigrationId, SteppedMigration, SteppedMigrationError},
30 pallet_prelude::ValueQuery,
31 storage_alias,
32 weights::WeightMeter,
33 Twox64Concat,
34};
35use scale_info::TypeInfo;
36
37#[derive(Encode, Decode, MaxEncodedLen, TypeInfo, PartialEq, Eq, Clone, Debug)]
40pub enum MigrationCursor {
41 Iterate,
43 InProgress { para: ParaId },
46}
47
48pub const STORAGE_VERSION: StorageVersion = StorageVersion::new(1);
50
51const PALLET_MIGRATIONS_ID: &[u8; 21] = b"cumulus-dmp-queue-mbm";
53
54pub mod v0 {
56 use super::*;
57
58 #[storage_alias]
59 pub type DownwardMessageQueues<T: Config> = StorageMap<
60 crate::dmp::Pallet<T>,
61 Twox64Concat,
62 ParaId,
63 Vec<InboundDownwardMessage<BlockNumberFor<T>>>,
64 ValueQuery,
65 >;
66}
67
68pub struct MigrateV0ToV1<T>(core::marker::PhantomData<T>);
70
71impl<T: Config> SteppedMigration for MigrateV0ToV1<T> {
72 type Cursor = MigrationCursor;
73 type Identifier = MigrationId<21>;
74
75 fn id() -> Self::Identifier {
76 MigrationId { pallet_id: *PALLET_MIGRATIONS_ID, version_from: 0, version_to: 1 }
77 }
78
79 fn step(
80 mut cursor: Option<Self::Cursor>,
81 meter: &mut WeightMeter,
82 ) -> Result<Option<Self::Cursor>, SteppedMigrationError> {
83 if Pallet::<T>::on_chain_storage_version() != Self::id().version_from as u16 {
84 return Ok(None);
85 }
86 let base = <T as Config>::WeightInfo::migrate_v0_to_v1_step_base();
87 let per_iter = <T as Config>::WeightInfo::migrate_v0_to_v1_step_iter();
88 let per_msg = <T as Config>::WeightInfo::migrate_v0_to_v1_step_msg();
89
90 let minimum = base.saturating_add(per_iter).saturating_add(per_msg);
92 if meter.remaining().any_lt(minimum) {
93 return Err(SteppedMigrationError::InsufficientWeight { required: minimum });
94 }
95 meter.consume(base);
96
97 loop {
98 if meter.try_consume(per_iter).is_err() {
99 break;
100 }
101
102 let para = match cursor.take() {
103 Some(MigrationCursor::InProgress { para }) => para,
104 Some(MigrationCursor::Iterate) | None => {
105 let Some(p) = v0::DownwardMessageQueues::<T>::iter_keys().next() else {
106 cursor = None;
107 break;
108 };
109 p
110 },
111 };
112
113 let msgs = v0::DownwardMessageQueues::<T>::take(para);
114 if msgs.is_empty() {
115 cursor = Some(MigrationCursor::Iterate);
116 continue;
117 }
118
119 let total = msgs.len();
120 let mut migrated = 0usize;
121
122 for msg in &msgs {
123 if migrated > 0 && meter.try_consume(per_msg).is_err() {
124 break;
125 }
126 if InboundDownwardQueue::<T>::push_back_inbound_v1(para, msg).is_err() {
127 v0::DownwardMessageQueues::<T>::insert(para, msgs[migrated..].to_vec());
128 cursor = Some(MigrationCursor::InProgress { para });
129 return Ok(cursor);
130 }
131 migrated = migrated.saturating_add(1);
132 }
133
134 if migrated < total {
135 v0::DownwardMessageQueues::<T>::insert(para, msgs[migrated..].to_vec());
136 cursor = Some(MigrationCursor::InProgress { para });
137 break;
138 }
139
140 super::Pallet::<T>::deposit_event(Event::DmpQueueV0Cleaned { para });
141 cursor = Some(MigrationCursor::Iterate);
142 }
143
144 if cursor.is_none() {
145 StorageVersion::new(Self::id().version_to as u16).put::<Pallet<T>>();
146 }
147 Ok(cursor)
148 }
149
150 #[cfg(feature = "try-runtime")]
151 fn pre_upgrade() -> Result<Vec<u8>, sp_runtime::TryRuntimeError> {
152 let snapshot: BTreeMap<ParaId, Vec<InboundDownwardMessage<BlockNumberFor<T>>>> =
154 v0::DownwardMessageQueues::<T>::iter().collect();
155
156 Ok(snapshot.encode())
157 }
158
159 #[cfg(feature = "try-runtime")]
160 fn post_upgrade(_prev: Vec<u8>) -> Result<(), sp_runtime::TryRuntimeError> {
161 assert_eq!(
162 v0::DownwardMessageQueues::<T>::iter().count(),
163 0,
164 "v0::DownwardMessageQueues still has entries after MigrateV0ToV1",
165 );
166
167 Ok(())
168 }
169}