referrerpolicy=no-referrer-when-downgrade

polkadot_runtime_parachains/assigner_coretime/
mod.rs

1// Copyright (C) Parity Technologies (UK) Ltd.
2// This file is part of Polkadot.
3
4// Polkadot is free software: you can redistribute it and/or modify
5// it under the terms of the GNU General Public License as published by
6// the Free Software Foundation, either version 3 of the License, or
7// (at your option) any later version.
8
9// Polkadot is distributed in the hope that it will be useful,
10// but WITHOUT ANY WARRANTY; without even the implied warranty of
11// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
12// GNU General Public License for more details.
13
14// You should have received a copy of the GNU General Public License
15// along with Polkadot.  If not, see <http://www.gnu.org/licenses/>.
16
17//! The parachain coretime assignment module.
18//!
19//! Handles scheduling of assignments coming from the coretime/broker chain. For on-demand
20//! assignments it relies on the separate on-demand assignment provider, where it forwards requests
21//! to.
22//!
23//! `CoreDescriptor` contains pointers to the begin and the end of a list of schedules, together
24//! with the currently active assignments.
25
26mod mock_helpers;
27#[cfg(test)]
28mod tests;
29
30use crate::{
31	configuration, on_demand,
32	paras::AssignCoretime,
33	scheduler::common::{Assignment, AssignmentProvider},
34	ParaId,
35};
36
37use alloc::{vec, vec::Vec};
38use frame_support::{defensive, pallet_prelude::*};
39use frame_system::pallet_prelude::*;
40use pallet_broker::CoreAssignment;
41use polkadot_primitives::CoreIndex;
42use sp_runtime::traits::{One, Saturating};
43
44pub use pallet::*;
45
46/// Fraction expressed as a nominator with an assumed denominator of 57,600.
47#[derive(
48	RuntimeDebug,
49	Clone,
50	Copy,
51	PartialEq,
52	Eq,
53	PartialOrd,
54	Ord,
55	Encode,
56	Decode,
57	DecodeWithMemTracking,
58	TypeInfo,
59)]
60pub struct PartsOf57600(u16);
61
62impl PartsOf57600 {
63	pub const ZERO: Self = Self(0);
64	pub const FULL: Self = Self(57600);
65
66	pub fn new_saturating(v: u16) -> Self {
67		Self::ZERO.saturating_add(Self(v))
68	}
69
70	pub fn is_full(&self) -> bool {
71		*self == Self::FULL
72	}
73
74	pub fn saturating_add(self, rhs: Self) -> Self {
75		let inner = self.0.saturating_add(rhs.0);
76		if inner > 57600 {
77			Self(57600)
78		} else {
79			Self(inner)
80		}
81	}
82
83	pub fn saturating_sub(self, rhs: Self) -> Self {
84		Self(self.0.saturating_sub(rhs.0))
85	}
86
87	pub fn checked_add(self, rhs: Self) -> Option<Self> {
88		let inner = self.0.saturating_add(rhs.0);
89		if inner > 57600 {
90			None
91		} else {
92			Some(Self(inner))
93		}
94	}
95}
96
97/// Assignments as they are scheduled by block number
98///
99/// for a particular core.
100#[derive(Encode, Decode, TypeInfo)]
101#[cfg_attr(test, derive(PartialEq, RuntimeDebug))]
102struct Schedule<N> {
103	// Original assignments
104	assignments: Vec<(CoreAssignment, PartsOf57600)>,
105	/// When do our assignments become invalid, if at all?
106	///
107	/// If this is `Some`, then this `CoreState` will be dropped at that block number. If this is
108	/// `None`, then we will keep serving our core assignments in a circle until a new set of
109	/// assignments is scheduled.
110	end_hint: Option<N>,
111
112	/// The next queued schedule for this core.
113	///
114	/// Schedules are forming a queue.
115	next_schedule: Option<N>,
116}
117
118/// Descriptor for a core.
119///
120/// Contains pointers to first and last schedule into `CoreSchedules` for that core and keeps track
121/// of the currently active work as well.
122#[derive(Encode, Decode, TypeInfo, Default)]
123#[cfg_attr(test, derive(PartialEq, RuntimeDebug, Clone))]
124struct CoreDescriptor<N> {
125	/// Meta data about the queued schedules for this core.
126	queue: Option<QueueDescriptor<N>>,
127	/// Currently performed work.
128	current_work: Option<WorkState<N>>,
129}
130
131/// Pointers into `CoreSchedules` for a particular core.
132///
133/// Schedules in `CoreSchedules` form a queue. `Schedule::next_schedule` always pointing to the next
134/// item.
135#[derive(Encode, Decode, TypeInfo, Copy, Clone)]
136#[cfg_attr(test, derive(PartialEq, RuntimeDebug))]
137struct QueueDescriptor<N> {
138	/// First scheduled item, that is not yet active.
139	first: N,
140	/// Last scheduled item.
141	last: N,
142}
143
144#[derive(Encode, Decode, TypeInfo)]
145#[cfg_attr(test, derive(PartialEq, RuntimeDebug, Clone))]
146struct WorkState<N> {
147	/// Assignments with current state.
148	///
149	/// Assignments and book keeping on how much has been served already. We keep track of serviced
150	/// assignments in order to adhere to the specified ratios.
151	assignments: Vec<(CoreAssignment, AssignmentState)>,
152	/// When do our assignments become invalid if at all?
153	///
154	/// If this is `Some`, then this `CoreState` will be dropped at that block number. If this is
155	/// `None`, then we will keep serving our core assignments in a circle until a new set of
156	/// assignments is scheduled.
157	end_hint: Option<N>,
158	/// Position in the assignments we are currently in.
159	///
160	/// Aka which core assignment will be popped next on
161	/// `AssignmentProvider::pop_assignment_for_core`.
162	pos: u16,
163	/// Step width
164	///
165	/// How much we subtract from `AssignmentState::remaining` for a core served.
166	step: PartsOf57600,
167}
168
169#[derive(Encode, Decode, TypeInfo)]
170#[cfg_attr(test, derive(PartialEq, RuntimeDebug, Clone, Copy))]
171struct AssignmentState {
172	/// Ratio of the core this assignment has.
173	///
174	/// As initially received via `assign_core`.
175	ratio: PartsOf57600,
176	/// How many parts are remaining in this round?
177	///
178	/// At the end of each round (in preparation for the next), ratio will be added to remaining.
179	/// Then every time we get scheduled we subtract a core worth of points. Once we reach 0 or a
180	/// number lower than what a core is worth (`CoreState::step` size), we move on to the next
181	/// item in the `Vec`.
182	///
183	/// The first round starts with remaining = ratio.
184	remaining: PartsOf57600,
185}
186
187impl<N> From<Schedule<N>> for WorkState<N> {
188	fn from(schedule: Schedule<N>) -> Self {
189		let Schedule { assignments, end_hint, next_schedule: _ } = schedule;
190		let step =
191			if let Some(min_step_assignment) = assignments.iter().min_by(|a, b| a.1.cmp(&b.1)) {
192				min_step_assignment.1
193			} else {
194				// Assignments empty, should not exist. In any case step size does not matter here:
195				log::debug!("assignments of a `Schedule` should never be empty.");
196				PartsOf57600(1)
197			};
198		let assignments = assignments
199			.into_iter()
200			.map(|(a, ratio)| (a, AssignmentState { ratio, remaining: ratio }))
201			.collect();
202
203		Self { assignments, end_hint, pos: 0, step }
204	}
205}
206
207#[frame_support::pallet]
208pub mod pallet {
209	use super::*;
210
211	#[pallet::pallet]
212	#[pallet::without_storage_info]
213	pub struct Pallet<T>(_);
214
215	#[pallet::config]
216	pub trait Config: frame_system::Config + configuration::Config + on_demand::Config {}
217
218	/// Scheduled assignment sets.
219	///
220	/// Assignments as of the given block number. They will go into state once the block number is
221	/// reached (and replace whatever was in there before).
222	#[pallet::storage]
223	pub(super) type CoreSchedules<T: Config> = StorageMap<
224		_,
225		Twox256,
226		(BlockNumberFor<T>, CoreIndex),
227		Schedule<BlockNumberFor<T>>,
228		OptionQuery,
229	>;
230
231	/// Assignments which are currently active.
232	///
233	/// They will be picked from `PendingAssignments` once we reach the scheduled block number in
234	/// `PendingAssignments`.
235	#[pallet::storage]
236	pub(super) type CoreDescriptors<T: Config> = StorageMap<
237		_,
238		Twox256,
239		CoreIndex,
240		CoreDescriptor<BlockNumberFor<T>>,
241		ValueQuery,
242		GetDefault,
243	>;
244
245	#[pallet::hooks]
246	impl<T: Config> Hooks<BlockNumberFor<T>> for Pallet<T> {}
247
248	#[pallet::error]
249	pub enum Error<T> {
250		AssignmentsEmpty,
251		/// assign_core is only allowed to append new assignments at the end of already existing
252		/// ones or update the last entry.
253		DisallowedInsert,
254	}
255}
256
257impl<T: Config> AssignmentProvider<BlockNumberFor<T>> for Pallet<T> {
258	fn pop_assignment_for_core(core_idx: CoreIndex) -> Option<Assignment> {
259		let now = frame_system::Pallet::<T>::block_number();
260
261		CoreDescriptors::<T>::mutate(core_idx, |core_state| {
262			Self::ensure_workload(now, core_idx, core_state);
263
264			let work_state = core_state.current_work.as_mut()?;
265
266			// Wrap around:
267			work_state.pos = work_state.pos % work_state.assignments.len() as u16;
268			let (a_type, a_state) = &mut work_state
269				.assignments
270				.get_mut(work_state.pos as usize)
271				.expect("We limited pos to the size of the vec one line above. qed");
272
273			// advance for next pop:
274			a_state.remaining = a_state.remaining.saturating_sub(work_state.step);
275			if a_state.remaining < work_state.step {
276				// Assignment exhausted, need to move to the next and credit remaining for
277				// next round.
278				work_state.pos += 1;
279				// Reset to ratio + still remaining "credits":
280				a_state.remaining = a_state.remaining.saturating_add(a_state.ratio);
281			}
282
283			match a_type {
284				CoreAssignment::Idle => None,
285				CoreAssignment::Pool => on_demand::Pallet::<T>::pop_assignment_for_core(core_idx),
286				CoreAssignment::Task(para_id) => Some(Assignment::Bulk((*para_id).into())),
287			}
288		})
289	}
290
291	fn report_processed(assignment: Assignment) {
292		match assignment {
293			Assignment::Pool { para_id, core_index } =>
294				on_demand::Pallet::<T>::report_processed(para_id, core_index),
295			Assignment::Bulk(_) => {},
296		}
297	}
298
299	/// Push an assignment back to the front of the queue.
300	///
301	/// The assignment has not been processed yet. Typically used on session boundaries.
302	/// Parameters:
303	/// - `assignment`: The on demand assignment.
304	fn push_back_assignment(assignment: Assignment) {
305		match assignment {
306			Assignment::Pool { para_id, core_index } =>
307				on_demand::Pallet::<T>::push_back_assignment(para_id, core_index),
308			Assignment::Bulk(_) => {
309				// Session changes are rough. We just drop assignments that did not make it on a
310				// session boundary. This seems sensible as bulk is region based. Meaning, even if
311				// we made the effort catching up on those dropped assignments, this would very
312				// likely lead to other assignments not getting served at the "end" (when our
313				// assignment set gets replaced).
314			},
315		}
316	}
317
318	#[cfg(any(feature = "runtime-benchmarks", test))]
319	fn get_mock_assignment(_: CoreIndex, para_id: polkadot_primitives::Id) -> Assignment {
320		// Given that we are not tracking anything in `Bulk` assignments, it is safe to always
321		// return a bulk assignment.
322		Assignment::Bulk(para_id)
323	}
324
325	fn assignment_duplicated(assignment: &Assignment) {
326		match assignment {
327			Assignment::Pool { para_id, core_index } =>
328				on_demand::Pallet::<T>::assignment_duplicated(*para_id, *core_index),
329			Assignment::Bulk(_) => {},
330		}
331	}
332}
333
334impl<T: Config> Pallet<T> {
335	/// Ensure given workload for core is up to date.
336	fn ensure_workload(
337		now: BlockNumberFor<T>,
338		core_idx: CoreIndex,
339		descriptor: &mut CoreDescriptor<BlockNumberFor<T>>,
340	) {
341		// Workload expired?
342		if descriptor
343			.current_work
344			.as_ref()
345			.and_then(|w| w.end_hint)
346			.map_or(false, |e| e <= now)
347		{
348			descriptor.current_work = None;
349		}
350
351		let Some(queue) = descriptor.queue else {
352			// No queue.
353			return
354		};
355
356		let mut next_scheduled = queue.first;
357
358		if next_scheduled > now {
359			// Not yet ready.
360			return
361		}
362
363		// Update is needed:
364		let update = loop {
365			let Some(update) = CoreSchedules::<T>::take((next_scheduled, core_idx)) else {
366				break None
367			};
368			// Still good?
369			if update.end_hint.map_or(true, |e| e > now) {
370				break Some(update)
371			}
372			// Move on if possible:
373			if let Some(n) = update.next_schedule {
374				next_scheduled = n;
375			} else {
376				break None
377			}
378		};
379
380		let new_first = update.as_ref().and_then(|u| u.next_schedule);
381		descriptor.current_work = update.map(Into::into);
382
383		descriptor.queue = new_first.map(|new_first| {
384			QueueDescriptor {
385				first: new_first,
386				// `last` stays unaffected, if not empty:
387				last: queue.last,
388			}
389		});
390	}
391
392	/// Append another assignment for a core.
393	///
394	/// Important: Only appending is allowed or insertion into the last item. Meaning,
395	/// all already existing assignments must have a `begin` smaller or equal than the one passed
396	/// here.
397	/// Updating the last entry is supported to allow for making a core assignment multiple calls to
398	/// assign_core. Thus if you have too much interlacing for e.g. a single UMP message you can
399	/// split that up into multiple messages, each triggering a call to `assign_core`, together
400	/// forming the total assignment.
401	///
402	/// Inserting arbitrarily causes a `DispatchError::DisallowedInsert` error.
403	// With this restriction this function allows for O(1) complexity. It could easily be lifted, if
404	// need be and in fact an implementation is available
405	// [here](https://github.com/paritytech/polkadot-sdk/pull/1694/commits/c0c23b01fd2830910cde92c11960dad12cdff398#diff-0c85a46e448de79a5452395829986ee8747e17a857c27ab624304987d2dde8baR386).
406	// The problem is that insertion complexity then depends on the size of the existing queue,
407	// which makes determining weights hard and could lead to issues like overweight blocks (at
408	// least in theory).
409	pub fn assign_core(
410		core_idx: CoreIndex,
411		begin: BlockNumberFor<T>,
412		mut assignments: Vec<(CoreAssignment, PartsOf57600)>,
413		end_hint: Option<BlockNumberFor<T>>,
414	) -> Result<(), DispatchError> {
415		// There should be at least one assignment.
416		ensure!(!assignments.is_empty(), Error::<T>::AssignmentsEmpty);
417
418		CoreDescriptors::<T>::mutate(core_idx, |core_descriptor| {
419			let new_queue = match core_descriptor.queue {
420				Some(queue) => {
421					ensure!(begin >= queue.last, Error::<T>::DisallowedInsert);
422
423					// Update queue if we are appending:
424					if begin > queue.last {
425						CoreSchedules::<T>::mutate((queue.last, core_idx), |schedule| {
426							if let Some(schedule) = schedule.as_mut() {
427								debug_assert!(schedule.next_schedule.is_none(), "queue.end was supposed to be the end, so the next item must be `None`!");
428								schedule.next_schedule = Some(begin);
429							} else {
430								defensive!("Queue end entry does not exist?");
431							}
432						});
433					}
434
435					CoreSchedules::<T>::mutate((begin, core_idx), |schedule| {
436						let assignments = if let Some(mut old_schedule) = schedule.take() {
437							old_schedule.assignments.append(&mut assignments);
438							old_schedule.assignments
439						} else {
440							assignments
441						};
442						*schedule = Some(Schedule { assignments, end_hint, next_schedule: None });
443					});
444
445					QueueDescriptor { first: queue.first, last: begin }
446				},
447				None => {
448					// Queue empty, just insert:
449					CoreSchedules::<T>::insert(
450						(begin, core_idx),
451						Schedule { assignments, end_hint, next_schedule: None },
452					);
453					QueueDescriptor { first: begin, last: begin }
454				},
455			};
456			core_descriptor.queue = Some(new_queue);
457			Ok(())
458		})
459	}
460}
461
462impl<T: Config> AssignCoretime for Pallet<T> {
463	fn assign_coretime(id: ParaId) -> DispatchResult {
464		let current_block = frame_system::Pallet::<T>::block_number();
465
466		// Add a new core and assign the para to it.
467		let mut config = configuration::ActiveConfig::<T>::get();
468		let core = config.scheduler_params.num_cores;
469		config.scheduler_params.num_cores.saturating_inc();
470
471		// `assign_coretime` is only called at genesis or by root, so setting the active
472		// config here is fine.
473		configuration::Pallet::<T>::force_set_active_config(config);
474
475		let begin = current_block + One::one();
476		let assignment = vec![(pallet_broker::CoreAssignment::Task(id.into()), PartsOf57600::FULL)];
477		Pallet::<T>::assign_core(CoreIndex(core), begin, assignment, None)
478	}
479}