1use super::*;
20use alloc::vec::Vec;
21use frame_support::{
22 migrations::VersionedMigration, pallet_prelude::ValueQuery, storage_alias,
23 traits::UncheckedOnRuntimeUpgrade, weights::Weight,
24};
25
26#[derive(Encode, Decode, RuntimeDebug, TypeInfo, PartialEq, Clone)]
31struct V0Assignment {
32 pub para_id: ParaId,
33}
34
35mod v0 {
37 use super::*;
38 use polkadot_primitives::{CollatorId, Id};
39
40 #[storage_alias]
41 pub(super) type Scheduled<T: Config> = StorageValue<Pallet<T>, Vec<CoreAssignment>, ValueQuery>;
42
43 #[derive(Clone, Encode, Decode)]
44 #[cfg_attr(feature = "std", derive(PartialEq))]
45 pub struct ParathreadClaim(pub Id, pub CollatorId);
46
47 #[derive(Clone, Encode, Decode)]
48 #[cfg_attr(feature = "std", derive(PartialEq))]
49 pub struct ParathreadEntry {
50 pub claim: ParathreadClaim,
52 pub retries: u32,
54 }
55
56 #[derive(Clone, Encode, Decode)]
58 #[cfg_attr(feature = "std", derive(PartialEq))]
59 pub enum CoreOccupied {
60 Parathread(ParathreadEntry),
62 Parachain,
64 }
65
66 #[storage_alias]
68 pub(crate) type AvailabilityCores<T: Config> =
69 StorageValue<Pallet<T>, Vec<Option<CoreOccupied>>, ValueQuery>;
70
71 #[storage_alias]
73 pub(super) type ParathreadQueue<T: Config> = StorageValue<Pallet<T>, (), ValueQuery>;
74
75 #[storage_alias]
76 pub(super) type ParathreadClaimIndex<T: Config> = StorageValue<Pallet<T>, (), ValueQuery>;
77
78 #[derive(Clone, Encode, Decode, TypeInfo, RuntimeDebug)]
80 #[cfg_attr(feature = "std", derive(PartialEq))]
81 pub enum AssignmentKind {
82 Parachain,
84 Parathread(CollatorId, u32),
86 }
87
88 #[derive(Clone, Encode, Decode, TypeInfo, RuntimeDebug)]
90 #[cfg_attr(feature = "std", derive(PartialEq))]
91 pub struct CoreAssignment {
92 pub core: CoreIndex,
94 pub para_id: ParaId,
96 pub kind: AssignmentKind,
98 pub group_idx: GroupIndex,
100 }
101}
102
103mod v1 {
108 use frame_support::{
109 pallet_prelude::ValueQuery, storage_alias, traits::UncheckedOnRuntimeUpgrade,
110 weights::Weight,
111 };
112 use frame_system::pallet_prelude::BlockNumberFor;
113
114 use super::*;
115 use crate::scheduler;
116
117 #[storage_alias]
118 pub(super) type ClaimQueue<T: Config> = StorageValue<
119 Pallet<T>,
120 BTreeMap<CoreIndex, VecDeque<Option<ParasEntry<BlockNumberFor<T>>>>>,
121 ValueQuery,
122 >;
123
124 #[storage_alias]
125 pub(super) type AvailabilityCores<T: Config> =
126 StorageValue<Pallet<T>, Vec<CoreOccupied<BlockNumberFor<T>>>, ValueQuery>;
127
128 #[derive(Encode, Decode, TypeInfo, RuntimeDebug, PartialEq)]
129 pub(super) enum CoreOccupied<N> {
130 Free,
132 Paras(ParasEntry<N>),
134 }
135
136 #[derive(Encode, Decode, TypeInfo, RuntimeDebug, PartialEq)]
137 pub(super) struct ParasEntry<N> {
138 pub(super) assignment: V0Assignment,
140 pub(super) availability_timeouts: u32,
142 pub(super) ttl: N,
147 }
148
149 impl<N> ParasEntry<N> {
150 pub(super) fn new(assignment: V0Assignment, now: N) -> Self {
152 ParasEntry { assignment, availability_timeouts: 0, ttl: now }
153 }
154
155 pub(super) fn para_id(&self) -> ParaId {
157 self.assignment.para_id
158 }
159 }
160
161 fn add_to_claimqueue<T: Config>(core_idx: CoreIndex, pe: ParasEntry<BlockNumberFor<T>>) {
162 ClaimQueue::<T>::mutate(|la| {
163 la.entry(core_idx).or_default().push_back(Some(pe));
164 });
165 }
166
167 pub struct UncheckedMigrateToV1<T>(core::marker::PhantomData<T>);
169 impl<T: Config> UncheckedOnRuntimeUpgrade for UncheckedMigrateToV1<T> {
170 fn on_runtime_upgrade() -> Weight {
171 let mut weight: Weight = Weight::zero();
172
173 v0::ParathreadQueue::<T>::kill();
174 v0::ParathreadClaimIndex::<T>::kill();
175
176 let now = frame_system::Pallet::<T>::block_number();
177 let scheduled = v0::Scheduled::<T>::take();
178 let sched_len = scheduled.len() as u64;
179 for core_assignment in scheduled {
180 let core_idx = core_assignment.core;
181 let assignment = V0Assignment { para_id: core_assignment.para_id };
182 let pe = v1::ParasEntry::new(assignment, now);
183 v1::add_to_claimqueue::<T>(core_idx, pe);
184 }
185
186 let parachains = paras::Parachains::<T>::get();
187 let availability_cores = v0::AvailabilityCores::<T>::take();
188 let mut new_availability_cores = Vec::new();
189
190 for (core_index, core) in availability_cores.into_iter().enumerate() {
191 let new_core = if let Some(core) = core {
192 match core {
193 v0::CoreOccupied::Parachain =>
194 v1::CoreOccupied::Paras(v1::ParasEntry::new(
195 V0Assignment { para_id: parachains[core_index] },
196 now,
197 )),
198 v0::CoreOccupied::Parathread(entry) => v1::CoreOccupied::Paras(
199 v1::ParasEntry::new(V0Assignment { para_id: entry.claim.0 }, now),
200 ),
201 }
202 } else {
203 v1::CoreOccupied::Free
204 };
205
206 new_availability_cores.push(new_core);
207 }
208
209 v1::AvailabilityCores::<T>::set(new_availability_cores);
210
211 weight.saturating_accrue(T::DbWeight::get().reads_writes(2 * sched_len, 2 * sched_len));
213 weight.saturating_accrue(T::DbWeight::get().reads_writes(2, 1));
215 weight.saturating_accrue(T::DbWeight::get().writes(2));
217
218 log::info!(target: scheduler::LOG_TARGET, "Migrated para scheduler storage to v1");
219
220 weight
221 }
222
223 #[cfg(feature = "try-runtime")]
224 fn pre_upgrade() -> Result<Vec<u8>, sp_runtime::DispatchError> {
225 let n: u32 = v0::Scheduled::<T>::get().len() as u32 +
226 v0::AvailabilityCores::<T>::get().iter().filter(|c| c.is_some()).count() as u32;
227
228 log::info!(
229 target: crate::scheduler::LOG_TARGET,
230 "Number of scheduled and waiting for availability before: {n}",
231 );
232
233 Ok(n.encode())
234 }
235
236 #[cfg(feature = "try-runtime")]
237 fn post_upgrade(state: Vec<u8>) -> Result<(), sp_runtime::DispatchError> {
238 log::info!(target: crate::scheduler::LOG_TARGET, "Running post_upgrade()");
239
240 ensure!(
241 v0::Scheduled::<T>::get().is_empty(),
242 "Scheduled should be empty after the migration"
243 );
244
245 let expected_len = u32::decode(&mut &state[..]).unwrap();
246 let availability_cores_waiting = v1::AvailabilityCores::<T>::get()
247 .into_iter()
248 .filter(|c| !matches!(c, v1::CoreOccupied::Free))
249 .count();
250
251 ensure!(
252 Pallet::<T>::claim_queue_len() as u32 + availability_cores_waiting as u32 ==
253 expected_len,
254 "ClaimQueue and AvailabilityCores should have the correct length",
255 );
256
257 Ok(())
258 }
259 }
260}
261
262pub type MigrateV0ToV1<T> = VersionedMigration<
264 0,
265 1,
266 v1::UncheckedMigrateToV1<T>,
267 Pallet<T>,
268 <T as frame_system::Config>::DbWeight,
269>;
270
271pub(crate) mod v2 {
272 use super::*;
273 use crate::scheduler;
274
275 #[derive(Encode, Decode, TypeInfo, RuntimeDebug, PartialEq)]
276 pub(crate) enum CoreOccupied<N> {
277 Free,
278 Paras(ParasEntry<N>),
279 }
280
281 #[derive(Encode, Decode, TypeInfo, RuntimeDebug, PartialEq)]
282 pub(crate) struct ParasEntry<N> {
283 pub assignment: Assignment,
284 pub availability_timeouts: u32,
285 pub ttl: N,
286 }
287
288 #[storage_alias]
290 pub(crate) type ClaimQueue<T: Config> = StorageValue<
291 Pallet<T>,
292 BTreeMap<CoreIndex, VecDeque<ParasEntry<BlockNumberFor<T>>>>,
293 ValueQuery,
294 >;
295
296 #[storage_alias]
297 pub(crate) type AvailabilityCores<T: Config> =
298 StorageValue<Pallet<T>, Vec<CoreOccupied<BlockNumberFor<T>>>, ValueQuery>;
299
300 fn is_bulk<T: Config>(core_index: CoreIndex) -> bool {
301 core_index.0 < paras::Parachains::<T>::decode_len().unwrap_or(0) as u32
302 }
303
304 pub struct UncheckedMigrateToV2<T>(core::marker::PhantomData<T>);
306
307 impl<T: Config> UncheckedOnRuntimeUpgrade for UncheckedMigrateToV2<T> {
308 fn on_runtime_upgrade() -> Weight {
309 let mut weight: Weight = Weight::zero();
310
311 let old = v1::ClaimQueue::<T>::take();
312 let new = old
313 .into_iter()
314 .map(|(k, v)| {
315 (
316 k,
317 v.into_iter()
318 .flatten()
319 .map(|p| {
320 let assignment = if is_bulk::<T>(k) {
321 Assignment::Bulk(p.para_id())
322 } else {
323 Assignment::Pool { para_id: p.para_id(), core_index: k }
324 };
325
326 ParasEntry {
327 assignment,
328 availability_timeouts: p.availability_timeouts,
329 ttl: p.ttl,
330 }
331 })
332 .collect::<VecDeque<_>>(),
333 )
334 })
335 .collect::<BTreeMap<CoreIndex, VecDeque<ParasEntry<BlockNumberFor<T>>>>>();
336
337 ClaimQueue::<T>::put(new);
338
339 let old = v1::AvailabilityCores::<T>::get();
340
341 let new = old
342 .into_iter()
343 .enumerate()
344 .map(|(k, a)| match a {
345 v1::CoreOccupied::Free => CoreOccupied::Free,
346 v1::CoreOccupied::Paras(paras) => {
347 let assignment = if is_bulk::<T>((k as u32).into()) {
348 Assignment::Bulk(paras.para_id())
349 } else {
350 Assignment::Pool {
351 para_id: paras.para_id(),
352 core_index: (k as u32).into(),
353 }
354 };
355
356 CoreOccupied::Paras(ParasEntry {
357 assignment,
358 availability_timeouts: paras.availability_timeouts,
359 ttl: paras.ttl,
360 })
361 },
362 })
363 .collect::<Vec<_>>();
364 AvailabilityCores::<T>::put(new);
365
366 weight.saturating_accrue(T::DbWeight::get().reads_writes(1, 1));
367
368 log::info!(target: scheduler::LOG_TARGET, "Migrating para scheduler storage to v2");
369
370 weight
371 }
372
373 #[cfg(feature = "try-runtime")]
374 fn pre_upgrade() -> Result<Vec<u8>, sp_runtime::DispatchError> {
375 log::trace!(
376 target: crate::scheduler::LOG_TARGET,
377 "ClaimQueue before migration: {}",
378 v1::ClaimQueue::<T>::get().len()
379 );
380
381 let bytes = u32::to_be_bytes(v1::ClaimQueue::<T>::get().len() as u32);
382
383 Ok(bytes.to_vec())
384 }
385
386 #[cfg(feature = "try-runtime")]
387 fn post_upgrade(state: Vec<u8>) -> Result<(), sp_runtime::DispatchError> {
388 log::trace!(target: crate::scheduler::LOG_TARGET, "Running post_upgrade()");
389
390 let old_len = u32::from_be_bytes(state.try_into().unwrap());
391 ensure!(
392 v2::ClaimQueue::<T>::get().len() as u32 == old_len,
393 "Old ClaimQueue completely moved to new ClaimQueue after migration"
394 );
395
396 Ok(())
397 }
398 }
399}
400
401pub type MigrateV1ToV2<T> = VersionedMigration<
403 1,
404 2,
405 v2::UncheckedMigrateToV2<T>,
406 Pallet<T>,
407 <T as frame_system::Config>::DbWeight,
408>;
409
410mod v3 {
414 use super::*;
415 use crate::scheduler;
416
417 #[storage_alias]
418 pub(crate) type ClaimQueue<T: Config> =
419 StorageValue<Pallet<T>, BTreeMap<CoreIndex, VecDeque<Assignment>>, ValueQuery>;
420 pub struct UncheckedMigrateToV3<T>(core::marker::PhantomData<T>);
422
423 impl<T: Config> UncheckedOnRuntimeUpgrade for UncheckedMigrateToV3<T> {
424 fn on_runtime_upgrade() -> Weight {
425 let mut weight: Weight = Weight::zero();
426
427 let old = v2::ClaimQueue::<T>::take();
430 let new = old
431 .into_iter()
432 .map(|(k, v)| {
433 (
434 k,
435 v.into_iter()
436 .map(|paras_entry| paras_entry.assignment)
437 .collect::<VecDeque<_>>(),
438 )
439 })
440 .collect::<BTreeMap<CoreIndex, VecDeque<Assignment>>>();
441
442 v3::ClaimQueue::<T>::put(new);
443
444 v2::AvailabilityCores::<T>::kill();
446
447 weight.saturating_accrue(T::DbWeight::get().reads_writes(2, 2));
448
449 log::info!(target: scheduler::LOG_TARGET, "Migrating para scheduler storage to v3");
450
451 weight
452 }
453
454 #[cfg(feature = "try-runtime")]
455 fn pre_upgrade() -> Result<Vec<u8>, sp_runtime::DispatchError> {
456 log::trace!(
457 target: crate::scheduler::LOG_TARGET,
458 "ClaimQueue before migration: {}",
459 v2::ClaimQueue::<T>::get().len()
460 );
461
462 let bytes = u32::to_be_bytes(v2::ClaimQueue::<T>::get().len() as u32);
463
464 Ok(bytes.to_vec())
465 }
466
467 #[cfg(feature = "try-runtime")]
468 fn post_upgrade(state: Vec<u8>) -> Result<(), sp_runtime::DispatchError> {
469 log::trace!(target: crate::scheduler::LOG_TARGET, "Running post_upgrade()");
470
471 let old_len = u32::from_be_bytes(state.try_into().unwrap());
472 ensure!(
473 v3::ClaimQueue::<T>::get().len() as u32 == old_len,
474 "Old ClaimQueue completely moved to new ClaimQueue after migration"
475 );
476
477 ensure!(
478 !v2::AvailabilityCores::<T>::exists(),
479 "AvailabilityCores storage should have been completely killed"
480 );
481
482 Ok(())
483 }
484 }
485}
486
487pub type MigrateV2ToV3<T> = VersionedMigration<
489 2,
490 3,
491 v3::UncheckedMigrateToV3<T>,
492 Pallet<T>,
493 <T as frame_system::Config>::DbWeight,
494>;