use core::iter::Peekable;
use crate::{configuration, initializer::SessionChangeNotification, paras};
use frame_support::{pallet_prelude::*, traits::Defensive};
use frame_system::pallet_prelude::BlockNumberFor;
pub use polkadot_core_primitives::v2::BlockNumber;
use polkadot_primitives::{
CoreIndex, GroupIndex, GroupRotationInfo, Id as ParaId, ScheduledCore, ValidatorIndex,
};
use sp_runtime::traits::One;
use sp_std::{
collections::{
btree_map::{self, BTreeMap},
vec_deque::VecDeque,
},
prelude::*,
};
pub mod common;
use common::{Assignment, AssignmentProvider};
pub use pallet::*;
#[cfg(test)]
mod tests;
const LOG_TARGET: &str = "runtime::parachains::scheduler";
pub mod migration;
#[frame_support::pallet]
pub mod pallet {
use super::*;
const STORAGE_VERSION: StorageVersion = StorageVersion::new(2);
#[pallet::pallet]
#[pallet::without_storage_info]
#[pallet::storage_version(STORAGE_VERSION)]
pub struct Pallet<T>(_);
#[pallet::config]
pub trait Config: frame_system::Config + configuration::Config + paras::Config {
type AssignmentProvider: AssignmentProvider<BlockNumberFor<Self>>;
}
#[pallet::storage]
pub type ValidatorGroups<T> = StorageValue<_, Vec<Vec<ValidatorIndex>>, ValueQuery>;
#[pallet::storage]
pub type AvailabilityCores<T: Config> = StorageValue<_, Vec<CoreOccupiedType<T>>, ValueQuery>;
#[derive(Encode, Decode, TypeInfo, RuntimeDebug, PartialEq)]
pub enum CoreOccupied<N> {
Free,
Paras(ParasEntry<N>),
}
pub type CoreOccupiedType<T> = CoreOccupied<BlockNumberFor<T>>;
impl<N> CoreOccupied<N> {
pub fn is_free(&self) -> bool {
matches!(self, Self::Free)
}
}
#[derive(Clone, Copy)]
pub enum FreedReason {
Concluded,
TimedOut,
}
#[pallet::storage]
pub type SessionStartBlock<T: Config> = StorageValue<_, BlockNumberFor<T>, ValueQuery>;
#[pallet::storage]
pub type ClaimQueue<T: Config> =
StorageValue<_, BTreeMap<CoreIndex, VecDeque<ParasEntryType<T>>>, ValueQuery>;
#[derive(Encode, Decode, TypeInfo, RuntimeDebug, PartialEq, Clone)]
pub struct ParasEntry<N> {
pub assignment: Assignment,
pub availability_timeouts: u32,
pub ttl: N,
}
pub type ParasEntryType<T> = ParasEntry<BlockNumberFor<T>>;
impl<N> ParasEntry<N> {
pub fn new(assignment: Assignment, now: N) -> Self {
ParasEntry { assignment, availability_timeouts: 0, ttl: now }
}
pub fn para_id(&self) -> ParaId {
self.assignment.para_id()
}
}
pub(crate) struct AvailabilityTimeoutStatus<BlockNumber> {
pub timed_out: bool,
pub live_until: BlockNumber,
}
}
type PositionInClaimQueue = u32;
struct ClaimQueueIterator<E> {
next_idx: u32,
queue: Peekable<btree_map::IntoIter<CoreIndex, VecDeque<E>>>,
}
impl<E> Iterator for ClaimQueueIterator<E> {
type Item = (CoreIndex, VecDeque<E>);
fn next(&mut self) -> Option<Self::Item> {
let (idx, _) = self.queue.peek()?;
let val = if idx != &CoreIndex(self.next_idx) {
log::trace!(target: LOG_TARGET, "idx did not match claim queue idx: {:?} vs {:?}", idx, self.next_idx);
(CoreIndex(self.next_idx), VecDeque::new())
} else {
let (idx, q) = self.queue.next()?;
(idx, q)
};
self.next_idx += 1;
Some(val)
}
}
impl<T: Config> Pallet<T> {
pub(crate) fn initializer_initialize(_now: BlockNumberFor<T>) -> Weight {
Weight::zero()
}
pub(crate) fn initializer_finalize() {}
pub(crate) fn pre_new_session() {
Self::push_claim_queue_items_to_assignment_provider();
Self::push_occupied_cores_to_assignment_provider();
}
pub(crate) fn initializer_on_new_session(
notification: &SessionChangeNotification<BlockNumberFor<T>>,
) {
let SessionChangeNotification { validators, new_config, .. } = notification;
let config = new_config;
let n_cores = core::cmp::max(
T::AssignmentProvider::session_core_count(),
match config.scheduler_params.max_validators_per_core {
Some(x) if x != 0 => validators.len() as u32 / x,
_ => 0,
},
);
AvailabilityCores::<T>::mutate(|cores| {
cores.resize_with(n_cores as _, || CoreOccupied::Free);
});
if n_cores == 0 || validators.is_empty() {
ValidatorGroups::<T>::set(Vec::new());
} else {
let group_base_size = validators
.len()
.checked_div(n_cores as usize)
.defensive_proof("n_cores should not be 0")
.unwrap_or(0);
let n_larger_groups = validators
.len()
.checked_rem(n_cores as usize)
.defensive_proof("n_cores should not be 0")
.unwrap_or(0);
let mut groups: Vec<Vec<ValidatorIndex>> = Vec::new();
for i in 0..n_larger_groups {
let offset = (group_base_size + 1) * i;
groups.push(
(0..group_base_size + 1)
.map(|j| offset + j)
.map(|j| ValidatorIndex(j as _))
.collect(),
);
}
for i in 0..(n_cores as usize - n_larger_groups) {
let offset = (n_larger_groups * (group_base_size + 1)) + (i * group_base_size);
groups.push(
(0..group_base_size)
.map(|j| offset + j)
.map(|j| ValidatorIndex(j as _))
.collect(),
);
}
ValidatorGroups::<T>::set(groups);
}
let now = frame_system::Pallet::<T>::block_number() + One::one();
SessionStartBlock::<T>::set(now);
}
fn free_cores(
just_freed_cores: impl IntoIterator<Item = (CoreIndex, FreedReason)>,
) -> (BTreeMap<CoreIndex, Assignment>, BTreeMap<CoreIndex, ParasEntryType<T>>) {
let mut timedout_paras: BTreeMap<CoreIndex, ParasEntryType<T>> = BTreeMap::new();
let mut concluded_paras = BTreeMap::new();
AvailabilityCores::<T>::mutate(|cores| {
let c_len = cores.len();
just_freed_cores
.into_iter()
.filter(|(freed_index, _)| (freed_index.0 as usize) < c_len)
.for_each(|(freed_index, freed_reason)| {
match sp_std::mem::replace(
&mut cores[freed_index.0 as usize],
CoreOccupied::Free,
) {
CoreOccupied::Free => {},
CoreOccupied::Paras(entry) => {
match freed_reason {
FreedReason::Concluded => {
concluded_paras.insert(freed_index, entry.assignment);
},
FreedReason::TimedOut => {
timedout_paras.insert(freed_index, entry);
},
};
},
};
})
});
(concluded_paras, timedout_paras)
}
fn claim_queue_iterator() -> impl Iterator<Item = (CoreIndex, VecDeque<ParasEntryType<T>>)> {
let queues = ClaimQueue::<T>::get();
return ClaimQueueIterator::<ParasEntryType<T>> {
next_idx: 0,
queue: queues.into_iter().peekable(),
}
}
pub(crate) fn occupied(
now_occupied: BTreeMap<CoreIndex, ParaId>,
) -> BTreeMap<CoreIndex, PositionInClaimQueue> {
let mut availability_cores = AvailabilityCores::<T>::get();
log::debug!(target: LOG_TARGET, "[occupied] now_occupied {:?}", now_occupied);
let pos_mapping: BTreeMap<CoreIndex, PositionInClaimQueue> = now_occupied
.iter()
.flat_map(|(core_idx, para_id)| {
match Self::remove_from_claim_queue(*core_idx, *para_id) {
Err(e) => {
log::debug!(
target: LOG_TARGET,
"[occupied] error on remove_from_claim queue {}",
e
);
None
},
Ok((pos_in_claim_queue, pe)) => {
availability_cores[core_idx.0 as usize] = CoreOccupied::Paras(pe);
Some((*core_idx, pos_in_claim_queue))
},
}
})
.collect();
Self::drop_expired_claims_from_claim_queue();
AvailabilityCores::<T>::set(availability_cores);
pos_mapping
}
fn drop_expired_claims_from_claim_queue() {
let now = frame_system::Pallet::<T>::block_number();
let availability_cores = AvailabilityCores::<T>::get();
let ttl = configuration::ActiveConfig::<T>::get().scheduler_params.ttl;
ClaimQueue::<T>::mutate(|cq| {
for (idx, _) in (0u32..).zip(availability_cores) {
let core_idx = CoreIndex(idx);
if let Some(core_claim_queue) = cq.get_mut(&core_idx) {
let mut i = 0;
let mut num_dropped = 0;
while i < core_claim_queue.len() {
let maybe_dropped = if let Some(entry) = core_claim_queue.get(i) {
if entry.ttl < now {
core_claim_queue.remove(i)
} else {
None
}
} else {
None
};
if let Some(dropped) = maybe_dropped {
num_dropped += 1;
T::AssignmentProvider::report_processed(dropped.assignment);
} else {
i += 1;
}
}
for _ in 0..num_dropped {
if let Some(assignment) =
T::AssignmentProvider::pop_assignment_for_core(core_idx)
{
core_claim_queue.push_back(ParasEntry::new(assignment, now + ttl));
}
}
}
}
});
}
pub(crate) fn group_validators(group_index: GroupIndex) -> Option<Vec<ValidatorIndex>> {
ValidatorGroups::<T>::get().get(group_index.0 as usize).map(|g| g.clone())
}
pub(crate) fn group_assigned_to_core(
core: CoreIndex,
at: BlockNumberFor<T>,
) -> Option<GroupIndex> {
let config = configuration::ActiveConfig::<T>::get();
let session_start_block = SessionStartBlock::<T>::get();
if at < session_start_block {
return None
}
let validator_groups = ValidatorGroups::<T>::get();
if core.0 as usize >= validator_groups.len() {
return None
}
let rotations_since_session_start: BlockNumberFor<T> =
(at - session_start_block) / config.scheduler_params.group_rotation_frequency;
let rotations_since_session_start =
<BlockNumberFor<T> as TryInto<u32>>::try_into(rotations_since_session_start)
.unwrap_or(0);
let group_idx =
(core.0 as usize + rotations_since_session_start as usize) % validator_groups.len();
Some(GroupIndex(group_idx as u32))
}
pub(crate) fn availability_timeout_predicate(
) -> impl Fn(BlockNumberFor<T>) -> AvailabilityTimeoutStatus<BlockNumberFor<T>> {
let config = configuration::ActiveConfig::<T>::get();
let now = frame_system::Pallet::<T>::block_number();
let rotation_info = Self::group_rotation_info(now);
let next_rotation = rotation_info.next_rotation_at();
let times_out = Self::availability_timeout_check_required();
move |pending_since| {
let time_out_at = if times_out {
pending_since + config.scheduler_params.paras_availability_period
} else {
next_rotation + config.scheduler_params.paras_availability_period
};
AvailabilityTimeoutStatus { timed_out: time_out_at <= now, live_until: time_out_at }
}
}
pub(crate) fn availability_timeout_check_required() -> bool {
let config = configuration::ActiveConfig::<T>::get();
let now = frame_system::Pallet::<T>::block_number() + One::one();
let rotation_info = Self::group_rotation_info(now);
let current_window =
rotation_info.last_rotation_at() + config.scheduler_params.paras_availability_period;
now < current_window
}
pub(crate) fn group_rotation_info(
now: BlockNumberFor<T>,
) -> GroupRotationInfo<BlockNumberFor<T>> {
let session_start_block = SessionStartBlock::<T>::get();
let group_rotation_frequency = configuration::ActiveConfig::<T>::get()
.scheduler_params
.group_rotation_frequency;
GroupRotationInfo { session_start_block, now, group_rotation_frequency }
}
pub(crate) fn next_up_on_available(core: CoreIndex) -> Option<ScheduledCore> {
ClaimQueue::<T>::get()
.get(&core)
.and_then(|a| a.front().map(|pe| Self::paras_entry_to_scheduled_core(pe)))
}
fn paras_entry_to_scheduled_core(pe: &ParasEntryType<T>) -> ScheduledCore {
ScheduledCore { para_id: pe.para_id(), collator: None }
}
pub(crate) fn next_up_on_time_out(core: CoreIndex) -> Option<ScheduledCore> {
let max_availability_timeouts = configuration::ActiveConfig::<T>::get()
.scheduler_params
.max_availability_timeouts;
Self::next_up_on_available(core).or_else(|| {
let cores = AvailabilityCores::<T>::get();
cores.get(core.0 as usize).and_then(|c| match c {
CoreOccupied::Free => None,
CoreOccupied::Paras(pe) =>
if pe.availability_timeouts < max_availability_timeouts {
Some(Self::paras_entry_to_scheduled_core(pe))
} else {
None
},
})
})
}
fn push_occupied_cores_to_assignment_provider() {
AvailabilityCores::<T>::mutate(|cores| {
for core in cores.iter_mut() {
match sp_std::mem::replace(core, CoreOccupied::Free) {
CoreOccupied::Free => continue,
CoreOccupied::Paras(entry) => {
Self::maybe_push_assignment(entry);
},
}
}
});
}
fn push_claim_queue_items_to_assignment_provider() {
for (_, claim_queue) in ClaimQueue::<T>::take() {
for para_entry in claim_queue.into_iter().rev() {
Self::maybe_push_assignment(para_entry);
}
}
}
fn maybe_push_assignment(pe: ParasEntryType<T>) {
if pe.availability_timeouts == 0 {
T::AssignmentProvider::push_back_assignment(pe.assignment);
}
}
pub fn free_cores_and_fill_claim_queue(
just_freed_cores: impl IntoIterator<Item = (CoreIndex, FreedReason)>,
now: BlockNumberFor<T>,
) {
let (mut concluded_paras, mut timedout_paras) = Self::free_cores(just_freed_cores);
if ValidatorGroups::<T>::decode_len().map_or(true, |l| l == 0) {
return
}
let n_session_cores = T::AssignmentProvider::session_core_count();
let cq = ClaimQueue::<T>::get();
let config = configuration::ActiveConfig::<T>::get();
let n_lookahead = config.scheduler_params.lookahead.max(1);
let max_availability_timeouts = config.scheduler_params.max_availability_timeouts;
let ttl = config.scheduler_params.ttl;
for core_idx in 0..n_session_cores {
let core_idx = CoreIndex::from(core_idx);
let n_lookahead_used = cq.get(&core_idx).map_or(0, |v| v.len() as u32);
if let Some(mut entry) = timedout_paras.remove(&core_idx) {
if entry.availability_timeouts < max_availability_timeouts {
entry.availability_timeouts += 1;
if n_lookahead_used < n_lookahead {
entry.ttl = now + ttl;
} else {
entry.ttl = now + ttl + One::one();
}
Self::add_to_claim_queue(core_idx, entry);
continue
} else {
let ret = concluded_paras.insert(core_idx, entry.assignment);
debug_assert!(ret.is_none());
}
}
if let Some(concluded_para) = concluded_paras.remove(&core_idx) {
T::AssignmentProvider::report_processed(concluded_para);
}
for _ in n_lookahead_used..n_lookahead {
if let Some(assignment) = T::AssignmentProvider::pop_assignment_for_core(core_idx) {
Self::add_to_claim_queue(core_idx, ParasEntry::new(assignment, now + ttl));
}
}
}
debug_assert!(timedout_paras.is_empty());
debug_assert!(concluded_paras.is_empty());
}
fn add_to_claim_queue(core_idx: CoreIndex, pe: ParasEntryType<T>) {
ClaimQueue::<T>::mutate(|la| {
la.entry(core_idx).or_default().push_back(pe);
});
}
fn remove_from_claim_queue(
core_idx: CoreIndex,
para_id: ParaId,
) -> Result<(PositionInClaimQueue, ParasEntryType<T>), &'static str> {
ClaimQueue::<T>::mutate(|cq| {
let core_claims = cq.get_mut(&core_idx).ok_or("core_idx not found in lookahead")?;
let pos = core_claims
.iter()
.position(|pe| pe.para_id() == para_id)
.ok_or("para id not found at core_idx lookahead")?;
let pe = core_claims.remove(pos).ok_or("remove returned None")?;
Ok((pos as u32, pe))
})
}
pub(crate) fn scheduled_paras() -> impl Iterator<Item = (CoreIndex, ParaId)> {
let claim_queue = ClaimQueue::<T>::get();
claim_queue
.into_iter()
.filter_map(|(core_idx, v)| v.front().map(|e| (core_idx, e.assignment.para_id())))
}
pub(crate) fn eligible_paras() -> impl Iterator<Item = (CoreIndex, ParaId)> {
let availability_cores = AvailabilityCores::<T>::get();
Self::claim_queue_iterator().zip(availability_cores.into_iter()).filter_map(
|((core_idx, queue), core)| {
if core != CoreOccupied::Free {
return None
}
let next_scheduled = queue.front()?;
Some((core_idx, next_scheduled.assignment.para_id()))
},
)
}
#[cfg(any(feature = "try-runtime", test))]
fn claim_queue_len() -> usize {
ClaimQueue::<T>::get().iter().map(|la_vec| la_vec.1.len()).sum()
}
#[cfg(all(not(feature = "runtime-benchmarks"), test))]
pub(crate) fn claim_queue_is_empty() -> bool {
Self::claim_queue_len() == 0
}
#[cfg(test)]
pub(crate) fn set_validator_groups(validator_groups: Vec<Vec<ValidatorIndex>>) {
ValidatorGroups::<T>::set(validator_groups);
}
#[cfg(test)]
pub(crate) fn set_claim_queue(claim_queue: BTreeMap<CoreIndex, VecDeque<ParasEntryType<T>>>) {
ClaimQueue::<T>::set(claim_queue);
}
}