polkadot_runtime_parachains/
scheduler.rs1use crate::{configuration, initializer::SessionChangeNotification, paras};
40use alloc::{
41 collections::{btree_map::BTreeMap, btree_set::BTreeSet, vec_deque::VecDeque},
42 vec::Vec,
43};
44use frame_support::{pallet_prelude::*, traits::Defensive};
45use frame_system::pallet_prelude::BlockNumberFor;
46pub use polkadot_core_primitives::v2::BlockNumber;
47use polkadot_primitives::{
48 CoreIndex, GroupIndex, GroupRotationInfo, Id as ParaId, ScheduledCore, SchedulerParams,
49 ValidatorIndex,
50};
51use sp_runtime::traits::One;
52
53pub mod common;
54
55use common::{Assignment, AssignmentProvider};
56
57pub use pallet::*;
58
59#[cfg(test)]
60mod tests;
61
62const LOG_TARGET: &str = "runtime::parachains::scheduler";
63
64pub mod migration;
65
66#[frame_support::pallet]
67pub mod pallet {
68 use super::*;
69
70 const STORAGE_VERSION: StorageVersion = StorageVersion::new(3);
71
72 #[pallet::pallet]
73 #[pallet::without_storage_info]
74 #[pallet::storage_version(STORAGE_VERSION)]
75 pub struct Pallet<T>(_);
76
77 #[pallet::config]
78 pub trait Config: frame_system::Config + configuration::Config + paras::Config {
79 type AssignmentProvider: AssignmentProvider<BlockNumberFor<Self>>;
80 }
81
82 #[pallet::storage]
90 pub type ValidatorGroups<T> = StorageValue<_, Vec<Vec<ValidatorIndex>>, ValueQuery>;
91
92 #[pallet::storage]
100 pub type SessionStartBlock<T: Config> = StorageValue<_, BlockNumberFor<T>, ValueQuery>;
101
102 #[pallet::storage]
105 pub type ClaimQueue<T> = StorageValue<_, BTreeMap<CoreIndex, VecDeque<Assignment>>, ValueQuery>;
106
107 pub(crate) struct AvailabilityTimeoutStatus<BlockNumber> {
109 pub timed_out: bool,
113
114 pub live_until: BlockNumber,
119 }
120}
121
122impl<T: Config> Pallet<T> {
123 pub(crate) fn initializer_initialize(_now: BlockNumberFor<T>) -> Weight {
125 Weight::zero()
126 }
127
128 pub(crate) fn initializer_finalize() {}
130
131 pub(crate) fn initializer_on_new_session(
133 notification: &SessionChangeNotification<BlockNumberFor<T>>,
134 ) {
135 let SessionChangeNotification { validators, new_config, .. } = notification;
136 let config = new_config;
137 let assigner_cores = config.scheduler_params.num_cores;
138
139 let n_cores = core::cmp::max(
140 assigner_cores,
141 match config.scheduler_params.max_validators_per_core {
142 Some(x) if x != 0 => validators.len() as u32 / x,
143 _ => 0,
144 },
145 );
146
147 if n_cores == 0 || validators.is_empty() {
149 ValidatorGroups::<T>::set(Vec::new());
150 } else {
151 let group_base_size = validators
152 .len()
153 .checked_div(n_cores as usize)
154 .defensive_proof("n_cores should not be 0")
155 .unwrap_or(0);
156 let n_larger_groups = validators
157 .len()
158 .checked_rem(n_cores as usize)
159 .defensive_proof("n_cores should not be 0")
160 .unwrap_or(0);
161
162 let mut groups: Vec<Vec<ValidatorIndex>> = Vec::new();
166 for i in 0..n_larger_groups {
167 let offset = (group_base_size + 1) * i;
168 groups.push(
169 (0..group_base_size + 1)
170 .map(|j| offset + j)
171 .map(|j| ValidatorIndex(j as _))
172 .collect(),
173 );
174 }
175
176 for i in 0..(n_cores as usize - n_larger_groups) {
177 let offset = (n_larger_groups * (group_base_size + 1)) + (i * group_base_size);
178 groups.push(
179 (0..group_base_size)
180 .map(|j| offset + j)
181 .map(|j| ValidatorIndex(j as _))
182 .collect(),
183 );
184 }
185
186 ValidatorGroups::<T>::set(groups);
187 }
188
189 Self::maybe_resize_claim_queue();
191 Self::populate_claim_queue_after_session_change();
192
193 let now = frame_system::Pallet::<T>::block_number() + One::one();
194 SessionStartBlock::<T>::set(now);
195 }
196
197 pub(crate) fn group_validators(group_index: GroupIndex) -> Option<Vec<ValidatorIndex>> {
199 ValidatorGroups::<T>::get().get(group_index.0 as usize).map(|g| g.clone())
200 }
201
202 pub(crate) fn num_availability_cores() -> usize {
204 ValidatorGroups::<T>::decode_len().unwrap_or(0)
205 }
206
207 fn expected_claim_queue_len(config: &SchedulerParams<BlockNumberFor<T>>) -> u32 {
210 core::cmp::min(config.num_cores, Self::num_availability_cores() as u32)
211 }
212
213 pub(crate) fn group_assigned_to_core(
217 core: CoreIndex,
218 at: BlockNumberFor<T>,
219 ) -> Option<GroupIndex> {
220 let config = configuration::ActiveConfig::<T>::get();
221 let session_start_block = SessionStartBlock::<T>::get();
222
223 if at < session_start_block {
224 return None
225 }
226
227 let validator_groups = ValidatorGroups::<T>::get();
228
229 if core.0 as usize >= validator_groups.len() {
230 return None
231 }
232
233 let rotations_since_session_start: BlockNumberFor<T> =
234 (at - session_start_block) / config.scheduler_params.group_rotation_frequency;
235
236 let rotations_since_session_start =
237 <BlockNumberFor<T> as TryInto<u32>>::try_into(rotations_since_session_start)
238 .unwrap_or(0);
239 let group_idx =
243 (core.0 as usize + rotations_since_session_start as usize) % validator_groups.len();
244 Some(GroupIndex(group_idx as u32))
245 }
246
247 pub(crate) fn availability_timeout_predicate(
251 ) -> impl Fn(BlockNumberFor<T>) -> AvailabilityTimeoutStatus<BlockNumberFor<T>> {
252 let config = configuration::ActiveConfig::<T>::get();
253 let now = frame_system::Pallet::<T>::block_number();
254 let rotation_info = Self::group_rotation_info(now);
255
256 let next_rotation = rotation_info.next_rotation_at();
257
258 let times_out = Self::availability_timeout_check_required();
259
260 move |pending_since| {
261 let time_out_at = if times_out {
262 pending_since + config.scheduler_params.paras_availability_period
267 } else {
268 next_rotation + config.scheduler_params.paras_availability_period
269 };
270
271 AvailabilityTimeoutStatus { timed_out: time_out_at <= now, live_until: time_out_at }
272 }
273 }
274
275 pub(crate) fn availability_timeout_check_required() -> bool {
280 let config = configuration::ActiveConfig::<T>::get();
281 let now = frame_system::Pallet::<T>::block_number() + One::one();
282 let rotation_info = Self::group_rotation_info(now);
283
284 let current_window =
285 rotation_info.last_rotation_at() + config.scheduler_params.paras_availability_period;
286 now < current_window
287 }
288
289 pub(crate) fn group_rotation_info(
291 now: BlockNumberFor<T>,
292 ) -> GroupRotationInfo<BlockNumberFor<T>> {
293 let session_start_block = SessionStartBlock::<T>::get();
294 let group_rotation_frequency = configuration::ActiveConfig::<T>::get()
295 .scheduler_params
296 .group_rotation_frequency;
297
298 GroupRotationInfo { session_start_block, now, group_rotation_frequency }
299 }
300
301 pub(crate) fn next_up_on_available(core: CoreIndex) -> Option<ScheduledCore> {
304 if Self::on_chain_storage_version() == StorageVersion::new(2) {
306 migration::v2::ClaimQueue::<T>::get()
307 .get(&core)
308 .and_then(|a| a.front().map(|entry| entry.assignment.para_id()))
309 } else {
310 ClaimQueue::<T>::get()
311 .get(&core)
312 .and_then(|a| a.front().map(|assignment| assignment.para_id()))
313 }
314 .map(|para_id| ScheduledCore { para_id, collator: None })
315 }
316
317 pub(crate) fn get_claim_queue() -> BTreeMap<CoreIndex, VecDeque<Assignment>> {
319 if Self::on_chain_storage_version() == StorageVersion::new(2) {
320 migration::v2::ClaimQueue::<T>::get()
321 .into_iter()
322 .map(|(core_index, entries)| {
323 (core_index, entries.into_iter().map(|e| e.assignment).collect())
324 })
325 .collect()
326 } else {
327 ClaimQueue::<T>::get()
328 }
329 }
330
331 pub(crate) fn advance_claim_queue(except_for: &BTreeSet<CoreIndex>) {
334 let config = configuration::ActiveConfig::<T>::get();
335 let expected_claim_queue_len = Self::expected_claim_queue_len(&config.scheduler_params);
336 let n_lookahead = config.scheduler_params.lookahead.max(1);
338
339 for core_idx in 0..expected_claim_queue_len {
340 let core_idx = CoreIndex::from(core_idx);
341
342 if !except_for.contains(&core_idx) {
343 let core_idx = CoreIndex::from(core_idx);
344
345 if let Some(dropped_para) = Self::pop_front_of_claim_queue(&core_idx) {
346 T::AssignmentProvider::report_processed(dropped_para);
347 }
348
349 Self::fill_claim_queue(core_idx, n_lookahead);
350 }
351 }
352 }
353
354 fn maybe_resize_claim_queue() {
356 let cq = ClaimQueue::<T>::get();
357 let Some((old_max_core, _)) = cq.last_key_value() else { return };
358 let config = configuration::ActiveConfig::<T>::get();
359 let new_core_count = Self::expected_claim_queue_len(&config.scheduler_params);
360
361 if new_core_count < (old_max_core.0 + 1) {
362 ClaimQueue::<T>::mutate(|cq| {
363 let to_remove: Vec<_> =
364 cq.range(CoreIndex(new_core_count)..=*old_max_core).map(|(k, _)| *k).collect();
365 for key in to_remove {
366 if let Some(dropped_assignments) = cq.remove(&key) {
367 Self::push_back_to_assignment_provider(dropped_assignments.into_iter());
368 }
369 }
370 });
371 }
372 }
373
374 fn populate_claim_queue_after_session_change() {
377 let config = configuration::ActiveConfig::<T>::get();
378 let n_lookahead = config.scheduler_params.lookahead.max(1);
380 let expected_claim_queue_len = Self::expected_claim_queue_len(&config.scheduler_params);
381
382 for core_idx in 0..expected_claim_queue_len {
383 let core_idx = CoreIndex::from(core_idx);
384 Self::fill_claim_queue(core_idx, n_lookahead);
385 }
386 }
387
388 fn push_back_to_assignment_provider(
390 assignments: impl core::iter::DoubleEndedIterator<Item = Assignment>,
391 ) {
392 for assignment in assignments.rev() {
396 T::AssignmentProvider::push_back_assignment(assignment);
397 }
398 }
399
400 fn fill_claim_queue(core_idx: CoreIndex, n_lookahead: u32) {
401 ClaimQueue::<T>::mutate(|la| {
402 let cq = la.entry(core_idx).or_default();
403
404 let mut n_lookahead_used = cq.len() as u32;
405
406 if n_lookahead_used == 0 && n_lookahead > 1 {
412 if let Some(assignment) = T::AssignmentProvider::pop_assignment_for_core(core_idx) {
413 T::AssignmentProvider::assignment_duplicated(&assignment);
414 cq.push_back(assignment.clone());
415 cq.push_back(assignment);
416 n_lookahead_used += 2;
417 }
418 }
419
420 for _ in n_lookahead_used..n_lookahead {
421 if let Some(assignment) = T::AssignmentProvider::pop_assignment_for_core(core_idx) {
422 cq.push_back(assignment);
423 } else {
424 break
425 }
426 }
427
428 if cq.is_empty() {
431 la.remove(&core_idx);
432 }
433 });
434 }
435
436 fn pop_front_of_claim_queue(core_idx: &CoreIndex) -> Option<Assignment> {
437 ClaimQueue::<T>::mutate(|cq| cq.get_mut(core_idx)?.pop_front())
438 }
439
440 #[cfg(any(feature = "try-runtime", test))]
441 fn claim_queue_len() -> usize {
442 ClaimQueue::<T>::get().iter().map(|la_vec| la_vec.1.len()).sum()
443 }
444
445 #[cfg(all(not(feature = "runtime-benchmarks"), test))]
446 pub(crate) fn claim_queue_is_empty() -> bool {
447 Self::claim_queue_len() == 0
448 }
449
450 #[cfg(test)]
451 pub(crate) fn set_validator_groups(validator_groups: Vec<Vec<ValidatorIndex>>) {
452 ValidatorGroups::<T>::set(validator_groups);
453 }
454
455 #[cfg(test)]
456 pub(crate) fn set_claim_queue(claim_queue: BTreeMap<CoreIndex, VecDeque<Assignment>>) {
457 ClaimQueue::<T>::set(claim_queue);
458 }
459}