polkadot_runtime_parachains/assigner_coretime/
mod.rs1mod mock_helpers;
27#[cfg(test)]
28mod tests;
29
30use crate::{
31 configuration, on_demand,
32 paras::AssignCoretime,
33 scheduler::common::{Assignment, AssignmentProvider},
34 ParaId,
35};
36
37use alloc::{vec, vec::Vec};
38use frame_support::{defensive, pallet_prelude::*};
39use frame_system::pallet_prelude::*;
40use pallet_broker::CoreAssignment;
41use polkadot_primitives::CoreIndex;
42use sp_runtime::traits::{One, Saturating};
43
44pub use pallet::*;
45
46#[derive(
48 RuntimeDebug,
49 Clone,
50 Copy,
51 PartialEq,
52 Eq,
53 PartialOrd,
54 Ord,
55 Encode,
56 Decode,
57 DecodeWithMemTracking,
58 TypeInfo,
59)]
60pub struct PartsOf57600(u16);
61
62impl PartsOf57600 {
63 pub const ZERO: Self = Self(0);
64 pub const FULL: Self = Self(57600);
65
66 pub fn new_saturating(v: u16) -> Self {
67 Self::ZERO.saturating_add(Self(v))
68 }
69
70 pub fn is_full(&self) -> bool {
71 *self == Self::FULL
72 }
73
74 pub fn saturating_add(self, rhs: Self) -> Self {
75 let inner = self.0.saturating_add(rhs.0);
76 if inner > 57600 {
77 Self(57600)
78 } else {
79 Self(inner)
80 }
81 }
82
83 pub fn saturating_sub(self, rhs: Self) -> Self {
84 Self(self.0.saturating_sub(rhs.0))
85 }
86
87 pub fn checked_add(self, rhs: Self) -> Option<Self> {
88 let inner = self.0.saturating_add(rhs.0);
89 if inner > 57600 {
90 None
91 } else {
92 Some(Self(inner))
93 }
94 }
95}
96
97#[derive(Encode, Decode, TypeInfo)]
101#[cfg_attr(test, derive(PartialEq, RuntimeDebug))]
102struct Schedule<N> {
103 assignments: Vec<(CoreAssignment, PartsOf57600)>,
105 end_hint: Option<N>,
111
112 next_schedule: Option<N>,
116}
117
118#[derive(Encode, Decode, TypeInfo, Default)]
123#[cfg_attr(test, derive(PartialEq, RuntimeDebug, Clone))]
124struct CoreDescriptor<N> {
125 queue: Option<QueueDescriptor<N>>,
127 current_work: Option<WorkState<N>>,
129}
130
131#[derive(Encode, Decode, TypeInfo, Copy, Clone)]
136#[cfg_attr(test, derive(PartialEq, RuntimeDebug))]
137struct QueueDescriptor<N> {
138 first: N,
140 last: N,
142}
143
144#[derive(Encode, Decode, TypeInfo)]
145#[cfg_attr(test, derive(PartialEq, RuntimeDebug, Clone))]
146struct WorkState<N> {
147 assignments: Vec<(CoreAssignment, AssignmentState)>,
152 end_hint: Option<N>,
158 pos: u16,
163 step: PartsOf57600,
167}
168
169#[derive(Encode, Decode, TypeInfo)]
170#[cfg_attr(test, derive(PartialEq, RuntimeDebug, Clone, Copy))]
171struct AssignmentState {
172 ratio: PartsOf57600,
176 remaining: PartsOf57600,
185}
186
187impl<N> From<Schedule<N>> for WorkState<N> {
188 fn from(schedule: Schedule<N>) -> Self {
189 let Schedule { assignments, end_hint, next_schedule: _ } = schedule;
190 let step =
191 if let Some(min_step_assignment) = assignments.iter().min_by(|a, b| a.1.cmp(&b.1)) {
192 min_step_assignment.1
193 } else {
194 log::debug!("assignments of a `Schedule` should never be empty.");
196 PartsOf57600(1)
197 };
198 let assignments = assignments
199 .into_iter()
200 .map(|(a, ratio)| (a, AssignmentState { ratio, remaining: ratio }))
201 .collect();
202
203 Self { assignments, end_hint, pos: 0, step }
204 }
205}
206
207#[frame_support::pallet]
208pub mod pallet {
209 use super::*;
210
211 #[pallet::pallet]
212 #[pallet::without_storage_info]
213 pub struct Pallet<T>(_);
214
215 #[pallet::config]
216 pub trait Config: frame_system::Config + configuration::Config + on_demand::Config {}
217
218 #[pallet::storage]
223 pub(super) type CoreSchedules<T: Config> = StorageMap<
224 _,
225 Twox256,
226 (BlockNumberFor<T>, CoreIndex),
227 Schedule<BlockNumberFor<T>>,
228 OptionQuery,
229 >;
230
231 #[pallet::storage]
236 pub(super) type CoreDescriptors<T: Config> = StorageMap<
237 _,
238 Twox256,
239 CoreIndex,
240 CoreDescriptor<BlockNumberFor<T>>,
241 ValueQuery,
242 GetDefault,
243 >;
244
245 #[pallet::hooks]
246 impl<T: Config> Hooks<BlockNumberFor<T>> for Pallet<T> {}
247
248 #[pallet::error]
249 pub enum Error<T> {
250 AssignmentsEmpty,
251 DisallowedInsert,
254 }
255}
256
257impl<T: Config> AssignmentProvider<BlockNumberFor<T>> for Pallet<T> {
258 fn pop_assignment_for_core(core_idx: CoreIndex) -> Option<Assignment> {
259 let now = frame_system::Pallet::<T>::block_number();
260
261 CoreDescriptors::<T>::mutate(core_idx, |core_state| {
262 Self::ensure_workload(now, core_idx, core_state);
263
264 let work_state = core_state.current_work.as_mut()?;
265
266 work_state.pos = work_state.pos % work_state.assignments.len() as u16;
268 let (a_type, a_state) = &mut work_state
269 .assignments
270 .get_mut(work_state.pos as usize)
271 .expect("We limited pos to the size of the vec one line above. qed");
272
273 a_state.remaining = a_state.remaining.saturating_sub(work_state.step);
275 if a_state.remaining < work_state.step {
276 work_state.pos += 1;
279 a_state.remaining = a_state.remaining.saturating_add(a_state.ratio);
281 }
282
283 match a_type {
284 CoreAssignment::Idle => None,
285 CoreAssignment::Pool => on_demand::Pallet::<T>::pop_assignment_for_core(core_idx),
286 CoreAssignment::Task(para_id) => Some(Assignment::Bulk((*para_id).into())),
287 }
288 })
289 }
290
291 fn report_processed(assignment: Assignment) {
292 match assignment {
293 Assignment::Pool { para_id, core_index } =>
294 on_demand::Pallet::<T>::report_processed(para_id, core_index),
295 Assignment::Bulk(_) => {},
296 }
297 }
298
299 fn push_back_assignment(assignment: Assignment) {
305 match assignment {
306 Assignment::Pool { para_id, core_index } =>
307 on_demand::Pallet::<T>::push_back_assignment(para_id, core_index),
308 Assignment::Bulk(_) => {
309 },
315 }
316 }
317
318 #[cfg(any(feature = "runtime-benchmarks", test))]
319 fn get_mock_assignment(_: CoreIndex, para_id: polkadot_primitives::Id) -> Assignment {
320 Assignment::Bulk(para_id)
323 }
324
325 fn assignment_duplicated(assignment: &Assignment) {
326 match assignment {
327 Assignment::Pool { para_id, core_index } =>
328 on_demand::Pallet::<T>::assignment_duplicated(*para_id, *core_index),
329 Assignment::Bulk(_) => {},
330 }
331 }
332}
333
334impl<T: Config> Pallet<T> {
335 fn ensure_workload(
337 now: BlockNumberFor<T>,
338 core_idx: CoreIndex,
339 descriptor: &mut CoreDescriptor<BlockNumberFor<T>>,
340 ) {
341 if descriptor
343 .current_work
344 .as_ref()
345 .and_then(|w| w.end_hint)
346 .map_or(false, |e| e <= now)
347 {
348 descriptor.current_work = None;
349 }
350
351 let Some(queue) = descriptor.queue else {
352 return
354 };
355
356 let mut next_scheduled = queue.first;
357
358 if next_scheduled > now {
359 return
361 }
362
363 let update = loop {
365 let Some(update) = CoreSchedules::<T>::take((next_scheduled, core_idx)) else {
366 break None
367 };
368 if update.end_hint.map_or(true, |e| e > now) {
370 break Some(update)
371 }
372 if let Some(n) = update.next_schedule {
374 next_scheduled = n;
375 } else {
376 break None
377 }
378 };
379
380 let new_first = update.as_ref().and_then(|u| u.next_schedule);
381 descriptor.current_work = update.map(Into::into);
382
383 descriptor.queue = new_first.map(|new_first| {
384 QueueDescriptor {
385 first: new_first,
386 last: queue.last,
388 }
389 });
390 }
391
392 pub fn assign_core(
410 core_idx: CoreIndex,
411 begin: BlockNumberFor<T>,
412 mut assignments: Vec<(CoreAssignment, PartsOf57600)>,
413 end_hint: Option<BlockNumberFor<T>>,
414 ) -> Result<(), DispatchError> {
415 ensure!(!assignments.is_empty(), Error::<T>::AssignmentsEmpty);
417
418 CoreDescriptors::<T>::mutate(core_idx, |core_descriptor| {
419 let new_queue = match core_descriptor.queue {
420 Some(queue) => {
421 ensure!(begin >= queue.last, Error::<T>::DisallowedInsert);
422
423 if begin > queue.last {
425 CoreSchedules::<T>::mutate((queue.last, core_idx), |schedule| {
426 if let Some(schedule) = schedule.as_mut() {
427 debug_assert!(schedule.next_schedule.is_none(), "queue.end was supposed to be the end, so the next item must be `None`!");
428 schedule.next_schedule = Some(begin);
429 } else {
430 defensive!("Queue end entry does not exist?");
431 }
432 });
433 }
434
435 CoreSchedules::<T>::mutate((begin, core_idx), |schedule| {
436 let assignments = if let Some(mut old_schedule) = schedule.take() {
437 old_schedule.assignments.append(&mut assignments);
438 old_schedule.assignments
439 } else {
440 assignments
441 };
442 *schedule = Some(Schedule { assignments, end_hint, next_schedule: None });
443 });
444
445 QueueDescriptor { first: queue.first, last: begin }
446 },
447 None => {
448 CoreSchedules::<T>::insert(
450 (begin, core_idx),
451 Schedule { assignments, end_hint, next_schedule: None },
452 );
453 QueueDescriptor { first: begin, last: begin }
454 },
455 };
456 core_descriptor.queue = Some(new_queue);
457 Ok(())
458 })
459 }
460}
461
462impl<T: Config> AssignCoretime for Pallet<T> {
463 fn assign_coretime(id: ParaId) -> DispatchResult {
464 let current_block = frame_system::Pallet::<T>::block_number();
465
466 let mut config = configuration::ActiveConfig::<T>::get();
468 let core = config.scheduler_params.num_cores;
469 config.scheduler_params.num_cores.saturating_inc();
470
471 configuration::Pallet::<T>::force_set_active_config(config);
474
475 let begin = current_block + One::one();
476 let assignment = vec![(pallet_broker::CoreAssignment::Task(id.into()), PartsOf57600::FULL)];
477 Pallet::<T>::assign_core(CoreIndex(core), begin, assignment, None)
478 }
479}