referrerpolicy=no-referrer-when-downgrade

polkadot_runtime_parachains/scheduler/assigner_coretime/
mod.rs

1// Copyright (C) Parity Technologies (UK) Ltd.
2// This file is part of Polkadot.
3
4// Polkadot is free software: you can redistribute it and/or modify
5// it under the terms of the GNU General Public License as published by
6// the Free Software Foundation, either version 3 of the License, or
7// (at your option) any later version.
8
9// Polkadot is distributed in the hope that it will be useful,
10// but WITHOUT ANY WARRANTY; without even the implied warranty of
11// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
12// GNU General Public License for more details.
13
14// You should have received a copy of the GNU General Public License
15// along with Polkadot.  If not, see <http://www.gnu.org/licenses/>.
16
17//! The parachain coretime assignment module.
18//!
19//! Handles scheduling of assignments coming from the coretime/broker chain. For on-demand
20//! assignments it relies on the separate on-demand pallet, where it forwards requests
21//! to.
22//!
23//! `CoreDescriptor` contains pointers to the begin and the end of a list of schedules, together
24//! with the currently active assignments.
25
26mod mock_helpers;
27#[cfg(test)]
28mod tests;
29
30use crate::{configuration, on_demand, ParaId};
31
32use alloc::{
33	collections::{BTreeMap, VecDeque},
34	vec::Vec,
35};
36use frame_support::{defensive, pallet_prelude::*};
37use frame_system::pallet_prelude::*;
38use polkadot_primitives::CoreIndex;
39use scale_info::TypeInfo;
40use sp_runtime::{
41	codec::{Decode, Encode},
42	traits::Saturating,
43	Debug,
44};
45
46pub use pallet_broker::CoreAssignment;
47
48pub use super::Config;
49
50/// Fraction expressed as a nominator with an assumed denominator of 57,600.
51#[derive(
52	Debug,
53	Clone,
54	Copy,
55	PartialEq,
56	Eq,
57	PartialOrd,
58	Ord,
59	Encode,
60	Decode,
61	DecodeWithMemTracking,
62	TypeInfo,
63)]
64pub struct PartsOf57600(u16);
65
66impl PartsOf57600 {
67	pub const ZERO: Self = Self(0);
68	pub const FULL: Self = Self(57600);
69
70	pub fn new_saturating(v: u16) -> Self {
71		Self::ZERO.saturating_add(Self(v))
72	}
73
74	/// Returns the inner value (test-only accessor).
75	#[cfg(test)]
76	pub fn value(&self) -> u16 {
77		self.0
78	}
79
80	pub fn is_full(&self) -> bool {
81		*self == Self::FULL
82	}
83
84	pub fn saturating_add(self, rhs: Self) -> Self {
85		let inner = self.0.saturating_add(rhs.0);
86		if inner > 57600 {
87			Self(57600)
88		} else {
89			Self(inner)
90		}
91	}
92
93	pub fn saturating_sub(self, rhs: Self) -> Self {
94		Self(self.0.saturating_sub(rhs.0))
95	}
96
97	pub fn checked_add(self, rhs: Self) -> Option<Self> {
98		let inner = self.0.saturating_add(rhs.0);
99		if inner > 57600 {
100			None
101		} else {
102			Some(Self(inner))
103		}
104	}
105}
106
107/// Assignments as they are scheduled by block number
108///
109/// for a particular core.
110#[derive(Encode, Decode, TypeInfo)]
111#[cfg_attr(test, derive(PartialEq, Debug))]
112pub(super) struct Schedule<N> {
113	/// Original assignments.
114	assignments: Vec<(CoreAssignment, PartsOf57600)>,
115	/// When do our assignments become invalid, if at all?
116	///
117	/// If this is `Some`, then this `CoreState` will be dropped at that block number. If this is
118	/// `None`, then we will keep serving our core assignments in a circle until a new set of
119	/// assignments is scheduled.
120	end_hint: Option<N>,
121
122	/// The next queued schedule for this core.
123	///
124	/// Schedules are forming a queue.
125	next_schedule: Option<N>,
126}
127
128impl<N> Schedule<N> {
129	/// Creates a new Schedule (for tests).
130	#[cfg(test)]
131	pub(super) fn new(
132		assignments: Vec<(CoreAssignment, PartsOf57600)>,
133		end_hint: Option<N>,
134		next_schedule: Option<N>,
135	) -> Self {
136		Self { assignments, end_hint, next_schedule }
137	}
138
139	/// Accessor for assignments (needed by tests).
140	#[cfg(test)]
141	pub(super) fn assignments(&self) -> &[(CoreAssignment, PartsOf57600)] {
142		&self.assignments
143	}
144
145	/// Accessor for end_hint (needed by tests).
146	#[cfg(test)]
147	pub(super) fn end_hint(&self) -> Option<N>
148	where
149		N: Copy,
150	{
151		self.end_hint
152	}
153
154	/// Accessor for next_schedule (needed by migrations, tests, and try-runtime).
155	pub(super) fn next_schedule(&self) -> Option<N>
156	where
157		N: Copy,
158	{
159		self.next_schedule
160	}
161}
162
163/// Descriptor for a core.
164///
165/// Contains pointers to first and last schedule into `CoreSchedules` for that core and keeps track
166/// of the currently active work as well.
167#[derive(Encode, Decode, TypeInfo, Default)]
168#[cfg_attr(test, derive(PartialEq, Debug, Clone))]
169pub(super) struct CoreDescriptor<N> {
170	/// Meta data about the queued schedules for this core.
171	queue: Option<QueueDescriptor<N>>,
172	/// Currently performed work.
173	current_work: Option<WorkState<N>>,
174}
175
176impl<N: PartialOrd> CoreDescriptor<N> {
177	/// Creates a new CoreDescriptor (for tests).
178	#[cfg(test)]
179	pub(super) fn new(
180		queue: Option<QueueDescriptor<N>>,
181		current_work: Option<WorkState<N>>,
182	) -> Self {
183		Self { queue, current_work }
184	}
185
186	/// Any work currently on that core?
187	///
188	/// Params: until - until (exclusive) which block number we are interested.
189	fn has_assignments_until(&self, until: N) -> bool {
190		self.current_work.is_some() || self.queue.as_ref().map_or(false, |q| q.first < until)
191	}
192
193	/// Accessor for queue (needed by migrations, tests, and try-runtime).
194	pub(super) fn queue(&self) -> Option<&QueueDescriptor<N>> {
195		self.queue.as_ref()
196	}
197
198	/// Accessor for current_work (needed by migrations, tests, and try-runtime).
199	pub(super) fn current_work(&self) -> Option<&WorkState<N>> {
200		self.current_work.as_ref()
201	}
202}
203
204/// Pointers into `CoreSchedules` for a particular core.
205///
206/// Schedules in `CoreSchedules` form a queue. `Schedule::next_schedule` always pointing to the next
207/// item.
208#[derive(Encode, Decode, TypeInfo, Copy, Clone)]
209#[cfg_attr(test, derive(PartialEq, Debug))]
210pub struct QueueDescriptor<N> {
211	/// First scheduled item, that is not yet active.
212	pub first: N,
213	/// Last scheduled item.
214	pub last: N,
215}
216
217#[derive(Encode, Decode, TypeInfo)]
218#[cfg_attr(test, derive(PartialEq, Debug, Clone))]
219pub struct WorkState<N> {
220	/// Assignments with current state.
221	///
222	/// Assignments and book keeping on how much has been served already. We keep track of serviced
223	/// assignments in order to adhere to the specified ratios.
224	pub assignments: Vec<(CoreAssignment, AssignmentState)>,
225	/// When do our assignments become invalid if at all?
226	///
227	/// If this is `Some`, then this `CoreState` will be dropped at that block number. If this is
228	/// `None`, then we will keep serving our core assignments in a circle until a new set of
229	/// assignments is scheduled.
230	pub end_hint: Option<N>,
231	/// Position in the assignments we are currently in.
232	///
233	/// Aka which core assignment will be popped next on
234	/// `AssignmentProvider::advance_assignments`.
235	pub pos: u16,
236	/// Step width
237	///
238	/// How much we subtract from `AssignmentState::remaining` for a core served.
239	pub step: PartsOf57600,
240}
241
242#[derive(Encode, Decode, TypeInfo)]
243#[cfg_attr(test, derive(PartialEq, Debug, Clone, Copy))]
244pub struct AssignmentState {
245	/// Ratio of the core this assignment has.
246	///
247	/// As initially received via `assign_core`.
248	pub ratio: PartsOf57600,
249	/// How many parts are remaining in this round?
250	///
251	/// At the end of each round (in preparation for the next), ratio will be added to remaining.
252	/// Then every time we get scheduled we subtract a core worth of points. Once we reach 0 or a
253	/// number lower than what a core is worth (`CoreState::step` size), we move on to the next
254	/// item in the `Vec`.
255	///
256	/// The first round starts with remaining = ratio.
257	pub remaining: PartsOf57600,
258}
259
260/// How storage is accessed.
261enum AccessMode<'a, T: Config> {
262	/// We only want to peek (no side effects).
263	Peek { on_demand_orders: &'a mut on_demand::OrderQueue<BlockNumberFor<T>> },
264	/// We need to update state.
265	Pop,
266}
267
268impl<'a, T: Config> AccessMode<'a, T> {
269	/// Construct a peeking access mode.
270	fn peek(on_demand_orders: &'a mut on_demand::OrderQueue<BlockNumberFor<T>>) -> Self {
271		Self::Peek { on_demand_orders }
272	}
273
274	/// Construct popping/modifying access mode.
275	fn pop() -> Self {
276		Self::Pop
277	}
278
279	/// Pop pool assignments according to access mode.
280	fn pop_assignment_for_ondemand_cores(
281		&mut self,
282		now: BlockNumberFor<T>,
283		num_cores: u32,
284	) -> impl Iterator<Item = ParaId> {
285		match self {
286			Self::Peek { on_demand_orders } => on_demand_orders
287				.pop_assignment_for_cores::<T>(now, num_cores)
288				.collect::<Vec<_>>(),
289			Self::Pop => {
290				on_demand::Pallet::<T>::pop_assignment_for_cores(now, num_cores).collect::<Vec<_>>()
291			},
292		}
293		.into_iter()
294	}
295
296	/// Get core schedule according to access mode (either take or get).
297	fn get_core_schedule(
298		&self,
299		next_scheduled: BlockNumberFor<T>,
300		core_idx: CoreIndex,
301	) -> Option<Schedule<BlockNumberFor<T>>> {
302		match self {
303			Self::Peek { .. } => super::CoreSchedules::<T>::get((next_scheduled, core_idx)),
304			Self::Pop => super::CoreSchedules::<T>::take((next_scheduled, core_idx)),
305		}
306	}
307}
308
309/// Assignments that got advanced.
310struct AdvancedAssignments {
311	bulk_assignments: Vec<(CoreIndex, ParaId)>,
312	pool_assignments: Vec<(CoreIndex, ParaId)>,
313}
314
315impl AdvancedAssignments {
316	fn into_iter(self) -> impl Iterator<Item = (CoreIndex, ParaId)> {
317		let Self { bulk_assignments, pool_assignments } = self;
318		bulk_assignments.into_iter().chain(pool_assignments.into_iter())
319	}
320}
321
322impl<N> From<Schedule<N>> for WorkState<N> {
323	fn from(schedule: Schedule<N>) -> Self {
324		let Schedule { assignments, end_hint, next_schedule: _ } = schedule;
325		let step =
326			if let Some(min_step_assignment) = assignments.iter().min_by(|a, b| a.1.cmp(&b.1)) {
327				min_step_assignment.1
328			} else {
329				// Assignments empty, should not exist. In any case step size does not matter here:
330				log::debug!("assignments of a `Schedule` should never be empty.");
331				PartsOf57600(1)
332			};
333		let assignments = assignments
334			.into_iter()
335			.map(|(a, ratio)| (a, AssignmentState { ratio, remaining: ratio }))
336			.collect();
337
338		Self { assignments, end_hint, pos: 0, step }
339	}
340}
341
342#[derive(Debug, PartialEq, Eq, PartialOrd, Ord)]
343pub enum Error {
344	AssignmentsEmpty,
345	/// assign_core is only allowed to append new assignments at the end of already existing
346	/// ones or update the last entry.
347	DisallowedInsert,
348}
349
350/// Peek `num_entries` into the future.
351///
352/// First element for each `CoreIndex` will tell what would be retrieved when
353/// `advance_assignments` is called at the next block. The second what one would get in the
354/// block after the next block and so forth.
355///
356/// The predictions are accurate in the sense that if an assignment `B` was predicted, it will
357/// never happen that `advance_assignments` at that block will retrieve an assignment `A`.
358/// What can happen though is that the prediction is empty (returned vec does not contain that
359/// element), but `advance_assignments` at that block will then return something regardless.
360///
361/// Invariants:
362///
363/// - `advance_assignments` must be called for each core each block
364/// exactly once for the prediction offered by `peek_next_block` to stay accurate.
365/// - This function is meant to be called from a runtime API and thus uses the state of the
366/// block after the current one to show an accurate prediction of upcoming schedules.
367pub(super) fn peek_next_block<T: super::Config>(
368	num_entries: u32,
369) -> BTreeMap<CoreIndex, VecDeque<ParaId>> {
370	let now = frame_system::Pallet::<T>::block_number().saturating_plus_one();
371	peek_impl::<T>(now, num_entries)
372}
373
374/// Advance assignments.
375///
376/// We move forward one step with the assignments on each core.
377///
378/// Parameters:
379///
380/// - blocked: Lambda, for each core it returns true, the assignment could not actually be
381/// served.
382///
383/// Returns: Advanced assignments. Blocked cores will still be advanced, but will not be
384/// contained in the output.
385pub(super) fn advance_assignments<T: Config, F: Fn(CoreIndex) -> bool>(
386	is_blocked: F,
387) -> BTreeMap<CoreIndex, ParaId> {
388	let now = frame_system::Pallet::<T>::block_number();
389
390	let assignments = super::CoreDescriptors::<T>::mutate(|core_states| {
391		advance_assignments_single_impl::<T>(now, core_states, AccessMode::<T>::pop())
392	});
393
394	// Give blocked on-demand orders another chance:
395	for blocked in assignments.pool_assignments.iter().filter_map(|(core_idx, para_id)| {
396		if is_blocked(*core_idx) {
397			Some(*para_id)
398		} else {
399			None
400		}
401	}) {
402		on_demand::Pallet::<T>::push_back_order(blocked);
403	}
404
405	let mut assignments: BTreeMap<CoreIndex, ParaId> =
406		assignments.into_iter().filter(|(core_idx, _)| !is_blocked(*core_idx)).collect();
407
408	// Try to fill missing assignments from the next position (duplication to allow asynchronous
409	// backing even for first assignment coming in on a previously empty core):
410	let next = now.saturating_plus_one();
411	let mut core_states = super::CoreDescriptors::<T>::get();
412	let mut on_demand_orders = on_demand::Pallet::<T>::peek_order_queue();
413	let next_assignments = advance_assignments_single_impl(
414		next,
415		&mut core_states,
416		AccessMode::<T>::peek(&mut on_demand_orders),
417	)
418	.into_iter();
419
420	for (core_idx, next_assignment) in
421		next_assignments.filter(|(core_idx, _)| !is_blocked(*core_idx))
422	{
423		assignments.entry(core_idx).or_insert_with(|| next_assignment);
424	}
425	assignments
426}
427
428/// Append another assignment for a core.
429///
430/// Important: Only appending is allowed or insertion into the last item. Meaning,
431/// all already existing assignments must have a `begin` smaller or equal than the one passed
432/// here.
433/// Updating the last entry is supported to allow for making a core assignment multiple calls to
434/// assign_core. Thus if you have too much interlacing for e.g. a single UMP message you can
435/// split that up into multiple messages, each triggering a call to `assign_core`, together
436/// forming the total assignment.
437///
438/// Inserting arbitrarily causes a `DispatchError::DisallowedInsert` error.
439///
440/// Inserting too early (changing assignments within the lookahead depth), will
441/// get the begin auto-adjusted to maintain the stable claim queue invariant, if
442/// there existed assignments before.
443// With the restriction of only allowing for appends this function allows for
444// O(1) complexity. It could easily be lifted, if need be and in fact an
445// implementation is available
446// [here](https://github.com/paritytech/polkadot-sdk/pull/1694/commits/c0c23b01fd2830910cde92c11960dad12cdff398#diff-0c85a46e448de79a5452395829986ee8747e17a857c27ab624304987d2dde8baR386).
447// The problem is that insertion complexity then depends on the size of the existing queue,
448// which makes determining weights hard and could lead to issues like overweight blocks (at
449// least in theory).
450pub(super) fn assign_core<T: Config>(
451	core_idx: CoreIndex,
452	mut begin: BlockNumberFor<T>,
453	mut assignments: Vec<(CoreAssignment, PartsOf57600)>,
454	end_hint: Option<BlockNumberFor<T>>,
455) -> Result<(), Error> {
456	// There should be at least one assignment.
457	ensure!(!assignments.is_empty(), Error::AssignmentsEmpty);
458
459	super::CoreDescriptors::<T>::mutate(|core_descriptors| {
460		let core_descriptor = core_descriptors.entry(core_idx).or_default();
461
462		let config = configuration::ActiveConfig::<T>::get();
463		// Plus 1 because claim queue is always 1 block ahead:
464		let now = frame_system::Pallet::<T>::block_number().saturating_plus_one();
465		let lookahead = config.scheduler_params.lookahead.into();
466		let claim_queue_end = now.saturating_add(lookahead);
467		let assignments_exist = core_descriptor.has_assignments_until(claim_queue_end);
468		// Maintain invariant of stable claim queue (existing visible assignments not getting
469		// replaced):
470		if assignments_exist && begin < claim_queue_end {
471			log::debug!(
472				target: "runtime::parachains::assigner-coretime",
473				"Claim queue needs to be stable, schedule change within claim queue length is not supported. Adjusting begin from {:?} to {:?}",
474				begin,
475				claim_queue_end
476			);
477			begin = claim_queue_end;
478		}
479
480		let new_queue = match core_descriptor.queue {
481			Some(queue) => {
482				ensure!(begin >= queue.last, Error::DisallowedInsert);
483
484				// Update queue if we are appending:
485				if begin > queue.last {
486					super::CoreSchedules::<T>::mutate((queue.last, core_idx), |schedule| {
487						if let Some(schedule) = schedule.as_mut() {
488							debug_assert!(schedule.next_schedule.is_none(), "queue.end was supposed to be the end, so the next item must be `None`!");
489							schedule.next_schedule = Some(begin);
490						} else {
491							defensive!("Queue end entry does not exist?");
492						}
493					});
494				}
495
496				super::CoreSchedules::<T>::mutate((begin, core_idx), |schedule| {
497					let assignments = if let Some(mut old_schedule) = schedule.take() {
498						old_schedule.assignments.append(&mut assignments);
499						old_schedule.assignments
500					} else {
501						assignments
502					};
503					*schedule = Some(Schedule { assignments, end_hint, next_schedule: None });
504				});
505
506				QueueDescriptor { first: queue.first, last: begin }
507			},
508			None => {
509				// Queue empty, just insert:
510				super::CoreSchedules::<T>::insert(
511					(begin, core_idx),
512					Schedule { assignments, end_hint, next_schedule: None },
513				);
514				QueueDescriptor { first: begin, last: begin }
515			},
516		};
517		core_descriptor.queue = Some(new_queue);
518		Ok(())
519	})
520}
521
522fn num_coretime_cores<T: Config>() -> u32 {
523	configuration::ActiveConfig::<T>::get().scheduler_params.num_cores
524}
525
526fn peek_impl<T: Config>(
527	mut now: BlockNumberFor<T>,
528	num_entries: u32,
529) -> BTreeMap<CoreIndex, VecDeque<ParaId>> {
530	let mut core_states = super::CoreDescriptors::<T>::get();
531	let mut result = BTreeMap::new();
532	let mut on_demand_orders = on_demand::Pallet::<T>::peek_order_queue();
533	for i in 0..num_entries {
534		let assignments = advance_assignments_single_impl(
535			now,
536			&mut core_states,
537			AccessMode::<T>::peek(&mut on_demand_orders),
538		)
539		.into_iter();
540		for (core_idx, para_id) in assignments {
541			let claim_queue: &mut VecDeque<ParaId> = result.entry(core_idx).or_default();
542			// Stop filling on holes, otherwise we get claims at the wrong positions.
543			if claim_queue.len() == i as usize {
544				claim_queue.push_back(para_id)
545			} else if claim_queue.len() == 0 && i == 1 {
546				// Except for position 1: Claim queue was empty before. We now have an incoming
547				// assignment on position 1: Duplicate it to position 0 so the chain will
548				// get a full asynchronous backing opportunity (and a bonus synchronous
549				// backing opportunity).
550				claim_queue.push_back(para_id);
551				// And fill position 1:
552				claim_queue.push_back(para_id);
553			}
554		}
555		now.saturating_inc();
556	}
557	result
558}
559
560/// Pop assignments for `now`.
561fn advance_assignments_single_impl<T: Config>(
562	now: BlockNumberFor<T>,
563	core_states: &mut BTreeMap<CoreIndex, CoreDescriptor<BlockNumberFor<T>>>,
564	mut mode: AccessMode<T>,
565) -> AdvancedAssignments {
566	let mut bulk_assignments = Vec::with_capacity(num_coretime_cores::<T>() as _);
567	let mut pool_cores = Vec::with_capacity(num_coretime_cores::<T>() as _);
568	for (core_idx, core_state) in core_states.iter_mut() {
569		ensure_workload::<T>(now, *core_idx, core_state, &mode);
570
571		let Some(work_state) = core_state.current_work.as_mut() else { continue };
572
573		// Wrap around:
574		work_state.pos = work_state.pos % work_state.assignments.len() as u16;
575		let (a_type, a_state) = &mut work_state
576			.assignments
577			.get_mut(work_state.pos as usize)
578			.expect("We limited pos to the size of the vec one line above. qed");
579
580		// advance for next pop:
581		a_state.remaining = a_state.remaining.saturating_sub(work_state.step);
582		if a_state.remaining < work_state.step {
583			// Assignment exhausted, need to move to the next and credit remaining for
584			// next round.
585			work_state.pos += 1;
586			// Reset to ratio + still remaining "credits":
587			a_state.remaining = a_state.remaining.saturating_add(a_state.ratio);
588		}
589		match *a_type {
590			CoreAssignment::Pool => pool_cores.push(*core_idx),
591			CoreAssignment::Task(para_id) => bulk_assignments.push((*core_idx, para_id.into())),
592			CoreAssignment::Idle => {},
593		}
594	}
595
596	let pool_assignments = mode.pop_assignment_for_ondemand_cores(now, pool_cores.len() as _);
597	let pool_assignments = pool_cores.into_iter().zip(pool_assignments).collect();
598
599	AdvancedAssignments { bulk_assignments, pool_assignments }
600}
601
602/// Ensure given workload for core is up to date.
603fn ensure_workload<T: Config>(
604	now: BlockNumberFor<T>,
605	core_idx: CoreIndex,
606	descriptor: &mut CoreDescriptor<BlockNumberFor<T>>,
607	mode: &AccessMode<T>,
608) {
609	// Workload expired?
610	if descriptor
611		.current_work
612		.as_ref()
613		.and_then(|w| w.end_hint)
614		.map_or(false, |e| e <= now)
615	{
616		descriptor.current_work = None;
617	}
618
619	let Some(queue) = descriptor.queue else {
620		// No queue.
621		return;
622	};
623
624	let mut next_scheduled = queue.first;
625
626	if next_scheduled > now {
627		// Not yet ready.
628		return;
629	}
630
631	// Update is needed:
632	let update = loop {
633		let Some(update) = mode.get_core_schedule(next_scheduled, core_idx) else { break None };
634
635		// Still good?
636		if update.end_hint.map_or(true, |e| e > now) {
637			break Some(update);
638		}
639		// Move on if possible:
640		if let Some(n) = update.next_schedule {
641			next_scheduled = n;
642		} else {
643			break None;
644		}
645	};
646
647	let new_first = update.as_ref().and_then(|u| u.next_schedule);
648	descriptor.current_work = update.map(Into::into);
649
650	descriptor.queue = new_first.map(|new_first| {
651		QueueDescriptor {
652			first: new_first,
653			// `last` stays unaffected, if not empty:
654			last: queue.last,
655		}
656	});
657}