use crate::{configuration, initializer::SessionChangeNotification, paras};
use alloc::{
collections::{btree_map::BTreeMap, btree_set::BTreeSet, vec_deque::VecDeque},
vec::Vec,
};
use frame_support::{pallet_prelude::*, traits::Defensive};
use frame_system::pallet_prelude::BlockNumberFor;
pub use polkadot_core_primitives::v2::BlockNumber;
use polkadot_primitives::{
CoreIndex, GroupIndex, GroupRotationInfo, Id as ParaId, ScheduledCore, SchedulerParams,
ValidatorIndex,
};
use sp_runtime::traits::One;
pub mod common;
use common::{Assignment, AssignmentProvider};
pub use pallet::*;
#[cfg(test)]
mod tests;
const LOG_TARGET: &str = "runtime::parachains::scheduler";
pub mod migration;
#[frame_support::pallet]
pub mod pallet {
use super::*;
const STORAGE_VERSION: StorageVersion = StorageVersion::new(3);
#[pallet::pallet]
#[pallet::without_storage_info]
#[pallet::storage_version(STORAGE_VERSION)]
pub struct Pallet<T>(_);
#[pallet::config]
pub trait Config: frame_system::Config + configuration::Config + paras::Config {
type AssignmentProvider: AssignmentProvider<BlockNumberFor<Self>>;
}
#[pallet::storage]
pub type ValidatorGroups<T> = StorageValue<_, Vec<Vec<ValidatorIndex>>, ValueQuery>;
#[pallet::storage]
pub type SessionStartBlock<T: Config> = StorageValue<_, BlockNumberFor<T>, ValueQuery>;
#[pallet::storage]
pub type ClaimQueue<T> = StorageValue<_, BTreeMap<CoreIndex, VecDeque<Assignment>>, ValueQuery>;
pub(crate) struct AvailabilityTimeoutStatus<BlockNumber> {
pub timed_out: bool,
pub live_until: BlockNumber,
}
}
impl<T: Config> Pallet<T> {
pub(crate) fn initializer_initialize(_now: BlockNumberFor<T>) -> Weight {
Weight::zero()
}
pub(crate) fn initializer_finalize() {}
pub(crate) fn initializer_on_new_session(
notification: &SessionChangeNotification<BlockNumberFor<T>>,
) {
let SessionChangeNotification { validators, new_config, .. } = notification;
let config = new_config;
let assigner_cores = config.scheduler_params.num_cores;
let n_cores = core::cmp::max(
assigner_cores,
match config.scheduler_params.max_validators_per_core {
Some(x) if x != 0 => validators.len() as u32 / x,
_ => 0,
},
);
if n_cores == 0 || validators.is_empty() {
ValidatorGroups::<T>::set(Vec::new());
} else {
let group_base_size = validators
.len()
.checked_div(n_cores as usize)
.defensive_proof("n_cores should not be 0")
.unwrap_or(0);
let n_larger_groups = validators
.len()
.checked_rem(n_cores as usize)
.defensive_proof("n_cores should not be 0")
.unwrap_or(0);
let mut groups: Vec<Vec<ValidatorIndex>> = Vec::new();
for i in 0..n_larger_groups {
let offset = (group_base_size + 1) * i;
groups.push(
(0..group_base_size + 1)
.map(|j| offset + j)
.map(|j| ValidatorIndex(j as _))
.collect(),
);
}
for i in 0..(n_cores as usize - n_larger_groups) {
let offset = (n_larger_groups * (group_base_size + 1)) + (i * group_base_size);
groups.push(
(0..group_base_size)
.map(|j| offset + j)
.map(|j| ValidatorIndex(j as _))
.collect(),
);
}
ValidatorGroups::<T>::set(groups);
}
Self::maybe_resize_claim_queue();
Self::populate_claim_queue_after_session_change();
let now = frame_system::Pallet::<T>::block_number() + One::one();
SessionStartBlock::<T>::set(now);
}
pub(crate) fn group_validators(group_index: GroupIndex) -> Option<Vec<ValidatorIndex>> {
ValidatorGroups::<T>::get().get(group_index.0 as usize).map(|g| g.clone())
}
pub(crate) fn num_availability_cores() -> usize {
ValidatorGroups::<T>::decode_len().unwrap_or(0)
}
fn expected_claim_queue_len(config: &SchedulerParams<BlockNumberFor<T>>) -> u32 {
core::cmp::min(config.num_cores, Self::num_availability_cores() as u32)
}
pub(crate) fn group_assigned_to_core(
core: CoreIndex,
at: BlockNumberFor<T>,
) -> Option<GroupIndex> {
let config = configuration::ActiveConfig::<T>::get();
let session_start_block = SessionStartBlock::<T>::get();
if at < session_start_block {
return None
}
let validator_groups = ValidatorGroups::<T>::get();
if core.0 as usize >= validator_groups.len() {
return None
}
let rotations_since_session_start: BlockNumberFor<T> =
(at - session_start_block) / config.scheduler_params.group_rotation_frequency;
let rotations_since_session_start =
<BlockNumberFor<T> as TryInto<u32>>::try_into(rotations_since_session_start)
.unwrap_or(0);
let group_idx =
(core.0 as usize + rotations_since_session_start as usize) % validator_groups.len();
Some(GroupIndex(group_idx as u32))
}
pub(crate) fn availability_timeout_predicate(
) -> impl Fn(BlockNumberFor<T>) -> AvailabilityTimeoutStatus<BlockNumberFor<T>> {
let config = configuration::ActiveConfig::<T>::get();
let now = frame_system::Pallet::<T>::block_number();
let rotation_info = Self::group_rotation_info(now);
let next_rotation = rotation_info.next_rotation_at();
let times_out = Self::availability_timeout_check_required();
move |pending_since| {
let time_out_at = if times_out {
pending_since + config.scheduler_params.paras_availability_period
} else {
next_rotation + config.scheduler_params.paras_availability_period
};
AvailabilityTimeoutStatus { timed_out: time_out_at <= now, live_until: time_out_at }
}
}
pub(crate) fn availability_timeout_check_required() -> bool {
let config = configuration::ActiveConfig::<T>::get();
let now = frame_system::Pallet::<T>::block_number() + One::one();
let rotation_info = Self::group_rotation_info(now);
let current_window =
rotation_info.last_rotation_at() + config.scheduler_params.paras_availability_period;
now < current_window
}
pub(crate) fn group_rotation_info(
now: BlockNumberFor<T>,
) -> GroupRotationInfo<BlockNumberFor<T>> {
let session_start_block = SessionStartBlock::<T>::get();
let group_rotation_frequency = configuration::ActiveConfig::<T>::get()
.scheduler_params
.group_rotation_frequency;
GroupRotationInfo { session_start_block, now, group_rotation_frequency }
}
pub(crate) fn next_up_on_available(core: CoreIndex) -> Option<ScheduledCore> {
if Self::on_chain_storage_version() == StorageVersion::new(2) {
migration::v2::ClaimQueue::<T>::get()
.get(&core)
.and_then(|a| a.front().map(|entry| entry.assignment.para_id()))
} else {
ClaimQueue::<T>::get()
.get(&core)
.and_then(|a| a.front().map(|assignment| assignment.para_id()))
}
.map(|para_id| ScheduledCore { para_id, collator: None })
}
pub(crate) fn get_claim_queue() -> BTreeMap<CoreIndex, VecDeque<Assignment>> {
if Self::on_chain_storage_version() == StorageVersion::new(2) {
migration::v2::ClaimQueue::<T>::get()
.into_iter()
.map(|(core_index, entries)| {
(core_index, entries.into_iter().map(|e| e.assignment).collect())
})
.collect()
} else {
ClaimQueue::<T>::get()
}
}
pub(crate) fn advance_claim_queue(except_for: &BTreeSet<CoreIndex>) {
let config = configuration::ActiveConfig::<T>::get();
let expected_claim_queue_len = Self::expected_claim_queue_len(&config.scheduler_params);
let n_lookahead = config.scheduler_params.lookahead.max(1);
for core_idx in 0..expected_claim_queue_len {
let core_idx = CoreIndex::from(core_idx);
if !except_for.contains(&core_idx) {
let core_idx = CoreIndex::from(core_idx);
if let Some(dropped_para) = Self::pop_front_of_claim_queue(&core_idx) {
T::AssignmentProvider::report_processed(dropped_para);
}
Self::fill_claim_queue(core_idx, n_lookahead);
}
}
}
fn maybe_resize_claim_queue() {
let cq = ClaimQueue::<T>::get();
let Some((old_max_core, _)) = cq.last_key_value() else { return };
let config = configuration::ActiveConfig::<T>::get();
let new_core_count = Self::expected_claim_queue_len(&config.scheduler_params);
if new_core_count < (old_max_core.0 + 1) {
ClaimQueue::<T>::mutate(|cq| {
let to_remove: Vec<_> =
cq.range(CoreIndex(new_core_count)..=*old_max_core).map(|(k, _)| *k).collect();
for key in to_remove {
if let Some(dropped_assignments) = cq.remove(&key) {
Self::push_back_to_assignment_provider(dropped_assignments.into_iter());
}
}
});
}
}
fn populate_claim_queue_after_session_change() {
let config = configuration::ActiveConfig::<T>::get();
let n_lookahead = config.scheduler_params.lookahead.max(1);
let expected_claim_queue_len = Self::expected_claim_queue_len(&config.scheduler_params);
for core_idx in 0..expected_claim_queue_len {
let core_idx = CoreIndex::from(core_idx);
Self::fill_claim_queue(core_idx, n_lookahead);
}
}
fn push_back_to_assignment_provider(
assignments: impl core::iter::DoubleEndedIterator<Item = Assignment>,
) {
for assignment in assignments.rev() {
T::AssignmentProvider::push_back_assignment(assignment);
}
}
fn fill_claim_queue(core_idx: CoreIndex, n_lookahead: u32) {
ClaimQueue::<T>::mutate(|la| {
let cq = la.entry(core_idx).or_default();
let mut n_lookahead_used = cq.len() as u32;
if n_lookahead_used == 0 && n_lookahead > 1 {
if let Some(assignment) = T::AssignmentProvider::pop_assignment_for_core(core_idx) {
T::AssignmentProvider::assignment_duplicated(&assignment);
cq.push_back(assignment.clone());
cq.push_back(assignment);
n_lookahead_used += 2;
}
}
for _ in n_lookahead_used..n_lookahead {
if let Some(assignment) = T::AssignmentProvider::pop_assignment_for_core(core_idx) {
cq.push_back(assignment);
} else {
break
}
}
if cq.is_empty() {
la.remove(&core_idx);
}
});
}
fn pop_front_of_claim_queue(core_idx: &CoreIndex) -> Option<Assignment> {
ClaimQueue::<T>::mutate(|cq| cq.get_mut(core_idx)?.pop_front())
}
#[cfg(any(feature = "try-runtime", test))]
fn claim_queue_len() -> usize {
ClaimQueue::<T>::get().iter().map(|la_vec| la_vec.1.len()).sum()
}
#[cfg(all(not(feature = "runtime-benchmarks"), test))]
pub(crate) fn claim_queue_is_empty() -> bool {
Self::claim_queue_len() == 0
}
#[cfg(test)]
pub(crate) fn set_validator_groups(validator_groups: Vec<Vec<ValidatorIndex>>) {
ValidatorGroups::<T>::set(validator_groups);
}
#[cfg(test)]
pub(crate) fn set_claim_queue(claim_queue: BTreeMap<CoreIndex, VecDeque<Assignment>>) {
ClaimQueue::<T>::set(claim_queue);
}
}