1mod mock_helpers;
27#[cfg(test)]
28mod tests;
29
30use crate::{configuration, on_demand, ParaId};
31
32use alloc::{
33 collections::{BTreeMap, VecDeque},
34 vec::Vec,
35};
36use frame_support::{defensive, pallet_prelude::*};
37use frame_system::pallet_prelude::*;
38use polkadot_primitives::CoreIndex;
39use scale_info::TypeInfo;
40use sp_runtime::{
41 codec::{Decode, Encode},
42 traits::Saturating,
43 Debug,
44};
45
46pub use pallet_broker::CoreAssignment;
47
48pub use super::Config;
49
50#[derive(
52 Debug,
53 Clone,
54 Copy,
55 PartialEq,
56 Eq,
57 PartialOrd,
58 Ord,
59 Encode,
60 Decode,
61 DecodeWithMemTracking,
62 TypeInfo,
63)]
64pub struct PartsOf57600(u16);
65
66impl PartsOf57600 {
67 pub const ZERO: Self = Self(0);
68 pub const FULL: Self = Self(57600);
69
70 pub fn new_saturating(v: u16) -> Self {
71 Self::ZERO.saturating_add(Self(v))
72 }
73
74 #[cfg(test)]
76 pub fn value(&self) -> u16 {
77 self.0
78 }
79
80 pub fn is_full(&self) -> bool {
81 *self == Self::FULL
82 }
83
84 pub fn saturating_add(self, rhs: Self) -> Self {
85 let inner = self.0.saturating_add(rhs.0);
86 if inner > 57600 {
87 Self(57600)
88 } else {
89 Self(inner)
90 }
91 }
92
93 pub fn saturating_sub(self, rhs: Self) -> Self {
94 Self(self.0.saturating_sub(rhs.0))
95 }
96
97 pub fn checked_add(self, rhs: Self) -> Option<Self> {
98 let inner = self.0.saturating_add(rhs.0);
99 if inner > 57600 {
100 None
101 } else {
102 Some(Self(inner))
103 }
104 }
105}
106
107#[derive(Encode, Decode, TypeInfo)]
111#[cfg_attr(test, derive(PartialEq, Debug))]
112pub(super) struct Schedule<N> {
113 assignments: Vec<(CoreAssignment, PartsOf57600)>,
115 end_hint: Option<N>,
121
122 next_schedule: Option<N>,
126}
127
128impl<N> Schedule<N> {
129 #[cfg(test)]
131 pub(super) fn new(
132 assignments: Vec<(CoreAssignment, PartsOf57600)>,
133 end_hint: Option<N>,
134 next_schedule: Option<N>,
135 ) -> Self {
136 Self { assignments, end_hint, next_schedule }
137 }
138
139 #[cfg(test)]
141 pub(super) fn assignments(&self) -> &[(CoreAssignment, PartsOf57600)] {
142 &self.assignments
143 }
144
145 #[cfg(test)]
147 pub(super) fn end_hint(&self) -> Option<N>
148 where
149 N: Copy,
150 {
151 self.end_hint
152 }
153
154 pub(super) fn next_schedule(&self) -> Option<N>
156 where
157 N: Copy,
158 {
159 self.next_schedule
160 }
161}
162
163#[derive(Encode, Decode, TypeInfo, Default)]
168#[cfg_attr(test, derive(PartialEq, Debug, Clone))]
169pub(super) struct CoreDescriptor<N> {
170 queue: Option<QueueDescriptor<N>>,
172 current_work: Option<WorkState<N>>,
174}
175
176impl<N: PartialOrd> CoreDescriptor<N> {
177 #[cfg(test)]
179 pub(super) fn new(
180 queue: Option<QueueDescriptor<N>>,
181 current_work: Option<WorkState<N>>,
182 ) -> Self {
183 Self { queue, current_work }
184 }
185
186 fn has_assignments_until(&self, until: N) -> bool {
190 self.current_work.is_some() || self.queue.as_ref().map_or(false, |q| q.first < until)
191 }
192
193 pub(super) fn queue(&self) -> Option<&QueueDescriptor<N>> {
195 self.queue.as_ref()
196 }
197
198 pub(super) fn current_work(&self) -> Option<&WorkState<N>> {
200 self.current_work.as_ref()
201 }
202}
203
204#[derive(Encode, Decode, TypeInfo, Copy, Clone)]
209#[cfg_attr(test, derive(PartialEq, Debug))]
210pub struct QueueDescriptor<N> {
211 pub first: N,
213 pub last: N,
215}
216
217#[derive(Encode, Decode, TypeInfo)]
218#[cfg_attr(test, derive(PartialEq, Debug, Clone))]
219pub struct WorkState<N> {
220 pub assignments: Vec<(CoreAssignment, AssignmentState)>,
225 pub end_hint: Option<N>,
231 pub pos: u16,
236 pub step: PartsOf57600,
240}
241
242#[derive(Encode, Decode, TypeInfo)]
243#[cfg_attr(test, derive(PartialEq, Debug, Clone, Copy))]
244pub struct AssignmentState {
245 pub ratio: PartsOf57600,
249 pub remaining: PartsOf57600,
258}
259
260enum AccessMode<'a, T: Config> {
262 Peek { on_demand_orders: &'a mut on_demand::OrderQueue<BlockNumberFor<T>> },
264 Pop,
266}
267
268impl<'a, T: Config> AccessMode<'a, T> {
269 fn peek(on_demand_orders: &'a mut on_demand::OrderQueue<BlockNumberFor<T>>) -> Self {
271 Self::Peek { on_demand_orders }
272 }
273
274 fn pop() -> Self {
276 Self::Pop
277 }
278
279 fn pop_assignment_for_ondemand_cores(
281 &mut self,
282 now: BlockNumberFor<T>,
283 num_cores: u32,
284 ) -> impl Iterator<Item = ParaId> {
285 match self {
286 Self::Peek { on_demand_orders } => on_demand_orders
287 .pop_assignment_for_cores::<T>(now, num_cores)
288 .collect::<Vec<_>>(),
289 Self::Pop => {
290 on_demand::Pallet::<T>::pop_assignment_for_cores(now, num_cores).collect::<Vec<_>>()
291 },
292 }
293 .into_iter()
294 }
295
296 fn get_core_schedule(
298 &self,
299 next_scheduled: BlockNumberFor<T>,
300 core_idx: CoreIndex,
301 ) -> Option<Schedule<BlockNumberFor<T>>> {
302 match self {
303 Self::Peek { .. } => super::CoreSchedules::<T>::get((next_scheduled, core_idx)),
304 Self::Pop => super::CoreSchedules::<T>::take((next_scheduled, core_idx)),
305 }
306 }
307}
308
309struct AdvancedAssignments {
311 bulk_assignments: Vec<(CoreIndex, ParaId)>,
312 pool_assignments: Vec<(CoreIndex, ParaId)>,
313}
314
315impl AdvancedAssignments {
316 fn into_iter(self) -> impl Iterator<Item = (CoreIndex, ParaId)> {
317 let Self { bulk_assignments, pool_assignments } = self;
318 bulk_assignments.into_iter().chain(pool_assignments.into_iter())
319 }
320}
321
322impl<N> From<Schedule<N>> for WorkState<N> {
323 fn from(schedule: Schedule<N>) -> Self {
324 let Schedule { assignments, end_hint, next_schedule: _ } = schedule;
325 let step =
326 if let Some(min_step_assignment) = assignments.iter().min_by(|a, b| a.1.cmp(&b.1)) {
327 min_step_assignment.1
328 } else {
329 log::debug!("assignments of a `Schedule` should never be empty.");
331 PartsOf57600(1)
332 };
333 let assignments = assignments
334 .into_iter()
335 .map(|(a, ratio)| (a, AssignmentState { ratio, remaining: ratio }))
336 .collect();
337
338 Self { assignments, end_hint, pos: 0, step }
339 }
340}
341
342#[derive(Debug, PartialEq, Eq, PartialOrd, Ord)]
343pub enum Error {
344 AssignmentsEmpty,
345 DisallowedInsert,
348}
349
350pub(super) fn peek_next_block<T: super::Config>(
368 num_entries: u32,
369) -> BTreeMap<CoreIndex, VecDeque<ParaId>> {
370 let now = frame_system::Pallet::<T>::block_number().saturating_plus_one();
371 peek_impl::<T>(now, num_entries)
372}
373
374pub(super) fn advance_assignments<T: Config, F: Fn(CoreIndex) -> bool>(
386 is_blocked: F,
387) -> BTreeMap<CoreIndex, ParaId> {
388 let now = frame_system::Pallet::<T>::block_number();
389
390 let assignments = super::CoreDescriptors::<T>::mutate(|core_states| {
391 advance_assignments_single_impl::<T>(now, core_states, AccessMode::<T>::pop())
392 });
393
394 for blocked in assignments.pool_assignments.iter().filter_map(|(core_idx, para_id)| {
396 if is_blocked(*core_idx) {
397 Some(*para_id)
398 } else {
399 None
400 }
401 }) {
402 on_demand::Pallet::<T>::push_back_order(blocked);
403 }
404
405 let mut assignments: BTreeMap<CoreIndex, ParaId> =
406 assignments.into_iter().filter(|(core_idx, _)| !is_blocked(*core_idx)).collect();
407
408 let next = now.saturating_plus_one();
411 let mut core_states = super::CoreDescriptors::<T>::get();
412 let mut on_demand_orders = on_demand::Pallet::<T>::peek_order_queue();
413 let next_assignments = advance_assignments_single_impl(
414 next,
415 &mut core_states,
416 AccessMode::<T>::peek(&mut on_demand_orders),
417 )
418 .into_iter();
419
420 for (core_idx, next_assignment) in
421 next_assignments.filter(|(core_idx, _)| !is_blocked(*core_idx))
422 {
423 assignments.entry(core_idx).or_insert_with(|| next_assignment);
424 }
425 assignments
426}
427
428pub(super) fn assign_core<T: Config>(
451 core_idx: CoreIndex,
452 mut begin: BlockNumberFor<T>,
453 mut assignments: Vec<(CoreAssignment, PartsOf57600)>,
454 end_hint: Option<BlockNumberFor<T>>,
455) -> Result<(), Error> {
456 ensure!(!assignments.is_empty(), Error::AssignmentsEmpty);
458
459 super::CoreDescriptors::<T>::mutate(|core_descriptors| {
460 let core_descriptor = core_descriptors.entry(core_idx).or_default();
461
462 let config = configuration::ActiveConfig::<T>::get();
463 let now = frame_system::Pallet::<T>::block_number().saturating_plus_one();
465 let lookahead = config.scheduler_params.lookahead.into();
466 let claim_queue_end = now.saturating_add(lookahead);
467 let assignments_exist = core_descriptor.has_assignments_until(claim_queue_end);
468 if assignments_exist && begin < claim_queue_end {
471 log::debug!(
472 target: "runtime::parachains::assigner-coretime",
473 "Claim queue needs to be stable, schedule change within claim queue length is not supported. Adjusting begin from {:?} to {:?}",
474 begin,
475 claim_queue_end
476 );
477 begin = claim_queue_end;
478 }
479
480 let new_queue = match core_descriptor.queue {
481 Some(queue) => {
482 ensure!(begin >= queue.last, Error::DisallowedInsert);
483
484 if begin > queue.last {
486 super::CoreSchedules::<T>::mutate((queue.last, core_idx), |schedule| {
487 if let Some(schedule) = schedule.as_mut() {
488 debug_assert!(schedule.next_schedule.is_none(), "queue.end was supposed to be the end, so the next item must be `None`!");
489 schedule.next_schedule = Some(begin);
490 } else {
491 defensive!("Queue end entry does not exist?");
492 }
493 });
494 }
495
496 super::CoreSchedules::<T>::mutate((begin, core_idx), |schedule| {
497 let assignments = if let Some(mut old_schedule) = schedule.take() {
498 old_schedule.assignments.append(&mut assignments);
499 old_schedule.assignments
500 } else {
501 assignments
502 };
503 *schedule = Some(Schedule { assignments, end_hint, next_schedule: None });
504 });
505
506 QueueDescriptor { first: queue.first, last: begin }
507 },
508 None => {
509 super::CoreSchedules::<T>::insert(
511 (begin, core_idx),
512 Schedule { assignments, end_hint, next_schedule: None },
513 );
514 QueueDescriptor { first: begin, last: begin }
515 },
516 };
517 core_descriptor.queue = Some(new_queue);
518 Ok(())
519 })
520}
521
522fn num_coretime_cores<T: Config>() -> u32 {
523 configuration::ActiveConfig::<T>::get().scheduler_params.num_cores
524}
525
526fn peek_impl<T: Config>(
527 mut now: BlockNumberFor<T>,
528 num_entries: u32,
529) -> BTreeMap<CoreIndex, VecDeque<ParaId>> {
530 let mut core_states = super::CoreDescriptors::<T>::get();
531 let mut result = BTreeMap::new();
532 let mut on_demand_orders = on_demand::Pallet::<T>::peek_order_queue();
533 for i in 0..num_entries {
534 let assignments = advance_assignments_single_impl(
535 now,
536 &mut core_states,
537 AccessMode::<T>::peek(&mut on_demand_orders),
538 )
539 .into_iter();
540 for (core_idx, para_id) in assignments {
541 let claim_queue: &mut VecDeque<ParaId> = result.entry(core_idx).or_default();
542 if claim_queue.len() == i as usize {
544 claim_queue.push_back(para_id)
545 } else if claim_queue.len() == 0 && i == 1 {
546 claim_queue.push_back(para_id);
551 claim_queue.push_back(para_id);
553 }
554 }
555 now.saturating_inc();
556 }
557 result
558}
559
560fn advance_assignments_single_impl<T: Config>(
562 now: BlockNumberFor<T>,
563 core_states: &mut BTreeMap<CoreIndex, CoreDescriptor<BlockNumberFor<T>>>,
564 mut mode: AccessMode<T>,
565) -> AdvancedAssignments {
566 let mut bulk_assignments = Vec::with_capacity(num_coretime_cores::<T>() as _);
567 let mut pool_cores = Vec::with_capacity(num_coretime_cores::<T>() as _);
568 for (core_idx, core_state) in core_states.iter_mut() {
569 ensure_workload::<T>(now, *core_idx, core_state, &mode);
570
571 let Some(work_state) = core_state.current_work.as_mut() else { continue };
572
573 work_state.pos = work_state.pos % work_state.assignments.len() as u16;
575 let (a_type, a_state) = &mut work_state
576 .assignments
577 .get_mut(work_state.pos as usize)
578 .expect("We limited pos to the size of the vec one line above. qed");
579
580 a_state.remaining = a_state.remaining.saturating_sub(work_state.step);
582 if a_state.remaining < work_state.step {
583 work_state.pos += 1;
586 a_state.remaining = a_state.remaining.saturating_add(a_state.ratio);
588 }
589 match *a_type {
590 CoreAssignment::Pool => pool_cores.push(*core_idx),
591 CoreAssignment::Task(para_id) => bulk_assignments.push((*core_idx, para_id.into())),
592 CoreAssignment::Idle => {},
593 }
594 }
595
596 let pool_assignments = mode.pop_assignment_for_ondemand_cores(now, pool_cores.len() as _);
597 let pool_assignments = pool_cores.into_iter().zip(pool_assignments).collect();
598
599 AdvancedAssignments { bulk_assignments, pool_assignments }
600}
601
602fn ensure_workload<T: Config>(
604 now: BlockNumberFor<T>,
605 core_idx: CoreIndex,
606 descriptor: &mut CoreDescriptor<BlockNumberFor<T>>,
607 mode: &AccessMode<T>,
608) {
609 if descriptor
611 .current_work
612 .as_ref()
613 .and_then(|w| w.end_hint)
614 .map_or(false, |e| e <= now)
615 {
616 descriptor.current_work = None;
617 }
618
619 let Some(queue) = descriptor.queue else {
620 return;
622 };
623
624 let mut next_scheduled = queue.first;
625
626 if next_scheduled > now {
627 return;
629 }
630
631 let update = loop {
633 let Some(update) = mode.get_core_schedule(next_scheduled, core_idx) else { break None };
634
635 if update.end_hint.map_or(true, |e| e > now) {
637 break Some(update);
638 }
639 if let Some(n) = update.next_schedule {
641 next_scheduled = n;
642 } else {
643 break None;
644 }
645 };
646
647 let new_first = update.as_ref().and_then(|u| u.next_schedule);
648 descriptor.current_work = update.map(Into::into);
649
650 descriptor.queue = new_first.map(|new_first| {
651 QueueDescriptor {
652 first: new_first,
653 last: queue.last,
655 }
656 });
657}