polkadot_runtime_parachains/
scheduler.rs1use crate::{configuration, initializer::SessionChangeNotification, paras::AssignCoretime};
40use alloc::{
41 collections::{btree_map::BTreeMap, vec_deque::VecDeque},
42 vec,
43 vec::Vec,
44};
45use frame_support::{pallet_prelude::*, traits::Defensive};
46use frame_system::pallet_prelude::BlockNumberFor;
47use polkadot_primitives::{CoreIndex, GroupIndex, GroupRotationInfo, Id as ParaId, ValidatorIndex};
48use sp_runtime::traits::{One, Saturating};
49
50const LOG_TARGET: &str = "runtime::parachains::scheduler";
51
52pub use assigner_coretime::{CoreAssignment, PartsOf57600};
53pub use pallet::*;
54pub use polkadot_core_primitives::v2::BlockNumber;
55
56#[cfg(test)]
57mod tests;
58
59mod assigner_coretime;
63
64pub mod migration;
66
67#[frame_support::pallet]
68pub mod pallet {
69
70 use crate::on_demand;
71
72 use super::*;
73
74 const STORAGE_VERSION: StorageVersion = StorageVersion::new(4);
75
76 #[pallet::pallet]
77 #[pallet::without_storage_info]
78 #[pallet::storage_version(STORAGE_VERSION)]
79 pub struct Pallet<T>(_);
80
81 #[pallet::config]
82 pub trait Config: frame_system::Config + configuration::Config + on_demand::Config {}
83
84 #[pallet::error]
85 pub enum Error<T> {
86 AssignmentsEmpty,
88 DisallowedInsert,
90 }
91
92 impl<T> From<assigner_coretime::Error> for Error<T> {
93 fn from(e: assigner_coretime::Error) -> Self {
94 match e {
95 assigner_coretime::Error::AssignmentsEmpty => Error::AssignmentsEmpty,
96 assigner_coretime::Error::DisallowedInsert => Error::DisallowedInsert,
97 }
98 }
99 }
100
101 #[pallet::storage]
109 pub type ValidatorGroups<T> = StorageValue<_, Vec<Vec<ValidatorIndex>>, ValueQuery>;
110
111 #[pallet::storage]
119 pub type SessionStartBlock<T: Config> = StorageValue<_, BlockNumberFor<T>, ValueQuery>;
120
121 #[pallet::storage]
128 pub(super) type CoreSchedules<T: Config> = StorageMap<
129 _,
130 Twox64Concat,
131 (BlockNumberFor<T>, CoreIndex),
132 assigner_coretime::Schedule<BlockNumberFor<T>>,
133 OptionQuery,
134 >;
135
136 #[pallet::storage]
142 pub(super) type CoreDescriptors<T: Config> = StorageValue<
143 _,
144 BTreeMap<CoreIndex, assigner_coretime::CoreDescriptor<BlockNumberFor<T>>>,
145 ValueQuery,
146 >;
147
148 pub(crate) struct AvailabilityTimeoutStatus<BlockNumber> {
150 pub timed_out: bool,
154
155 pub live_until: BlockNumber,
160 }
161}
162
163impl<T: Config> AssignCoretime for Pallet<T> {
164 fn assign_coretime(id: ParaId) -> DispatchResult {
166 let current_block = frame_system::Pallet::<T>::block_number();
167
168 let mut config = configuration::ActiveConfig::<T>::get();
170 let core = config.scheduler_params.num_cores;
171 config.scheduler_params.num_cores.saturating_inc();
172
173 configuration::Pallet::<T>::force_set_active_config(config);
176
177 let begin = current_block + One::one();
178 let assignment = vec![(pallet_broker::CoreAssignment::Task(id.into()), PartsOf57600::FULL)];
179 assigner_coretime::assign_core::<T>(CoreIndex(core), begin, assignment, None)
180 .map_err(Error::<T>::from)?;
181 Ok(())
182 }
183}
184
185impl<T: Config> Pallet<T> {
186 pub(crate) fn assign_core(
188 core: CoreIndex,
189 begin: BlockNumberFor<T>,
190 assignment: Vec<(CoreAssignment, PartsOf57600)>,
191 end_hint: Option<BlockNumberFor<T>>,
192 ) -> DispatchResult {
193 assigner_coretime::assign_core::<T>(core, begin, assignment, end_hint)
194 .map_err(Error::<T>::from)?;
195 Ok(())
196 }
197
198 pub(crate) fn advance_claim_queue<F: Fn(CoreIndex) -> bool>(
206 is_blocked: F,
207 ) -> BTreeMap<CoreIndex, ParaId> {
208 let mut assignments = assigner_coretime::advance_assignments::<T, F>(is_blocked);
209 assignments.split_off(&CoreIndex(Self::num_availability_cores() as _));
210 assignments
211 }
212
213 pub(crate) fn claim_queue() -> BTreeMap<CoreIndex, VecDeque<ParaId>> {
217 if Self::on_chain_storage_version() == StorageVersion::new(3) {
219 return migration::v3::ClaimQueue::<T>::get()
220 .into_iter()
221 .map(|(core_index, paras)| {
222 (core_index, paras.into_iter().map(|e| e.para_id()).collect())
223 })
224 .collect();
225 }
226
227 let config = configuration::ActiveConfig::<T>::get();
228 let lookahead = config.scheduler_params.lookahead;
229 let mut queue = assigner_coretime::peek_next_block::<T>(lookahead);
230 queue.split_off(&CoreIndex(Self::num_availability_cores() as _));
231 queue
232 }
233
234 pub(crate) fn initializer_initialize(_now: BlockNumberFor<T>) -> Weight {
236 Weight::zero()
237 }
238
239 pub(crate) fn initializer_finalize() {}
241
242 pub(crate) fn initializer_on_new_session(
244 notification: &SessionChangeNotification<BlockNumberFor<T>>,
245 ) {
246 let SessionChangeNotification { validators, new_config, .. } = notification;
247 let config = new_config;
248 let assigner_cores = config.scheduler_params.num_cores;
249
250 let n_cores = core::cmp::max(
251 assigner_cores,
252 match config.scheduler_params.max_validators_per_core {
253 Some(x) if x != 0 => validators.len() as u32 / x,
254 _ => 0,
255 },
256 );
257
258 if n_cores == 0 || validators.is_empty() {
260 ValidatorGroups::<T>::set(Vec::new());
261 } else {
262 let group_base_size = validators
263 .len()
264 .checked_div(n_cores as usize)
265 .defensive_proof("n_cores should not be 0")
266 .unwrap_or(0);
267 let n_larger_groups = validators
268 .len()
269 .checked_rem(n_cores as usize)
270 .defensive_proof("n_cores should not be 0")
271 .unwrap_or(0);
272
273 let mut groups: Vec<Vec<ValidatorIndex>> = Vec::new();
277 for i in 0..n_larger_groups {
278 let offset = (group_base_size + 1) * i;
279 groups.push(
280 (0..group_base_size + 1)
281 .map(|j| offset + j)
282 .map(|j| ValidatorIndex(j as _))
283 .collect(),
284 );
285 }
286
287 for i in 0..(n_cores as usize - n_larger_groups) {
288 let offset = (n_larger_groups * (group_base_size + 1)) + (i * group_base_size);
289 groups.push(
290 (0..group_base_size)
291 .map(|j| offset + j)
292 .map(|j| ValidatorIndex(j as _))
293 .collect(),
294 );
295 }
296
297 ValidatorGroups::<T>::set(groups);
298 }
299 let now = frame_system::Pallet::<T>::block_number() + One::one();
300 SessionStartBlock::<T>::set(now);
301 }
302
303 pub(crate) fn group_validators(group_index: GroupIndex) -> Option<Vec<ValidatorIndex>> {
305 ValidatorGroups::<T>::get().get(group_index.0 as usize).map(|g| g.clone())
306 }
307
308 pub(crate) fn num_availability_cores() -> usize {
310 ValidatorGroups::<T>::decode_len().unwrap_or(0)
311 }
312
313 pub(crate) fn group_assigned_to_core(
317 core: CoreIndex,
318 at: BlockNumberFor<T>,
319 ) -> Option<GroupIndex> {
320 let config = configuration::ActiveConfig::<T>::get();
321 let session_start_block = SessionStartBlock::<T>::get();
322
323 if at < session_start_block {
324 return None;
325 }
326
327 let validator_groups = ValidatorGroups::<T>::get();
328
329 if core.0 as usize >= validator_groups.len() {
330 return None;
331 }
332
333 let rotations_since_session_start: BlockNumberFor<T> =
334 (at - session_start_block) / config.scheduler_params.group_rotation_frequency;
335
336 let rotations_since_session_start =
337 <BlockNumberFor<T> as TryInto<u32>>::try_into(rotations_since_session_start)
338 .unwrap_or(0);
339 let group_idx =
343 (core.0 as usize + rotations_since_session_start as usize) % validator_groups.len();
344 Some(GroupIndex(group_idx as u32))
345 }
346
347 pub(crate) fn availability_timeout_predicate(
351 ) -> impl Fn(BlockNumberFor<T>) -> AvailabilityTimeoutStatus<BlockNumberFor<T>> {
352 let config = configuration::ActiveConfig::<T>::get();
353 let now = frame_system::Pallet::<T>::block_number();
354 let rotation_info = Self::group_rotation_info(now);
355
356 let next_rotation = rotation_info.next_rotation_at();
357
358 let times_out = Self::availability_timeout_check_required();
359
360 move |pending_since| {
361 let time_out_at = if times_out {
362 pending_since + config.scheduler_params.paras_availability_period
367 } else {
368 next_rotation + config.scheduler_params.paras_availability_period
369 };
370
371 AvailabilityTimeoutStatus { timed_out: time_out_at <= now, live_until: time_out_at }
372 }
373 }
374
375 pub(crate) fn availability_timeout_check_required() -> bool {
380 let config = configuration::ActiveConfig::<T>::get();
381 let now = frame_system::Pallet::<T>::block_number() + One::one();
382 let rotation_info = Self::group_rotation_info(now);
383
384 let current_window =
385 rotation_info.last_rotation_at() + config.scheduler_params.paras_availability_period;
386 now < current_window
387 }
388
389 pub(crate) fn group_rotation_info(
391 now: BlockNumberFor<T>,
392 ) -> GroupRotationInfo<BlockNumberFor<T>> {
393 let session_start_block = SessionStartBlock::<T>::get();
394 let group_rotation_frequency = configuration::ActiveConfig::<T>::get()
395 .scheduler_params
396 .group_rotation_frequency;
397
398 GroupRotationInfo { session_start_block, now, group_rotation_frequency }
399 }
400
401 #[cfg(test)]
402 fn claim_queue_len() -> usize {
403 Self::claim_queue().iter().map(|la_vec| la_vec.1.len()).sum()
404 }
405
406 #[cfg(test)]
407 #[allow(dead_code)]
408 pub(crate) fn claim_queue_is_empty() -> bool {
409 Self::claim_queue_len() == 0
410 }
411
412 #[cfg(test)]
413 pub(crate) fn set_validator_groups(validator_groups: Vec<Vec<ValidatorIndex>>) {
414 ValidatorGroups::<T>::set(validator_groups);
415 }
416}