referrerpolicy=no-referrer-when-downgrade

polkadot_runtime_parachains/scheduler/
migration.rs

1// Copyright (C) Parity Technologies (UK) Ltd.
2// This file is part of Polkadot.
3
4// Polkadot is free software: you can redistribute it and/or modify
5// it under the terms of the GNU General Public License as published by
6// the Free Software Foundation, either version 3 of the License, or
7// (at your option) any later version.
8
9// Polkadot is distributed in the hope that it will be useful,
10// but WITHOUT ANY WARRANTY; without even the implied warranty of
11// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
12// GNU General Public License for more details.
13
14// You should have received a copy of the GNU General Public License
15// along with Polkadot.  If not, see <http://www.gnu.org/licenses/>.
16
17//! A module that is responsible for migration of storage.
18
19use super::*;
20use alloc::vec::Vec;
21use frame_support::{
22	migrations::VersionedMigration, pallet_prelude::ValueQuery, storage_alias,
23	traits::UncheckedOnRuntimeUpgrade, weights::Weight,
24};
25
26/// Old/legacy assignment representation (v0).
27///
28/// `Assignment` used to be a concrete type with the same layout V0Assignment, identical on all
29/// assignment providers. This can be removed once storage has been migrated.
30#[derive(Encode, Decode, RuntimeDebug, TypeInfo, PartialEq, Clone)]
31struct V0Assignment {
32	pub para_id: ParaId,
33}
34
35/// Old scheduler with explicit parathreads and `Scheduled` storage instead of `ClaimQueue`.
36mod v0 {
37	use super::*;
38	use polkadot_primitives::{CollatorId, Id};
39
40	#[storage_alias]
41	pub(super) type Scheduled<T: Config> = StorageValue<Pallet<T>, Vec<CoreAssignment>, ValueQuery>;
42
43	#[derive(Clone, Encode, Decode)]
44	#[cfg_attr(feature = "std", derive(PartialEq))]
45	pub struct ParathreadClaim(pub Id, pub CollatorId);
46
47	#[derive(Clone, Encode, Decode)]
48	#[cfg_attr(feature = "std", derive(PartialEq))]
49	pub struct ParathreadEntry {
50		/// The claim.
51		pub claim: ParathreadClaim,
52		/// Number of retries.
53		pub retries: u32,
54	}
55
56	/// What is occupying a specific availability core.
57	#[derive(Clone, Encode, Decode)]
58	#[cfg_attr(feature = "std", derive(PartialEq))]
59	pub enum CoreOccupied {
60		/// A parathread.
61		Parathread(ParathreadEntry),
62		/// A parachain.
63		Parachain,
64	}
65
66	/// The actual type isn't important, as we only delete the key in the state.
67	#[storage_alias]
68	pub(crate) type AvailabilityCores<T: Config> =
69		StorageValue<Pallet<T>, Vec<Option<CoreOccupied>>, ValueQuery>;
70
71	/// The actual type isn't important, as we only delete the key in the state.
72	#[storage_alias]
73	pub(super) type ParathreadQueue<T: Config> = StorageValue<Pallet<T>, (), ValueQuery>;
74
75	#[storage_alias]
76	pub(super) type ParathreadClaimIndex<T: Config> = StorageValue<Pallet<T>, (), ValueQuery>;
77
78	/// The assignment type.
79	#[derive(Clone, Encode, Decode, TypeInfo, RuntimeDebug)]
80	#[cfg_attr(feature = "std", derive(PartialEq))]
81	pub enum AssignmentKind {
82		/// A parachain.
83		Parachain,
84		/// A parathread.
85		Parathread(CollatorId, u32),
86	}
87
88	/// How a free core is scheduled to be assigned.
89	#[derive(Clone, Encode, Decode, TypeInfo, RuntimeDebug)]
90	#[cfg_attr(feature = "std", derive(PartialEq))]
91	pub struct CoreAssignment {
92		/// The core that is assigned.
93		pub core: CoreIndex,
94		/// The unique ID of the para that is assigned to the core.
95		pub para_id: ParaId,
96		/// The kind of the assignment.
97		pub kind: AssignmentKind,
98		/// The index of the validator group assigned to the core.
99		pub group_idx: GroupIndex,
100	}
101}
102
103// `ClaimQueue` got introduced.
104//
105// - Items are `Option` for some weird reason.
106// - Assignments only consist of `ParaId`, `Assignment` is a concrete type (Same as V0Assignment).
107mod v1 {
108	use frame_support::{
109		pallet_prelude::ValueQuery, storage_alias, traits::UncheckedOnRuntimeUpgrade,
110		weights::Weight,
111	};
112	use frame_system::pallet_prelude::BlockNumberFor;
113
114	use super::*;
115	use crate::scheduler;
116
117	#[storage_alias]
118	pub(super) type ClaimQueue<T: Config> = StorageValue<
119		Pallet<T>,
120		BTreeMap<CoreIndex, VecDeque<Option<ParasEntry<BlockNumberFor<T>>>>>,
121		ValueQuery,
122	>;
123
124	#[storage_alias]
125	pub(super) type AvailabilityCores<T: Config> =
126		StorageValue<Pallet<T>, Vec<CoreOccupied<BlockNumberFor<T>>>, ValueQuery>;
127
128	#[derive(Encode, Decode, TypeInfo, RuntimeDebug, PartialEq)]
129	pub(super) enum CoreOccupied<N> {
130		/// No candidate is waiting availability on this core right now (the core is not occupied).
131		Free,
132		/// A para is currently waiting for availability/inclusion on this core.
133		Paras(ParasEntry<N>),
134	}
135
136	#[derive(Encode, Decode, TypeInfo, RuntimeDebug, PartialEq)]
137	pub(super) struct ParasEntry<N> {
138		/// The underlying `Assignment`
139		pub(super) assignment: V0Assignment,
140		/// The number of times the entry has timed out in availability already.
141		pub(super) availability_timeouts: u32,
142		/// The block height until this entry needs to be backed.
143		///
144		/// If missed the entry will be removed from the claim queue without ever having occupied
145		/// the core.
146		pub(super) ttl: N,
147	}
148
149	impl<N> ParasEntry<N> {
150		/// Create a new `ParasEntry`.
151		pub(super) fn new(assignment: V0Assignment, now: N) -> Self {
152			ParasEntry { assignment, availability_timeouts: 0, ttl: now }
153		}
154
155		/// Return `Id` from the underlying `Assignment`.
156		pub(super) fn para_id(&self) -> ParaId {
157			self.assignment.para_id
158		}
159	}
160
161	fn add_to_claimqueue<T: Config>(core_idx: CoreIndex, pe: ParasEntry<BlockNumberFor<T>>) {
162		ClaimQueue::<T>::mutate(|la| {
163			la.entry(core_idx).or_default().push_back(Some(pe));
164		});
165	}
166
167	/// Migration to V1
168	pub struct UncheckedMigrateToV1<T>(core::marker::PhantomData<T>);
169	impl<T: Config> UncheckedOnRuntimeUpgrade for UncheckedMigrateToV1<T> {
170		fn on_runtime_upgrade() -> Weight {
171			let mut weight: Weight = Weight::zero();
172
173			v0::ParathreadQueue::<T>::kill();
174			v0::ParathreadClaimIndex::<T>::kill();
175
176			let now = frame_system::Pallet::<T>::block_number();
177			let scheduled = v0::Scheduled::<T>::take();
178			let sched_len = scheduled.len() as u64;
179			for core_assignment in scheduled {
180				let core_idx = core_assignment.core;
181				let assignment = V0Assignment { para_id: core_assignment.para_id };
182				let pe = v1::ParasEntry::new(assignment, now);
183				v1::add_to_claimqueue::<T>(core_idx, pe);
184			}
185
186			let parachains = paras::Parachains::<T>::get();
187			let availability_cores = v0::AvailabilityCores::<T>::take();
188			let mut new_availability_cores = Vec::new();
189
190			for (core_index, core) in availability_cores.into_iter().enumerate() {
191				let new_core = if let Some(core) = core {
192					match core {
193						v0::CoreOccupied::Parachain =>
194							v1::CoreOccupied::Paras(v1::ParasEntry::new(
195								V0Assignment { para_id: parachains[core_index] },
196								now,
197							)),
198						v0::CoreOccupied::Parathread(entry) => v1::CoreOccupied::Paras(
199							v1::ParasEntry::new(V0Assignment { para_id: entry.claim.0 }, now),
200						),
201					}
202				} else {
203					v1::CoreOccupied::Free
204				};
205
206				new_availability_cores.push(new_core);
207			}
208
209			v1::AvailabilityCores::<T>::set(new_availability_cores);
210
211			// 2x as once for Scheduled and once for Claimqueue
212			weight.saturating_accrue(T::DbWeight::get().reads_writes(2 * sched_len, 2 * sched_len));
213			// reading parachains + availability_cores, writing AvailabilityCores
214			weight.saturating_accrue(T::DbWeight::get().reads_writes(2, 1));
215			// 2x kill
216			weight.saturating_accrue(T::DbWeight::get().writes(2));
217
218			log::info!(target: scheduler::LOG_TARGET, "Migrated para scheduler storage to v1");
219
220			weight
221		}
222
223		#[cfg(feature = "try-runtime")]
224		fn pre_upgrade() -> Result<Vec<u8>, sp_runtime::DispatchError> {
225			let n: u32 = v0::Scheduled::<T>::get().len() as u32 +
226				v0::AvailabilityCores::<T>::get().iter().filter(|c| c.is_some()).count() as u32;
227
228			log::info!(
229				target: crate::scheduler::LOG_TARGET,
230				"Number of scheduled and waiting for availability before: {n}",
231			);
232
233			Ok(n.encode())
234		}
235
236		#[cfg(feature = "try-runtime")]
237		fn post_upgrade(state: Vec<u8>) -> Result<(), sp_runtime::DispatchError> {
238			log::info!(target: crate::scheduler::LOG_TARGET, "Running post_upgrade()");
239
240			ensure!(
241				v0::Scheduled::<T>::get().is_empty(),
242				"Scheduled should be empty after the migration"
243			);
244
245			let expected_len = u32::decode(&mut &state[..]).unwrap();
246			let availability_cores_waiting = v1::AvailabilityCores::<T>::get()
247				.into_iter()
248				.filter(|c| !matches!(c, v1::CoreOccupied::Free))
249				.count();
250
251			ensure!(
252				Pallet::<T>::claim_queue_len() as u32 + availability_cores_waiting as u32 ==
253					expected_len,
254				"ClaimQueue and AvailabilityCores should have the correct length",
255			);
256
257			Ok(())
258		}
259	}
260}
261
262/// Migrate `V0` to `V1` of the storage format.
263pub type MigrateV0ToV1<T> = VersionedMigration<
264	0,
265	1,
266	v1::UncheckedMigrateToV1<T>,
267	Pallet<T>,
268	<T as frame_system::Config>::DbWeight,
269>;
270
271pub(crate) mod v2 {
272	use super::*;
273	use crate::scheduler;
274
275	#[derive(Encode, Decode, TypeInfo, RuntimeDebug, PartialEq)]
276	pub(crate) enum CoreOccupied<N> {
277		Free,
278		Paras(ParasEntry<N>),
279	}
280
281	#[derive(Encode, Decode, TypeInfo, RuntimeDebug, PartialEq)]
282	pub(crate) struct ParasEntry<N> {
283		pub assignment: Assignment,
284		pub availability_timeouts: u32,
285		pub ttl: N,
286	}
287
288	// V2 (no Option wrapper) and new [`Assignment`].
289	#[storage_alias]
290	pub(crate) type ClaimQueue<T: Config> = StorageValue<
291		Pallet<T>,
292		BTreeMap<CoreIndex, VecDeque<ParasEntry<BlockNumberFor<T>>>>,
293		ValueQuery,
294	>;
295
296	#[storage_alias]
297	pub(crate) type AvailabilityCores<T: Config> =
298		StorageValue<Pallet<T>, Vec<CoreOccupied<BlockNumberFor<T>>>, ValueQuery>;
299
300	fn is_bulk<T: Config>(core_index: CoreIndex) -> bool {
301		core_index.0 < paras::Parachains::<T>::decode_len().unwrap_or(0) as u32
302	}
303
304	/// Migration to V2
305	pub struct UncheckedMigrateToV2<T>(core::marker::PhantomData<T>);
306
307	impl<T: Config> UncheckedOnRuntimeUpgrade for UncheckedMigrateToV2<T> {
308		fn on_runtime_upgrade() -> Weight {
309			let mut weight: Weight = Weight::zero();
310
311			let old = v1::ClaimQueue::<T>::take();
312			let new = old
313				.into_iter()
314				.map(|(k, v)| {
315					(
316						k,
317						v.into_iter()
318							.flatten()
319							.map(|p| {
320								let assignment = if is_bulk::<T>(k) {
321									Assignment::Bulk(p.para_id())
322								} else {
323									Assignment::Pool { para_id: p.para_id(), core_index: k }
324								};
325
326								ParasEntry {
327									assignment,
328									availability_timeouts: p.availability_timeouts,
329									ttl: p.ttl,
330								}
331							})
332							.collect::<VecDeque<_>>(),
333					)
334				})
335				.collect::<BTreeMap<CoreIndex, VecDeque<ParasEntry<BlockNumberFor<T>>>>>();
336
337			ClaimQueue::<T>::put(new);
338
339			let old = v1::AvailabilityCores::<T>::get();
340
341			let new = old
342				.into_iter()
343				.enumerate()
344				.map(|(k, a)| match a {
345					v1::CoreOccupied::Free => CoreOccupied::Free,
346					v1::CoreOccupied::Paras(paras) => {
347						let assignment = if is_bulk::<T>((k as u32).into()) {
348							Assignment::Bulk(paras.para_id())
349						} else {
350							Assignment::Pool {
351								para_id: paras.para_id(),
352								core_index: (k as u32).into(),
353							}
354						};
355
356						CoreOccupied::Paras(ParasEntry {
357							assignment,
358							availability_timeouts: paras.availability_timeouts,
359							ttl: paras.ttl,
360						})
361					},
362				})
363				.collect::<Vec<_>>();
364			AvailabilityCores::<T>::put(new);
365
366			weight.saturating_accrue(T::DbWeight::get().reads_writes(1, 1));
367
368			log::info!(target: scheduler::LOG_TARGET, "Migrating para scheduler storage to v2");
369
370			weight
371		}
372
373		#[cfg(feature = "try-runtime")]
374		fn pre_upgrade() -> Result<Vec<u8>, sp_runtime::DispatchError> {
375			log::trace!(
376				target: crate::scheduler::LOG_TARGET,
377				"ClaimQueue before migration: {}",
378				v1::ClaimQueue::<T>::get().len()
379			);
380
381			let bytes = u32::to_be_bytes(v1::ClaimQueue::<T>::get().len() as u32);
382
383			Ok(bytes.to_vec())
384		}
385
386		#[cfg(feature = "try-runtime")]
387		fn post_upgrade(state: Vec<u8>) -> Result<(), sp_runtime::DispatchError> {
388			log::trace!(target: crate::scheduler::LOG_TARGET, "Running post_upgrade()");
389
390			let old_len = u32::from_be_bytes(state.try_into().unwrap());
391			ensure!(
392				v2::ClaimQueue::<T>::get().len() as u32 == old_len,
393				"Old ClaimQueue completely moved to new ClaimQueue after migration"
394			);
395
396			Ok(())
397		}
398	}
399}
400
401/// Migrate `V1` to `V2` of the storage format.
402pub type MigrateV1ToV2<T> = VersionedMigration<
403	1,
404	2,
405	v2::UncheckedMigrateToV2<T>,
406	Pallet<T>,
407	<T as frame_system::Config>::DbWeight,
408>;
409
410/// Migration for TTL and availability timeout retries removal.
411/// AvailabilityCores storage is removed and ClaimQueue now holds `Assignment`s instead of
412/// `ParasEntryType`
413mod v3 {
414	use super::*;
415	use crate::scheduler;
416
417	#[storage_alias]
418	pub(crate) type ClaimQueue<T: Config> =
419		StorageValue<Pallet<T>, BTreeMap<CoreIndex, VecDeque<Assignment>>, ValueQuery>;
420	/// Migration to V3
421	pub struct UncheckedMigrateToV3<T>(core::marker::PhantomData<T>);
422
423	impl<T: Config> UncheckedOnRuntimeUpgrade for UncheckedMigrateToV3<T> {
424		fn on_runtime_upgrade() -> Weight {
425			let mut weight: Weight = Weight::zero();
426
427			// Migrate ClaimQueuee to new format.
428
429			let old = v2::ClaimQueue::<T>::take();
430			let new = old
431				.into_iter()
432				.map(|(k, v)| {
433					(
434						k,
435						v.into_iter()
436							.map(|paras_entry| paras_entry.assignment)
437							.collect::<VecDeque<_>>(),
438					)
439				})
440				.collect::<BTreeMap<CoreIndex, VecDeque<Assignment>>>();
441
442			v3::ClaimQueue::<T>::put(new);
443
444			// Clear AvailabilityCores storage
445			v2::AvailabilityCores::<T>::kill();
446
447			weight.saturating_accrue(T::DbWeight::get().reads_writes(2, 2));
448
449			log::info!(target: scheduler::LOG_TARGET, "Migrating para scheduler storage to v3");
450
451			weight
452		}
453
454		#[cfg(feature = "try-runtime")]
455		fn pre_upgrade() -> Result<Vec<u8>, sp_runtime::DispatchError> {
456			log::trace!(
457				target: crate::scheduler::LOG_TARGET,
458				"ClaimQueue before migration: {}",
459				v2::ClaimQueue::<T>::get().len()
460			);
461
462			let bytes = u32::to_be_bytes(v2::ClaimQueue::<T>::get().len() as u32);
463
464			Ok(bytes.to_vec())
465		}
466
467		#[cfg(feature = "try-runtime")]
468		fn post_upgrade(state: Vec<u8>) -> Result<(), sp_runtime::DispatchError> {
469			log::trace!(target: crate::scheduler::LOG_TARGET, "Running post_upgrade()");
470
471			let old_len = u32::from_be_bytes(state.try_into().unwrap());
472			ensure!(
473				v3::ClaimQueue::<T>::get().len() as u32 == old_len,
474				"Old ClaimQueue completely moved to new ClaimQueue after migration"
475			);
476
477			ensure!(
478				!v2::AvailabilityCores::<T>::exists(),
479				"AvailabilityCores storage should have been completely killed"
480			);
481
482			Ok(())
483		}
484	}
485}
486
487/// Migrate `V2` to `V3` of the storage format.
488pub type MigrateV2ToV3<T> = VersionedMigration<
489	2,
490	3,
491	v3::UncheckedMigrateToV3<T>,
492	Pallet<T>,
493	<T as frame_system::Config>::DbWeight,
494>;