use super::*;
use alloc::vec::Vec;
use frame_support::{
migrations::VersionedMigration, pallet_prelude::ValueQuery, storage_alias,
traits::UncheckedOnRuntimeUpgrade, weights::Weight,
};
#[derive(Encode, Decode, RuntimeDebug, TypeInfo, PartialEq, Clone)]
struct V0Assignment {
pub para_id: ParaId,
}
mod v0 {
use super::*;
use polkadot_primitives::{CollatorId, Id};
#[storage_alias]
pub(super) type Scheduled<T: Config> = StorageValue<Pallet<T>, Vec<CoreAssignment>, ValueQuery>;
#[derive(Clone, Encode, Decode)]
#[cfg_attr(feature = "std", derive(PartialEq))]
pub struct ParathreadClaim(pub Id, pub CollatorId);
#[derive(Clone, Encode, Decode)]
#[cfg_attr(feature = "std", derive(PartialEq))]
pub struct ParathreadEntry {
pub claim: ParathreadClaim,
pub retries: u32,
}
#[derive(Clone, Encode, Decode)]
#[cfg_attr(feature = "std", derive(PartialEq))]
pub enum CoreOccupied {
Parathread(ParathreadEntry),
Parachain,
}
#[storage_alias]
pub(crate) type AvailabilityCores<T: Config> =
StorageValue<Pallet<T>, Vec<Option<CoreOccupied>>, ValueQuery>;
#[storage_alias]
pub(super) type ParathreadQueue<T: Config> = StorageValue<Pallet<T>, (), ValueQuery>;
#[storage_alias]
pub(super) type ParathreadClaimIndex<T: Config> = StorageValue<Pallet<T>, (), ValueQuery>;
#[derive(Clone, Encode, Decode, TypeInfo, RuntimeDebug)]
#[cfg_attr(feature = "std", derive(PartialEq))]
pub enum AssignmentKind {
Parachain,
Parathread(CollatorId, u32),
}
#[derive(Clone, Encode, Decode, TypeInfo, RuntimeDebug)]
#[cfg_attr(feature = "std", derive(PartialEq))]
pub struct CoreAssignment {
pub core: CoreIndex,
pub para_id: ParaId,
pub kind: AssignmentKind,
pub group_idx: GroupIndex,
}
}
mod v1 {
use frame_support::{
pallet_prelude::ValueQuery, storage_alias, traits::UncheckedOnRuntimeUpgrade,
weights::Weight,
};
use frame_system::pallet_prelude::BlockNumberFor;
use super::*;
use crate::scheduler;
#[storage_alias]
pub(super) type ClaimQueue<T: Config> = StorageValue<
Pallet<T>,
BTreeMap<CoreIndex, VecDeque<Option<ParasEntry<BlockNumberFor<T>>>>>,
ValueQuery,
>;
#[storage_alias]
pub(super) type AvailabilityCores<T: Config> =
StorageValue<Pallet<T>, Vec<CoreOccupied<BlockNumberFor<T>>>, ValueQuery>;
#[derive(Encode, Decode, TypeInfo, RuntimeDebug, PartialEq)]
pub(super) enum CoreOccupied<N> {
Free,
Paras(ParasEntry<N>),
}
#[derive(Encode, Decode, TypeInfo, RuntimeDebug, PartialEq)]
pub(super) struct ParasEntry<N> {
pub(super) assignment: V0Assignment,
pub(super) availability_timeouts: u32,
pub(super) ttl: N,
}
impl<N> ParasEntry<N> {
pub(super) fn new(assignment: V0Assignment, now: N) -> Self {
ParasEntry { assignment, availability_timeouts: 0, ttl: now }
}
pub(super) fn para_id(&self) -> ParaId {
self.assignment.para_id
}
}
fn add_to_claimqueue<T: Config>(core_idx: CoreIndex, pe: ParasEntry<BlockNumberFor<T>>) {
ClaimQueue::<T>::mutate(|la| {
la.entry(core_idx).or_default().push_back(Some(pe));
});
}
pub struct UncheckedMigrateToV1<T>(core::marker::PhantomData<T>);
impl<T: Config> UncheckedOnRuntimeUpgrade for UncheckedMigrateToV1<T> {
fn on_runtime_upgrade() -> Weight {
let mut weight: Weight = Weight::zero();
v0::ParathreadQueue::<T>::kill();
v0::ParathreadClaimIndex::<T>::kill();
let now = frame_system::Pallet::<T>::block_number();
let scheduled = v0::Scheduled::<T>::take();
let sched_len = scheduled.len() as u64;
for core_assignment in scheduled {
let core_idx = core_assignment.core;
let assignment = V0Assignment { para_id: core_assignment.para_id };
let pe = v1::ParasEntry::new(assignment, now);
v1::add_to_claimqueue::<T>(core_idx, pe);
}
let parachains = paras::Parachains::<T>::get();
let availability_cores = v0::AvailabilityCores::<T>::take();
let mut new_availability_cores = Vec::new();
for (core_index, core) in availability_cores.into_iter().enumerate() {
let new_core = if let Some(core) = core {
match core {
v0::CoreOccupied::Parachain =>
v1::CoreOccupied::Paras(v1::ParasEntry::new(
V0Assignment { para_id: parachains[core_index] },
now,
)),
v0::CoreOccupied::Parathread(entry) => v1::CoreOccupied::Paras(
v1::ParasEntry::new(V0Assignment { para_id: entry.claim.0 }, now),
),
}
} else {
v1::CoreOccupied::Free
};
new_availability_cores.push(new_core);
}
v1::AvailabilityCores::<T>::set(new_availability_cores);
weight.saturating_accrue(T::DbWeight::get().reads_writes(2 * sched_len, 2 * sched_len));
weight.saturating_accrue(T::DbWeight::get().reads_writes(2, 1));
weight.saturating_accrue(T::DbWeight::get().writes(2));
log::info!(target: scheduler::LOG_TARGET, "Migrated para scheduler storage to v1");
weight
}
#[cfg(feature = "try-runtime")]
fn pre_upgrade() -> Result<Vec<u8>, sp_runtime::DispatchError> {
let n: u32 = v0::Scheduled::<T>::get().len() as u32 +
v0::AvailabilityCores::<T>::get().iter().filter(|c| c.is_some()).count() as u32;
log::info!(
target: crate::scheduler::LOG_TARGET,
"Number of scheduled and waiting for availability before: {n}",
);
Ok(n.encode())
}
#[cfg(feature = "try-runtime")]
fn post_upgrade(state: Vec<u8>) -> Result<(), sp_runtime::DispatchError> {
log::info!(target: crate::scheduler::LOG_TARGET, "Running post_upgrade()");
ensure!(
v0::Scheduled::<T>::get().is_empty(),
"Scheduled should be empty after the migration"
);
let expected_len = u32::decode(&mut &state[..]).unwrap();
let availability_cores_waiting = v1::AvailabilityCores::<T>::get()
.into_iter()
.filter(|c| !matches!(c, v1::CoreOccupied::Free))
.count();
ensure!(
Pallet::<T>::claim_queue_len() as u32 + availability_cores_waiting as u32 ==
expected_len,
"ClaimQueue and AvailabilityCores should have the correct length",
);
Ok(())
}
}
}
pub type MigrateV0ToV1<T> = VersionedMigration<
0,
1,
v1::UncheckedMigrateToV1<T>,
Pallet<T>,
<T as frame_system::Config>::DbWeight,
>;
pub(crate) mod v2 {
use super::*;
use crate::scheduler;
#[derive(Encode, Decode, TypeInfo, RuntimeDebug, PartialEq)]
pub(crate) enum CoreOccupied<N> {
Free,
Paras(ParasEntry<N>),
}
#[derive(Encode, Decode, TypeInfo, RuntimeDebug, PartialEq)]
pub(crate) struct ParasEntry<N> {
pub assignment: Assignment,
pub availability_timeouts: u32,
pub ttl: N,
}
#[storage_alias]
pub(crate) type ClaimQueue<T: Config> = StorageValue<
Pallet<T>,
BTreeMap<CoreIndex, VecDeque<ParasEntry<BlockNumberFor<T>>>>,
ValueQuery,
>;
#[storage_alias]
pub(crate) type AvailabilityCores<T: Config> =
StorageValue<Pallet<T>, Vec<CoreOccupied<BlockNumberFor<T>>>, ValueQuery>;
fn is_bulk<T: Config>(core_index: CoreIndex) -> bool {
core_index.0 < paras::Parachains::<T>::decode_len().unwrap_or(0) as u32
}
pub struct UncheckedMigrateToV2<T>(core::marker::PhantomData<T>);
impl<T: Config> UncheckedOnRuntimeUpgrade for UncheckedMigrateToV2<T> {
fn on_runtime_upgrade() -> Weight {
let mut weight: Weight = Weight::zero();
let old = v1::ClaimQueue::<T>::take();
let new = old
.into_iter()
.map(|(k, v)| {
(
k,
v.into_iter()
.flatten()
.map(|p| {
let assignment = if is_bulk::<T>(k) {
Assignment::Bulk(p.para_id())
} else {
Assignment::Pool { para_id: p.para_id(), core_index: k }
};
ParasEntry {
assignment,
availability_timeouts: p.availability_timeouts,
ttl: p.ttl,
}
})
.collect::<VecDeque<_>>(),
)
})
.collect::<BTreeMap<CoreIndex, VecDeque<ParasEntry<BlockNumberFor<T>>>>>();
ClaimQueue::<T>::put(new);
let old = v1::AvailabilityCores::<T>::get();
let new = old
.into_iter()
.enumerate()
.map(|(k, a)| match a {
v1::CoreOccupied::Free => CoreOccupied::Free,
v1::CoreOccupied::Paras(paras) => {
let assignment = if is_bulk::<T>((k as u32).into()) {
Assignment::Bulk(paras.para_id())
} else {
Assignment::Pool {
para_id: paras.para_id(),
core_index: (k as u32).into(),
}
};
CoreOccupied::Paras(ParasEntry {
assignment,
availability_timeouts: paras.availability_timeouts,
ttl: paras.ttl,
})
},
})
.collect::<Vec<_>>();
AvailabilityCores::<T>::put(new);
weight.saturating_accrue(T::DbWeight::get().reads_writes(1, 1));
log::info!(target: scheduler::LOG_TARGET, "Migrating para scheduler storage to v2");
weight
}
#[cfg(feature = "try-runtime")]
fn pre_upgrade() -> Result<Vec<u8>, sp_runtime::DispatchError> {
log::trace!(
target: crate::scheduler::LOG_TARGET,
"ClaimQueue before migration: {}",
v1::ClaimQueue::<T>::get().len()
);
let bytes = u32::to_be_bytes(v1::ClaimQueue::<T>::get().len() as u32);
Ok(bytes.to_vec())
}
#[cfg(feature = "try-runtime")]
fn post_upgrade(state: Vec<u8>) -> Result<(), sp_runtime::DispatchError> {
log::trace!(target: crate::scheduler::LOG_TARGET, "Running post_upgrade()");
let old_len = u32::from_be_bytes(state.try_into().unwrap());
ensure!(
v2::ClaimQueue::<T>::get().len() as u32 == old_len,
"Old ClaimQueue completely moved to new ClaimQueue after migration"
);
Ok(())
}
}
}
pub type MigrateV1ToV2<T> = VersionedMigration<
1,
2,
v2::UncheckedMigrateToV2<T>,
Pallet<T>,
<T as frame_system::Config>::DbWeight,
>;
mod v3 {
use super::*;
use crate::scheduler;
#[storage_alias]
pub(crate) type ClaimQueue<T: Config> =
StorageValue<Pallet<T>, BTreeMap<CoreIndex, VecDeque<Assignment>>, ValueQuery>;
pub struct UncheckedMigrateToV3<T>(core::marker::PhantomData<T>);
impl<T: Config> UncheckedOnRuntimeUpgrade for UncheckedMigrateToV3<T> {
fn on_runtime_upgrade() -> Weight {
let mut weight: Weight = Weight::zero();
let old = v2::ClaimQueue::<T>::take();
let new = old
.into_iter()
.map(|(k, v)| {
(
k,
v.into_iter()
.map(|paras_entry| paras_entry.assignment)
.collect::<VecDeque<_>>(),
)
})
.collect::<BTreeMap<CoreIndex, VecDeque<Assignment>>>();
v3::ClaimQueue::<T>::put(new);
v2::AvailabilityCores::<T>::kill();
weight.saturating_accrue(T::DbWeight::get().reads_writes(2, 2));
log::info!(target: scheduler::LOG_TARGET, "Migrating para scheduler storage to v3");
weight
}
#[cfg(feature = "try-runtime")]
fn pre_upgrade() -> Result<Vec<u8>, sp_runtime::DispatchError> {
log::trace!(
target: crate::scheduler::LOG_TARGET,
"ClaimQueue before migration: {}",
v2::ClaimQueue::<T>::get().len()
);
let bytes = u32::to_be_bytes(v2::ClaimQueue::<T>::get().len() as u32);
Ok(bytes.to_vec())
}
#[cfg(feature = "try-runtime")]
fn post_upgrade(state: Vec<u8>) -> Result<(), sp_runtime::DispatchError> {
log::trace!(target: crate::scheduler::LOG_TARGET, "Running post_upgrade()");
let old_len = u32::from_be_bytes(state.try_into().unwrap());
ensure!(
v3::ClaimQueue::<T>::get().len() as u32 == old_len,
"Old ClaimQueue completely moved to new ClaimQueue after migration"
);
ensure!(
!v2::AvailabilityCores::<T>::exists(),
"AvailabilityCores storage should have been completely killed"
);
Ok(())
}
}
}
pub type MigrateV2ToV3<T> = VersionedMigration<
2,
3,
v3::UncheckedMigrateToV3<T>,
Pallet<T>,
<T as frame_system::Config>::DbWeight,
>;